diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DockerContainerDeletionTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DockerContainerDeletionTask.java index 8882b455446f7..5123240389393 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DockerContainerDeletionTask.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DockerContainerDeletionTask.java @@ -56,6 +56,7 @@ public void run() { LinuxContainerExecutor exec = ((LinuxContainerExecutor) getDeletionService().getContainerExecutor()); exec.removeDockerContainer(containerId); + deletionTaskFinished(); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java index 87f4a1c2fd175..197a630c554c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java @@ -33,9 +33,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DockerContainerDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.junit.AfterClass; import org.junit.Test; import org.mockito.Mockito; @@ -370,4 +374,61 @@ public void testRecovery() throws Exception { stateStore.close(); } } + + @Test + public void testDockerContainerDeletionTaskLeak() throws Exception { + + String user = "foo"; + String containerId = "container_e123_123456_000001"; + + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 1); + NMMemoryStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + + LinuxContainerExecutor lce = Mockito.mock(LinuxContainerExecutor.class); + DeletionService del = new DeletionService(lce, stateStore); + + try { + del.init(conf); + del.start(); + DockerContainerDeletionTask deletionTask = new DockerContainerDeletionTask(del, + user, containerId); + del.delete(deletionTask); + + List deleteTaskProtos = + loadDeletionTaskProtos(stateStore); + + assertEquals(1, deleteTaskProtos.size()); + DeletionServiceDeleteTaskProto deleteTaskProto = deleteTaskProtos.get(0); + assertEquals(user, deleteTaskProto.getUser()); + assertEquals(containerId, deleteTaskProto.getDockerContainerId()); + + int msecToWait = 5 * 1000; + Thread.sleep(msecToWait); + + deleteTaskProtos = loadDeletionTaskProtos(stateStore); + System.out.println("TEST deleteTaskProtos Size: " + deleteTaskProtos.size()); + assertTrue(deleteTaskProtos.isEmpty()); + + } finally { + del.close(); + stateStore.close(); + } + } + + private List loadDeletionTaskProtos( + NMStateStoreService stateStore) throws IOException { + + RecoveryIterator it = + stateStore.loadDeletionServiceState().getIterator(); + List deleteTaskProtos = + new ArrayList(); + while (it.hasNext()) { + deleteTaskProtos.add(it.next()); + } + return deleteTaskProtos; + } }