Skip to content

Commit

Permalink
Add periodic observer cleanup to job kill service
Browse files Browse the repository at this point in the history
If an agent disconnects or reconnects to a new node the old observers can be left in an orphaned state on the original node. This periodic task looks through the in memory map and removes/closes any observers for jobs that are no longer attached to the server.
  • Loading branch information
tgianos committed Feb 15, 2021
1 parent 85c5d55 commit 2845660
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.scheduling.annotation.Scheduled;

import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
Expand Down Expand Up @@ -187,4 +188,30 @@ public void killJob(
);
}
}

/**
* Remove orphaned kill observers from local map.
* <p>
* The logic as currently implemented is to have the Agent, once handshake is complete, open a connection to
* the server which results in parking a response observer in the map stored in this implementation. Upon receiving
* a kill request for the correct job this class will use the observer to send the "response" to the agent which
* will begin shut down process. The issue is that if the agent disconnects from this server the server will never
* realize it's gone and these observers will build up in the map in memory forever. This method will periodically
* go through the map and determine if the observers are still valid and remove any that aren't.
*
* @see "GRpcAgentJobKillServiceImpl"
*/
@Scheduled(fixedDelay = 30_000L, initialDelay = 30_000L)
public void cleanupOrphanedObservers() {
for (final String jobId : this.parkedJobKillResponseObservers.keySet()) {
if (!this.agentRoutingService.isAgentConnectionLocal(jobId)) {
final StreamObserver<JobKillRegistrationResponse> observer = this.parkedJobKillResponseObservers.remove(
jobId
);
if (observer != null) {
observer.onCompleted();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,65 @@ class GRpcJobKillServiceImplSpec extends Specification {
1 * this.requestForwardingService.kill(this.remoteHost, this.jobId, this.servletRequest)
noExceptionThrown()
}

def "can clean up orphans"() {
def jobId0 = UUID.randomUUID().toString()
def jobId1 = UUID.randomUUID().toString()
def jobId2 = UUID.randomUUID().toString()

def request0 = JobKillRegistrationRequest.newBuilder().setJobId(jobId0).build()
def request1 = JobKillRegistrationRequest.newBuilder().setJobId(jobId1).build()
def request2 = JobKillRegistrationRequest.newBuilder().setJobId(jobId2).build()

def jobObserver0 = Mock(StreamObserver)
def jobObserver1 = Mock(StreamObserver)
def jobObserver2 = Mock(StreamObserver)

when: "Nothing is registered"
this.service.cleanupOrphanedObservers()

then: "Nothing happens"
this.service.getParkedJobKillResponseObservers().isEmpty()

when: "Jobs are registered"
this.service.registerForKillNotification(request0, jobObserver0)
this.service.registerForKillNotification(request1, jobObserver1)
this.service.registerForKillNotification(request2, jobObserver2)

then: "They're saved in map"
this.service.getParkedJobKillResponseObservers().size() == 3
this.service.getParkedJobKillResponseObservers().get(jobId0) == jobObserver0
this.service.getParkedJobKillResponseObservers().get(jobId1) == jobObserver1
this.service.getParkedJobKillResponseObservers().get(jobId2) == jobObserver2

when: "All jobs still attached locally"
this.service.cleanupOrphanedObservers()

then: "Nothing happens"
1 * this.agentRoutingService.isAgentConnectionLocal(jobId0) >> true
1 * this.agentRoutingService.isAgentConnectionLocal(jobId1) >> true
1 * this.agentRoutingService.isAgentConnectionLocal(jobId2) >> true
this.service.getParkedJobKillResponseObservers().size() == 3
this.service.getParkedJobKillResponseObservers().get(jobId0) == jobObserver0
this.service.getParkedJobKillResponseObservers().get(jobId1) == jobObserver1
this.service.getParkedJobKillResponseObservers().get(jobId2) == jobObserver2
0 * jobObserver0.onCompleted()
0 * jobObserver1.onCompleted()
0 * jobObserver2.onCompleted()

when: "The jobs switch servers or complete"
this.service.cleanupOrphanedObservers()

then: "It is removed from the map and the observer is completed"
1 * this.agentRoutingService.isAgentConnectionLocal(jobId0) >> false
1 * this.agentRoutingService.isAgentConnectionLocal(jobId1) >> true
1 * this.agentRoutingService.isAgentConnectionLocal(jobId2) >> false
this.service.getParkedJobKillResponseObservers().size() == 1
this.service.getParkedJobKillResponseObservers().get(jobId0) == null
this.service.getParkedJobKillResponseObservers().get(jobId1) == jobObserver1
this.service.getParkedJobKillResponseObservers().get(jobId2) == null
1 * jobObserver0.onCompleted()
0 * jobObserver1.onCompleted()
1 * jobObserver2.onCompleted()
}
}

0 comments on commit 2845660

Please sign in to comment.