From ce97bfedf7374c8e2a9ea93e772fdac5548f7454 Mon Sep 17 00:00:00 2001 From: sihuazhou Date: Fri, 20 Apr 2018 13:02:28 +0800 Subject: [PATCH 1/2] fix YarnResourceManager sometimes does not request new Containers --- .../flink/yarn/YarnResourceManager.java | 49 +++++++- .../flink/yarn/YarnResourceManagerTest.java | 106 ++++++++++++++++++ 2 files changed, 154 insertions(+), 1 deletion(-) diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 4eb4fc90dfe7f..f1c35ed86038d 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -18,6 +18,7 @@ package org.apache.flink.yarn; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; @@ -29,7 +30,9 @@ import org.apache.flink.runtime.entrypoint.ClusterInformation; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; @@ -37,6 +40,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.yarn.configuration.YarnConfigOptions; @@ -114,6 +118,9 @@ public class YarnResourceManager extends ResourceManager impleme private final Map resourcePriorities = new HashMap<>(); + /** The containers that we expected to register with ResourceManager. */ + private final Map pendingContainersExpectedToRegister = new ConcurrentHashMap<>(); + public YarnResourceManager( RpcService rpcService, String resourceManagerEndpointId, @@ -265,6 +272,29 @@ public CompletableFuture postStop() { } } + @Override + public CompletableFuture registerTaskExecutor( + String taskExecutorAddress, + ResourceID taskExecutorResourceId, + SlotReport slotReport, + int dataPort, + HardwareDescription hardwareDescription, + Time timeout) { + + CompletableFuture responseFuture = super.registerTaskExecutor( + taskExecutorAddress, + taskExecutorResourceId, + slotReport, + dataPort, + hardwareDescription, + timeout); + + // ack the pending register + pendingContainersExpectedToRegister.remove(taskExecutorResourceId); + + return responseFuture; + } + @Override protected void internalDeregisterApplication( ApplicationStatus finalStatus, @@ -334,7 +364,16 @@ public void onContainersCompleted(List list) { closeTaskManagerConnection(new ResourceID( container.getContainerId().toString()), new Exception(container.getDiagnostics())); } - workerNodeMap.remove(new ResourceID(container.getContainerId().toString())); + + ResourceID completedResourceID = new ResourceID(container.getContainerId().toString()); + workerNodeMap.remove(completedResourceID); + + // this means the TM exists without registering with ResourceManager successfully, + // we just fire a new request + Container pendingContainer = pendingContainersExpectedToRegister.remove(completedResourceID); + if (pendingContainer != null) { + requestYarnContainer(pendingContainer.getResource(), pendingContainer.getPriority()); + } } } ); @@ -356,6 +395,8 @@ public void onContainersAllocated(List containers) { workerNodeMap.put(new ResourceID(containerIdStr), new YarnWorkerNode(container)); + ResourceID resourceID = new ResourceID(containerIdStr); + try { // Context information used to start a TaskExecutor Java process ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext( @@ -363,10 +404,16 @@ public void onContainersAllocated(List containers) { containerIdStr, container.getNodeId().getHost()); + // remember the pending container that need to be registered with ResourceManager. + pendingContainersExpectedToRegister.put(resourceID, container); + nodeManagerClient.startContainer(container, taskExecutorLaunchContext); } catch (Throwable t) { log.error("Could not start TaskManager in container {}.", container.getId(), t); + // remove the failed container + pendingContainersExpectedToRegister.remove(resourceID); + // release the failed container resourceManagerClient.releaseAssignedContainer(container.getId()); // and ask for a new one diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index 0d37b8ed6bf98..294a1e6be85b3 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -95,6 +96,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -388,4 +390,108 @@ public void testStopWorker() throws Exception { assertTrue(resourceManager.getNumberOfRegisteredTaskManagers().get() == 0); }}; } + + /** + * Tests the case that containers are killed before registering with ResourceManager successfully. + */ + @Test + public void testKillContainerBeforeTMRegisterSuccessfully() throws Exception { + new Context() {{ + startResourceManager(); + + // Request slot from SlotManager. + CompletableFuture registerSlotRequestFuture = resourceManager.runInMainThread(() -> { + rmServices.slotManager.registerSlotRequest( + new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost)); + return null; + }); + + // wait for the registerSlotRequest completion + registerSlotRequestFuture.get(); + + // step1 ------------> test container completed before registering with master <------------ + + ApplicationId testingApplicationId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + ContainerId containerCompletedBeforeRegisteringWithRMId = ContainerId.newInstance( + ApplicationAttemptId.newInstance( + testingApplicationId, + 1), + 1); + + // Callback from YARN when container is allocated. + Container containerCompletedBeforeRegisteringWithRM = mock(Container.class); + when(containerCompletedBeforeRegisteringWithRM.getId()).thenReturn(containerCompletedBeforeRegisteringWithRMId); + + when(containerCompletedBeforeRegisteringWithRM.getNodeId()).thenReturn(NodeId.newInstance("containerCompletedBeforeRegisteringWithRM", 1234)); + when(containerCompletedBeforeRegisteringWithRM.getResource()).thenReturn(Resource.newInstance(200, 1)); + when(containerCompletedBeforeRegisteringWithRM.getPriority()).thenReturn(Priority.UNDEFINED); + resourceManager.onContainersAllocated(ImmutableList.of(containerCompletedBeforeRegisteringWithRM)); + verify(mockResourceManagerClient, times(1)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockNMClient, times(1)).startContainer(eq(containerCompletedBeforeRegisteringWithRM), any(ContainerLaunchContext.class)); + + ContainerStatus containerCompletedBeforeRegisteringWithRMStatus = mock(ContainerStatus.class); + when(containerCompletedBeforeRegisteringWithRMStatus.getExitStatus()).thenReturn(-1); + when(containerCompletedBeforeRegisteringWithRMStatus.getDiagnostics()).thenReturn("the mock diagnostics."); + when(containerCompletedBeforeRegisteringWithRMStatus.getContainerId()).thenReturn(containerCompletedBeforeRegisteringWithRMId); + + resourceManager.onContainersCompleted(ImmutableList.of(containerCompletedBeforeRegisteringWithRMStatus)); + + verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + + // step2 -----------> test container completed after registering with master <----------- + + ContainerId containerRegisterSuccessfullyId = ContainerId.newInstance( + ApplicationAttemptId.newInstance( + testingApplicationId, + 1), + 2); + + Container containerRegisterSuccessfully = mock(Container.class); + when(containerRegisterSuccessfully.getId()).thenReturn(containerRegisterSuccessfullyId); + + when(containerRegisterSuccessfully.getNodeId()).thenReturn(NodeId.newInstance("container", 1234)); + when(containerRegisterSuccessfully.getResource()).thenReturn(Resource.newInstance(200, 1)); + when(containerRegisterSuccessfully.getPriority()).thenReturn(Priority.UNDEFINED); + + resourceManager.onContainersAllocated(ImmutableList.of(containerRegisterSuccessfully)); + verify(mockNMClient, times(1)).startContainer(eq(containerRegisterSuccessfully), any(ContainerLaunchContext.class)); + + // Remote task executor registers with YarnResourceManager. + TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class); + rpcService.registerGateway(taskHost, mockTaskExecutorGateway); + + final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); + + final ResourceID taskManagerResourceId = new ResourceID(containerRegisterSuccessfully.getId().toString()); + final SlotReport slotReport = new SlotReport( + new SlotStatus( + new SlotID(taskManagerResourceId, 1), + new ResourceProfile(10, 1, 1, 1, 0, Collections.emptyMap()))); + + CompletableFuture numberRegisteredSlotsFuture = rmGateway + .registerTaskExecutor( + taskHost, + taskManagerResourceId, + slotReport, + dataPort, + hardwareDescription, + Time.seconds(10L)) + .handleAsync( + (RegistrationResponse response, Throwable throwable) -> rmServices.slotManager.getNumberRegisteredSlots(), + resourceManager.getMainThreadExecutorForTesting()); + + final int numberRegisteredSlots = numberRegisteredSlotsFuture.get(); + + assertEquals(1, numberRegisteredSlots); + + ContainerStatus containerRegisterSuccessfullyStatus = mock(ContainerStatus.class); + when(containerRegisterSuccessfullyStatus.getExitStatus()).thenReturn(-1); + when(containerRegisterSuccessfullyStatus.getDiagnostics()).thenReturn("the mock diagnostics."); + when(containerRegisterSuccessfullyStatus.getContainerId()).thenReturn(containerRegisterSuccessfullyId); + + resourceManager.onContainersCompleted(ImmutableList.of(containerRegisterSuccessfullyStatus)); + + verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + }}; + } } From 3ea3a646317c48996ab8ebb3d32c47f91d0de52a Mon Sep 17 00:00:00 2001 From: sihuazhou Date: Mon, 23 Apr 2018 11:31:00 +0800 Subject: [PATCH 2/2] Gary's comments. --- .../org/apache/flink/yarn/YarnResourceManager.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index f1c35ed86038d..db0c170b26a28 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -289,8 +289,13 @@ public CompletableFuture registerTaskExecutor( hardwareDescription, timeout); - // ack the pending register - pendingContainersExpectedToRegister.remove(taskExecutorResourceId); + // ack the pending registration only when the registration is truly completed. + responseFuture.whenComplete((RegistrationResponse response, Throwable throwable) -> { + // ack the pending registration + if (response instanceof RegistrationResponse.Success) { + pendingContainersExpectedToRegister.remove(taskExecutorResourceId); + } + }); return responseFuture; } @@ -414,6 +419,9 @@ public void onContainersAllocated(List containers) { // remove the failed container pendingContainersExpectedToRegister.remove(resourceID); + // remove the resourceID from workerNodeMap eagerly + workerNodeMap.remove(resourceID); + // release the failed container resourceManagerClient.releaseAssignedContainer(container.getId()); // and ask for a new one