Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 5935] Support multi pulsar clusters to use the same bk cluster #5985

Merged
merged 5 commits into from Apr 21, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions conf/broker.conf
Expand Up @@ -478,6 +478,11 @@ httpMaxRequestSize=-1

### --- BookKeeper Client --- ###

# Metadata service uri that bookkeeper is used for loading corresponding metadata driver
# and resolving its metadata service location.
# For example: zk+hierarchical://localhost:2181/ledgers
bookkeeperMetadataServiceUri=

# Authentication plugin to use when connecting to bookies
bookkeeperClientAuthenticationPlugin=

Expand Down
Expand Up @@ -816,6 +816,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
private String kinitCommand = "/usr/bin/kinit";

/**** --- BookKeeper Client --- ****/
@FieldContext(
category = CATEGORY_STORAGE_BK,
doc = "Metadata service uri that bookkeeper is used for loading corresponding metadata driver"
+ " and resolving its metadata service location"
)
private String bookkeeperMetadataServiceUri;
@FieldContext(
category = CATEGORY_STORAGE_BK,
doc = "Authentication plugin to use when connecting to bookies"
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
Expand Down Expand Up @@ -66,7 +67,6 @@ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
try {
return BookKeeper.forConfig(bkConf)
.allocator(PulsarByteBufAllocator.DEFAULT)
.zk(zkClient)
.build();
} catch (InterruptedException | BKException e) {
throw new IOException(e);
Expand Down Expand Up @@ -106,6 +106,13 @@ ClientConfiguration createBkClientConfiguration(ServiceConfiguration conf) {
bkConf.setNettyMaxFrameSizeBytes(conf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING);
bkConf.setDiskWeightBasedPlacementEnabled(conf.isBookkeeperDiskWeightBasedPlacementEnabled());

if (StringUtils.isNotBlank(conf.getBookkeeperMetadataServiceUri())) {
bkConf.setMetadataServiceUri(conf.getBookkeeperMetadataServiceUri());
} else {
String metadataServiceUri = PulsarService.bookieMetadataServiceUri(conf);
bkConf.setMetadataServiceUri(metadataServiceUri);
}

if (conf.isBookkeeperClientHealthCheckEnabled()) {
bkConf.enableBookieHealthCheck();
bkConf.setBookieHealthCheckInterval(conf.getBookkeeperHealthCheckIntervalSec(), TimeUnit.SECONDS);
Expand Down
Expand Up @@ -59,7 +59,7 @@ public ManagedLedgerClientFactory(ServiceConfiguration conf, ZooKeeper zkClient,
managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());

this.defaultBkClient = bookkeeperProvider.create(conf, zkClient, Optional.empty(), null);

BookkeeperFactoryForCustomEnsemblePlacementPolicy bkFactory = (
EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) -> {
BookKeeper bkClient = null;
Expand Down
Expand Up @@ -66,6 +66,7 @@
import org.apache.bookkeeper.mledger.offload.OffloaderUtils;
import org.apache.bookkeeper.mledger.offload.Offloaders;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.pulsar.PulsarVersion;
Expand Down Expand Up @@ -721,6 +722,28 @@ public ZooKeeper getZkClient() {
return this.localZooKeeperConnectionProvider.getLocalZooKeeper();
}

/**
* Get default bookkeeper metadata service uri.
*/
public String getMetadataServiceUri() {
return bookieMetadataServiceUri(this.getConfiguration());
}

public InternalConfigurationData getInternalConfigurationData() {

String metadataServiceUri = getMetadataServiceUri();

if (StringUtils.isNotBlank(config.getBookkeeperMetadataServiceUri())) {
metadataServiceUri = this.getConfiguration().getBookkeeperMetadataServiceUri();
}

return new InternalConfigurationData(
this.getConfiguration().getZookeeperServers(),
this.getConfiguration().getConfigurationStoreServers(),
metadataServiceUri,
this.getWorkerConfig().map(wc -> wc.getStateStorageServiceUrl()).orElse(null));
}

public ConfigurationCacheService getConfigurationCache() {
return configurationCacheService;
}
Expand Down Expand Up @@ -1064,6 +1087,27 @@ public String getSafeBrokerServiceUrl() {
return brokerServiceUrl != null ? brokerServiceUrl : brokerServiceUrlTls;
}

/**
* Get bookkeeper metadata service uri.
*
* @param config broker configuration
* @return the metadata service uri that bookkeeper is used
*/
public static String bookieMetadataServiceUri(ServiceConfiguration config) {
ClientConfiguration bkConf = new ClientConfiguration();
// init bookkeeper metadata service uri
String metadataServiceUri = null;
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should add this logic here. We should reply on the logic that bookkeeper already provides. You can do this by using the following code snippet.

ClientConfiguration bkConf = new ClientConfiguration();
bkConf.setZkServers(config.getZooKeeperServers());
return bkConf.getMetadataServiceUri();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

String zkServers = config.getZookeeperServers();
String ledgerManagerType = bkConf.getLedgerManagerLayoutStringFromFactoryClass();
metadataServiceUri = String.format("zk+%s://%s%s", ledgerManagerType,
zkServers.replace(",", ";"), "/ledgers");
} catch (ConfigurationException e) {
LOG.error("Failed to get bookkeeper metadata service uri", e);
}
return metadataServiceUri;
}

private void startWorkerService(AuthenticationService authenticationService,
AuthorizationService authorizationService)
throws InterruptedException, IOException, KeeperException {
Expand Down Expand Up @@ -1153,21 +1197,15 @@ private void startWorkerService(AuthenticationService authenticationService,
throw e;
}

InternalConfigurationData internalConf = new InternalConfigurationData(
this.getConfiguration().getZookeeperServers(),
this.getConfiguration().getConfigurationStoreServers(),
new ClientConfiguration().getZkLedgersRootPath(),
this.getWorkerConfig().map(wc -> wc.getStateStorageServiceUrl()).orElse(null));
InternalConfigurationData internalConf = this.getInternalConfigurationData();

URI dlogURI;
try {
// initializing dlog namespace for function worker
dlogURI = WorkerUtils.initializeDlogNamespace(
internalConf.getZookeeperServers(),
internalConf.getLedgersRootPath());
dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
} catch (IOException ioe) {
LOG.error("Failed to initialize dlog namespace at zookeeper {} for storing function packages",
internalConf.getZookeeperServers(), ioe);
LOG.error("Failed to initialize dlog namespace with zookeeper {} at at metadata service uri {} for storing function packages",
internalConf.getZookeeperServers(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
throw ioe;
}
LOG.info("Function worker service setup completed");
Expand Down
Expand Up @@ -239,12 +239,7 @@ private synchronized void updateDynamicConfigurationOnZk(String configName, Stri
@Path("/internal-configuration")
@ApiOperation(value = "Get the internal configuration data", response = InternalConfigurationData.class)
public InternalConfigurationData getInternalConfigurationData() {
ClientConfiguration conf = new ClientConfiguration();
return new InternalConfigurationData(
pulsar().getConfiguration().getZookeeperServers(),
pulsar().getConfiguration().getConfigurationStoreServers(),
conf.getZkLedgersRootPath(),
pulsar().getWorkerConfig().map(wc -> wc.getStateStorageServiceUrl()).orElse(null));
return pulsar().getInternalConfigurationData();
}

@GET
Expand Down
Expand Up @@ -28,10 +28,12 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.CachedDNSToSwitchMapping;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.zookeeper.ZooKeeper;
Expand Down Expand Up @@ -154,6 +156,7 @@ public void testSetDefaultEnsemblePlacementPolicyRackAwareEnabledChangedValues()
public void testSetDiskWeightBasedPlacementEnabled() {
BookKeeperClientFactoryImpl factory = new BookKeeperClientFactoryImpl();
ServiceConfiguration conf = new ServiceConfiguration();
conf.setZookeeperServers("localhost:2181");
assertFalse(factory.createBkClientConfiguration(conf).getDiskWeightBasedPlacementEnabled());
conf.setBookkeeperDiskWeightBasedPlacementEnabled(true);
assertTrue(factory.createBkClientConfiguration(conf).getDiskWeightBasedPlacementEnabled());
Expand All @@ -163,9 +166,27 @@ public void testSetDiskWeightBasedPlacementEnabled() {
public void testSetExplicitLacInterval() {
BookKeeperClientFactoryImpl factory = new BookKeeperClientFactoryImpl();
ServiceConfiguration conf = new ServiceConfiguration();
conf.setZookeeperServers("localhost:2181");
assertEquals(factory.createBkClientConfiguration(conf).getExplictLacInterval(), 0);
conf.setBookkeeperExplicitLacIntervalInMills(5);
assertEquals(factory.createBkClientConfiguration(conf).getExplictLacInterval(), 5);
}

@Test
public void testSetMetadataServiceUri() {
BookKeeperClientFactoryImpl factory = new BookKeeperClientFactoryImpl();
ServiceConfiguration conf = new ServiceConfiguration();
conf.setZookeeperServers("localhost:2181");
try {
String defaultUri = "zk+null://localhost:2181/ledgers";
assertEquals(factory.createBkClientConfiguration(conf).getMetadataServiceUri(), defaultUri);
String expectedUri = "zk+hierarchical://localhost:2181/chroot/ledgers";
conf.setBookkeeperMetadataServiceUri(expectedUri);
assertEquals(factory.createBkClientConfiguration(conf).getMetadataServiceUri(), expectedUri);
} catch (ConfigurationException e) {
e.printStackTrace();
fail("Get metadata service uri should be successful", e);
}
}

}
Expand Up @@ -220,7 +220,7 @@ void internalConfiguration() throws Exception {
InternalConfigurationData expectedData = new InternalConfigurationData(
pulsar.getConfiguration().getZookeeperServers(),
pulsar.getConfiguration().getConfigurationStoreServers(),
new ClientConfiguration().getZkLedgersRootPath(),
pulsar.getMetadataServiceUri(),
pulsar.getWorkerConfig().map(wc -> wc.getStateStorageServiceUrl()).orElse(null));

assertEquals(brokers.getInternalConfigurationData(), expectedData);
Expand Down
Expand Up @@ -28,19 +28,19 @@ public class InternalConfigurationData {

private String zookeeperServers;
private String configurationStoreServers;
private String ledgersRootPath;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for backward compatibility, you shouldn't change the field directly.

You can add a new field bookkeeperMetadataServiceUri and deprecate the old field ledgersRootPath .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out, done.

private String bookkeeperMetadataServiceUri;
private String stateStorageServiceUrl;

public InternalConfigurationData() {
}

public InternalConfigurationData(String zookeeperServers,
String configurationStoreServers,
String ledgersRootPath,
String bookkeeperMetadataServiceUri,
String stateStorageServiceUrl) {
this.zookeeperServers = zookeeperServers;
this.configurationStoreServers = configurationStoreServers;
this.ledgersRootPath = ledgersRootPath;
this.bookkeeperMetadataServiceUri = bookkeeperMetadataServiceUri;
this.stateStorageServiceUrl = stateStorageServiceUrl;
}

Expand All @@ -52,8 +52,8 @@ public String getConfigurationStoreServers() {
return configurationStoreServers;
}

public String getLedgersRootPath() {
return ledgersRootPath;
public String getBookkeeperMetadataServiceUri() {
return bookkeeperMetadataServiceUri;
}

public String getStateStorageServiceUrl() {
Expand All @@ -68,21 +68,24 @@ public boolean equals(Object obj) {
InternalConfigurationData other = (InternalConfigurationData) obj;
return Objects.equals(zookeeperServers, other.zookeeperServers)
&& Objects.equals(configurationStoreServers, other.configurationStoreServers)
&& Objects.equals(ledgersRootPath, other.ledgersRootPath)
&& Objects.equals(bookkeeperMetadataServiceUri, other.bookkeeperMetadataServiceUri)
&& Objects.equals(stateStorageServiceUrl, other.stateStorageServiceUrl);
}

@Override
public int hashCode() {
return Objects.hash(zookeeperServers, configurationStoreServers, ledgersRootPath, stateStorageServiceUrl);
return Objects.hash(zookeeperServers,
configurationStoreServers,
bookkeeperMetadataServiceUri,
stateStorageServiceUrl);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("zookeeperServers", zookeeperServers)
.add("configurationStoreServers", configurationStoreServers)
.add("ledgersRootPath", ledgersRootPath)
.add("bookkeeperMetadataServiceUri", bookkeeperMetadataServiceUri)
.add("stateStorageServiceUrl", stateStorageServiceUrl)
.toString();
}
Expand Down
Expand Up @@ -143,12 +143,10 @@ private static URI initialize(WorkerConfig workerConfig)
// initialize the dlog namespace
// TODO: move this as part of pulsar cluster initialization later
try {
return WorkerUtils.initializeDlogNamespace(
internalConf.getZookeeperServers(),
internalConf.getLedgersRootPath());
return WorkerUtils.initializeDlogNamespace(internalConf);
} catch (IOException ioe) {
log.error("Failed to initialize dlog namespace at zookeeper {} for storing function packages",
internalConf.getZookeeperServers(), ioe);
log.error("Failed to initialize dlog namespace with zookeeper {} at metadata service uri {} for storing function packages",
internalConf.getZookeeperServers(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
throw ioe;
}
}
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
Expand Down Expand Up @@ -146,10 +147,14 @@ public static DistributedLogConfiguration getDlogConf(WorkerConfig workerConfig)
return conf;
}

public static URI initializeDlogNamespace(String zkServers, String ledgersRootPath) throws IOException {
BKDLConfig dlConfig = new BKDLConfig(zkServers, ledgersRootPath);
public static URI initializeDlogNamespace(InternalConfigurationData internalConf) throws IOException {
String zookeeperServers = internalConf.getZookeeperServers();
URI metadataServiceUri = URI.create(internalConf.getBookkeeperMetadataServiceUri());
String ledgersStoreServers = metadataServiceUri.getAuthority().replace(";", ",");
String ledgersRootPath = metadataServiceUri.getPath();
BKDLConfig dlConfig = new BKDLConfig(ledgersStoreServers, ledgersRootPath);
DLMetadata dlMetadata = DLMetadata.create(dlConfig);
URI dlogUri = URI.create(String.format("distributedlog://%s/pulsar/functions", zkServers));
URI dlogUri = URI.create(String.format("distributedlog://%s/pulsar/functions", zookeeperServers));

try {
dlMetadata.create(dlogUri);
Expand Down
1 change: 1 addition & 0 deletions site2/docs/reference-configuration.md
Expand Up @@ -168,6 +168,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|brokerClientAuthenticationParameters|||
|athenzDomainNames| Supported Athenz provider domain names(comma separated) for authentication ||
|exposePreciseBacklogInPrometheus| Enable expose the precise backlog stats, set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate. |false|
|bookkeeperMetadataServiceUri| Metadata service uri that bookkeeper is used for loading corresponding metadata driver and resolving its metadata service location. For example: zk+hierarchical://localhost:2181/ledgers ||
|bookkeeperClientAuthenticationPlugin| Authentication plugin to use when connecting to bookies ||
|bookkeeperClientAuthenticationParametersName| BookKeeper auth plugin implementatation specifics parameters name and values ||
|bookkeeperClientAuthenticationParameters|||
Expand Down