Skip to content

Commit

Permalink
Use metadata service uri to init bookkeeper client
Browse files Browse the repository at this point in the history
  • Loading branch information
murong00 committed Jan 19, 2020
1 parent c45aa69 commit f9acd25
Show file tree
Hide file tree
Showing 19 changed files with 107 additions and 137 deletions.
11 changes: 3 additions & 8 deletions conf/broker.conf
Expand Up @@ -430,14 +430,9 @@ saslJaasBrokerSectionName=

### --- BookKeeper Client --- ###

# BookKeeper ledger storage connection string when using a separated BookKeeper cluster.
# If not set it will use local zookeeper quorum of Pulsar cluster.
bookkeeperLedgersStore=

# Root Zookeeper path to store ledger metadata
# This parameter is used by zookeeper-based ledger manager as a root znode to
# store all ledgers.
bookkeeperLedgersRootPath=/ledgers
# Metadata service uri that bookkeeper is used for loading corresponding metadata driver
# and resolving its metadata service location.
bookkeeperServiceUri=

# Authentication plugin to use when connecting to bookies
bookkeeperClientAuthenticationPlugin=
Expand Down
Expand Up @@ -745,16 +745,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
/**** --- BookKeeper Client --- ****/
@FieldContext(
category = CATEGORY_STORAGE_BK,
doc = "BookKeeper ledgers storage connection string when using a separated BookKeeper cluster."
+ " If not set it will use local zookeeper quorum of Pulsar cluster"
doc = "Metadata service uri that bookkeeper is used for loading corresponding metadata driver"
+ " and resolving its metadata service location"
)
private String bookkeeperLedgersStore;
@FieldContext(
category = CATEGORY_STORAGE_BK,
doc = "Root Zookeeper path to store ledger metadata. This parameter is used by zookeeper-based "
+ "ledger manager as a root znode to store all ledgers"
)
private String bookkeeperLedgersRootPath = "/ledgers";
private String bookkeeperServiceUri;
@FieldContext(
category = CATEGORY_STORAGE_BK,
doc = "Authentication plugin to use when connecting to bookies"
Expand Down
Expand Up @@ -170,11 +170,16 @@ public static void main(String[] args) throws Exception {
ZooKeeper localZk = initZk(arguments.zookeeper, arguments.zkSessionTimeoutMillis);
ZooKeeper configStoreZk = initZk(arguments.configurationStore, arguments.zkSessionTimeoutMillis);

// Format BookKeeper stream storage metadata
// Format BookKeeper ledger storage metadata
ServerConfiguration bkConf = new ServerConfiguration();
bkConf.setZkServers(arguments.zookeeper);
bkConf.setZkTimeout(arguments.zkSessionTimeoutMillis);
if (localZk.exists("/ledgers", false) == null // only format if /ledgers doesn't exist
&& !BookKeeperAdmin.format(bkConf, false /* interactive */, false /* force */)) {
throw new IOException("Failed to initialize BookKeeper metadata");
}

// Format BookKeeper stream storage metadata
if (arguments.numStreamStorageContainers > 0) {
ServiceURI bkMetadataServiceUri = ServiceURI.create(bkConf.getMetadataServiceUri());
ClusterInitializer initializer = new ZkClusterInitializer(arguments.zookeeper);
Expand Down
Expand Up @@ -31,7 +31,7 @@
* Provider of a new BookKeeper client instance
*/
public interface BookKeeperClientFactory {
BookKeeper create(PulsarService pulsar,
BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> ensemblePlacementPolicyProperties) throws IOException;

Expand Down
Expand Up @@ -53,28 +53,20 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
private final AtomicReference<ZooKeeperCache> zkCache = new AtomicReference<>();

@Override
public BookKeeper create(PulsarService pulsar,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> properties) throws IOException {
ServiceConfiguration conf = pulsar.getConfiguration();
public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass, Map<String, Object> properties) throws IOException {
ClientConfiguration bkConf = createBkClientConfiguration(conf);
if (properties != null) {
properties.forEach((key, value) -> bkConf.setProperty(key, value));
}

// Choose which zookeeper client to use
ZooKeeper pulsarZkClient = pulsar.getZkClient();
ZooKeeper ledgerZkClient = pulsar.isLedgerSeparated() ? pulsar.getLedgersZkClient() : pulsarZkClient;

if (ensemblePlacementPolicyClass.isPresent()) {
setEnsemblePlacementPolicy(bkConf, conf, pulsarZkClient, ensemblePlacementPolicyClass.get());
setEnsemblePlacementPolicy(bkConf, conf, zkClient, ensemblePlacementPolicyClass.get());
} else {
setDefaultEnsemblePlacementPolicy(rackawarePolicyZkCache, clientIsolationZkCache, bkConf, conf, pulsarZkClient);
setDefaultEnsemblePlacementPolicy(rackawarePolicyZkCache, clientIsolationZkCache, bkConf, conf, zkClient);
}
try {
return BookKeeper.forConfig(bkConf)
.allocator(PulsarByteBufAllocator.DEFAULT)
.zk(ledgerZkClient)
.build();
} catch (InterruptedException | BKException e) {
throw new IOException(e);
Expand Down Expand Up @@ -114,9 +106,12 @@ ClientConfiguration createBkClientConfiguration(ServiceConfiguration conf) {
bkConf.setNettyMaxFrameSizeBytes(conf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING);
bkConf.setDiskWeightBasedPlacementEnabled(conf.isBookkeeperDiskWeightBasedPlacementEnabled());

if (StringUtils.isNotBlank(conf.getBookkeeperLedgersStore())) {
bkConf.setZkServers(conf.getBookkeeperLedgersStore());
bkConf.setZkLedgersRootPath(conf.getBookkeeperLedgersRootPath());
if (StringUtils.isNotBlank(conf.getBookkeeperServiceUri())) {
bkConf.setMetadataServiceUri(conf.getBookkeeperServiceUri());
} else {
PulsarService pulsar = new PulsarService(conf);
String metadataServiceUri = pulsar.getMetadataServiceUri();
bkConf.setMetadataServiceUri(metadataServiceUri);
}

if (conf.isBookkeeperClientHealthCheckEnabled()) {
Expand Down
Expand Up @@ -30,6 +30,8 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.BookkeeperFactoryForCustomEnsemblePlacementPolicy;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,8 +46,8 @@ public class ManagedLedgerClientFactory implements Closeable {
private final BookKeeper defaultBkClient;
private final Map<EnsemblePlacementPolicyConfig, BookKeeper> bkEnsemblePolicyToBkClientMap = Maps.newConcurrentMap();

public ManagedLedgerClientFactory(BookKeeperClientFactory bookkeeperProvider, PulsarService pulsar) throws Exception {
ServiceConfiguration conf = pulsar.getConfiguration();
public ManagedLedgerClientFactory(ServiceConfiguration conf, ZooKeeper zkClient,
BookKeeperClientFactory bookkeeperProvider) throws Exception {
ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L);
managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark());
Expand All @@ -56,30 +58,30 @@ public ManagedLedgerClientFactory(BookKeeperClientFactory bookkeeperProvider, Pu
managedLedgerFactoryConfig.setThresholdBackloggedCursor(conf.getManagedLedgerCursorBackloggedThreshold());
managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());

this.defaultBkClient = bookkeeperProvider.create(pulsar, Optional.empty(), null);
this.defaultBkClient = bookkeeperProvider.create(conf, zkClient, Optional.empty(), null);

BookkeeperFactoryForCustomEnsemblePlacementPolicy bkFactory = (
EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) -> {
EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) -> {
BookKeeper bkClient = null;
// find or create bk-client in cache for a specific ensemblePlacementPolicy
if (ensemblePlacementPolicyConfig != null && ensemblePlacementPolicyConfig.getPolicyClass() != null) {
bkClient = bkEnsemblePolicyToBkClientMap.computeIfAbsent(ensemblePlacementPolicyConfig, (key) -> {
try {
return bookkeeperProvider.create(pulsar,
Optional.ofNullable(ensemblePlacementPolicyConfig.getPolicyClass()),
ensemblePlacementPolicyConfig.getProperties());
return bookkeeperProvider.create(conf, zkClient,
Optional.ofNullable(ensemblePlacementPolicyConfig.getPolicyClass()),
ensemblePlacementPolicyConfig.getProperties());
} catch (Exception e) {
log.error("Failed to initialize bk-client for policy {}, properties {}",
ensemblePlacementPolicyConfig.getPolicyClass(),
ensemblePlacementPolicyConfig.getProperties(), e);
ensemblePlacementPolicyConfig.getPolicyClass(),
ensemblePlacementPolicyConfig.getProperties(), e);
}
return this.defaultBkClient;
});
}
return bkClient != null ? bkClient : defaultBkClient;
};

this.managedLedgerFactory = new ManagedLedgerFactoryImpl(bkFactory, pulsar.getZkClient(), managedLedgerFactoryConfig);
this.managedLedgerFactory = new ManagedLedgerFactoryImpl(bkFactory, zkClient, managedLedgerFactoryConfig);
}

public ManagedLedgerFactory getManagedLedgerFactory() {
Expand Down
Expand Up @@ -53,18 +53,17 @@
import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.offload.OffloaderUtils;
import org.apache.bookkeeper.mledger.offload.Offloaders;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.pulsar.PulsarVersion;
Expand Down Expand Up @@ -152,7 +151,6 @@ public class PulsarService implements AutoCloseable {
private ZooKeeperCache localZkCache;
private GlobalZooKeeperCache globalZkCache;
private LocalZooKeeperConnectionService localZooKeeperConnectionProvider;
private LocalZooKeeperConnectionService ledgerZooKeeperConnectionProvider = null;
private Compactor compactor;

private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(20,
Expand Down Expand Up @@ -259,14 +257,8 @@ public void close() throws PulsarServerException {
if (globalZkCache != null) {
globalZkCache.close();
globalZkCache = null;
if (localZooKeeperConnectionProvider != null) {
localZooKeeperConnectionProvider.close();
localZooKeeperConnectionProvider = null;
}
if (ledgerZooKeeperConnectionProvider != null) {
ledgerZooKeeperConnectionProvider.close();
ledgerZooKeeperConnectionProvider = null;
}
localZooKeeperConnectionProvider.close();
localZooKeeperConnectionProvider = null;
}

configurationCacheService = null;
Expand Down Expand Up @@ -399,23 +391,7 @@ public void start() throws PulsarServerException {
this.startZkCacheService();

this.bkClientFactory = newBookKeeperClientFactory();

// Initialize BookKeeper ledger storage metadata
if (!isLedgerSeparated()) {
ServerConfiguration bkConf = new ServerConfiguration();
bkConf.setZkServers(config.getZookeeperServers());
bkConf.setZkTimeout(new Long(config.getZooKeeperSessionTimeoutMillis()).intValue());
if (getZkClient().exists("/ledgers", false) == null // only format if /ledgers doesn't exist
&& !BookKeeperAdmin.format(bkConf, false /* interactive */, false /* force */)) {
throw new IOException("Failed to initialize BookKeeper metadata");
}
} else {
ledgerZooKeeperConnectionProvider = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(),
config.getBookkeeperLedgersStore(), config.getZooKeeperSessionTimeoutMillis());
ledgerZooKeeperConnectionProvider.start(shutdownService);
}

managedLedgerClientFactory = new ManagedLedgerClientFactory(bkClientFactory, this);
managedLedgerClientFactory = new ManagedLedgerClientFactory(config, getZkClient(), bkClientFactory);

this.brokerService = new BrokerService(this);

Expand Down Expand Up @@ -728,32 +704,33 @@ public ZooKeeper getZkClient() {
return this.localZooKeeperConnectionProvider.getLocalZooKeeper();
}

public boolean isLedgerSeparated() {
boolean separated = false;
if (StringUtils.isNotBlank(config.getBookkeeperLedgersStore())) {
separated = true;
public String getMetadataServiceUri() {
ClientConfiguration bkConf = new ClientConfiguration();
// init bookkeeper metadata service uri
String metadataServiceUri = null;
try {
String zkServers = this.getConfiguration().getZookeeperServers();
String ledgerManagerType = bkConf.getLedgerManagerLayoutStringFromFactoryClass();
metadataServiceUri = String.format("zk+%s://%s%s", ledgerManagerType,
zkServers.replace(",", ";"), "/ledgers");
} catch (ConfigurationException e) {
LOG.error("Failed to set bookkeeper metadata service uri", e);
}
return separated;
}

public ZooKeeper getLedgersZkClient() throws IOException {
return this.ledgerZooKeeperConnectionProvider.getLocalZooKeeper();
return metadataServiceUri;
}

public InternalConfigurationData getInternalConfigurationData() {
ClientConfiguration bkConf = new ClientConfiguration();
String zookeeperServers = this.getConfiguration().getZookeeperServers();
String zkLedgersServers = zookeeperServers;
String zkLedgersRootPath = bkConf.getZkLedgersRootPath();
if (isLedgerSeparated()) {
zkLedgersServers = this.getConfiguration().getBookkeeperLedgersStore();
zkLedgersRootPath = this.getConfiguration().getBookkeeperLedgersRootPath();

String metadataServiceUri = getMetadataServiceUri();

if (StringUtils.isNotBlank(config.getBookkeeperServiceUri())) {
metadataServiceUri = this.getConfiguration().getBookkeeperServiceUri();
}

return new InternalConfigurationData(
zookeeperServers,
this.getConfiguration().getZookeeperServers(),
this.getConfiguration().getConfigurationStoreServers(),
zkLedgersServers,
zkLedgersRootPath,
metadataServiceUri,
this.getWorkerConfig().map(wc -> wc.getStateStorageServiceUrl()).orElse(null));
}

Expand Down Expand Up @@ -1147,8 +1124,8 @@ private void startWorkerService(AuthenticationService authenticationService,
// initializing dlog namespace for function worker
dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
} catch (IOException ioe) {
LOG.error("Failed to initialize dlog namespace with zookeeper {} at ledgers store {} for storing function packages",
internalConf.getZookeeperServers(), internalConf.getLedgersStoreServers(), ioe);
LOG.error("Failed to initialize dlog namespace with zookeeper {} at at metadata service uri {} for storing function packages",
internalConf.getZookeeperServers(), internalConf.getMetadataServiceUri(), ioe);
throw ioe;
}
LOG.info("Function worker service setup completed");
Expand Down
Expand Up @@ -99,7 +99,8 @@ public void init() throws KeeperException, InterruptedException {
@Override
public void start() throws IOException {
this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
pulsar,
pulsar.getConfiguration(),
pulsar.getZkClient(),
Optional.empty(),
null
);
Expand Down

0 comments on commit f9acd25

Please sign in to comment.