Skip to content

Commit

Permalink
Use advertisedAddress for bookies in pulsar standalone (#1254)
Browse files Browse the repository at this point in the history
* Use advertisedAddress for bookies in pulsar standalone (#233)

* Use advertised address for zookeeper server in standalone as well
  • Loading branch information
sijie authored and merlimat committed Feb 20, 2018
1 parent de06bd4 commit 1061906
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 6 deletions.
Expand Up @@ -107,20 +107,23 @@ public PulsarStandaloneStarter(String[] args) throws Exception {

this.config = PulsarConfigurationLoader.create((new FileInputStream(configFile)), ServiceConfiguration.class);

// Set ZK server's host to localhost
config.setZookeeperServers("127.0.0.1:" + zkPort);
config.setGlobalZookeeperServers("127.0.0.1:" + zkPort);
String zkServers = "127.0.0.1";

if (advertisedAddress != null) {
// Use advertised address from command line
config.setAdvertisedAddress(advertisedAddress);
zkServers = advertisedAddress;
} else if (isBlank(config.getAdvertisedAddress())) {
// Use advertised address as local hostname
config.setAdvertisedAddress(ServiceConfigurationUtils.unsafeLocalhostResolve());
} else {
// Use advertised address from config file
}

// Set ZK server's host to localhost
config.setZookeeperServers(zkServers + ":" + zkPort);
config.setGlobalZookeeperServers(zkServers + ":" + zkPort);

Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
Expand Down Expand Up @@ -148,7 +151,7 @@ void start() throws Exception {

if (!onlyBroker) {
// Start LocalBookKeeper
bkEnsemble = new LocalBookkeeperEnsemble(numOfBk, zkPort, bkPort, zkDir, bkDir, wipeData);
bkEnsemble = new LocalBookkeeperEnsemble(numOfBk, zkPort, bkPort, zkDir, bkDir, wipeData, config.getAdvertisedAddress());
bkEnsemble.startStandalone();
}

Expand Down
Expand Up @@ -69,17 +69,24 @@ public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort)

public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String zkDataDirName,
String bkDataDirName, boolean clearOldData) {
this(numberOfBookies, zkPort, bkBasePort, zkDataDirName, bkDataDirName, clearOldData, null);
}

public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String zkDataDirName,
String bkDataDirName, boolean clearOldData, String advertisedAddress) {
this.numberOfBookies = numberOfBookies;
this.HOSTPORT = "127.0.0.1:" + zkPort;
this.ZooKeeperDefaultPort = zkPort;
this.initialPort = bkBasePort;
this.zkDataDirName = zkDataDirName;
this.bkDataDirName = bkDataDirName;
this.clearOldData = clearOldData;
LOG.info("Running " + this.numberOfBookies + " bookie(s).");
this.advertisedAddress = null == advertisedAddress ? "127.0.0.1" : advertisedAddress;
LOG.info("Running {} bookie(s) and advertised them at {}.", this.numberOfBookies, advertisedAddress);
}

private final String HOSTPORT;
private final String advertisedAddress;
NIOServerCnxnFactory serverFactory;
ZooKeeperServer zks;
ZooKeeper zkc;
Expand Down Expand Up @@ -224,7 +231,7 @@ public void startStandalone() throws Exception {
conf.setProperty("dbStorage_readAheadCacheMaxSizeMb", 64);
conf.setFlushInterval(60000);
conf.setProperty("journalMaxGroupWaitMSec", 1L);
conf.setAdvertisedAddress("127.0.0.1");
conf.setAdvertisedAddress(advertisedAddress);

runZookeeper(1000);
initializeZookeper();
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.io.File;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertFalse;
import org.testng.annotations.AfterMethod;
Expand All @@ -41,6 +42,21 @@ void setup() throws Exception {
void teardown() throws Exception {
}

@Test
void testAdvertisedAddress() throws Exception {
final int numBk = 1;
final int zkPort = PortManager.nextFreePort();
final int bkPort = PortManager.nextFreePort();

LocalBookkeeperEnsemble ensemble = new LocalBookkeeperEnsemble(
numBk, zkPort, bkPort, null, null, true, "127.0.0.2");
ensemble.startStandalone();

assertNotNull(ensemble.getZkClient().exists("/ledgers/available/127.0.0.2:" + bkPort, false));

ensemble.stop();
}

@Test
void testStartStop() throws Exception {

Expand Down

0 comments on commit 1061906

Please sign in to comment.