diff --git a/genie-web/src/main/java/com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcJobKillServiceImpl.java b/genie-web/src/main/java/com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcJobKillServiceImpl.java
index b428798fb1c..f36468b9f05 100644
--- a/genie-web/src/main/java/com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcJobKillServiceImpl.java
+++ b/genie-web/src/main/java/com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcJobKillServiceImpl.java
@@ -37,6 +37,7 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
+import org.springframework.scheduling.annotation.Scheduled;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
@@ -187,4 +188,30 @@ public void killJob(
);
}
}
+
+ /**
+ * Remove orphaned kill observers from local map.
+ *
+ * The logic as currently implemented is to have the Agent, once handshake is complete, open a connection to
+ * the server which results in parking a response observer in the map stored in this implementation. Upon receiving
+ * a kill request for the correct job this class will use the observer to send the "response" to the agent which
+ * will begin shut down process. The issue is that if the agent disconnects from this server the server will never
+ * realize it's gone and these observers will build up in the map in memory forever. This method will periodically
+ * go through the map and determine if the observers are still valid and remove any that aren't.
+ *
+ * @see "GRpcAgentJobKillServiceImpl"
+ */
+ @Scheduled(fixedDelay = 30_000L, initialDelay = 30_000L)
+ public void cleanupOrphanedObservers() {
+ for (final String jobId : this.parkedJobKillResponseObservers.keySet()) {
+ if (!this.agentRoutingService.isAgentConnectionLocal(jobId)) {
+ final StreamObserver observer = this.parkedJobKillResponseObservers.remove(
+ jobId
+ );
+ if (observer != null) {
+ observer.onCompleted();
+ }
+ }
+ }
+ }
}
diff --git a/genie-web/src/test/groovy/com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcJobKillServiceImplSpec.groovy b/genie-web/src/test/groovy/com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcJobKillServiceImplSpec.groovy
index 7cd05d4dc68..d9f8011e06c 100644
--- a/genie-web/src/test/groovy/com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcJobKillServiceImplSpec.groovy
+++ b/genie-web/src/test/groovy/com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcJobKillServiceImplSpec.groovy
@@ -185,4 +185,65 @@ class GRpcJobKillServiceImplSpec extends Specification {
1 * this.requestForwardingService.kill(this.remoteHost, this.jobId, this.servletRequest)
noExceptionThrown()
}
+
+ def "can clean up orphans"() {
+ def jobId0 = UUID.randomUUID().toString()
+ def jobId1 = UUID.randomUUID().toString()
+ def jobId2 = UUID.randomUUID().toString()
+
+ def request0 = JobKillRegistrationRequest.newBuilder().setJobId(jobId0).build()
+ def request1 = JobKillRegistrationRequest.newBuilder().setJobId(jobId1).build()
+ def request2 = JobKillRegistrationRequest.newBuilder().setJobId(jobId2).build()
+
+ def jobObserver0 = Mock(StreamObserver)
+ def jobObserver1 = Mock(StreamObserver)
+ def jobObserver2 = Mock(StreamObserver)
+
+ when: "Nothing is registered"
+ this.service.cleanupOrphanedObservers()
+
+ then: "Nothing happens"
+ this.service.getParkedJobKillResponseObservers().isEmpty()
+
+ when: "Jobs are registered"
+ this.service.registerForKillNotification(request0, jobObserver0)
+ this.service.registerForKillNotification(request1, jobObserver1)
+ this.service.registerForKillNotification(request2, jobObserver2)
+
+ then: "They're saved in map"
+ this.service.getParkedJobKillResponseObservers().size() == 3
+ this.service.getParkedJobKillResponseObservers().get(jobId0) == jobObserver0
+ this.service.getParkedJobKillResponseObservers().get(jobId1) == jobObserver1
+ this.service.getParkedJobKillResponseObservers().get(jobId2) == jobObserver2
+
+ when: "All jobs still attached locally"
+ this.service.cleanupOrphanedObservers()
+
+ then: "Nothing happens"
+ 1 * this.agentRoutingService.isAgentConnectionLocal(jobId0) >> true
+ 1 * this.agentRoutingService.isAgentConnectionLocal(jobId1) >> true
+ 1 * this.agentRoutingService.isAgentConnectionLocal(jobId2) >> true
+ this.service.getParkedJobKillResponseObservers().size() == 3
+ this.service.getParkedJobKillResponseObservers().get(jobId0) == jobObserver0
+ this.service.getParkedJobKillResponseObservers().get(jobId1) == jobObserver1
+ this.service.getParkedJobKillResponseObservers().get(jobId2) == jobObserver2
+ 0 * jobObserver0.onCompleted()
+ 0 * jobObserver1.onCompleted()
+ 0 * jobObserver2.onCompleted()
+
+ when: "The jobs switch servers or complete"
+ this.service.cleanupOrphanedObservers()
+
+ then: "It is removed from the map and the observer is completed"
+ 1 * this.agentRoutingService.isAgentConnectionLocal(jobId0) >> false
+ 1 * this.agentRoutingService.isAgentConnectionLocal(jobId1) >> true
+ 1 * this.agentRoutingService.isAgentConnectionLocal(jobId2) >> false
+ this.service.getParkedJobKillResponseObservers().size() == 1
+ this.service.getParkedJobKillResponseObservers().get(jobId0) == null
+ this.service.getParkedJobKillResponseObservers().get(jobId1) == jobObserver1
+ this.service.getParkedJobKillResponseObservers().get(jobId2) == null
+ 1 * jobObserver0.onCompleted()
+ 0 * jobObserver1.onCompleted()
+ 1 * jobObserver2.onCompleted()
+ }
}