Skip to content

Commit

Permalink
Allow Pulsar to use BookieID instead of Bookie Network Addresses (Upd…
Browse files Browse the repository at this point in the history
…ate BK to 4.12.1) (#9019)

This change allows ZkBookieRackAffinityMapping to deal with BookieId instead of raw BookieSocketAddresses.

Summary of changes:
- upgrade to BK 4.12.1
- use BookieAddressResolver in ZkBookieRackAffinityMapping
- Start BookieServer passing "null" as BookieServiceInfo provider instead of BookieServiceInfo.NO_INFO (this change allows the Bookie bundled with Pulsar to publish local endpoints)
- indirectly the update of BK to 4.12.1 brings the update of Apache Curator from 4.x to 5.1. It is used by BK StreamStorage
  • Loading branch information
eolivelli committed Jan 14, 2021
1 parent bb4cb31 commit 4c60262
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 55 deletions.
58 changes: 29 additions & 29 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -395,32 +395,32 @@ The Apache Software License, Version 2.0
- org.apache.logging.log4j-log4j-1.2-api-2.14.0.jar
* Java Native Access JNA -- net.java.dev.jna-jna-4.2.0.jar
* BookKeeper
- org.apache.bookkeeper-bookkeeper-common-4.12.0.jar
- org.apache.bookkeeper-bookkeeper-common-allocator-4.12.0.jar
- org.apache.bookkeeper-bookkeeper-proto-4.12.0.jar
- org.apache.bookkeeper-bookkeeper-server-4.12.0.jar
- org.apache.bookkeeper-bookkeeper-tools-framework-4.12.0.jar
- org.apache.bookkeeper-circe-checksum-4.12.0.jar
- org.apache.bookkeeper-cpu-affinity-4.12.0.jar
- org.apache.bookkeeper-statelib-4.12.0.jar
- org.apache.bookkeeper-stream-storage-api-4.12.0.jar
- org.apache.bookkeeper-stream-storage-common-4.12.0.jar
- org.apache.bookkeeper-stream-storage-java-client-4.12.0.jar
- org.apache.bookkeeper-stream-storage-java-client-base-4.12.0.jar
- org.apache.bookkeeper-stream-storage-proto-4.12.0.jar
- org.apache.bookkeeper-stream-storage-server-4.12.0.jar
- org.apache.bookkeeper-stream-storage-service-api-4.12.0.jar
- org.apache.bookkeeper-stream-storage-service-impl-4.12.0.jar
- org.apache.bookkeeper.http-http-server-4.12.0.jar
- org.apache.bookkeeper.http-vertx-http-server-4.12.0.jar
- org.apache.bookkeeper.stats-bookkeeper-stats-api-4.12.0.jar
- org.apache.bookkeeper.stats-prometheus-metrics-provider-4.12.0.jar
- org.apache.bookkeeper.tests-stream-storage-tests-common-4.12.0.jar
- org.apache.distributedlog-distributedlog-common-4.12.0.jar
- org.apache.distributedlog-distributedlog-core-4.12.0-tests.jar
- org.apache.distributedlog-distributedlog-core-4.12.0.jar
- org.apache.distributedlog-distributedlog-protocol-4.12.0.jar
- org.apache.bookkeeper.stats-codahale-metrics-provider-4.12.0.jar
- org.apache.bookkeeper-bookkeeper-common-4.12.1.jar
- org.apache.bookkeeper-bookkeeper-common-allocator-4.12.1.jar
- org.apache.bookkeeper-bookkeeper-proto-4.12.1.jar
- org.apache.bookkeeper-bookkeeper-server-4.12.1.jar
- org.apache.bookkeeper-bookkeeper-tools-framework-4.12.1.jar
- org.apache.bookkeeper-circe-checksum-4.12.1.jar
- org.apache.bookkeeper-cpu-affinity-4.12.1.jar
- org.apache.bookkeeper-statelib-4.12.1.jar
- org.apache.bookkeeper-stream-storage-api-4.12.1.jar
- org.apache.bookkeeper-stream-storage-common-4.12.1.jar
- org.apache.bookkeeper-stream-storage-java-client-4.12.1.jar
- org.apache.bookkeeper-stream-storage-java-client-base-4.12.1.jar
- org.apache.bookkeeper-stream-storage-proto-4.12.1.jar
- org.apache.bookkeeper-stream-storage-server-4.12.1.jar
- org.apache.bookkeeper-stream-storage-service-api-4.12.1.jar
- org.apache.bookkeeper-stream-storage-service-impl-4.12.1.jar
- org.apache.bookkeeper.http-http-server-4.12.1.jar
- org.apache.bookkeeper.http-vertx-http-server-4.12.1.jar
- org.apache.bookkeeper.stats-bookkeeper-stats-api-4.12.1.jar
- org.apache.bookkeeper.stats-prometheus-metrics-provider-4.12.1.jar
- org.apache.bookkeeper.tests-stream-storage-tests-common-4.12.1.jar
- org.apache.distributedlog-distributedlog-common-4.12.1.jar
- org.apache.distributedlog-distributedlog-core-4.12.1-tests.jar
- org.apache.distributedlog-distributedlog-core-4.12.1.jar
- org.apache.distributedlog-distributedlog-protocol-4.12.1.jar
- org.apache.bookkeeper.stats-codahale-metrics-provider-4.12.1.jar
* Apache HTTP Client
- org.apache.httpcomponents-httpclient-4.5.5.jar
- org.apache.httpcomponents-httpcore-4.4.9.jar
Expand Down Expand Up @@ -476,9 +476,9 @@ The Apache Software License, Version 2.0
- org.apache.avro-avro-1.9.1.jar
- org.apache.avro-avro-protobuf-1.9.1.jar
* Apache Curator
- org.apache.curator-curator-client-4.0.1.jar
- org.apache.curator-curator-framework-4.0.1.jar
- org.apache.curator-curator-recipes-4.0.1.jar
- org.apache.curator-curator-client-5.1.0.jar
- org.apache.curator-curator-framework-5.1.0.jar
- org.apache.curator-curator-recipes-5.1.0.jar
* Apache Yetus
- org.apache.yetus-audience-annotations-0.5.0.jar
* @FreeBuilder
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ flexible messaging model and an intuitive client API.</description>
<!-- apache commons -->
<commons-compress.version>1.19</commons-compress.version>

<bookkeeper.version>4.12.0</bookkeeper.version>
<bookkeeper.version>4.12.1</bookkeeper.version>
<zookeeper.version>3.5.7</zookeeper.version>
<netty.version>4.1.51.Final</netty.version>
<netty-tc-native.version>2.0.33.Final</netty-tc-native.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.Optional;
import org.apache.bookkeeper.common.util.ReflectionUtils;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.AutoRecoveryMain;
import org.apache.bookkeeper.stats.StatsProvider;
Expand Down Expand Up @@ -225,7 +224,7 @@ && isBlank(starterArguments.bookieConfigFile)) {
checkNotNull(bookieConfig, "No ServerConfiguration for Bookie");
checkNotNull(bookieStatsProvider, "No Stats Provider for Bookie");
bookieServer = new BookieServer(
bookieConfig, bookieStatsProvider.getStatsLogger(""), BookieServiceInfo.NO_INFO);
bookieConfig, bookieStatsProvider.getStatsLogger(""), null);
} else {
bookieServer = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected void setup() throws Exception {
String addr = String.format("10.0.0.%d", i + 1);
conf.setAdvertisedAddress(addr);

BookieServer bs = new BookieServer(conf, NullStatsLogger.INSTANCE, BookieServiceInfo.NO_INFO);
BookieServer bs = new BookieServer(conf, NullStatsLogger.INSTANCE, null);

bs.start();
bookies.add(bs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.bookkeeper.test.ZooKeeperUtil;

/**
* A class runs several bookie servers for testing.
Expand Down Expand Up @@ -126,7 +127,7 @@ public void tearDown() throws Exception {
* @throws Exception
*/
protected void startZKCluster() throws Exception {
zkUtil.startServer();
zkUtil.startCluster();
zkc = zkUtil.getZooKeeperClient();
}

Expand All @@ -136,7 +137,7 @@ protected void startZKCluster() throws Exception {
* @throws Exception
*/
protected void stopZKCluster() throws Exception {
zkUtil.killServer();
zkUtil.stopCluster();
}

/**
Expand Down
24 changes: 12 additions & 12 deletions pulsar-sql/presto-distribution/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -411,18 +411,18 @@ The Apache Software License, Version 2.0
- async-http-client-2.12.1.jar
- async-http-client-netty-utils-2.12.1.jar
* Apache Bookkeeper
- bookkeeper-common-4.12.0.jar
- bookkeeper-common-allocator-4.12.0.jar
- bookkeeper-proto-4.12.0.jar
- bookkeeper-server-4.12.0.jar
- bookkeeper-stats-api-4.12.0.jar
- bookkeeper-tools-framework-4.12.0.jar
- circe-checksum-4.12.0.jar
- codahale-metrics-provider-4.12.0jar
- cpu-affinity-4.12.0.jar
- http-server-4.12.0.jar
- prometheus-metrics-provider-4.12.0.jar
- codahale-metrics-provider-4.12.0.jar
- bookkeeper-common-4.12.1.jar
- bookkeeper-common-allocator-4.12.1.jar
- bookkeeper-proto-4.12.1.jar
- bookkeeper-server-4.12.1.jar
- bookkeeper-stats-api-4.12.1.jar
- bookkeeper-tools-framework-4.12.1.jar
- circe-checksum-4.12.1.jar
- codahale-metrics-provider-4.12.1jar
- cpu-affinity-4.12.1.jar
- http-server-4.12.1.jar
- prometheus-metrics-provider-4.12.1.jar
- codahale-metrics-provider-4.12.1.jar
* Apache Commons
- commons-cli-1.2.jar
- commons-codec-1.10.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.bookkeeper.common.util.Backoff;
import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.server.conf.BookieConfiguration;
import org.apache.bookkeeper.stats.NullStatsLogger;
Expand Down Expand Up @@ -299,7 +298,7 @@ private void runBookies(ServerConfiguration baseConf) throws Exception {
bsConfs[i].setAllowEphemeralPorts(true);

try {
bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE, BookieServiceInfo.NO_INFO);
bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE, null);
} catch (InvalidCookieException e) {
// InvalidCookieException can happen if the machine IP has changed
// Since we are running here a local bookie that is always accessed
Expand All @@ -312,7 +311,7 @@ private void runBookies(ServerConfiguration baseConf) throws Exception {
new File(new File(bkDataDir, "current"), "VERSION").delete();

// Retry to start the bookie after cleaning the old left cookie
bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE, BookieServiceInfo.NO_INFO);
bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE, null);
}
bs[i].start();
LOG.debug("Local BK[{}] started (port: {}, data_directory: {})", i, bookiePort,
Expand Down Expand Up @@ -446,7 +445,7 @@ public void stopBK() {

public void startBK(int i) throws Exception {
try {
bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE, BookieServiceInfo.NO_INFO);
bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE, null);
} catch (InvalidCookieException e) {
// InvalidCookieException can happen if the machine IP has changed
// Since we are running here a local bookie that is always accessed
Expand All @@ -459,7 +458,7 @@ public void startBK(int i) throws Exception {
new File(new File(bsConfs[i].getJournalDirNames()[0], "current"), "VERSION").delete();

// Retry to start the bookie after cleaning the old left cookie
bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE, BookieServiceInfo.NO_INFO);
bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE, null);

}
bs[i].start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieNode;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.commons.configuration.Configuration;
import org.apache.pulsar.common.policies.data.BookieInfo;
Expand Down Expand Up @@ -92,7 +93,8 @@ private void updateRacksWithHost(BookiesRackConfiguration racks) {
racks.forEach((group, bookies) ->
bookies.forEach((addr, bi) -> {
try {
BookieSocketAddress bsa = new BookieSocketAddress(addr);
BookieId bookieId = BookieId.parse(addr);
BookieSocketAddress bsa = getBookieAddressResolver().resolve(bookieId);
newRacksWithHost.updateBookie(group, bsa.toString(), bi);

String hostname = bsa.getSocketAddress().getHostName();
Expand All @@ -107,8 +109,8 @@ private void updateRacksWithHost(BookiesRackConfiguration racks) {
} else {
LOG.info("Network address for {} is unresolvable yet.", addr);
}
} catch (UnknownHostException e) {
throw new RuntimeException(e);
} catch (BookieAddressResolver.BookieIdNotResolvedException e) {
LOG.info("Network address for {} is unresolvable yet. error is {}", addr, e);
}
})
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public void testBasic() throws Exception {
bkClientConf1.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache("test", localZkc, 30) {
});
assertNull(bkClientConf1.getProperty(ZkBookieRackAffinityMapping.ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE));
mapping1.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
mapping1.setConf(bkClientConf1);
List<String> racks1 = mapping1
.resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()));
Expand All @@ -95,6 +96,7 @@ public void testBasic() throws Exception {
ClientConfiguration bkClientConf2 = new ClientConfiguration();
bkClientConf2.setZkServers("127.0.0.1" + ":" + localZkS.getZookeeperPort());
bkClientConf2.setZkTimeout(1000);
mapping2.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
mapping2.setConf(bkClientConf2);
List<String> racks2 = mapping2
.resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()));
Expand All @@ -111,6 +113,7 @@ public void testNoBookieInfo() throws Exception {
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache("test", localZkc, 30) {
});
mapping.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
mapping.setConf(bkClientConf);
List<String> racks = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.3"));
assertEquals(racks.get(0), null);
Expand Down Expand Up @@ -155,6 +158,7 @@ public void testBookieInfoChange() throws Exception {
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache("test", localZkc, 30) {
});
mapping.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
mapping.setConf(bkClientConf);
List<String> racks = mapping
.resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()));
Expand Down

0 comments on commit 4c60262

Please sign in to comment.