diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 6b9999c673e33..153ee53ad4800 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -94,7 +94,7 @@ protected Dispatcher( MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler, Optional restAddress) throws Exception { - super(rpcService, endpointId, DispatcherId.generate()); + super(rpcService, endpointId); this.configuration = Preconditions.checkNotNull(configuration); this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices); @@ -399,7 +399,8 @@ public void revokeLeadership() { log.warn("Could not properly clear the Dispatcher state while revoking leadership.", e); } - setFencingToken(DispatcherId.generate()); + // clear the fencing token indicating that we don't have the leadership right now + setFencingToken(null); }); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 2bfe27789bc31..343fbf6fd5a08 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -212,7 +212,7 @@ public JobMaster( FatalErrorHandler errorHandler, ClassLoader userCodeLoader) throws Exception { - super(rpcService, AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME), JobMasterId.INITIAL_JOB_MASTER_ID); + super(rpcService, AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME)); selfGateway = getSelfGateway(JobMasterGateway.class); @@ -735,7 +735,7 @@ private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Excepti return Acknowledge.get(); } - if (!Objects.equals(getFencingToken(), JobMasterId.INITIAL_JOB_MASTER_ID)) { + if (getFencingToken() != null) { log.info("Restarting old job with JobMasterId {}. The new JobMasterId is {}.", getFencingToken(), newJobMasterId); // first we have to suspend the current execution @@ -791,13 +791,13 @@ private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Excepti private Acknowledge suspendExecution(final Throwable cause) { validateRunsInMainThread(); - if (Objects.equals(JobMasterId.INITIAL_JOB_MASTER_ID, getFencingToken())) { + if (getFencingToken() == null) { log.debug("Job has already been suspended or shutdown."); return Acknowledge.get(); } - // not leader anymore --> set the JobMasterId to the initial id - setFencingToken(JobMasterId.INITIAL_JOB_MASTER_ID); + // not leader anymore --> set the JobMasterId to null + setFencingToken(null); try { resourceManagerLeaderRetriever.stop(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java index ffd53b31b6864..39f7dedd074e0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java @@ -29,8 +29,6 @@ public class JobMasterId extends AbstractID { private static final long serialVersionUID = -933276753644003754L; - public static final JobMasterId INITIAL_JOB_MASTER_ID = new JobMasterId(0L, 0L); - public JobMasterId(byte[] bytes) { super(bytes); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 87cf7d10fbcaa..f69998c2971ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -139,7 +139,7 @@ public ResourceManager( JobLeaderIdService jobLeaderIdService, FatalErrorHandler fatalErrorHandler) { - super(rpcService, resourceManagerEndpointId, ResourceManagerId.generate()); + super(rpcService, resourceManagerEndpointId); this.resourceId = checkNotNull(resourceId); this.resourceManagerConfiguration = checkNotNull(resourceManagerConfiguration); @@ -772,13 +772,11 @@ public void grantLeadership(final UUID newLeaderSessionID) { public void revokeLeadership() { runAsyncWithoutFencing( () -> { - final ResourceManagerId newResourceManagerId = ResourceManagerId.generate(); - - log.info("ResourceManager {} was revoked leadership. Setting fencing token to {}.", getAddress(), newResourceManagerId); + log.info("ResourceManager {} was revoked leadership. Clearing fencing token.", getAddress()); clearState(); - setFencingToken(newResourceManagerId); + setFencingToken(null); slotManager.suspend(); }); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java index 81bae2924e674..ff74f478d6ad7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java @@ -19,7 +19,8 @@ package org.apache.flink.runtime.rpc; import org.apache.flink.api.common.time.Time; -import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; import java.io.Serializable; import java.util.UUID; @@ -39,25 +40,26 @@ public class FencedRpcEndpoint extends RpcEndpoint { private volatile F fencingToken; private volatile MainThreadExecutor fencedMainThreadExecutor; - protected FencedRpcEndpoint(RpcService rpcService, String endpointId, F initialFencingToken) { + protected FencedRpcEndpoint(RpcService rpcService, String endpointId) { super(rpcService, endpointId); - this.fencingToken = Preconditions.checkNotNull(initialFencingToken); + // no fencing token == no leadership + this.fencingToken = null; this.fencedMainThreadExecutor = new MainThreadExecutor( getRpcService().fenceRpcServer( rpcServer, - initialFencingToken)); + null)); } - protected FencedRpcEndpoint(RpcService rpcService, F initialFencingToken) { - this(rpcService, UUID.randomUUID().toString(), initialFencingToken); + protected FencedRpcEndpoint(RpcService rpcService) { + this(rpcService, UUID.randomUUID().toString()); } public F getFencingToken() { return fencingToken; } - protected void setFencingToken(F newFencingToken) { + protected void setFencingToken(@Nullable F newFencingToken) { // this method should only be called from within the main thread validateRunsInMainThread(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java index 1ace3b79fdb23..369af6aa4d25d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java @@ -20,7 +20,7 @@ import org.apache.flink.runtime.rpc.FencedRpcEndpoint; import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException; -import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException; +import org.apache.flink.runtime.rpc.exceptions.FencingTokenException; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.messages.FencedMessage; import org.apache.flink.runtime.rpc.messages.UnfencedMessage; @@ -45,23 +45,36 @@ public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture terminationFutu @Override protected void handleMessage(Object message) { if (message instanceof FencedMessage) { - @SuppressWarnings("unchecked") - FencedMessage fencedMessage = ((FencedMessage) message); - F fencingToken = fencedMessage.getFencingToken(); + final F expectedFencingToken = rpcEndpoint.getFencingToken(); - if (Objects.equals(rpcEndpoint.getFencingToken(), fencingToken)) { - super.handleMessage(fencedMessage.getPayload()); - } else { + if (expectedFencingToken == null) { if (log.isDebugEnabled()) { - log.debug("Fencing token mismatch: Ignoring message {} because the fencing token {} did " + - "not match the expected fencing token {}.", message, fencingToken, rpcEndpoint.getFencingToken()); + log.debug("Fencing token not set: Ignoring message {} because the fencing token is null.", message); } sendErrorIfSender( - new FencingTokenMismatchException("Fencing token mismatch: Ignoring message " + message + - " because the fencing token " + fencingToken + " did not match the expected fencing token " + - rpcEndpoint.getFencingToken() + '.')); + new FencingTokenException( + "Fencing token not set: Ignoring message " + message + " because the fencing token is null.")); + } else { + @SuppressWarnings("unchecked") + FencedMessage fencedMessage = ((FencedMessage) message); + + F fencingToken = fencedMessage.getFencingToken(); + + if (Objects.equals(expectedFencingToken, fencingToken)) { + super.handleMessage(fencedMessage.getPayload()); + } else { + if (log.isDebugEnabled()) { + log.debug("Fencing token mismatch: Ignoring message {} because the fencing token {} did " + + "not match the expected fencing token {}.", message, fencingToken, expectedFencingToken); + } + + sendErrorIfSender( + new FencingTokenException("Fencing token mismatch: Ignoring message " + message + + " because the fencing token " + fencingToken + " did not match the expected fencing token " + + expectedFencingToken + '.')); + } } } else if (message instanceof UnfencedMessage) { super.handleMessage(((UnfencedMessage) message).getPayload()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenMismatchException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenException.java similarity index 82% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenMismatchException.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenException.java index 9a59101a0e813..71520c8ca0523 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenMismatchException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenException.java @@ -25,18 +25,18 @@ * Exception which is thrown if the fencing tokens of a {@link FencedRpcEndpoint} do * not match. */ -public class FencingTokenMismatchException extends RpcException { +public class FencingTokenException extends RpcException { private static final long serialVersionUID = -500634972988881467L; - public FencingTokenMismatchException(String message) { + public FencingTokenException(String message) { super(message); } - public FencingTokenMismatchException(String message, Throwable cause) { + public FencingTokenException(String message, Throwable cause) { super(message, cause); } - public FencingTokenMismatchException(Throwable cause) { + public FencingTokenException(Throwable cause) { super(cause); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java index 248106558d3b8..0ee4940670a6c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java @@ -20,6 +20,8 @@ import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; + import java.io.Serializable; /** @@ -34,8 +36,8 @@ public class LocalFencedMessage implements FencedMess private final F fencingToken; private final P payload; - public LocalFencedMessage(F fencingToken, P payload) { - this.fencingToken = Preconditions.checkNotNull(fencingToken); + public LocalFencedMessage(@Nullable F fencingToken, P payload) { + this.fencingToken = fencingToken; this.payload = Preconditions.checkNotNull(payload); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java index 5cf9b98d6202a..ad8c34978c127 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java @@ -20,6 +20,8 @@ import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; + import java.io.Serializable; /** @@ -35,8 +37,8 @@ public class RemoteFencedMessage private final F fencingToken; private final P payload; - public RemoteFencedMessage(F fencingToken, P payload) { - this.fencingToken = Preconditions.checkNotNull(fencingToken); + public RemoteFencedMessage(@Nullable F fencingToken, P payload) { + this.fencingToken = fencingToken; this.payload = Preconditions.checkNotNull(payload); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java index 2b8792b2eba6f..d0dd973687c86 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java @@ -108,7 +108,7 @@ public void revokeLeadership() { try { resourceManager.start(); - Assert.assertNotNull(resourceManager.getFencingToken()); + Assert.assertNull(resourceManager.getFencingToken()); final UUID leaderId = UUID.randomUUID(); leaderElectionService.isLeader(leaderId); // after grant leadership, resourceManager's leaderId has value diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index 156bc73ca3972..73c5b5c72e4ad 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -38,7 +38,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.registration.RegistrationResponse; -import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException; +import org.apache.flink.runtime.rpc.exceptions.FencingTokenException; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.ExceptionUtils; @@ -47,6 +47,7 @@ import org.junit.Before; import org.junit.Test; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -81,10 +82,14 @@ public void testRegisterJobMaster() throws Exception { JobMasterId jobMasterId = JobMasterId.generate(); final ResourceID jmResourceId = new ResourceID(jobMasterAddress); TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID()); + TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); - final ResourceManager resourceManager = createAndStartResourceManager(mock(LeaderElectionService.class), jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler); + final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler); final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); + // wait until the leader election has been completed + resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get(); + // test response successful CompletableFuture successfulFuture = rmGateway.registerJobManager( jobMasterId, @@ -127,7 +132,7 @@ public void testRegisterJobMasterWithUnmatchedLeaderSessionId1() throws Exceptio unMatchedLeaderFuture.get(5L, TimeUnit.SECONDS); fail("Should fail because we are using the wrong fencing token."); } catch (ExecutionException e) { - assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException); + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException); } if (testingFatalErrorHandler.hasExceptionOccurred()) { @@ -151,6 +156,9 @@ public void testRegisterJobMasterWithUnmatchedLeaderSessionId2() throws Exceptio final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); final ResourceID jmResourceId = new ResourceID(jobMasterAddress); + // wait until the leader election has been completed + resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get(); + // test throw exception when receive a registration from job master which takes unmatched leaderSessionId JobMasterId differentJobMasterId = JobMasterId.generate(); CompletableFuture unMatchedLeaderFuture = rmGateway.registerJobManager( @@ -182,6 +190,9 @@ public void testRegisterJobMasterFromInvalidAddress() throws Exception { final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); final ResourceID jmResourceId = new ResourceID(jobMasterAddress); + // wait until the leader election has been completed + resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get(); + // test throw exception when receive a registration from job master which takes invalid address String invalidAddress = "/jobMasterAddress2"; CompletableFuture invalidAddressFuture = rmGateway.registerJobManager( @@ -219,6 +230,9 @@ public void testRegisterJobMasterWithFailureLeaderListener() throws Exception { JobID unknownJobIDToHAServices = new JobID(); + // wait until the leader election has been completed + resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get(); + // this should fail because we try to register a job leader listener for an unknown job id CompletableFuture registrationFuture = rmGateway.registerJobManager( new JobMasterId(HighAvailabilityServices.DEFAULT_LEADER_ID), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index 8add1685dbea7..0206adebc744a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.registration.RegistrationResponse; -import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException; +import org.apache.flink.runtime.rpc.exceptions.FencingTokenException; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; @@ -134,7 +134,7 @@ public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Except unMatchedLeaderFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); fail("Should have failed because we are using a wrongly fenced ResourceManagerGateway."); } catch (ExecutionException e) { - assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException); + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException); } } finally { if (testingFatalErrorHandler.hasExceptionOccurred()) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java index f8eca1692c97f..9fe9904159eff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java @@ -25,7 +25,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; -import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException; +import org.apache.flink.runtime.rpc.exceptions.FencingTokenException; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; @@ -231,7 +231,7 @@ public void testCallAsyncWithFencing() throws Exception { fail("The async call operation should fail due to the changed fencing token."); } catch (ExecutionException e) { - assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException); + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException); } } @@ -346,10 +346,19 @@ protected FencedTestEndpoint( UUID initialFencingToken, OneShotLatch enteringSetNewFencingToken, OneShotLatch triggerSetNewFencingToken) { - super(rpcService, initialFencingToken); + super(rpcService); this.enteringSetNewFencingToken = enteringSetNewFencingToken; this.triggerSetNewFencingToken = triggerSetNewFencingToken; + + // make it look as if we are running in the main thread + currentMainThread.set(Thread.currentThread()); + + try { + setFencingToken(initialFencingToken); + } finally { + currentMainThread.set(null); + } } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java index 62d5354fd51be..6162a2d608e7c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java @@ -22,7 +22,7 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException; +import org.apache.flink.runtime.rpc.exceptions.FencingTokenException; import org.apache.flink.runtime.rpc.exceptions.RpcException; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; @@ -39,6 +39,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -67,17 +68,15 @@ public static void teardown() throws ExecutionException, InterruptedException, T */ @Test public void testFencingTokenSetting() throws Exception { - final UUID initialFencingToken = UUID.randomUUID(); final String value = "foobar"; - FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken, value); - FencedTestingGateway fencedTestingGateway = fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class); + FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, value); FencedTestingGateway fencedGateway = fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class); try { fencedTestingEndpoint.start(); - assertEquals(initialFencingToken, fencedGateway.getFencingToken()); - assertEquals(initialFencingToken, fencedTestingEndpoint.getFencingToken()); + assertNull(fencedGateway.getFencingToken()); + assertNull(fencedTestingEndpoint.getFencingToken()); final UUID newFencingToken = UUID.randomUUID(); @@ -88,9 +87,9 @@ public void testFencingTokenSetting() throws Exception { // expected to fail } - assertEquals(initialFencingToken, fencedTestingEndpoint.getFencingToken()); + assertNull(fencedTestingEndpoint.getFencingToken()); - CompletableFuture setFencingFuture = fencedTestingGateway.rpcSetFencingToken(newFencingToken, timeout); + CompletableFuture setFencingFuture = fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken, timeout); // wait for the completion of the set fencing token operation setFencingFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); @@ -109,15 +108,15 @@ public void testFencingTokenSetting() throws Exception { */ @Test public void testFencing() throws Exception { - final UUID initialFencingToken = UUID.randomUUID(); + final UUID fencingToken = UUID.randomUUID(); final UUID wrongFencingToken = UUID.randomUUID(); final String value = "barfoo"; - FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken, value); + FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, value, fencingToken); try { fencedTestingEndpoint.start(); - final FencedTestingGateway properFencedGateway = rpcService.connect(fencedTestingEndpoint.getAddress(), initialFencingToken, FencedTestingGateway.class) + final FencedTestingGateway properFencedGateway = rpcService.connect(fencedTestingEndpoint.getAddress(), fencingToken, FencedTestingGateway.class) .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); final FencedTestingGateway wronglyFencedGateway = rpcService.connect(fencedTestingEndpoint.getAddress(), wrongFencingToken, FencedTestingGateway.class) .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); @@ -128,12 +127,12 @@ public void testFencing() throws Exception { wronglyFencedGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); fail("This should fail since we have the wrong fencing token."); } catch (ExecutionException e) { - assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException); + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException); } final UUID newFencingToken = UUID.randomUUID(); - CompletableFuture newFencingTokenFuture = properFencedGateway.rpcSetFencingToken(newFencingToken, timeout); + CompletableFuture newFencingTokenFuture = fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken, timeout); // wait for the new fencing token to be set newFencingTokenFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); @@ -144,7 +143,7 @@ public void testFencing() throws Exception { fail("This should fail since we have the wrong fencing token by now."); } catch (ExecutionException e) { - assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException); + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException); } } finally { @@ -163,7 +162,7 @@ public void testRemoteAndSelfGateways() throws Exception { final UUID newFencingToken = UUID.randomUUID(); final String value = "foobar"; - final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken, value); + final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, value, initialFencingToken); try { fencedTestingEndpoint.start(); @@ -178,7 +177,7 @@ public void testRemoteAndSelfGateways() throws Exception { assertEquals(value, selfGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS)); assertEquals(value, remoteGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS)); - CompletableFuture newFencingTokenFuture = selfGateway.rpcSetFencingToken(newFencingToken, timeout); + CompletableFuture newFencingTokenFuture = fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken, timeout); // wait for the new fencing token to be set newFencingTokenFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); @@ -192,7 +191,7 @@ public void testRemoteAndSelfGateways() throws Exception { remoteGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); fail("This should have failed because we don't have the right fencing token."); } catch (ExecutionException e) { - assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException); + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException); } } finally { fencedTestingEndpoint.shutDown(); @@ -208,7 +207,7 @@ public void testMainThreadExecutorUnderChangingFencingToken() throws Exception { final Time shortTimeout = Time.milliseconds(100L); final UUID initialFencingToken = UUID.randomUUID(); final String value = "foobar"; - final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken, value); + final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, value, initialFencingToken); try { fencedTestingEndpoint.start(); @@ -221,7 +220,7 @@ public void testMainThreadExecutorUnderChangingFencingToken() throws Exception { // therefore, we know that the change fencing token call is executed after the trigger MainThreadExecutor // computation final UUID newFencingToken = UUID.randomUUID(); - CompletableFuture newFencingTokenFuture = selfGateway.rpcSetFencingToken(newFencingToken, timeout); + CompletableFuture newFencingTokenFuture = fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken, timeout); newFencingTokenFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); @@ -253,7 +252,7 @@ public void testUnfencedRemoteGateway() throws Exception { final UUID initialFencingToken = UUID.randomUUID(); final String value = "foobar"; - final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken, value); + final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, value, initialFencingToken); try { fencedTestingEndpoint.start(); @@ -283,8 +282,6 @@ public void testUnfencedRemoteGateway() throws Exception { public interface FencedTestingGateway extends FencedRpcGateway { CompletableFuture foobar(@RpcTimeout Time timeout); - CompletableFuture rpcSetFencingToken(UUID fencingToken, @RpcTimeout Time timeout); - CompletableFuture triggerMainThreadExecutorComputation(@RpcTimeout Time timeout); CompletableFuture triggerComputationLatch(@RpcTimeout Time timeout); @@ -296,12 +293,25 @@ private static class FencedTestingEndpoint extends FencedRpcEndpoint imple private final String value; - protected FencedTestingEndpoint(RpcService rpcService, UUID initialFencingToken, String value) { - super(rpcService, initialFencingToken); + protected FencedTestingEndpoint(RpcService rpcService, String value) { + this(rpcService, value, null); + } + + protected FencedTestingEndpoint(RpcService rpcService, String value, UUID initialFencingToken) { + super(rpcService); computationLatch = new OneShotLatch(); this.value = value; + + // make sure that it looks as if we are running in the main thread + currentMainThread.set(Thread.currentThread()); + + try { + setFencingToken(initialFencingToken); + } finally { + currentMainThread.set(null); + } } @Override @@ -309,13 +319,6 @@ public CompletableFuture foobar(Time timeout) { return CompletableFuture.completedFuture(value); } - @Override - public CompletableFuture rpcSetFencingToken(UUID fencingToken, Time timeout) { - setFencingToken(fencingToken); - - return CompletableFuture.completedFuture(Acknowledge.get()); - } - @Override public CompletableFuture triggerMainThreadExecutorComputation(Time timeout) { return CompletableFuture.supplyAsync( @@ -340,5 +343,15 @@ public CompletableFuture triggerComputationLatch(Time timeout) { return CompletableFuture.completedFuture(Acknowledge.get()); } + + public CompletableFuture setFencingTokenInMainThread(UUID fencingToken, Time timeout) { + return callAsyncWithoutFencing( + () -> { + setFencingToken(fencingToken); + + return Acknowledge.get(); + }, + timeout); + } } }