Skip to content

Commit

Permalink
[FLINK-8678] [flip6] Make RpcService shut down non blocking
Browse files Browse the repository at this point in the history
Changes the RpcService#stopService method to be non blocking. Instead
of waiting until the RpcService has stopped, it returns the termination
future which is completed once the RpcService has been completelyshut
down.

This closes #5517.
  • Loading branch information
tillrohrmann committed Feb 23, 2018
1 parent af3ea81 commit c27e2a7
Show file tree
Hide file tree
Showing 33 changed files with 271 additions and 198 deletions.
Expand Up @@ -510,7 +510,7 @@ public Protos.Offer.Operation launch(Protos.TaskInfo... taskInfo) {

@Override
public void close() throws Exception {
rpcService.stopService();
rpcService.stopService().get();
}
}

Expand Down
Expand Up @@ -437,7 +437,7 @@ protected void stopClusterServices(boolean cleanupHaData) throws FlinkException

if (commonRpcService != null) {
try {
commonRpcService.stopService();
commonRpcService.stopService().get();
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
Expand Down
Expand Up @@ -756,7 +756,7 @@ private void initializeIOFormatClasses(Configuration configuration) {
private static Throwable shutDownRpc(RpcService rpcService, Throwable priorException) {
if (rpcService != null) {
try {
rpcService.stopService();
rpcService.stopService().get();
}
catch (Throwable t) {
return ExceptionUtils.firstOrSuppressed(t, priorException);
Expand All @@ -773,7 +773,7 @@ private static Throwable shutDownRpcs(RpcService[] rpcServices, Throwable priorE
for (RpcService service : rpcServices) {
try {
if (service != null) {
service.stopService();
service.stopService().get();
}
}
catch (Throwable t) {
Expand Down
Expand Up @@ -116,9 +116,12 @@ <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> con
void stopServer(RpcServer selfGateway);

/**
* Stop the rpc service shutting down all started rpc servers.
* Trigger the asynchronous stopping of the {@link RpcService}.
*
* @return Future which is completed once the {@link RpcService} has been
* fully stopped.
*/
void stopService();
CompletableFuture<Void> stopService();

/**
* Returns a future indicating when the RPC service has been shut down.
Expand Down
Expand Up @@ -50,7 +50,7 @@ public static Set<Class<? extends RpcGateway>> extractImplementedRpcGateways(Cla
while (clazz != null) {
for (Class<?> interfaze : clazz.getInterfaces()) {
if (RpcGateway.class.isAssignableFrom(interfaze)) {
interfaces.add((Class<? extends RpcGateway>)interfaze);
interfaces.add((Class<? extends RpcGateway>) interfaze);
}
}

Expand All @@ -65,7 +65,7 @@ public static Set<Class<? extends RpcGateway>> extractImplementedRpcGateways(Cla
*
* @param rpcEndpoint to terminate
* @param timeout for this operation
* @throws ExecutionException if a problem occurs
* @throws ExecutionException if a problem occurred
* @throws InterruptedException if the operation has been interrupted
* @throws TimeoutException if a timeout occurred
*/
Expand All @@ -74,6 +74,19 @@ public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint, Time timeout) t
rpcEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
}

/**
* Shuts the given rpc service down and waits for its termination.
*
* @param rpcService to shut down
* @param timeout for this operation
* @throws InterruptedException if the operation has been interrupted
* @throws ExecutionException if a problem occurred
* @throws TimeoutException if a timeout occurred
*/
public static void terminateRpcService(RpcService rpcService, Time timeout) throws InterruptedException, ExecutionException, TimeoutException {
rpcService.stopService().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
}

// We don't want this class to be instantiable
private RpcUtils() {}
}
Expand Up @@ -42,6 +42,7 @@
import akka.actor.Identify;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.pattern.Patterns;
Expand Down Expand Up @@ -98,6 +99,8 @@ public class AkkaRpcService implements RpcService {

private final ScheduledExecutor internalScheduledExecutor;

private final CompletableFuture<Void> terminationFuture;

private volatile boolean stopped;

public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) {
Expand Down Expand Up @@ -127,6 +130,8 @@ public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) {

internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem);

terminationFuture = new CompletableFuture<>();

stopped = false;
}

Expand Down Expand Up @@ -311,33 +316,40 @@ public void stopServer(RpcServer selfGateway) {
}

@Override
public void stopService() {
LOG.info("Stopping Akka RPC service.");

public CompletableFuture<Void> stopService() {
synchronized (lock) {
if (stopped) {
return;
return terminationFuture;
}

stopped = true;

}

actorSystem.shutdown();
actorSystem.awaitTermination();
LOG.info("Stopping Akka RPC service.");

synchronized (lock) {
actors.clear();
}
final CompletableFuture<Terminated> actorSytemTerminationFuture = FutureUtils.toJava(actorSystem.terminate());

actorSytemTerminationFuture.whenComplete(
(Terminated ignored, Throwable throwable) -> {
synchronized (lock) {
actors.clear();
}

if (throwable != null) {
terminationFuture.completeExceptionally(throwable);
} else {
terminationFuture.complete(null);
}

LOG.info("Stopped Akka RPC service.");
});

LOG.info("Stopped Akka RPC service.");
return terminationFuture;
}

@Override
public CompletableFuture<Void> getTerminationFuture() {
return CompletableFuture.runAsync(
actorSystem::awaitTermination,
getExecutor());
return terminationFuture;
}

@Override
Expand Down
Expand Up @@ -173,7 +173,15 @@ protected void shutDownInternally() throws Exception {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

rpcService.stopService();
try {
rpcService.stopService().get();
} catch (InterruptedException ie) {
exception = ExceptionUtils.firstOrSuppressed(ie, exception);

Thread.currentThread().interrupt();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

try {
highAvailabilityServices.close();
Expand Down
Expand Up @@ -53,6 +53,7 @@
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
Expand Down Expand Up @@ -578,7 +579,7 @@ public void testHeartbeatTimeoutWithTaskExecutor() throws Exception {
verify(taskExecutorGateway, Mockito.timeout(timeout.toMilliseconds())).disconnectResourceManager(any(TimeoutException.class));

} finally {
rpcService.stopService();
RpcUtils.terminateRpcService(rpcService, timeout);
}
}

Expand Down Expand Up @@ -680,7 +681,7 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception {
verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).disconnectResourceManager(eq(rmLeaderId), any(TimeoutException.class));

} finally {
rpcService.stopService();
RpcUtils.terminateRpcService(rpcService, timeout);
}
}
}
Expand Up @@ -134,9 +134,9 @@ public static void setupClass() {
}

@AfterClass
public static void teardownClass() {
public static void teardownClass() throws Exception {
if (rpcService != null) {
rpcService.stopService();
RpcUtils.terminateRpcService(rpcService, TIMEOUT);

rpcService = null;
}
Expand Down
Expand Up @@ -61,6 +61,8 @@
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
Expand Down Expand Up @@ -143,13 +145,13 @@ public void teardown() throws Exception {
}

@AfterClass
public static void teardownClass() throws IOException {
public static void teardownClass() throws IOException, InterruptedException, ExecutionException, TimeoutException {
if (blobServer != null) {
blobServer.close();
}

if (rpcService != null) {
rpcService.stopService();
RpcUtils.terminateRpcService(rpcService, timeout);
}
}

Expand Down
Expand Up @@ -117,7 +117,7 @@ public void teardown() throws Exception {
}

if (rpcService != null) {
rpcService.stopService();
rpcService.stopService().get();
rpcService = null;
}
}
Expand Down
Expand Up @@ -92,9 +92,9 @@ public static void setup() {
}

@AfterClass
public static void shutdown() {
public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
if (rpcService != null) {
rpcService.stopService();
RpcUtils.terminateRpcService(rpcService, timeout);
rpcService = null;
}
}
Expand Down
Expand Up @@ -64,9 +64,9 @@ public static void setup() {
}

@AfterClass
public static void teardown() {
public static void teardown() throws ExecutionException, InterruptedException {
if (testingRpcService != null) {
testingRpcService.stopService();
testingRpcService.stopService().get();
testingRpcService = null;
}
}
Expand Down
Expand Up @@ -104,7 +104,7 @@ public void setUp() throws Exception {

@After
public void tearDown() throws Exception {
rpcService.stopService();
RpcUtils.terminateRpcService(rpcService, timeout);
}

@Test
Expand Down

0 comments on commit c27e2a7

Please sign in to comment.