Skip to content

Commit c8375e7

Browse files
BewareMyPowersrinath-ctds
authored andcommitted
[improve][broker] Reduce the broker close time to avoid useless wait for event loop shutdown (apache#24895)
(cherry picked from commit 411aea9) (cherry picked from commit 61f6fcd)
1 parent f7c4f95 commit c8375e7

File tree

2 files changed

+91
-14
lines changed

2 files changed

+91
-14
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
import org.apache.commons.lang3.StringUtils;
9898
import org.apache.commons.lang3.mutable.MutableBoolean;
9999
import org.apache.commons.lang3.tuple.ImmutablePair;
100+
import org.apache.commons.lang3.tuple.Pair;
100101
import org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy;
101102
import org.apache.pulsar.broker.PulsarServerException;
102103
import org.apache.pulsar.broker.PulsarService;
@@ -206,7 +207,6 @@ public class BrokerService implements Closeable {
206207
private static final TimeoutException FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION =
207208
FutureUtil.createTimeoutException("Failed to load topic within timeout", BrokerService.class,
208209
"futureWithDeadline(...)");
209-
private static final long GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS = 5000L;
210210
private static final double GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT = 0.25d;
211211
private static final double GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5d;
212212

@@ -308,7 +308,7 @@ public class BrokerService implements Closeable {
308308
// fallback if recover BucketDelayedDeliveryTracker failed.
309309
private volatile DelayedDeliveryTrackerFactory fallbackDelayedDeliveryTrackerFactory;
310310
private final ServerBootstrap defaultServerBootstrap;
311-
private final List<EventLoopGroup> protocolHandlersWorkerGroups = new ArrayList<>();
311+
private final List<Pair<String, EventLoopGroup>> protocolHandlersWorkerGroups = new ArrayList<>();
312312

313313
@Getter
314314
private final BundlesQuotas bundlesQuotas;
@@ -524,7 +524,7 @@ private void startProtocolHandler(String protocol,
524524
EventLoopGroup dedicatedWorkerGroup =
525525
EventLoopUtil.newEventLoopGroup(configuration.getNumIOThreads(), false, defaultThreadFactory);
526526
bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(dedicatedWorkerGroup));
527-
protocolHandlersWorkerGroups.add(dedicatedWorkerGroup);
527+
protocolHandlersWorkerGroups.add(Pair.of(protocol, dedicatedWorkerGroup));
528528
bootstrap.group(this.acceptorGroup, dedicatedWorkerGroup);
529529
} else {
530530
bootstrap = defaultServerBootstrap.clone();
@@ -835,10 +835,10 @@ public CompletableFuture<Void> closeAsync() {
835835
CompletableFuture<CompletableFuture<Void>> cancellableDownstreamFutureReference = new CompletableFuture<>();
836836
log.info("Event loops shutting down gracefully...");
837837
List<CompletableFuture<?>> shutdownEventLoops = new ArrayList<>();
838-
shutdownEventLoops.add(shutdownEventLoopGracefully(acceptorGroup));
839-
shutdownEventLoops.add(shutdownEventLoopGracefully(workerGroup));
840-
for (EventLoopGroup group : protocolHandlersWorkerGroups) {
841-
shutdownEventLoops.add(shutdownEventLoopGracefully(group));
838+
shutdownEventLoops.add(shutdownEventLoopGracefully("acceptor", acceptorGroup));
839+
shutdownEventLoops.add(shutdownEventLoopGracefully("worker", workerGroup));
840+
for (final var pair : protocolHandlersWorkerGroups) {
841+
shutdownEventLoops.add(shutdownEventLoopGracefully(pair.getLeft(), pair.getRight()));
842842
}
843843

844844
CompletableFuture<Void> shutdownFuture =
@@ -933,15 +933,21 @@ public CompletableFuture<Void> closeAsync() {
933933
}
934934
}
935935

936-
CompletableFuture<Void> shutdownEventLoopGracefully(EventLoopGroup eventLoopGroup) {
936+
CompletableFuture<Void> shutdownEventLoopGracefully(String name, EventLoopGroup eventLoopGroup) {
937937
long brokerShutdownTimeoutMs = pulsar.getConfiguration().getBrokerShutdownTimeoutMs();
938-
long quietPeriod = Math.min((long) (
939-
GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs),
940-
GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS);
941938
long timeout = (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs);
942-
return NettyFutureUtil.toCompletableFutureVoid(
943-
eventLoopGroup.shutdownGracefully(quietPeriod,
944-
timeout, MILLISECONDS));
939+
long periodMs = (timeout > 0) ? 1 : 0;
940+
long startNs = System.nanoTime();
941+
return NettyFutureUtil.toCompletableFutureVoid(eventLoopGroup.shutdownGracefully(
942+
periodMs, timeout, MILLISECONDS)
943+
).whenComplete((__, e) -> {
944+
final var elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
945+
if (e == null) {
946+
log.info("Event loop {} shut down after {} ms", name, elapsedMs);
947+
} else {
948+
log.warn("Failed to shut down event loop {} after {} ms: {}", name, elapsedMs, e.getMessage());
949+
}
950+
});
945951
}
946952

947953
private CompletableFuture<Void> closeChannel(Channel channel) {
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import java.util.Optional;
22+
import java.util.concurrent.TimeUnit;
23+
import java.util.function.Supplier;
24+
import lombok.Cleanup;
25+
import org.apache.pulsar.broker.PulsarService;
26+
import org.apache.pulsar.broker.ServiceConfiguration;
27+
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
28+
import org.testng.Assert;
29+
import org.testng.annotations.AfterClass;
30+
import org.testng.annotations.BeforeClass;
31+
import org.testng.annotations.Test;
32+
33+
@Test
34+
public class BrokerEventLoopShutdownTest {
35+
36+
private LocalBookkeeperEnsemble bk;
37+
38+
@BeforeClass(alwaysRun = true)
39+
public void setup() throws Exception {
40+
bk = new LocalBookkeeperEnsemble(2, 0, () -> 0);
41+
bk.start();
42+
}
43+
44+
@AfterClass(alwaysRun = true, timeOut = 30000)
45+
public void cleanup() throws Exception {
46+
bk.stop();
47+
}
48+
49+
@Test(timeOut = 60000)
50+
public void testCloseOneBroker() throws Exception {
51+
final var clusterName = "test";
52+
final Supplier<ServiceConfiguration> configSupplier = () -> {
53+
final var config = new ServiceConfiguration();
54+
config.setClusterName(clusterName);
55+
config.setAdvertisedAddress("localhost");
56+
config.setBrokerServicePort(Optional.of(0));
57+
config.setWebServicePort(Optional.of(0));
58+
config.setMetadataStoreUrl("zk:127.0.0.1:" + bk.getZookeeperPort());
59+
return config;
60+
};
61+
@Cleanup final var broker0 = new PulsarService(configSupplier.get());
62+
@Cleanup final var broker1 = new PulsarService(configSupplier.get());
63+
broker0.start();
64+
broker1.start();
65+
66+
final var startNs = System.nanoTime();
67+
broker0.close();
68+
final var closeTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
69+
Assert.assertTrue(closeTimeMs < 1000, "close time: " + closeTimeMs + " ms");
70+
}
71+
}

0 commit comments

Comments
 (0)