Skip to content

Commit

Permalink
Rename var names and add a test
Browse files Browse the repository at this point in the history
  • Loading branch information
murong00 committed Apr 3, 2020
1 parent fa97765 commit e2d7c4b
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 18 deletions.
3 changes: 2 additions & 1 deletion conf/broker.conf
Expand Up @@ -473,7 +473,8 @@ httpMaxRequestSize=-1

# Metadata service uri that bookkeeper is used for loading corresponding metadata driver
# and resolving its metadata service location.
bookkeeperServiceUri=
# For example: zk+hierarchical://localhost:2181/ledgers
bookkeeperMetadataServiceUri=

# Authentication plugin to use when connecting to bookies
bookkeeperClientAuthenticationPlugin=
Expand Down
Expand Up @@ -810,7 +810,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Metadata service uri that bookkeeper is used for loading corresponding metadata driver"
+ " and resolving its metadata service location"
)
private String bookkeeperServiceUri;
private String bookkeeperMetadataServiceUri;
@FieldContext(
category = CATEGORY_STORAGE_BK,
doc = "Authentication plugin to use when connecting to bookies"
Expand Down
Expand Up @@ -106,8 +106,8 @@ ClientConfiguration createBkClientConfiguration(ServiceConfiguration conf) {
bkConf.setNettyMaxFrameSizeBytes(conf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING);
bkConf.setDiskWeightBasedPlacementEnabled(conf.isBookkeeperDiskWeightBasedPlacementEnabled());

if (StringUtils.isNotBlank(conf.getBookkeeperServiceUri())) {
bkConf.setMetadataServiceUri(conf.getBookkeeperServiceUri());
if (StringUtils.isNotBlank(conf.getBookkeeperMetadataServiceUri())) {
bkConf.setMetadataServiceUri(conf.getBookkeeperMetadataServiceUri());
} else {
String metadataServiceUri = PulsarService.bookieMetadataServiceUri(conf);
bkConf.setMetadataServiceUri(metadataServiceUri);
Expand Down
Expand Up @@ -733,8 +733,8 @@ public InternalConfigurationData getInternalConfigurationData() {

String metadataServiceUri = getMetadataServiceUri();

if (StringUtils.isNotBlank(config.getBookkeeperServiceUri())) {
metadataServiceUri = this.getConfiguration().getBookkeeperServiceUri();
if (StringUtils.isNotBlank(config.getBookkeeperMetadataServiceUri())) {
metadataServiceUri = this.getConfiguration().getBookkeeperMetadataServiceUri();
}

return new InternalConfigurationData(
Expand Down Expand Up @@ -1205,7 +1205,7 @@ private void startWorkerService(AuthenticationService authenticationService,
dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
} catch (IOException ioe) {
LOG.error("Failed to initialize dlog namespace with zookeeper {} at at metadata service uri {} for storing function packages",
internalConf.getZookeeperServers(), internalConf.getMetadataServiceUri(), ioe);
internalConf.getZookeeperServers(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
throw ioe;
}
LOG.info("Function worker service setup completed");
Expand Down
Expand Up @@ -28,10 +28,12 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.CachedDNSToSwitchMapping;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.zookeeper.ZooKeeper;
Expand Down Expand Up @@ -170,4 +172,21 @@ public void testSetExplicitLacInterval() {
assertEquals(factory.createBkClientConfiguration(conf).getExplictLacInterval(), 5);
}

@Test
public void testSetMetadataServiceUri() {
BookKeeperClientFactoryImpl factory = new BookKeeperClientFactoryImpl();
ServiceConfiguration conf = new ServiceConfiguration();
conf.setZookeeperServers("localhost:2181");
try {
String defaultUri = "zk+null://localhost:2181/ledgers";
assertEquals(factory.createBkClientConfiguration(conf).getMetadataServiceUri(), defaultUri);
String expectedUri = "zk+hierarchical://localhost:2181/chroot/ledgers";
conf.setBookkeeperMetadataServiceUri(expectedUri);
assertEquals(factory.createBkClientConfiguration(conf).getMetadataServiceUri(), expectedUri);
} catch (ConfigurationException e) {
e.printStackTrace();
fail("Get metadata service uri should be successful", e);
}
}

}
Expand Up @@ -28,19 +28,19 @@ public class InternalConfigurationData {

private String zookeeperServers;
private String configurationStoreServers;
private String metadataServiceUri;
private String bookkeeperMetadataServiceUri;
private String stateStorageServiceUrl;

public InternalConfigurationData() {
}

public InternalConfigurationData(String zookeeperServers,
String configurationStoreServers,
String metadataServiceUri,
String bookkeeperMetadataServiceUri,
String stateStorageServiceUrl) {
this.zookeeperServers = zookeeperServers;
this.configurationStoreServers = configurationStoreServers;
this.metadataServiceUri = metadataServiceUri;
this.bookkeeperMetadataServiceUri = bookkeeperMetadataServiceUri;
this.stateStorageServiceUrl = stateStorageServiceUrl;
}

Expand All @@ -52,8 +52,8 @@ public String getConfigurationStoreServers() {
return configurationStoreServers;
}

public String getMetadataServiceUri() {
return metadataServiceUri;
public String getBookkeeperMetadataServiceUri() {
return bookkeeperMetadataServiceUri;
}

public String getStateStorageServiceUrl() {
Expand All @@ -68,21 +68,21 @@ public boolean equals(Object obj) {
InternalConfigurationData other = (InternalConfigurationData) obj;
return Objects.equals(zookeeperServers, other.zookeeperServers)
&& Objects.equals(configurationStoreServers, other.configurationStoreServers)
&& Objects.equals(metadataServiceUri, other.metadataServiceUri)
&& Objects.equals(bookkeeperMetadataServiceUri, other.bookkeeperMetadataServiceUri)
&& Objects.equals(stateStorageServiceUrl, other.stateStorageServiceUrl);
}

@Override
public int hashCode() {
return Objects.hash(zookeeperServers, configurationStoreServers, metadataServiceUri, stateStorageServiceUrl);
return Objects.hash(zookeeperServers, configurationStoreServers, bookkeeperMetadataServiceUri, stateStorageServiceUrl);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("zookeeperServers", zookeeperServers)
.add("configurationStoreServers", configurationStoreServers)
.add("metadataServiceUri", metadataServiceUri)
.add("bookkeeperMetadataServiceUri", bookkeeperMetadataServiceUri)
.add("stateStorageServiceUrl", stateStorageServiceUrl)
.toString();
}
Expand Down
Expand Up @@ -146,7 +146,7 @@ private static URI initialize(WorkerConfig workerConfig)
return WorkerUtils.initializeDlogNamespace(internalConf);
} catch (IOException ioe) {
log.error("Failed to initialize dlog namespace with zookeeper {} at metadata service uri {} for storing function packages",
internalConf.getZookeeperServers(), internalConf.getMetadataServiceUri(), ioe);
internalConf.getZookeeperServers(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
throw ioe;
}
}
Expand Down
Expand Up @@ -149,7 +149,7 @@ public static DistributedLogConfiguration getDlogConf(WorkerConfig workerConfig)

public static URI initializeDlogNamespace(InternalConfigurationData internalConf) throws IOException {
String zookeeperServers = internalConf.getZookeeperServers();
URI metadataServiceUri = URI.create(internalConf.getMetadataServiceUri());
URI metadataServiceUri = URI.create(internalConf.getBookkeeperMetadataServiceUri());
String ledgersStoreServers = metadataServiceUri.getAuthority().replace(";", ",");
String ledgersRootPath = metadataServiceUri.getPath();
BKDLConfig dlConfig = new BKDLConfig(ledgersStoreServers, ledgersRootPath);
Expand Down
2 changes: 1 addition & 1 deletion site2/docs/reference-configuration.md
Expand Up @@ -168,7 +168,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|brokerClientAuthenticationParameters|||
|athenzDomainNames| Supported Athenz provider domain names(comma separated) for authentication ||
|exposePreciseBacklogInPrometheus| Enable expose the precise backlog stats, set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate. |false|
|bookkeeperServiceUri| Metadata service uri that bookkeeper is used for loading corresponding metadata driver and resolving its metadata service location ||
|bookkeeperMetadataServiceUri| Metadata service uri that bookkeeper is used for loading corresponding metadata driver and resolving its metadata service location. For example: zk+hierarchical://localhost:2181/ledgers ||
|bookkeeperClientAuthenticationPlugin| Authentication plugin to use when connecting to bookies ||
|bookkeeperClientAuthenticationParametersName| BookKeeper auth plugin implementatation specifics parameters name and values ||
|bookkeeperClientAuthenticationParameters|||
Expand Down

0 comments on commit e2d7c4b

Please sign in to comment.