Skip to content

Commit

Permalink
fix: adds a handler to gracefully shutdown (#5895)
Browse files Browse the repository at this point in the history
* fix: adds a handler to gracefully shutdown service

Couple changes related to application lifecycle management. Firstly, adds a shutdown handler
that gracefully shuts down the service when the jvm determines its time to shut down (e.g.
when it receives a termination signal). Secondly, this patch reorganizes some of the startup,
steady-state, and shutdown code to make shutdown easier to reason about. Specifically, all
these methods are now called from the same thread, so both the thread-safety and order of
execution are guaranteed. All the shutdown hook does is notify the main thread and then wait
for it to exit.
  • Loading branch information
rodesai committed Jul 30, 2020
1 parent 1ff099b commit 5fbf171
Show file tree
Hide file tree
Showing 13 changed files with 132 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.cli.ConnectDistributed;
import org.apache.kafka.connect.errors.ConnectException;
Expand All @@ -35,6 +36,7 @@ public final class ConnectExecutable implements Executable {
private final ConnectDistributed connectDistributed;
private final Map<String, String> workerProps;
private Connect connect;
private final CountDownLatch terminateLatch = new CountDownLatch(1);

public static ConnectExecutable of(final String configFile) throws IOException {
final Map<String, String> workerProps = !configFile.isEmpty()
Expand Down Expand Up @@ -64,16 +66,19 @@ public void startAsync() {
}

@Override
public void triggerShutdown() {
public void shutdown() {
if (connect != null) {
connect.stop();
}
}

@Override
public void awaitTerminated() {
if (connect != null) {
connect.awaitStop();
}
public void notifyTerminated() {
terminateLatch.countDown();
}

@Override
public void awaitTerminated() throws InterruptedException {
terminateLatch.await();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,24 @@
public interface Executable {

/**
* Starts the executable asynchronously.
* Starts the executable asynchronously. Guaranteed to be called before shutdown.
*/
default void startAsync() throws Exception {}

/**
* Triggers a shutdown asynchronously, in order to ensure that the shutdown
* has finished use {@link #awaitTerminated()}
* Called to notify threads awaiting termination (see #awaitTerminated)
* that it's time to shutdown.
*/
default void triggerShutdown() throws Exception {}
default void notifyTerminated() {}

/**
* Awaits the {@link #triggerShutdown()} to finish. This is a blocking
* operation.
* Shutdown the service.
*/
default void shutdown() throws Exception {}

/**
* Awaits the {@link #notifyTerminated()} notification. This is a blocking
* operation. Guaranteed to be called before shutdown.
*/
default void awaitTerminated() throws InterruptedException {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -169,7 +169,6 @@ public final class KsqlRestApplication implements Executable {
private final Optional<HeartbeatAgent> heartbeatAgent;
private final Optional<LagReportingAgent> lagReportingAgent;
private final PullQueryExecutor pullQueryExecutor;
private final AtomicBoolean shuttingDown = new AtomicBoolean(false);
private final ServerInfoResource serverInfoResource;
private final Optional<HeartbeatResource> heartbeatResource;
private final Optional<ClusterStatusResource> clusterStatusResource;
Expand All @@ -185,7 +184,7 @@ public final class KsqlRestApplication implements Executable {

// The startup thread that can be interrupted if necessary during shutdown. This should only
// happen if startup hangs.
private volatile Thread startAsyncThread;
private AtomicReference<Thread> startAsyncThreadRef = new AtomicReference<>(null);

public static SourceName getCommandsStreamName() {
return COMMANDS_STREAM_NAME;
Expand Down Expand Up @@ -305,7 +304,7 @@ public void startAsync() {
pullQueryExecutor
);

startAsyncThread = Thread.currentThread();
startAsyncThreadRef.set(Thread.currentThread());
try {
final Endpoints endpoints = new KsqlServerEndpoints(
ksqlEngine,
Expand Down Expand Up @@ -340,7 +339,7 @@ public void startAsync() {
} catch (AbortApplicationStartException e) {
log.error("Aborting application start", e);
} finally {
startAsyncThread = null;
startAsyncThreadRef.set(null);
}
}

Expand Down Expand Up @@ -392,14 +391,14 @@ private void waitForPreconditions() {
1000,
30000,
this::checkPreconditions,
shuttingDown::get,
terminatedFuture::isDone,
predicates
);
} catch (KsqlFailedPrecondition e) {
log.error("Failed to meet preconditions. Exiting...", e);
}

if (shuttingDown.get()) {
if (terminatedFuture.isDone()) {
throw new AbortApplicationStartException(
"Shutting down application during waitForPreconditions");
}
Expand Down Expand Up @@ -441,17 +440,19 @@ private void initialize(final KsqlConfig configWithApplicationServer) {
serverState.setReady();
}

@SuppressWarnings("checkstyle:NPathComplexity")
@Override
public void triggerShutdown() {
log.debug("ksqlDB triggerShutdown called");
// First, make sure the server wasn't stuck in startup. Set the shutdown flag and interrupt the
// startup thread if it's been hanging.
shuttingDown.set(true);
public void notifyTerminated() {
terminatedFuture.complete(null);
final Thread startAsyncThread = startAsyncThreadRef.get();
if (startAsyncThread != null) {
startAsyncThread.interrupt();
}
}

@SuppressWarnings("checkstyle:NPathComplexity")
@Override
public void shutdown() {
log.info("ksqlDB shutdown called");
try {
streamedQueryResource.closeMetrics();
} catch (final Exception e) {
Expand Down Expand Up @@ -496,9 +497,7 @@ public void triggerShutdown() {

shutdownAdditionalAgents();

log.debug("ksqlDB triggerShutdown complete");

terminatedFuture.complete(null);
log.info("ksqlDB shutdown complete");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,6 +34,7 @@ public class KsqlServerMain {

private static final Logger log = LoggerFactory.getLogger(KsqlServerMain.class);

private final Executor shutdownHandler;
private final Executable executable;

public static void main(final String[] args) {
Expand All @@ -55,29 +58,46 @@ public static void main(final String[] args) {
final Optional<String> queriesFile = serverOptions.getQueriesFile(properties);
final Executable executable = createExecutable(
properties, queriesFile, installDir, ksqlConfig);
new KsqlServerMain(executable).tryStartApp();
new KsqlServerMain(
executable,
r -> Runtime.getRuntime().addShutdownHook(new Thread(r))
).tryStartApp();
} catch (final Exception e) {
log.error("Failed to start KSQL", e);
System.exit(-1);
}
}

KsqlServerMain(final Executable executable) {
KsqlServerMain(final Executable executable, final Executor shutdownHandler) {
this.executable = Objects.requireNonNull(executable, "executable");
this.shutdownHandler = Objects.requireNonNull(shutdownHandler, "shutdownHandler");
}

void tryStartApp() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
shutdownHandler.execute(() -> {
executable.notifyTerminated();
try {
latch.await();
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
});
try {
log.info("Starting server");
executable.startAsync();
log.info("Server up and running");
executable.awaitTerminated();
} catch (Throwable t) {
log.error("Unhandled exception in server startup", t);
throw t;
try {
log.info("Starting server");
executable.startAsync();
log.info("Server up and running");
executable.awaitTerminated();
} catch (Throwable t) {
log.error("Unhandled exception in server startup", t);
throw t;
} finally {
log.info("Server shutting down");
executable.shutdown();
}
} finally {
log.info("Server shutting down");
executable.triggerShutdown();
latch.countDown();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,13 @@ public void startAsync() throws Exception {
}

@Override
public void triggerShutdown() throws Exception {
doAction(Executable::triggerShutdown);
public void shutdown() throws Exception {
doAction(Executable::shutdown);
}

@Override
public void notifyTerminated() {
doAction(Executable::notifyTerminated);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,17 @@ public void startAsync() {
versionChecker.start(KsqlModuleType.SERVER, properties);
} catch (final Exception e) {
log.error("Failed to start KSQL Server with query file: " + queriesFile, e);
triggerShutdown();
throw e;
}
}

public void triggerShutdown() {
@Override
public void notifyTerminated() {
shutdownLatch.countDown();
}

@Override
public void shutdown() {
try {
ksqlEngine.close();
} catch (final Exception e) {
Expand All @@ -145,7 +150,6 @@ public void triggerShutdown() {
} catch (final Exception e) {
log.warn("Failed to cleanly shutdown services", e);
}
shutdownLatch.countDown();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public static void setUpClass() {

@AfterClass
public static void tearDownClass() {
CONNECT.triggerShutdown();
CONNECT.shutdown();
}

private KsqlRestClient ksqlRestClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void tearDown() {
@Test
public void shouldCloseServiceContextOnClose() {
// When:
app.triggerShutdown();
app.shutdown();

// Then:
verify(serviceContext).close();
Expand All @@ -211,7 +211,7 @@ public void shouldCloseServiceContextOnClose() {
@Test
public void shouldCloseSecurityExtensionOnClose() {
// When:
app.triggerShutdown();
app.shutdown();

// Then:
verify(securityExtension).close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

package io.confluent.ksql.rest.server;

import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.newCapture;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -27,6 +29,8 @@

import io.confluent.ksql.util.KsqlServerException;
import java.io.File;
import java.util.concurrent.Executor;
import org.easymock.Capture;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
Expand All @@ -40,12 +44,14 @@ public class KsqlServerMainTest {

@Mock(MockType.NICE)
private Executable executable;
@Mock(MockType.NICE)
private Executor shutdownHandler;

private final File mockStreamsStateDir = mock(File.class);

@Before
public void setUp() {
main = new KsqlServerMain(executable);
main = new KsqlServerMain(executable, shutdownHandler);
when(mockStreamsStateDir.exists()).thenReturn(true);
when(mockStreamsStateDir.mkdirs()).thenReturn(true);
when(mockStreamsStateDir.isDirectory()).thenReturn(true);
Expand All @@ -57,7 +63,7 @@ public void setUp() {
@Test
public void shouldStopAppOnJoin() throws Exception {
// Given:
executable.triggerShutdown();
executable.shutdown();
expectLastCall();
replay(executable);

Expand All @@ -74,7 +80,7 @@ public void shouldStopAppOnErrorStarting() throws Exception {
executable.startAsync();
expectLastCall().andThrow(new RuntimeException("Boom"));

executable.triggerShutdown();
executable.shutdown();
expectLastCall();
replay(executable);

Expand All @@ -90,6 +96,24 @@ public void shouldStopAppOnErrorStarting() throws Exception {
verify(executable);
}

@Test
public void shouldNotifyAppOnTerminate() throws Exception {
// Given:
final Capture<Runnable> captureShutdownHandler = newCapture();
shutdownHandler.execute(capture(captureShutdownHandler));
executable.notifyTerminated();
expectLastCall();
replay(shutdownHandler, executable);
main.tryStartApp();
final Runnable handler = captureShutdownHandler.getValue();

// When:
handler.run();

// Then:
verify(executable);
}

@Test
public void shouldFailIfStreamsStateDirectoryCannotBeCreated() {
// Given:
Expand Down

0 comments on commit 5fbf171

Please sign in to comment.