Skip to content

Commit

Permalink
[FLINK-7655] [flip6] Set fencing token to null if not leader
Browse files Browse the repository at this point in the history
This commit changes the fencing behaviour such that a component which is not the
leader will set its fencing token to null. This distinction allows to throw different
exceptions depending on whether it is a token mismatch or whether the receiver has
no fencing token set (== not being the leader).

This closes #4689.
  • Loading branch information
tillrohrmann committed Sep 21, 2017
1 parent a86b646 commit 42cc3a2
Show file tree
Hide file tree
Showing 14 changed files with 133 additions and 81 deletions.
Expand Up @@ -94,7 +94,7 @@ protected Dispatcher(
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler,
Optional<String> restAddress) throws Exception {
super(rpcService, endpointId, DispatcherId.generate());
super(rpcService, endpointId);

this.configuration = Preconditions.checkNotNull(configuration);
this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
Expand Down Expand Up @@ -399,7 +399,8 @@ public void revokeLeadership() {
log.warn("Could not properly clear the Dispatcher state while revoking leadership.", e);
}

setFencingToken(DispatcherId.generate());
// clear the fencing token indicating that we don't have the leadership right now
setFencingToken(null);
});
}

Expand Down
Expand Up @@ -212,7 +212,7 @@ public JobMaster(
FatalErrorHandler errorHandler,
ClassLoader userCodeLoader) throws Exception {

super(rpcService, AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME), JobMasterId.INITIAL_JOB_MASTER_ID);
super(rpcService, AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME));

selfGateway = getSelfGateway(JobMasterGateway.class);

Expand Down Expand Up @@ -735,7 +735,7 @@ private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Excepti
return Acknowledge.get();
}

if (!Objects.equals(getFencingToken(), JobMasterId.INITIAL_JOB_MASTER_ID)) {
if (getFencingToken() != null) {
log.info("Restarting old job with JobMasterId {}. The new JobMasterId is {}.", getFencingToken(), newJobMasterId);

// first we have to suspend the current execution
Expand Down Expand Up @@ -791,13 +791,13 @@ private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Excepti
private Acknowledge suspendExecution(final Throwable cause) {
validateRunsInMainThread();

if (Objects.equals(JobMasterId.INITIAL_JOB_MASTER_ID, getFencingToken())) {
if (getFencingToken() == null) {
log.debug("Job has already been suspended or shutdown.");
return Acknowledge.get();
}

// not leader anymore --> set the JobMasterId to the initial id
setFencingToken(JobMasterId.INITIAL_JOB_MASTER_ID);
// not leader anymore --> set the JobMasterId to null
setFencingToken(null);

try {
resourceManagerLeaderRetriever.stop();
Expand Down
Expand Up @@ -29,8 +29,6 @@ public class JobMasterId extends AbstractID {

private static final long serialVersionUID = -933276753644003754L;

public static final JobMasterId INITIAL_JOB_MASTER_ID = new JobMasterId(0L, 0L);

public JobMasterId(byte[] bytes) {
super(bytes);
}
Expand Down
Expand Up @@ -139,7 +139,7 @@ public ResourceManager(
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler) {

super(rpcService, resourceManagerEndpointId, ResourceManagerId.generate());
super(rpcService, resourceManagerEndpointId);

this.resourceId = checkNotNull(resourceId);
this.resourceManagerConfiguration = checkNotNull(resourceManagerConfiguration);
Expand Down Expand Up @@ -772,13 +772,11 @@ public void grantLeadership(final UUID newLeaderSessionID) {
public void revokeLeadership() {
runAsyncWithoutFencing(
() -> {
final ResourceManagerId newResourceManagerId = ResourceManagerId.generate();

log.info("ResourceManager {} was revoked leadership. Setting fencing token to {}.", getAddress(), newResourceManagerId);
log.info("ResourceManager {} was revoked leadership. Clearing fencing token.", getAddress());

clearState();

setFencingToken(newResourceManagerId);
setFencingToken(null);

slotManager.suspend();
});
Expand Down
Expand Up @@ -19,7 +19,8 @@
package org.apache.flink.runtime.rpc;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.UUID;
Expand All @@ -39,25 +40,26 @@ public class FencedRpcEndpoint<F extends Serializable> extends RpcEndpoint {
private volatile F fencingToken;
private volatile MainThreadExecutor fencedMainThreadExecutor;

protected FencedRpcEndpoint(RpcService rpcService, String endpointId, F initialFencingToken) {
protected FencedRpcEndpoint(RpcService rpcService, String endpointId) {
super(rpcService, endpointId);

this.fencingToken = Preconditions.checkNotNull(initialFencingToken);
// no fencing token == no leadership
this.fencingToken = null;
this.fencedMainThreadExecutor = new MainThreadExecutor(
getRpcService().fenceRpcServer(
rpcServer,
initialFencingToken));
null));
}

protected FencedRpcEndpoint(RpcService rpcService, F initialFencingToken) {
this(rpcService, UUID.randomUUID().toString(), initialFencingToken);
protected FencedRpcEndpoint(RpcService rpcService) {
this(rpcService, UUID.randomUUID().toString());
}

public F getFencingToken() {
return fencingToken;
}

protected void setFencingToken(F newFencingToken) {
protected void setFencingToken(@Nullable F newFencingToken) {
// this method should only be called from within the main thread
validateRunsInMainThread();

Expand Down
Expand Up @@ -20,7 +20,7 @@

import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.messages.FencedMessage;
import org.apache.flink.runtime.rpc.messages.UnfencedMessage;
Expand All @@ -45,23 +45,36 @@ public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture<Void> terminationFutu
@Override
protected void handleMessage(Object message) {
if (message instanceof FencedMessage) {
@SuppressWarnings("unchecked")
FencedMessage<F, ?> fencedMessage = ((FencedMessage<F, ?>) message);

F fencingToken = fencedMessage.getFencingToken();
final F expectedFencingToken = rpcEndpoint.getFencingToken();

if (Objects.equals(rpcEndpoint.getFencingToken(), fencingToken)) {
super.handleMessage(fencedMessage.getPayload());
} else {
if (expectedFencingToken == null) {
if (log.isDebugEnabled()) {
log.debug("Fencing token mismatch: Ignoring message {} because the fencing token {} did " +
"not match the expected fencing token {}.", message, fencingToken, rpcEndpoint.getFencingToken());
log.debug("Fencing token not set: Ignoring message {} because the fencing token is null.", message);
}

sendErrorIfSender(
new FencingTokenMismatchException("Fencing token mismatch: Ignoring message " + message +
" because the fencing token " + fencingToken + " did not match the expected fencing token " +
rpcEndpoint.getFencingToken() + '.'));
new FencingTokenException(
"Fencing token not set: Ignoring message " + message + " because the fencing token is null."));
} else {
@SuppressWarnings("unchecked")
FencedMessage<F, ?> fencedMessage = ((FencedMessage<F, ?>) message);

F fencingToken = fencedMessage.getFencingToken();

if (Objects.equals(expectedFencingToken, fencingToken)) {
super.handleMessage(fencedMessage.getPayload());
} else {
if (log.isDebugEnabled()) {
log.debug("Fencing token mismatch: Ignoring message {} because the fencing token {} did " +
"not match the expected fencing token {}.", message, fencingToken, expectedFencingToken);
}

sendErrorIfSender(
new FencingTokenException("Fencing token mismatch: Ignoring message " + message +
" because the fencing token " + fencingToken + " did not match the expected fencing token " +
expectedFencingToken + '.'));
}
}
} else if (message instanceof UnfencedMessage) {
super.handleMessage(((UnfencedMessage<?>) message).getPayload());
Expand Down
Expand Up @@ -25,18 +25,18 @@
* Exception which is thrown if the fencing tokens of a {@link FencedRpcEndpoint} do
* not match.
*/
public class FencingTokenMismatchException extends RpcException {
public class FencingTokenException extends RpcException {
private static final long serialVersionUID = -500634972988881467L;

public FencingTokenMismatchException(String message) {
public FencingTokenException(String message) {
super(message);
}

public FencingTokenMismatchException(String message, Throwable cause) {
public FencingTokenException(String message, Throwable cause) {
super(message, cause);
}

public FencingTokenMismatchException(Throwable cause) {
public FencingTokenException(Throwable cause) {
super(cause);
}
}
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.io.Serializable;

/**
Expand All @@ -34,8 +36,8 @@ public class LocalFencedMessage<F extends Serializable, P> implements FencedMess
private final F fencingToken;
private final P payload;

public LocalFencedMessage(F fencingToken, P payload) {
this.fencingToken = Preconditions.checkNotNull(fencingToken);
public LocalFencedMessage(@Nullable F fencingToken, P payload) {
this.fencingToken = fencingToken;
this.payload = Preconditions.checkNotNull(payload);
}

Expand Down
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.io.Serializable;

/**
Expand All @@ -35,8 +37,8 @@ public class RemoteFencedMessage<F extends Serializable, P extends Serializable>
private final F fencingToken;
private final P payload;

public RemoteFencedMessage(F fencingToken, P payload) {
this.fencingToken = Preconditions.checkNotNull(fencingToken);
public RemoteFencedMessage(@Nullable F fencingToken, P payload) {
this.fencingToken = fencingToken;
this.payload = Preconditions.checkNotNull(payload);
}

Expand Down
Expand Up @@ -108,7 +108,7 @@ public void revokeLeadership() {
try {
resourceManager.start();

Assert.assertNotNull(resourceManager.getFencingToken());
Assert.assertNull(resourceManager.getFencingToken());
final UUID leaderId = UUID.randomUUID();
leaderElectionService.isLeader(leaderId);
// after grant leadership, resourceManager's leaderId has value
Expand Down
Expand Up @@ -38,7 +38,7 @@
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
Expand All @@ -47,6 +47,7 @@
import org.junit.Before;
import org.junit.Test;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -81,10 +82,14 @@ public void testRegisterJobMaster() throws Exception {
JobMasterId jobMasterId = JobMasterId.generate();
final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID());
TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
final ResourceManager<?> resourceManager = createAndStartResourceManager(mock(LeaderElectionService.class), jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);

// wait until the leader election has been completed
resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();

// test response successful
CompletableFuture<RegistrationResponse> successfulFuture = rmGateway.registerJobManager(
jobMasterId,
Expand Down Expand Up @@ -127,7 +132,7 @@ public void testRegisterJobMasterWithUnmatchedLeaderSessionId1() throws Exceptio
unMatchedLeaderFuture.get(5L, TimeUnit.SECONDS);
fail("Should fail because we are using the wrong fencing token.");
} catch (ExecutionException e) {
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException);
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
}

if (testingFatalErrorHandler.hasExceptionOccurred()) {
Expand All @@ -151,6 +156,9 @@ public void testRegisterJobMasterWithUnmatchedLeaderSessionId2() throws Exceptio
final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
final ResourceID jmResourceId = new ResourceID(jobMasterAddress);

// wait until the leader election has been completed
resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();

// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
JobMasterId differentJobMasterId = JobMasterId.generate();
CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = rmGateway.registerJobManager(
Expand Down Expand Up @@ -182,6 +190,9 @@ public void testRegisterJobMasterFromInvalidAddress() throws Exception {
final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
final ResourceID jmResourceId = new ResourceID(jobMasterAddress);

// wait until the leader election has been completed
resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();

// test throw exception when receive a registration from job master which takes invalid address
String invalidAddress = "/jobMasterAddress2";
CompletableFuture<RegistrationResponse> invalidAddressFuture = rmGateway.registerJobManager(
Expand Down Expand Up @@ -219,6 +230,9 @@ public void testRegisterJobMasterWithFailureLeaderListener() throws Exception {

JobID unknownJobIDToHAServices = new JobID();

// wait until the leader election has been completed
resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();

// this should fail because we try to register a job leader listener for an unknown job id
CompletableFuture<RegistrationResponse> registrationFuture = rmGateway.registerJobManager(
new JobMasterId(HighAvailabilityServices.DEFAULT_LEADER_ID),
Expand Down
Expand Up @@ -30,7 +30,7 @@
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
Expand Down Expand Up @@ -134,7 +134,7 @@ public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Except
unMatchedLeaderFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
fail("Should have failed because we are using a wrongly fenced ResourceManagerGateway.");
} catch (ExecutionException e) {
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException);
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
}
} finally {
if (testingFatalErrorHandler.hasExceptionOccurred()) {
Expand Down
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -231,7 +231,7 @@ public void testCallAsyncWithFencing() throws Exception {

fail("The async call operation should fail due to the changed fencing token.");
} catch (ExecutionException e) {
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException);
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
}
}

Expand Down Expand Up @@ -346,10 +346,19 @@ protected FencedTestEndpoint(
UUID initialFencingToken,
OneShotLatch enteringSetNewFencingToken,
OneShotLatch triggerSetNewFencingToken) {
super(rpcService, initialFencingToken);
super(rpcService);

this.enteringSetNewFencingToken = enteringSetNewFencingToken;
this.triggerSetNewFencingToken = triggerSetNewFencingToken;

// make it look as if we are running in the main thread
currentMainThread.set(Thread.currentThread());

try {
setFencingToken(initialFencingToken);
} finally {
currentMainThread.set(null);
}
}

@Override
Expand Down

0 comments on commit 42cc3a2

Please sign in to comment.