From 3c4d8452021ea3e84d5f78eeb1b24dcc04adf865 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 9 Jan 2018 20:37:08 +0100 Subject: [PATCH] [FLINK-8393] [flip6] Reconnect to last known JobMaster when connection is lost In case of a heartbeat timeout or a disconnect call, the TaskExecutor tries to reconnect to the last known JobMaster location. This closes #5267. --- .../registration/RegisteredRpcConnection.java | 101 ++++++++++++++---- .../taskexecutor/JobLeaderService.java | 45 +++++++- .../runtime/taskexecutor/TaskExecutor.java | 27 ++++- .../taskexecutor/slot/TaskSlotTable.java | 13 ++- .../RegisteredRpcConnectionTest.java | 79 +++++++++++--- 5 files changed, 219 insertions(+), 46 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java index c76bcf8f5fa01..7d2c35a7a4b90 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java @@ -27,6 +27,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -46,6 +47,11 @@ */ public abstract class RegisteredRpcConnection { + private static final AtomicReferenceFieldUpdater REGISTRATION_UPDATER = AtomicReferenceFieldUpdater.newUpdater( + RegisteredRpcConnection.class, + RetryingRegistration.class, + "pendingRegistration"); + /** The logger for all log messages of this class. */ protected final Logger log; @@ -59,7 +65,7 @@ public abstract class RegisteredRpcConnection pendingRegistration; + private volatile RetryingRegistration pendingRegistration; /** The gateway to register, it's null until the registration is completed. */ private volatile G targetGateway; @@ -85,27 +91,47 @@ public void start() { checkState(!closed, "The RPC connection is already closed"); checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started"); - pendingRegistration = checkNotNull(generateRegistration()); - pendingRegistration.startRegistration(); + final RetryingRegistration newRegistration = createNewRegistration(); - CompletableFuture> future = pendingRegistration.getFuture(); + if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) { + newRegistration.startRegistration(); + } else { + // concurrent start operation + newRegistration.cancel(); + } + } - future.whenCompleteAsync( - (Tuple2 result, Throwable failure) -> { - if (failure != null) { - if (failure instanceof CancellationException) { - // we ignore cancellation exceptions because they originate from cancelling - // the RetryingRegistration - log.debug("Retrying registration towards {} was cancelled.", targetAddress); - } else { - // this future should only ever fail if there is a bug, not if the registration is declined - onRegistrationFailure(failure); - } - } else { - targetGateway = result.f0; - onRegistrationSuccess(result.f1); - } - }, executor); + public boolean tryReconnect() { + checkState(isConnected(), "Cannot reconnect to an unknown destination."); + + if (closed) { + return false; + } else { + final RetryingRegistration currentPendingRegistration = pendingRegistration; + + if (currentPendingRegistration != null) { + currentPendingRegistration.cancel(); + } + + final RetryingRegistration newRegistration = createNewRegistration(); + + if (REGISTRATION_UPDATER.compareAndSet(this, currentPendingRegistration, newRegistration)) { + newRegistration.startRegistration(); + } else { + // concurrent modification + newRegistration.cancel(); + return false; + } + + // double check for concurrent close operations + if (closed) { + newRegistration.cancel(); + + return false; + } else { + return true; + } + } } /** @@ -175,13 +201,42 @@ public String toString() { } if (isClosed()) { - connectionInfo = connectionInfo + " is closed"; + connectionInfo += " is closed"; } else if (isConnected()){ - connectionInfo = connectionInfo + " is established"; + connectionInfo += " is established"; } else { - connectionInfo = connectionInfo + " is connecting"; + connectionInfo += " is connecting"; } return connectionInfo; } + + // ------------------------------------------------------------------------ + // Internal methods + // ------------------------------------------------------------------------ + + private RetryingRegistration createNewRegistration() { + RetryingRegistration newRegistration = checkNotNull(generateRegistration()); + + CompletableFuture> future = newRegistration.getFuture(); + + future.whenCompleteAsync( + (Tuple2 result, Throwable failure) -> { + if (failure != null) { + if (failure instanceof CancellationException) { + // we ignore cancellation exceptions because they originate from cancelling + // the RetryingRegistration + log.debug("Retrying registration towards {} was cancelled.", targetAddress); + } else { + // this future should only ever fail if there is a bug, not if the registration is declined + onRegistrationFailure(failure); + } + } else { + targetGateway = result.f0; + onRegistrationSuccess(result.f1); + } + }, executor); + + return newRegistration; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java index 77737e19ba208..3b4da4ef5ce7c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java @@ -204,6 +204,23 @@ public void addJob(final JobID jobId, final String defaultTargetAddress) throws jobLeaderServices.put(jobId, Tuple2.of(leaderRetrievalService, jobManagerLeaderListener)); } + /** + * Triggers reconnection to the last known leader of the given job. + * + * @param jobId specifying the job for which to trigger reconnection + */ + public void reconnect(final JobID jobId) { + Preconditions.checkNotNull(jobId, "JobID must not be null."); + + final Tuple2 jobLeaderService = jobLeaderServices.get(jobId); + + if (jobLeaderService != null) { + jobLeaderService.f1.reconnect(); + } else { + LOG.info("Cannot reconnect to job {} because it is not registered.", jobId); + } + } + /** * Leader listener which tries to establish a connection to a newly detected job leader. */ @@ -213,7 +230,7 @@ private final class JobManagerLeaderListener implements LeaderRetrievalListener private final JobID jobId; /** Rpc connection to the job leader. */ - private RegisteredRpcConnection rpcConnection; + private volatile RegisteredRpcConnection rpcConnection; /** State of the listener. */ private volatile boolean stopped; @@ -237,6 +254,32 @@ public void stop() { } } + public void reconnect() { + if (stopped) { + LOG.debug("Cannot reconnect because the JobManagerLeaderListener has already been stopped."); + } else { + final RegisteredRpcConnection currentRpcConnection = rpcConnection; + + if (currentRpcConnection != null) { + if (currentRpcConnection.isConnected()) { + + if (currentRpcConnection.tryReconnect()) { + // double check for concurrent stop operation + if (stopped) { + currentRpcConnection.close(); + } + } else { + LOG.debug("Could not reconnect to the JobMaster {}.", currentRpcConnection.getTargetAddress()); + } + } else { + LOG.debug("Ongoing registration to JobMaster {}.", currentRpcConnection.getTargetAddress()); + } + } else { + LOG.debug("Cannot reconnect to an unknown JobMaster."); + } + } + } + @Override public void notifyLeaderAddress(final String leaderAddress, final UUID leaderId) { if (stopped) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 55774720b1b50..3c7d1cb2e155a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -687,6 +687,7 @@ public CompletableFuture freeSlot(AllocationID allocationId, Throwa @Override public void disconnectJobManager(JobID jobId, Exception cause) { closeJobManagerConnection(jobId, cause); + jobLeaderService.reconnect(jobId); } @Override @@ -1079,16 +1080,34 @@ private void freeSlotInternal(AllocationID allocationId, Throwable cause) { Preconditions.checkNotNull(allocationId); try { - int freedSlotIndex = taskSlotTable.freeSlot(allocationId, cause); + TaskSlot taskSlot = taskSlotTable.freeSlot(allocationId, cause); - if (freedSlotIndex != -1 && isConnectedToResourceManager()) { + if (taskSlot != null && isConnectedToResourceManager()) { // the slot was freed. Tell the RM about it ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway(); resourceManagerGateway.notifySlotAvailable( resourceManagerConnection.getRegistrationId(), - new SlotID(getResourceID(), freedSlotIndex), + new SlotID(getResourceID(), taskSlot.getIndex()), allocationId); + + // check whether we still have allocated slots for the same job + final JobID jobId = taskSlot.getJobId(); + final Iterator tasks = taskSlotTable.getTasks(jobId); + + if (!tasks.hasNext()) { + // we can remove the job from the job leader service + try { + jobLeaderService.removeJob(jobId); + } catch (Exception e) { + log.info("Could not remove job {} from JobLeaderService.", jobId, e); + } + + closeJobManagerConnection( + jobId, + new FlinkException("TaskExecutor " + getAddress() + + " has no more allocated slots for job " + jobId + '.')); + } } } catch (SlotNotFoundException e) { log.debug("Could not free slot for allocation id {}.", allocationId, e); @@ -1295,6 +1314,8 @@ public void notifyHeartbeatTimeout(final ResourceID resourceID) { closeJobManagerConnection( jobManagerConnection.getJobID(), new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out.")); + + jobLeaderService.reconnect(jobManagerConnection.getJobID()); } } }); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java index 62101e7318a9f..ab62a86f89c96 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java @@ -33,6 +33,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -266,7 +268,7 @@ public boolean markSlotInactive(AllocationID allocationId, Time slotTimeout) thr * @throws SlotNotFoundException if there is not task slot for the given allocation id * @return Index of the freed slot if the slot could be freed; otherwise -1 */ - public int freeSlot(AllocationID allocationId) throws SlotNotFoundException { + public TaskSlot freeSlot(AllocationID allocationId) throws SlotNotFoundException { return freeSlot(allocationId, new Exception("The task slot of this task is being freed.")); } @@ -278,9 +280,10 @@ public int freeSlot(AllocationID allocationId) throws SlotNotFoundException { * @param allocationId identifying the task slot to be freed * @param cause to fail the tasks with if slot is not empty * @throws SlotNotFoundException if there is not task slot for the given allocation id - * @return Index of the freed slot if the slot could be freed; otherwise -1 + * @return The freed TaskSlot. If the TaskSlot cannot be freed then null. */ - public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException { + @Nullable + public TaskSlot freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException { checkInit(); TaskSlot taskSlot = getTaskSlot(allocationId); @@ -314,7 +317,7 @@ public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFo slotsPerJob.remove(jobId); } - return taskSlot.getIndex(); + return taskSlot; } else { // we couldn't free the task slot because it still contains task, fail the tasks // and set the slot state to releasing so that it gets eventually freed @@ -326,7 +329,7 @@ public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFo taskIterator.next().failExternally(cause); } - return -1; + return null; } } else { throw new SlotNotFoundException(allocationId); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java index 19a57563c3815..650a0f2112b9f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java @@ -27,12 +27,15 @@ import org.slf4j.LoggerFactory; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; @@ -60,14 +63,14 @@ public void testSuccessfulRpcConnection() throws Exception { connection.start(); //wait for connection established - Thread.sleep(RetryingRegistrationTest.TestRetryingRegistration.MAX_TIMEOUT); + final String actualConnectionId = connection.getConnectionFuture().get(); // validate correct invocation and result assertTrue(connection.isConnected()); assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress()); assertEquals(leaderId, connection.getTargetLeaderId()); assertEquals(testGateway, connection.getTargetGateway()); - assertEquals(connectionID, connection.getConnectionId()); + assertEquals(connectionID, actualConnectionId); } finally { testGateway.stop(); @@ -86,8 +89,9 @@ public void testRpcConnectionFailures() throws Exception { try { // gateway that upon calls Throw an exception TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); + final RuntimeException registrationException = new RuntimeException(connectionFailureMessage); when(testGateway.registrationCall(any(UUID.class), anyLong())).thenThrow( - new RuntimeException(connectionFailureMessage)); + registrationException); rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway); @@ -95,14 +99,18 @@ public void testRpcConnectionFailures() throws Exception { connection.start(); //wait for connection failure - Thread.sleep(RetryingRegistrationTest.TestRetryingRegistration.MAX_TIMEOUT); + try { + connection.getConnectionFuture().get(); + fail("expected failure."); + } catch (ExecutionException ee) { + assertEquals(registrationException, ee.getCause()); + } // validate correct invocation and result assertFalse(connection.isConnected()); assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress()); assertEquals(leaderId, connection.getTargetLeaderId()); assertNull(connection.getTargetGateway()); - assertEquals(connectionFailureMessage, connection.getFailareMessage()); } finally { rpcService.stopService(); @@ -137,21 +145,53 @@ public void testRpcConnectionClose() throws Exception { } } + @Test + public void testReconnect() throws Exception { + final String connectionId1 = "Test RPC Connection ID 1"; + final String connectionId2 = "Test RPC Connection ID 2"; + final TestingRpcService rpcService = new TestingRpcService(); + final String testRpcConnectionEndpointAddress = ""; + final UUID leaderId = UUID.randomUUID(); + final TestRegistrationGateway testGateway = new TestRegistrationGateway( + new RetryingRegistrationTest.TestRegistrationSuccess(connectionId1), + new RetryingRegistrationTest.TestRegistrationSuccess(connectionId2)); + + try { + rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway); + + TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService); + connection.start(); + + final String actualConnectionId1 = connection.getConnectionFuture().get(); + + assertEquals(actualConnectionId1, connectionId1); + + assertTrue(connection.tryReconnect()); + + final String actualConnectionId2 = connection.getConnectionFuture().get(); + + assertEquals(actualConnectionId2, connectionId2); + } finally { + rpcService.stopService(); + } + } + // ------------------------------------------------------------------------ // test RegisteredRpcConnection // ------------------------------------------------------------------------ private static class TestRpcConnection extends RegisteredRpcConnection { - private final RpcService rpcService; + private final Object lock = new Object(); - private String connectionId; + private final RpcService rpcService; - private String failureMessage; + private CompletableFuture connectionFuture; public TestRpcConnection(String targetAddress, UUID targetLeaderId, Executor executor, RpcService rpcService) { super(LoggerFactory.getLogger(RegisteredRpcConnectionTest.class), targetAddress, targetLeaderId, executor); this.rpcService = rpcService; + this.connectionFuture = new CompletableFuture<>(); } @Override @@ -161,20 +201,31 @@ protected RetryingRegistration(); + } + return super.tryReconnect(); } - public String getFailareMessage() { - return failureMessage; + public CompletableFuture getConnectionFuture() { + synchronized (lock) { + return connectionFuture; + } } } }