Skip to content

Commit

Permalink
[improve][broker] Close protocol handlers before unloading namespace …
Browse files Browse the repository at this point in the history
…bundles (#22728)
  • Loading branch information
BewareMyPower authored May 21, 2024
1 parent ae9616b commit a66ff17
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -444,13 +444,20 @@ public void close() throws PulsarServerException {
public CompletableFuture<Void> closeAsync() {
mutex.lock();
try {
// Close protocol handler before unloading namespace bundles because protocol handlers might maintain
// Pulsar clients that could send lookup requests that affect unloading.
if (protocolHandlers != null) {
protocolHandlers.close();
protocolHandlers = null;
}
if (closeFuture != null) {
return closeFuture;
}
LOG.info("Closing PulsarService");
if (brokerService != null) {
brokerService.unloadNamespaceBundlesGracefully();
}
// It only tells the Pulsar clients that this service is not ready to serve for the lookup requests
state = State.Closing;

// close the service in reverse order v.s. in which they are started
Expand Down Expand Up @@ -512,11 +519,6 @@ public CompletableFuture<Void> closeAsync() {
(long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT
* getConfiguration()
.getBrokerShutdownTimeoutMs())));
// close protocol handler before closing broker service
if (protocolHandlers != null) {
protocolHandlers.close();
protocolHandlers = null;
}

// cancel loadShedding task and shutdown the loadManager executor before shutting down the broker
cancelLoadBalancerTasks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
public static final CompressionType MSG_COMPRESSION_TYPE = CompressionType.ZSTD;
private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000;
private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 100;
private static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000;
public static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000;
public static final long VERSION_ID_INIT = 1; // initial versionId
public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins
private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs to clean immediately
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.protocol;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;

public class PulsarClientBasedHandler implements ProtocolHandler {

static final String PROTOCOL = "test";

private String topic;
private int partitions;
private String cluster;
private PulsarClient client;
private List<Reader<byte[]>> readers;
private ExecutorService executor;
private volatile boolean running = false;
volatile long closeTimeMs;

@Override
public String protocolName() {
return PROTOCOL;
}

@Override
public boolean accept(String protocol) {
return protocol.equals(PROTOCOL);
}

@Override
public void initialize(ServiceConfiguration conf) throws Exception {
final var properties = conf.getProperties();
topic = (String) properties.getOrDefault("metadata.topic", "metadata-topic");
partitions = (Integer) properties.getOrDefault("metadata.partitions", 1);
cluster = conf.getClusterName();
}

@Override
public String getProtocolDataToAdvertise() {
return "";
}

@Override
public void start(BrokerService service) {
try {
final var port = service.getPulsar().getListenPortHTTP().orElseThrow();
@Cleanup final var admin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:" + port).build();
try {
admin.clusters().createCluster(cluster, ClusterData.builder()
.serviceUrl(service.getPulsar().getWebServiceAddress())
.serviceUrlTls(service.getPulsar().getWebServiceAddressTls())
.brokerServiceUrl(service.getPulsar().getBrokerServiceUrl())
.brokerServiceUrlTls(service.getPulsar().getBrokerServiceUrlTls())
.build());
} catch (PulsarAdminException ignored) {
}
try {
admin.tenants().createTenant("public", TenantInfo.builder().allowedClusters(Set.of(cluster)).build());
} catch (PulsarAdminException ignored) {
}
try {
admin.namespaces().createNamespace("public/default");
} catch (PulsarAdminException ignored) {
}
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
try {
final var port = service.getListenPort().orElseThrow();
client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + port).build();
readers = new ArrayList<>();
for (int i = 0; i < partitions; i++) {
readers.add(client.newReader().topic(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + i)
.startMessageId(MessageId.earliest).create());
}
running = true;
executor = Executors.newSingleThreadExecutor();
executor.execute(() -> {
while (running) {
readers.forEach(reader -> {
try {
reader.readNext(1, TimeUnit.MILLISECONDS);
} catch (PulsarClientException ignored) {
}
});
}
});
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}

@Override
public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
return Map.of();
}

@Override
public void close() {
final var start = System.currentTimeMillis();
running = false;
if (client != null) {
try {
client.close();
} catch (PulsarClientException ignored) {
}
client = null;
}
if (executor != null) {
executor.shutdown();
executor = null;
}
closeTimeMs = System.currentTimeMillis() - start;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.protocol;

import java.io.File;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.util.PortManager;
import org.apache.commons.io.FileUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
public class PulsarClientBasedHandlerTest {

private final static String clusterName = "cluster";
private final static int shutdownTimeoutMs = 100;
private final int zkPort = PortManager.nextFreePort();
private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextFreePort);
private File tempDirectory;
private PulsarService pulsar;

@BeforeClass
public void setup() throws Exception {
bk.start();
final var config = new ServiceConfiguration();
config.setClusterName(clusterName);
config.setAdvertisedAddress("localhost");
config.setBrokerServicePort(Optional.of(0));
config.setWebServicePort(Optional.of(0));
config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort);

tempDirectory = SimpleProtocolHandlerTestsBase.configureProtocolHandler(config,
PulsarClientBasedHandler.class.getName(), true);

config.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
config.setLoadBalancerDebugModeEnabled(true);
config.setBrokerShutdownTimeoutMs(shutdownTimeoutMs);

pulsar = new PulsarService(config);
pulsar.start();
}

@Test(timeOut = 30000)
public void testStopBroker() throws PulsarServerException {
final var beforeStop = System.currentTimeMillis();
final var handler = (PulsarClientBasedHandler) pulsar.getProtocolHandlers()
.protocol(PulsarClientBasedHandler.PROTOCOL);
pulsar.close();
final var elapsedMs = System.currentTimeMillis() - beforeStop;
log.info("It spends {} ms to stop the broker ({} for protocol handler)", elapsedMs, handler.closeTimeMs);
Assert.assertTrue(elapsedMs < ServiceUnitStateChannelImpl.OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS
+ handler.closeTimeMs + shutdownTimeoutMs + 1000); // tolerate 1 more second for other processes
}

@AfterClass(alwaysRun = true)
public void cleanup() throws Exception {
bk.stop();
if (tempDirectory != null) {
FileUtils.deleteDirectory(tempDirectory);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,18 @@ public SimpleProtocolHandlerTestsBase(boolean useSeparateThreadPool) {
@BeforeClass
@Override
protected void setup() throws Exception {
tempDirectory = Files.createTempDirectory("SimpleProtocolHandlerTest").toFile();
tempDirectory = configureProtocolHandler(conf, MyProtocolHandler.class.getName(), useSeparateThreadPool);
super.baseSetup();
}

static File configureProtocolHandler(ServiceConfiguration conf, String className, boolean useSeparateThreadPool)
throws Exception {
final var tempDirectory = Files.createTempDirectory("SimpleProtocolHandlerTest").toFile();
conf.setUseSeparateThreadPoolForProtocolHandlers(useSeparateThreadPool);
conf.setProtocolHandlerDirectory(tempDirectory.getAbsolutePath());
conf.setMessagingProtocols(Collections.singleton("test"));
buildMockNarFile(tempDirectory);
super.baseSetup();
buildMockNarFile(tempDirectory, className);
return tempDirectory;
}

@Test
Expand Down Expand Up @@ -163,7 +169,7 @@ protected void cleanup() throws Exception {
}
}

private static void buildMockNarFile(File tempDirectory) throws Exception {
private static void buildMockNarFile(File tempDirectory, String className) throws Exception {
File file = new File(tempDirectory, "temp.nar");
try (ZipOutputStream zipfile = new ZipOutputStream(new FileOutputStream(file))) {

Expand All @@ -176,7 +182,7 @@ private static void buildMockNarFile(File tempDirectory) throws Exception {
zipfile.putNextEntry(manifest);
String yaml = "name: test\n" +
"description: this is a test\n" +
"handlerClass: " + MyProtocolHandler.class.getName() + "\n";
"handlerClass: " + className + "\n";
zipfile.write(yaml.getBytes(StandardCharsets.UTF_8));
zipfile.closeEntry();
}
Expand Down

0 comments on commit a66ff17

Please sign in to comment.