Skip to content

Commit

Permalink
Share EventLoopGroup between Broker and BK client
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed May 19, 2021
1 parent 403b57a commit b58e135
Show file tree
Hide file tree
Showing 23 changed files with 134 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker;

import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
Expand All @@ -30,11 +31,11 @@
* Provider of a new BookKeeper client instance.
*/
public interface BookKeeperClientFactory {
BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient, EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> ensemblePlacementPolicyProperties) throws IOException;

BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient, EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> ensemblePlacementPolicyProperties,
StatsLogger statsLogger) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.REPP_REGIONS_TO_WRITE;
import static org.apache.bookkeeper.net.CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY;
import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -57,14 +58,15 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
private final AtomicReference<ZooKeeperCache> zkCache = new AtomicReference<>();

@Override
public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient, EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> properties) throws IOException {
return create(conf, zkClient, ensemblePlacementPolicyClass, properties, NullStatsLogger.INSTANCE);
return create(conf, zkClient, eventLoopGroup, ensemblePlacementPolicyClass, properties,
NullStatsLogger.INSTANCE);
}

@Override
public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient, EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> properties, StatsLogger statsLogger) throws IOException {
ClientConfiguration bkConf = createBkClientConfiguration(conf);
Expand All @@ -79,6 +81,7 @@ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
try {
return BookKeeper.forConfig(bkConf)
.allocator(PulsarByteBufAllocator.DEFAULT)
.eventLoopGroup(eventLoopGroup)
.statsLogger(statsLogger)
.build();
} catch (InterruptedException | BKException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -52,7 +53,8 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage {
private StatsProvider statsProvider = new NullStatsProvider();

public void initialize(ServiceConfiguration conf, ZooKeeper zkClient,
BookKeeperClientFactory bookkeeperProvider) throws Exception {
BookKeeperClientFactory bookkeeperProvider,
EventLoopGroup eventLoopGroup) throws Exception {
ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L);
managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark());
Expand All @@ -78,7 +80,7 @@ public void initialize(ServiceConfiguration conf, ZooKeeper zkClient,
statsProvider.start(configuration);
StatsLogger statsLogger = statsProvider.getStatsLogger("pulsar_managedLedger_client");

this.defaultBkClient = bookkeeperProvider.create(conf, zkClient, Optional.empty(), null);
this.defaultBkClient = bookkeeperProvider.create(conf, zkClient, eventLoopGroup, Optional.empty(), null);

BookkeeperFactoryForCustomEnsemblePlacementPolicy bkFactory = (
EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) -> {
Expand All @@ -87,7 +89,7 @@ public void initialize(ServiceConfiguration conf, ZooKeeper zkClient,
if (ensemblePlacementPolicyConfig != null && ensemblePlacementPolicyConfig.getPolicyClass() != null) {
bkClient = bkEnsemblePolicyToBkClientMap.computeIfAbsent(ensemblePlacementPolicyConfig, (key) -> {
try {
return bookkeeperProvider.create(conf, zkClient,
return bookkeeperProvider.create(conf, zkClient, eventLoopGroup,
Optional.ofNullable(ensemblePlacementPolicyConfig.getPolicyClass()),
ensemblePlacementPolicyConfig.getProperties());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
Expand Down Expand Up @@ -111,9 +112,11 @@
import org.apache.pulsar.broker.web.WebService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.configuration.VipStatus;
Expand All @@ -124,6 +127,7 @@
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.apache.pulsar.functions.worker.ErrorNotifier;
Expand Down Expand Up @@ -218,6 +222,7 @@ public class PulsarService implements AutoCloseable {
private ProtocolHandlers protocolHandlers = null;

private final ShutdownService shutdownService;
private final EventLoopGroup ioEventLoopGroup;

private MetricsGenerator metricsGenerator;

Expand Down Expand Up @@ -298,7 +303,10 @@ public PulsarService(ServiceConfiguration config,
} else {
this.transactionReplayExecutor = null;
}
}

this.ioEventLoopGroup = EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(),
new DefaultThreadFactory("pulsar-io"));
}

public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
return MetadataStoreExtended.create(config.getConfigurationStoreServers(),
Expand Down Expand Up @@ -471,6 +479,8 @@ public CompletableFuture<Void> closeAsync() {
transactionReplayExecutor.shutdown();
}

ioEventLoopGroup.shutdownGracefully();

// add timeout handling for closing executors
asyncCloseFutures.add(executorServicesShutdown.handle());

Expand Down Expand Up @@ -615,11 +625,12 @@ public void start() throws PulsarServerException {
this.startZkCacheService();

this.bkClientFactory = newBookKeeperClientFactory();

managedLedgerClientFactory = ManagedLedgerStorage.create(
config, getZkClient(), bkClientFactory
config, getZkClient(), bkClientFactory, ioEventLoopGroup
);

this.brokerService = new BrokerService(this);
this.brokerService = new BrokerService(this, ioEventLoopGroup);

// Start load management service (even if load balancing is disabled)
this.loadManager.set(LoadManager.create(this));
Expand Down Expand Up @@ -1247,32 +1258,35 @@ protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPolicies of
public synchronized PulsarClient getClient() throws PulsarServerException {
if (this.client == null) {
try {
ClientBuilder builder = PulsarClient.builder()
.serviceUrl(this.getConfiguration().isTlsEnabled()
? this.brokerServiceUrlTls : this.brokerServiceUrl)
.enableTls(this.getConfiguration().isTlsEnabled())
.allowTlsInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection())
.tlsTrustCertsFilePath(this.getConfiguration().getTlsCertificateFilePath());
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl(this.getConfiguration().isTlsEnabled()
? this.brokerServiceUrlTls : this.brokerServiceUrl);
conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());
conf.setTlsTrustCertsFilePath(this.getConfiguration().getTlsCertificateFilePath());

if (this.getConfiguration().isBrokerClientTlsEnabled()) {
if (this.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) {
builder.useKeyStoreTls(true)
.tlsTrustStoreType(this.getConfiguration().getBrokerClientTlsTrustStoreType())
.tlsTrustStorePath(this.getConfiguration().getBrokerClientTlsTrustStore())
.tlsTrustStorePassword(this.getConfiguration().getBrokerClientTlsTrustStorePassword());
conf.setUseKeyStoreTls(true);
conf.setTlsTrustStoreType(this.getConfiguration().getBrokerClientTlsTrustStoreType());
conf.setTlsTrustStorePath(this.getConfiguration().getBrokerClientTlsTrustStore());
conf.setTlsTrustStorePassword(this.getConfiguration().getBrokerClientTlsTrustStorePassword());
} else {
builder.tlsTrustCertsFilePath(
conf.setTlsTrustCertsFilePath(
isNotBlank(this.getConfiguration().getBrokerClientTrustCertsFilePath())
? this.getConfiguration().getBrokerClientTrustCertsFilePath()
: this.getConfiguration().getTlsCertificateFilePath());
}
}

if (isNotBlank(this.getConfiguration().getBrokerClientAuthenticationPlugin())) {
builder.authentication(this.getConfiguration().getBrokerClientAuthenticationPlugin(),
this.getConfiguration().getBrokerClientAuthenticationParameters());
conf.setAuthPluginClassName(this.getConfiguration().getBrokerClientAuthenticationPlugin());
conf.setAuthParams(this.getConfiguration().getBrokerClientAuthenticationParameters());
conf.setAuthParamMap(null);
conf.setAuthentication(AuthenticationFactory.create(
this.getConfiguration().getBrokerClientAuthenticationPlugin(),
this.getConfiguration().getBrokerClientAuthenticationParameters()));
}
this.client = builder.build();
this.client = new PulsarClientImpl(conf, ioEventLoopGroup);
} catch (Exception e) {
throw new PulsarServerException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies

private Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;

public BrokerService(PulsarService pulsar) throws Exception {
public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception {
this.pulsar = pulsar;
this.preciseTopicPublishRateLimitingEnable =
pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
Expand All @@ -288,13 +288,10 @@ public BrokerService(PulsarService pulsar) throws Exception {
.numThreads(pulsar.getConfiguration().getNumWorkerThreadsForNonPersistentTopic())
.name("broker-topic-workers").build();
final DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("pulsar-acceptor");
final DefaultThreadFactory workersThreadFactory = new DefaultThreadFactory("pulsar-io");
final int numThreads = pulsar.getConfiguration().getNumIOThreads();
log.info("Using {} threads for broker service IO", numThreads);

this.acceptorGroup = EventLoopUtil.newEventLoopGroup(
pulsar.getConfiguration().getNumAcceptorThreads(), acceptorThreadFactory);
this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, workersThreadFactory);
this.workerGroup = eventLoopGroup;
this.statsUpdater = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-stats-updater"));
this.authorizationService = new AuthorizationService(
Expand Down Expand Up @@ -1792,7 +1789,7 @@ public PulsarService pulsar() {
return pulsar;
}

public ScheduledExecutorService executor() {
public EventLoopGroup executor() {
return workerGroup;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static com.google.common.collect.Lists.newArrayList;
import static com.google.protobuf.ByteString.copyFrom;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry;
import static org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
Expand Down Expand Up @@ -101,6 +100,7 @@ public void start() throws IOException {
this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
pulsar.getConfiguration(),
pulsar.getZkClient(),
pulsar.getIoEventLoopGroup(),
Optional.empty(),
null
);
Expand Down Expand Up @@ -237,9 +237,7 @@ public SchemaVersion versionFromBytes(byte[] version) {

@Override
public void close() throws Exception {
if (nonNull(bookKeeper)) {
bookKeeper.close();
}
// nothing to do
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.storage;

import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
Expand Down Expand Up @@ -45,7 +46,8 @@ public interface ManagedLedgerStorage extends AutoCloseable {
*/
void initialize(ServiceConfiguration conf,
ZooKeeper zkClient,
BookKeeperClientFactory bookkeperProvider) throws Exception;
BookKeeperClientFactory bookkeperProvider,
EventLoopGroup eventLoopGroup) throws Exception;

/**
* Return the factory to create {@link ManagedLedgerFactory}.
Expand Down Expand Up @@ -85,10 +87,11 @@ void initialize(ServiceConfiguration conf,
*/
static ManagedLedgerStorage create(ServiceConfiguration conf,
ZooKeeper zkClient,
BookKeeperClientFactory bkProvider) throws Exception {
BookKeeperClientFactory bkProvider,
EventLoopGroup eventLoopGroup) throws Exception {
final Class<?> storageClass = Class.forName(conf.getManagedLedgerStorageClassName());
final ManagedLedgerStorage storage = (ManagedLedgerStorage) storageClass.newInstance();
storage.initialize(conf, zkClient, bkProvider);
storage.initialize(conf, zkClient, bkProvider, eventLoopGroup);
return storage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.nio.file.Paths;
import java.util.Optional;
import java.util.concurrent.Executors;
Expand All @@ -37,6 +39,7 @@
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
import org.apache.zookeeper.ZooKeeper;
Expand Down Expand Up @@ -123,7 +126,9 @@ public static void main(String[] args) throws Exception {
ZooKeeperClientFactory.SessionType.ReadWrite,
(int) brokerConfig.getZooKeeperSessionTimeoutMillis()).get();
BookKeeperClientFactory bkClientFactory = new BookKeeperClientFactoryImpl();
BookKeeper bk = bkClientFactory.create(brokerConfig, zk, Optional.empty(), null);

EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("compactor-io"));
BookKeeper bk = bkClientFactory.create(brokerConfig, zk, eventLoopGroup, Optional.empty(), null);
try (PulsarClient pulsar = clientBuilder.build()) {
Compactor compactor = new TwoPhaseCompactor(brokerConfig, pulsar, bk, scheduler);
long ledgerId = compactor.compact(arguments.topic).get();
Expand All @@ -134,6 +139,7 @@ public static void main(String[] args) throws Exception {
zk.close();
scheduler.shutdownNow();
executor.shutdown();
eventLoopGroup.shutdownGracefully();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.EventLoopGroup;

import java.io.IOException;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -55,14 +56,14 @@ public MockedBookKeeperClientFactory() {
}

@Override
public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient, EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> properties) throws IOException {
return mockedBk;
}

@Override
public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient, EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> properties, StatsLogger statsLogger) throws IOException {
return mockedBk;
Expand Down
Loading

0 comments on commit b58e135

Please sign in to comment.