Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shutdown Broker gracefully, but forcefully after brokerShutdownTimeoutMs #10199

Merged
merged 17 commits into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Method;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -76,7 +77,7 @@ public void run() {
future.get(service.getConfiguration().getBrokerShutdownTimeoutMs(), TimeUnit.MILLISECONDS);

LOG.info("Completed graceful shutdown. Exiting");
} catch (TimeoutException e) {
} catch (TimeoutException | CancellationException e) {
LOG.warn("Graceful shutdown timeout expired. Closing now");
} catch (Exception e) {
LOG.error("Failed to perform graceful shutdown, Exiting anyway", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -48,6 +50,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -89,6 +92,7 @@
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.GracefulExecutorServicesShutdown;
import org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
Expand Down Expand Up @@ -164,6 +168,7 @@
@Setter(AccessLevel.PROTECTED)
public class PulsarService implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(PulsarService.class);
private static final double GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5d;
private ServiceConfiguration config = null;
private NamespaceService nsService = null;
private ManagedLedgerStorage managedLedgerClientFactory = null;
Expand Down Expand Up @@ -236,7 +241,7 @@ public class PulsarService implements AutoCloseable {
private PulsarResources pulsarResources;

public enum State {
Init, Started, Closed
Init, Started, Closing, Closed
}

private volatile State state;
Expand Down Expand Up @@ -308,10 +313,15 @@ public void close() throws PulsarServerException {
try {
closeAsync().get();
} catch (ExecutionException e) {
if (e.getCause() instanceof PulsarServerException) {
throw (PulsarServerException) e.getCause();
Throwable cause = e.getCause();
if (cause instanceof PulsarServerException) {
throw (PulsarServerException) cause;
} else if (getConfiguration().getBrokerShutdownTimeoutMs() == 0
&& (cause instanceof TimeoutException || cause instanceof CancellationException)) {
// ignore shutdown timeout when timeout is 0, which is primarily used in tests
// to forcefully shutdown the broker
} else {
throw new PulsarServerException(e.getCause());
throw new PulsarServerException(cause);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -327,6 +337,7 @@ public CompletableFuture<Void> closeAsync() {
if (closeFuture != null) {
return closeFuture;
}
state = State.Closing;

// close the service in reverse order v.s. in which they are started
if (this.webService != null) {
Expand All @@ -345,6 +356,15 @@ public CompletableFuture<Void> closeAsync() {
this.webSocketService.close();
}

GracefulExecutorServicesShutdown executorServicesShutdown =
GracefulExecutorServicesShutdown
.initiate()
.timeout(
Duration.ofMillis(
(long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT
* getConfiguration()
.getBrokerShutdownTimeoutMs())));

List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
if (this.brokerService != null) {
asyncCloseFutures.add(this.brokerService.closeAsync());
Expand All @@ -366,7 +386,7 @@ public CompletableFuture<Void> closeAsync() {
this.leaderElectionService = null;
}

loadManagerExecutor.shutdown();
executorServicesShutdown.shutdown(loadManagerExecutor);

if (globalZkCache != null) {
globalZkCache.close();
Expand Down Expand Up @@ -402,24 +422,11 @@ public CompletableFuture<Void> closeAsync() {
nsService = null;
}

if (compactorExecutor != null) {
compactorExecutor.shutdown();
}

if (offloaderScheduler != null) {
offloaderScheduler.shutdown();
}

// executor is not initialized in mocks even when real close method is called
// guard against null executors
if (executor != null) {
executor.shutdown();
}

if (orderedExecutor != null) {
orderedExecutor.shutdown();
}
cacheExecutor.shutdown();
executorServicesShutdown.shutdown(compactorExecutor);
executorServicesShutdown.shutdown(offloaderScheduler);
executorServicesShutdown.shutdown(executor);
executorServicesShutdown.shutdown(orderedExecutor);
executorServicesShutdown.shutdown(cacheExecutor);

LoadManager loadManager = this.loadManager.get();
if (loadManager != null) {
Expand All @@ -441,10 +448,7 @@ public CompletableFuture<Void> closeAsync() {
transactionBufferClient.close();
}

if (transactionExecutor != null) {
transactionExecutor.shutdown();
transactionExecutor = null;
}
executorServicesShutdown.shutdown(transactionExecutor);

if (coordinationService != null) {
coordinationService.close();
Expand All @@ -457,20 +461,23 @@ public CompletableFuture<Void> closeAsync() {
configurationMetadataStore.close();
}

state = State.Closed;
isClosedCondition.signalAll();
// add timeout handling for closing executors
asyncCloseFutures.add(executorServicesShutdown.handle());

CompletableFuture<Void> shutdownFuture =
CompletableFuture.allOf(asyncCloseFutures.toArray(new CompletableFuture[0]));
closeFuture = shutdownFuture;
return shutdownFuture;
closeFuture = addTimeoutHandling(FutureUtil.waitForAllAndSupportCancel(asyncCloseFutures));
closeFuture.handle((v, t) -> {
state = State.Closed;
isClosedCondition.signalAll();
return null;
});
return closeFuture;
} catch (Exception e) {
PulsarServerException pse;
if (e instanceof CompletionException && e.getCause() instanceof MetadataStoreException) {
pse = new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e));
pse = new PulsarServerException(MetadataStoreException.unwrap(e));
} else if (e.getCause() instanceof CompletionException
&& e.getCause().getCause() instanceof MetadataStoreException) {
pse = new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e.getCause()));
pse = new PulsarServerException(MetadataStoreException.unwrap(e.getCause()));
} else {
pse = new PulsarServerException(e);
}
Expand All @@ -480,6 +487,20 @@ public CompletableFuture<Void> closeAsync() {
}
}

private CompletableFuture<Void> addTimeoutHandling(CompletableFuture<Void> future) {
ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory(getClass().getSimpleName() + "-shutdown"));
FutureUtil.addTimeoutHandling(future,
Duration.ofMillis(Math.max(1L, getConfiguration().getBrokerShutdownTimeoutMs())),
shutdownExecutor, () -> FutureUtil.createTimeoutException("Timeout in close", getClass(), "close"));
future.handle((v, t) -> {
// shutdown the shutdown executor
shutdownExecutor.shutdownNow();
return null;
});
return future;
}

/**
* Get the current service configuration.
*
Expand Down
Loading