Skip to content

Commit

Permalink
Extract GracefulExecutorServicesShutdown and use it in PulsarService
Browse files Browse the repository at this point in the history
- handle graceful / forcefully shutdown also for PulsarService executors
  • Loading branch information
lhotari committed Apr 15, 2021
1 parent c656e60 commit 4e7488a
Show file tree
Hide file tree
Showing 5 changed files with 324 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.GracefulExecutorServicesShutdown;
import org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
Expand Down Expand Up @@ -167,6 +168,7 @@
@Setter(AccessLevel.PROTECTED)
public class PulsarService implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(PulsarService.class);
private static final double GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5d;
private ServiceConfiguration config = null;
private NamespaceService nsService = null;
private ManagedLedgerStorage managedLedgerClientFactory = null;
Expand Down Expand Up @@ -354,6 +356,15 @@ public CompletableFuture<Void> closeAsync() {
this.webSocketService.close();
}

GracefulExecutorServicesShutdown executorServiceShutdown =
GracefulExecutorServicesShutdown
.initiate()
.timeout(
Duration.ofMillis(
(long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT
* getConfiguration()
.getBrokerShutdownTimeoutMs())));

List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
if (this.brokerService != null) {
asyncCloseFutures.add(this.brokerService.closeAsync());
Expand All @@ -375,7 +386,7 @@ public CompletableFuture<Void> closeAsync() {
this.leaderElectionService = null;
}

loadManagerExecutor.shutdown();
executorServiceShutdown.shutdown(loadManagerExecutor);

if (globalZkCache != null) {
globalZkCache.close();
Expand Down Expand Up @@ -411,24 +422,11 @@ public CompletableFuture<Void> closeAsync() {
nsService = null;
}

if (compactorExecutor != null) {
compactorExecutor.shutdown();
}

if (offloaderScheduler != null) {
offloaderScheduler.shutdown();
}

// executor is not initialized in mocks even when real close method is called
// guard against null executors
if (executor != null) {
executor.shutdown();
}

if (orderedExecutor != null) {
orderedExecutor.shutdown();
}
cacheExecutor.shutdown();
executorServiceShutdown.shutdown(compactorExecutor);
executorServiceShutdown.shutdown(offloaderScheduler);
executorServiceShutdown.shutdown(executor);
executorServiceShutdown.shutdown(orderedExecutor);
executorServiceShutdown.shutdown(cacheExecutor);

LoadManager loadManager = this.loadManager.get();
if (loadManager != null) {
Expand All @@ -450,10 +448,7 @@ public CompletableFuture<Void> closeAsync() {
transactionBufferClient.close();
}

if (transactionExecutor != null) {
transactionExecutor.shutdown();
transactionExecutor = null;
}
executorServiceShutdown.shutdown(transactionExecutor);

if (coordinationService != null) {
coordinationService.close();
Expand All @@ -466,6 +461,9 @@ public CompletableFuture<Void> closeAsync() {
configurationMetadataStore.close();
}

// add timeout handling for closing executors
asyncCloseFutures.add(executorServiceShutdown.handle());

closeFuture = addTimeoutHandling(FutureUtil.waitForAllAndSupportCancel(asyncCloseFutures));
closeFuture.handle((v, t) -> {
state = State.Closed;
Expand All @@ -476,10 +474,10 @@ public CompletableFuture<Void> closeAsync() {
} catch (Exception e) {
PulsarServerException pse;
if (e instanceof CompletionException && e.getCause() instanceof MetadataStoreException) {
pse = new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e));
pse = new PulsarServerException(MetadataStoreException.unwrap(e));
} else if (e.getCause() instanceof CompletionException
&& e.getCause().getCause() instanceof MetadataStoreException) {
pse = new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e.getCause()));
pse = new PulsarServerException(MetadataStoreException.unwrap(e.getCause()));
} else {
pse = new PulsarServerException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,18 @@
import java.net.SocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -77,12 +74,10 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.ws.rs.core.Response;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
Expand Down Expand Up @@ -749,10 +744,14 @@ public CompletableFuture<Void> closeAsync() {
log.warn("Error in closing delayedDeliveryTrackerFactory", e);
}

asyncCloseFutures.add(GracefulExecutorServiceShutdownHandler
.shutdownGracefully(
(long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT
* pulsar.getConfiguration().getBrokerShutdownTimeoutMs()),
asyncCloseFutures.add(GracefulExecutorServicesShutdown
.initiate()
.timeout(
Duration.ofMillis(
(long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT
* pulsar.getConfiguration()
.getBrokerShutdownTimeoutMs())))
.shutdown(
statsUpdater,
inactivityMonitor,
messageExpiryMonitor,
Expand All @@ -763,7 +762,8 @@ public CompletableFuture<Void> closeAsync() {
topicOrderedExecutor,
topicPublishRateLimiterMonitor,
brokerPublishRateLimiterMonitor,
deduplicationSnapshotMonitor));
deduplicationSnapshotMonitor)
.handle());

CompletableFuture<Void> combined =
FutureUtil.waitForAllAndSupportCancel(asyncCloseFutures);
Expand Down Expand Up @@ -802,85 +802,6 @@ CompletableFuture<Void> shutdownEventLoopGracefully(EventLoopGroup eventLoopGrou
timeout, TimeUnit.MILLISECONDS));
}

@Slf4j
private static class GracefulExecutorServiceShutdownHandler {
private final ScheduledExecutorService shutdownScheduler = Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory(getClass().getSimpleName()));
private final List<ExecutorService> executors;
private final CompletableFuture<Void> future;
private final long timeoutMs;

private GracefulExecutorServiceShutdownHandler(long timeoutMs, ExecutorService... executorServices) {
this.timeoutMs = timeoutMs;
executors = Arrays.stream(executorServices)
.filter(Objects::nonNull)
.collect(Collectors.toList());
future = new CompletableFuture<>();
}

static CompletableFuture<Void> shutdownGracefully(long timeoutMs, ExecutorService... executorServices) {
return new GracefulExecutorServiceShutdownHandler(timeoutMs, executorServices).doShutdownGracefully();
}

private CompletableFuture<Void> doShutdownGracefully() {
log.info("Shutting down {} executors.", executors.size());
executors.forEach(ExecutorService::shutdown);
FutureUtil.whenCancelledOrTimedOut(future, () -> {
terminate();
});
checkCompletion();
if (!shutdownScheduler.isShutdown()) {
try {
shutdownScheduler.schedule(this::terminate, timeoutMs, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
// ignore
}
}
return future;
}

private void terminate() {
for (ExecutorService executor : executors) {
if (!executor.isTerminated()) {
log.info("Shutting down forcefully executor {}", executor);
for (Runnable runnable : executor.shutdownNow()) {
log.info("Execution in progress for runnable instance of {}: {}", runnable.getClass(),
runnable);
}
}
}
shutdown();
}

private void shutdown() {
if (!shutdownScheduler.isShutdown()) {
log.info("Shutting down scheduler.");
shutdownScheduler.shutdown();
}
}

private void scheduleCheck() {
if (!shutdownScheduler.isShutdown()) {
try {
shutdownScheduler
.schedule(this::checkCompletion, Math.max(timeoutMs / 100, 10), TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
// ignore
}
}
}

private void checkCompletion() {
if (executors.stream().filter(executor -> !executor.isTerminated()).count() > 0) {
scheduleCheck();
} else {
log.info("Shutdown completed.");
future.complete(null);
shutdown();
}
}
}

private CompletableFuture<Void> closeChannel(Channel channel) {
return ChannelFutures.toCompletableFuture(channel.close())
.handle((c, t) -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

/**
* This a builder like class for providing a fluent API for graceful shutdown
*
* Executors are added with the {@link #shutdown(ExecutorService...)}
* method. The {@link ExecutorService#shutdown()} method is called immediately.
*
* Calling the {@link #handle()} method returns a future which completes when all executors
* have been terminated. The executors will be polled frequently for completion. If the shutdown times out
* or the future is cancelled, all executors will be terminated.
*/
public class GracefulExecutorServicesShutdown {
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(15);
private final List<ExecutorService> executorServices = new ArrayList<>();
private Duration timeout = DEFAULT_TIMEOUT;

private GracefulExecutorServicesShutdown() {

}

/**
* Initiates a new shutdown for one or many {@link ExecutorService}s.
*
* @return a new instance for controlling graceful shutdown
*/
public static GracefulExecutorServicesShutdown initiate() {
return new GracefulExecutorServicesShutdown();
}

/**
* Calls {@link ExecutorService#shutdown()} and enlists the executor as part of the
* shutdown handling.
*
* @param executorServices one or many executors to shutdown
* @return the current instance for controlling graceful shutdown
*/
public GracefulExecutorServicesShutdown shutdown(ExecutorService... executorServices) {
for (ExecutorService executorService : executorServices) {
if (executorService != null) {
executorService.shutdown();
this.executorServices.add(executorService);
}
}
return this;
}

/**
* Sets the timeout for graceful shutdown.
*
* @param timeout duration for the timeout
* @return the current instance for controlling graceful shutdown
*/
public GracefulExecutorServicesShutdown timeout(Duration timeout) {
this.timeout = timeout;
return this;
}

/**
* Starts the handler for polling frequently for the completion of enlisted executors.
*
* If the shutdown times out or the future is cancelled, all executors will be terminated.
* Supports {@link CompletableFuture#cancel(boolean)} for forceful shutdown of executors.
*
* @return a future for completion of executors
*/
public CompletableFuture<Void> handle() {
return new GracefulExecutorServicesShutdownHandler(timeout.toMillis(), executorServices)
.startShutdownHandler();
}
}
Loading

0 comments on commit 4e7488a

Please sign in to comment.