Skip to content

Commit

Permalink
Use binary-proto-lookup url for replication lookup (#128)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored and merlimat committed Dec 21, 2016
1 parent 6f1fcd3 commit 9609885
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 11 deletions.
Expand Up @@ -29,11 +29,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

Expand All @@ -44,6 +41,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.commons.lang.SystemUtils;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
Expand Down Expand Up @@ -381,11 +379,11 @@ public PulsarClient getReplicationClient(String cluster) {
configuration.setAuthentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
}
if (configuration.isUseTls() && !data.getServiceUrlTls().isEmpty()) {
return new PulsarClientImpl(data.getServiceUrlTls(), configuration, this.workerGroup);
} else {
return new PulsarClientImpl(data.getServiceUrl(), configuration, this.workerGroup);
}
String clusterUrl = configuration.isUseTls() ? (isNotBlank(data.getBrokerServiceUrlTls())
? data.getBrokerServiceUrlTls() : data.getServiceUrlTls()) : null;
clusterUrl = (isNotBlank(clusterUrl)) ? clusterUrl
: (isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl());
return new PulsarClientImpl(clusterUrl, configuration, this.workerGroup);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Expand Up @@ -169,9 +169,12 @@ void setup() throws Exception {
admin3 = new PulsarAdmin(url3, (Authentication) null);

// Provision the global namespace
admin1.clusters().createCluster("r1", new ClusterData(url1.toString()));
admin1.clusters().createCluster("r2", new ClusterData(url2.toString()));
admin1.clusters().createCluster("r3", new ClusterData(url3.toString()));
admin1.clusters().createCluster("r1", new ClusterData(url1.toString(), null, pulsar1.getBrokerServiceUrl(),
pulsar1.getBrokerServiceUrlTls()));
admin1.clusters().createCluster("r2", new ClusterData(url2.toString(), null, pulsar2.getBrokerServiceUrl(),
pulsar1.getBrokerServiceUrlTls()));
admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), null, pulsar3.getBrokerServiceUrl(),
pulsar1.getBrokerServiceUrlTls()));

admin1.clusters().createCluster("global", new ClusterData("http://global:8080"));
admin1.properties().createProperty("pulsar",
Expand All @@ -182,6 +185,9 @@ void setup() throws Exception {
assertEquals(admin2.clusters().getCluster("r1").getServiceUrl(), url1.toString());
assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(), url2.toString());
assertEquals(admin2.clusters().getCluster("r3").getServiceUrl(), url3.toString());
assertEquals(admin2.clusters().getCluster("r1").getBrokerServiceUrl(), pulsar1.getBrokerServiceUrl());
assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl());
assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(), pulsar3.getBrokerServiceUrl());
/*
* assertEquals(admin2.clusters().getCluster("global").getServiceUrl(), "http://global:8080");
* assertEquals(admin2.properties().getPropertyAdmin("pulsar").getAdminRoles(), Lists.newArrayList("appid1",
Expand Down

0 comments on commit 9609885

Please sign in to comment.