Skip to content

Commit

Permalink
[FLINK-9128] [flip6] Add support for scheduleRunAsync for FencedRpcEn…
Browse files Browse the repository at this point in the history
…dpoints

Wrap self messages in the FencedRpcEndpoints in a LocalFencedMessage to not drop them
due to a missing fencing token.

This closes #5812.
  • Loading branch information
tillrohrmann committed Apr 4, 2018
1 parent 6ffe22d commit 826d51d
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 66 deletions.
Expand Up @@ -303,7 +303,9 @@ private void handleRunAsync(RunAsync runAsync) {
FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS); FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS);
RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun); RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun);


getContext().system().scheduler().scheduleOnce(delay, getSelf(), message, final Object envelopedSelfMessage = envelopeSelfMessage(message);

getContext().system().scheduler().scheduleOnce(delay, getSelf(), envelopedSelfMessage,
getContext().dispatcher(), ActorRef.noSender()); getContext().dispatcher(), ActorRef.noSender());
} }
} }
Expand Down Expand Up @@ -332,4 +334,14 @@ protected void sendErrorIfSender(Throwable throwable) {
getSender().tell(new Status.Failure(throwable), getSelf()); getSender().tell(new Status.Failure(throwable), getSelf());
} }
} }

/**
* Hook to envelope self messages.
*
* @param message to envelope
* @return enveloped message
*/
protected Object envelopeSelfMessage(Object message) {
return message;
}
} }
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException; import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException; import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.rpc.messages.FencedMessage; import org.apache.flink.runtime.rpc.messages.FencedMessage;
import org.apache.flink.runtime.rpc.messages.LocalFencedMessage;
import org.apache.flink.runtime.rpc.messages.UnfencedMessage; import org.apache.flink.runtime.rpc.messages.UnfencedMessage;


import java.io.Serializable; import java.io.Serializable;
Expand Down Expand Up @@ -92,4 +93,11 @@ protected void handleMessage(Object message) {
FencedMessage.class.getSimpleName() + " nor " + UnfencedMessage.class.getSimpleName() + '.')); FencedMessage.class.getSimpleName() + " nor " + UnfencedMessage.class.getSimpleName() + '.'));
} }
} }

@Override
protected Object envelopeSelfMessage(Object message) {
final F fencingToken = rpcEndpoint.getFencingToken();

return new LocalFencedMessage<>(fencingToken, message);
}
} }
Expand Up @@ -83,106 +83,123 @@ public static void shutdown() throws InterruptedException, ExecutionException, T


@Test @Test
public void testScheduleWithNoDelay() throws Exception { public void testScheduleWithNoDelay() throws Exception {
runScheduleWithNoDelayTest(TestEndpoint::new);
}


@Test
public void testFencedScheduleWithNoDelay() throws Exception {
runScheduleWithNoDelayTest(FencedTestEndpoint::new);
}

private void runScheduleWithNoDelayTest(RpcEndpointFactory factory) throws Exception {
// to collect all the thread references // to collect all the thread references
final ReentrantLock lock = new ReentrantLock(); final ReentrantLock lock = new ReentrantLock();
final AtomicBoolean concurrentAccess = new AtomicBoolean(false); final AtomicBoolean concurrentAccess = new AtomicBoolean(false);


TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock); RpcEndpoint rpcEndpoint = factory.create(akkaRpcService, lock, concurrentAccess);
testEndpoint.start(); rpcEndpoint.start();
TestGateway gateway = testEndpoint.getSelfGateway(TestGateway.class);


// a bunch of gateway calls try {
gateway.someCall(); TestGateway gateway = rpcEndpoint.getSelfGateway(TestGateway.class);
gateway.anotherCall();
gateway.someCall();


// run something asynchronously // a bunch of gateway calls
for (int i = 0; i < 10000; i++) { gateway.someCall();
testEndpoint.runAsync(new Runnable() { gateway.anotherCall();
@Override gateway.someCall();
public void run() {
// run something asynchronously
for (int i = 0; i < 10000; i++) {
rpcEndpoint.runAsync(() -> {
boolean holdsLock = lock.tryLock(); boolean holdsLock = lock.tryLock();
if (holdsLock) { if (holdsLock) {
lock.unlock(); lock.unlock();
} else { } else {
concurrentAccess.set(true); concurrentAccess.set(true);
} }
} });
}); }
}

CompletableFuture<String> result = testEndpoint.callAsync(
() -> {
boolean holdsLock = lock.tryLock();
if (holdsLock) {
lock.unlock();
} else {
concurrentAccess.set(true);
}
return "test";
},
Time.seconds(30L));


String str = result.get(30, TimeUnit.SECONDS); CompletableFuture<String> result = rpcEndpoint.callAsync(
assertEquals("test", str); () -> {
boolean holdsLock = lock.tryLock();
if (holdsLock) {
lock.unlock();
} else {
concurrentAccess.set(true);
}
return "test";
},
Time.seconds(30L));


// validate that no concurrent access happened String str = result.get(30, TimeUnit.SECONDS);
assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess()); assertEquals("test", str);
assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());


testEndpoint.shutDown(); // validate that no concurrent access happened
assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
} finally {
rpcEndpoint.shutDown();
}
} }


@Test @Test
public void testScheduleWithDelay() throws Exception { public void testScheduleWithDelay() throws Exception {
runScheduleWithDelayTest(TestEndpoint::new);
}


@Test
public void testFencedScheduleWithDelay() throws Exception {
runScheduleWithDelayTest(FencedTestEndpoint::new);
}

private void runScheduleWithDelayTest(RpcEndpointFactory factory) throws Exception {
// to collect all the thread references // to collect all the thread references
final ReentrantLock lock = new ReentrantLock(); final ReentrantLock lock = new ReentrantLock();
final AtomicBoolean concurrentAccess = new AtomicBoolean(false); final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
final OneShotLatch latch = new OneShotLatch(); final OneShotLatch latch = new OneShotLatch();


final long delay = 100; final long delay = 10L;


TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock); RpcEndpoint rpcEndpoint = factory.create(akkaRpcService, lock, concurrentAccess);
testEndpoint.start(); rpcEndpoint.start();


// run something asynchronously try {
testEndpoint.runAsync(new Runnable() { // run something asynchronously
@Override rpcEndpoint.runAsync(() -> {
public void run() {
boolean holdsLock = lock.tryLock(); boolean holdsLock = lock.tryLock();
if (holdsLock) { if (holdsLock) {
lock.unlock(); lock.unlock();
} else { } else {
concurrentAccess.set(true); concurrentAccess.set(true);
} }
} });
});


final long start = System.nanoTime(); final long start = System.nanoTime();


testEndpoint.scheduleRunAsync(new Runnable() { rpcEndpoint.scheduleRunAsync(() -> {
@Override
public void run() {
boolean holdsLock = lock.tryLock(); boolean holdsLock = lock.tryLock();
if (holdsLock) { if (holdsLock) {
lock.unlock(); lock.unlock();
} else { } else {
concurrentAccess.set(true); concurrentAccess.set(true);
} }
latch.trigger(); latch.trigger();
} }, delay, TimeUnit.MILLISECONDS);
}, delay, TimeUnit.MILLISECONDS);


latch.await(); latch.await();
final long stop = System.nanoTime(); final long stop = System.nanoTime();


// validate that no concurrent access happened // validate that no concurrent access happened
assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess()); assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
assertTrue("call was not properly delayed", ((stop - start) / 1_000_000) >= delay);
} finally {
RpcUtils.terminateRpcEndpoint(rpcEndpoint, timeout);
}
}


assertTrue("call was not properly delayed", ((stop - start) / 1_000_000) >= delay); @FunctionalInterface
private interface RpcEndpointFactory {
RpcEndpoint create(RpcService rpcService, ReentrantLock lock, AtomicBoolean concurrentAccess);
} }


/** /**
Expand Down Expand Up @@ -321,16 +338,16 @@ public interface TestGateway extends RpcGateway {
void anotherCall(); void anotherCall();
} }


@SuppressWarnings("unused") private static class TestEndpoint extends RpcEndpoint implements TestGateway {
public static class TestEndpoint extends RpcEndpoint implements TestGateway {


private final ReentrantLock lock; private final ReentrantLock lock;


private volatile boolean concurrentAccess; private final AtomicBoolean concurrentAccess;


public TestEndpoint(RpcService rpcService, ReentrantLock lock) { TestEndpoint(RpcService rpcService, ReentrantLock lock, AtomicBoolean concurrentAccess) {
super(rpcService); super(rpcService);
this.lock = lock; this.lock = lock;
this.concurrentAccess = concurrentAccess;
} }


@Override @Override
Expand All @@ -339,7 +356,7 @@ public void someCall() {
if (holdsLock) { if (holdsLock) {
lock.unlock(); lock.unlock();
} else { } else {
concurrentAccess = true; concurrentAccess.set(true);
} }
} }


Expand All @@ -349,36 +366,67 @@ public void anotherCall() {
if (holdsLock) { if (holdsLock) {
lock.unlock(); lock.unlock();
} else { } else {
concurrentAccess = true; concurrentAccess.set(true);
} }
} }


public boolean hasConcurrentAccess() {
return concurrentAccess;
}

@Override @Override
public CompletableFuture<Void> postStop() { public CompletableFuture<Void> postStop() {
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
} }


public interface FencedTestGateway extends FencedRpcGateway<UUID> { public interface FencedTestGateway extends FencedRpcGateway<UUID>, TestGateway {
CompletableFuture<Acknowledge> setNewFencingToken(UUID fencingToken, @RpcTimeout Time timeout); CompletableFuture<Acknowledge> setNewFencingToken(UUID fencingToken, @RpcTimeout Time timeout);
} }


public static class FencedTestEndpoint extends FencedRpcEndpoint<UUID> implements FencedTestGateway { public static class FencedTestEndpoint extends FencedRpcEndpoint<UUID> implements FencedTestGateway {


private final ReentrantLock lock;
private final AtomicBoolean concurrentAccess;

private final OneShotLatch enteringSetNewFencingToken; private final OneShotLatch enteringSetNewFencingToken;
private final OneShotLatch triggerSetNewFencingToken; private final OneShotLatch triggerSetNewFencingToken;


protected FencedTestEndpoint(
RpcService rpcService,
ReentrantLock lock,
AtomicBoolean concurrentAccess) {
this(
rpcService,
lock,
concurrentAccess,
UUID.randomUUID(),
new OneShotLatch(),
new OneShotLatch());
}

protected FencedTestEndpoint( protected FencedTestEndpoint(
RpcService rpcService, RpcService rpcService,
UUID initialFencingToken, UUID initialFencingToken,
OneShotLatch enteringSetNewFencingToken, OneShotLatch enteringSetNewFencingToken,
OneShotLatch triggerSetNewFencingToken) { OneShotLatch triggerSetNewFencingToken) {
this(
rpcService,
new ReentrantLock(),
new AtomicBoolean(false),
initialFencingToken,
enteringSetNewFencingToken,
triggerSetNewFencingToken);
}

private FencedTestEndpoint(
RpcService rpcService,
ReentrantLock lock,
AtomicBoolean concurrentAccess,
UUID initialFencingToken,
OneShotLatch enteringSetNewFencingToken,
OneShotLatch triggerSetNewFencingToken) {
super(rpcService); super(rpcService);


this.lock = lock;
this.concurrentAccess = concurrentAccess;

this.enteringSetNewFencingToken = enteringSetNewFencingToken; this.enteringSetNewFencingToken = enteringSetNewFencingToken;
this.triggerSetNewFencingToken = triggerSetNewFencingToken; this.triggerSetNewFencingToken = triggerSetNewFencingToken;


Expand Down Expand Up @@ -410,5 +458,25 @@ public CompletableFuture<Acknowledge> setNewFencingToken(UUID fencingToken, Time
public CompletableFuture<Void> postStop() { public CompletableFuture<Void> postStop() {
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }

@Override
public void someCall() {
boolean holdsLock = lock.tryLock();
if (holdsLock) {
lock.unlock();
} else {
concurrentAccess.set(true);
}
}

@Override
public void anotherCall() {
boolean holdsLock = lock.tryLock();
if (holdsLock) {
lock.unlock();
} else {
concurrentAccess.set(true);
}
}
} }
} }

0 comments on commit 826d51d

Please sign in to comment.