From 50714921fb543bb09d1b3ed06f0b0c90c6c9c95b Mon Sep 17 00:00:00 2001 From: linjunhua Date: Thu, 4 Mar 2021 14:40:09 +0800 Subject: [PATCH 1/9] [pulsar-admin] add cmd to get service url of the leader broker --- .../pulsar/broker/admin/impl/BrokersBase.java | 23 +++++++++++++ .../pulsar/broker/admin/AdminApiTest.java | 3 ++ .../apache/pulsar/broker/admin/AdminTest.java | 6 ++++ .../apache/pulsar/client/admin/Brokers.java | 34 +++++++++++++++++++ .../client/admin/internal/BrokersImpl.java | 33 ++++++++++++++++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 3 ++ .../apache/pulsar/admin/cli/CmdBrokers.java | 14 ++++++-- 7 files changed, 114 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 25b51493cf066..2b2e7f0999e56 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -43,6 +43,7 @@ import javax.ws.rs.core.Response.Status; import org.apache.pulsar.broker.PulsarService.State; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; @@ -92,6 +93,28 @@ public Set getActiveBrokers(@PathParam("cluster") String cluster) throws } } + @GET + @Path("/leaderBroker") + @ApiOperation( + value = "Get the service url of the leader broker in the cluster.", + response = String.class) + @ApiResponses( + value = { + @ApiResponse(code = 401, message = "Authentication required"), + @ApiResponse(code = 403, message = "This operation requires super-user access") }) + public String getLeaderBroker() throws Exception { + validateSuperUserAccess(); + + try { + return pulsar().getLeaderElectionService().getCurrentLeader() + .map(LeaderBroker::getServiceUrl) + .orElse("None"); + } catch (Exception e) { + LOG.error("[{}] Failed to get the service url of the leader broker.", clientAppId(), e); + throw new RestException(e); + } + } + @GET @Path("/{clusterName}/{broker-webserviceurl}/ownedNamespaces") @ApiOperation(value = "Get the list of namespaces served by the specific broker", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 6d3a662a04bc9..549e991948494 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -441,6 +441,9 @@ public void brokers() throws Exception { Assert.assertNotNull(list2); Assert.assertEquals(list2.size(), 1); + String leaderBroker = admin.brokers().getLeaderBroker(); + Assert.assertNotNull(leaderBroker); + Map nsMap = admin.brokers().getOwnedNamespaces("test", list.get(0)); // since sla-monitor ns is not created nsMap.size() == 1 (for HeartBeat Namespace) Assert.assertEquals(nsMap.size(), 1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index 6f1ce0569615d..97f019a631a72 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -70,6 +70,7 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationDataHttps; import org.apache.pulsar.broker.cache.ConfigurationCacheService; +import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.common.conf.InternalConfigurationData; @@ -620,6 +621,11 @@ public void brokers() throws Exception { Set activeBrokers = brokers.getActiveBrokers("use"); assertEquals(activeBrokers.size(), 1); assertEquals(activeBrokers, Sets.newHashSet(pulsar.getAdvertisedAddress() + ":" + pulsar.getListenPortHTTP().get())); + + String leaderBroker = brokers.getLeaderBroker(); + assertEquals(leaderBroker, pulsar.getLeaderElectionService().getCurrentLeader() + .map(LeaderBroker::getServiceUrl) + .orElse("None")); } @Test diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java index d0bb9b9db4f90..287bc64a9374e 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java @@ -72,6 +72,40 @@ public interface Brokers { */ CompletableFuture> getActiveBrokersAsync(String cluster); + /** + * Get the service url of the leader broker. + *

+ * Get the service url of the leader broker. + *

+ * Response Example: + * + *

+     * "prod1-broker1.messaging.use.example.com:8080"
+     * 
+ * + * @return the service url of the leader broker + * @throws PulsarAdminException + * Unexpected error + */ + String getLeaderBroker() throws PulsarAdminException; + + /** + * Get the service url of the leader broker asynchronously. + *

+ * Get the service url of the leader broker. + *

+ * Response Example: + * + *

+     * "prod1-broker1.messaging.use.example.com:8080"
+     * 
+ * + * @return the service url of the leader broker + * @throws PulsarAdminException + * Unexpected error + */ + CompletableFuture getLeaderBrokerAsync() throws PulsarAdminException; + /** * Get the map of owned namespaces and their status from a single broker in the cluster. *

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java index 3db8dfe4fe65d..a04d7971e6cde 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java @@ -76,6 +76,39 @@ public void failed(Throwable throwable) { return future; } + @Override + public String getLeaderBroker() throws PulsarAdminException { + try { + return getLeaderBrokerAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture getLeaderBrokerAsync() { + WebTarget path = adminBrokers.path("leaderBroker"); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(String leaderBroker) { + future.complete(leaderBroker); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + @Override public Map getOwnedNamespaces(String cluster, String brokerUrl) throws PulsarAdminException { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 63515f1880675..1ca1c6fcfdf31 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -75,6 +75,9 @@ public void brokers() throws Exception { brokers.run(split("list use")); verify(mockBrokers).getActiveBrokers("use"); + brokers.run(split("leader-broker")); + verify(mockBrokers).getLeaderBroker(); + brokers.run(split("namespaces use --url http://my-service.url:8080")); verify(mockBrokers).getOwnedNamespaces("use", "http://my-service.url:8080"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java index 4ce74100dc0f2..12d200a0f3b6b 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java @@ -40,6 +40,15 @@ void run() throws Exception { } } + @Parameters(commandDescription = "Get the service url of the leader broker") + private class LeaderBroker extends CliCommand { + + @Override + void run() throws Exception { + print(getAdmin().brokers().getLeaderBroker()); + } + } + @Parameters(commandDescription = "List namespaces owned by the broker") private class Namespaces extends CliCommand { @Parameter(description = "cluster-name\n", required = true) @@ -77,7 +86,7 @@ void run() throws Exception { getAdmin().brokers().deleteDynamicConfiguration(configName); } } - + @Parameters(commandDescription = "Get all overridden dynamic-configuration values") private class GetAllConfigurationsCmd extends CliCommand { @@ -86,7 +95,7 @@ void run() throws Exception { print(getAdmin().brokers().getAllDynamicConfigurations()); } } - + @Parameters(commandDescription = "Get list of updatable configuration name") private class GetUpdatableConfigCmd extends CliCommand { @@ -140,6 +149,7 @@ void run() throws Exception { public CmdBrokers(Supplier admin) { super("brokers", admin); jcommander.addCommand("list", new List()); + jcommander.addCommand("leader-broker", new LeaderBroker()); jcommander.addCommand("namespaces", new Namespaces()); jcommander.addCommand("update-dynamic-config", new UpdateConfigurationCmd()); jcommander.addCommand("delete-dynamic-config", new DeleteConfigurationCmd()); From 61648ed0dc6bbdb670d6701e1c25a9fc57cc3117 Mon Sep 17 00:00:00 2001 From: linjunhua Date: Fri, 5 Mar 2021 13:22:49 +0800 Subject: [PATCH 2/9] [doc] Add content for leader-broker cmd --- site2/docs/admin-api-brokers.md | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/site2/docs/admin-api-brokers.md b/site2/docs/admin-api-brokers.md index 01ed0053f79a0..514ef702672ed 100644 --- a/site2/docs/admin-api-brokers.md +++ b/site2/docs/admin-api-brokers.md @@ -48,6 +48,33 @@ admin.brokers().getActiveBrokers(clusterName) +### Get the service url of the leader broker + +Fetch the service url of the leader broker, or print None when it not exists. + + + + +```shell +$ pulsar-admin brokers leader-broker +``` + +``` +broker1.use.org.com:8080 +``` + + + +{@inject: endpoint|GET|/admin/v2/brokers/leaderBroker?version=[[pulsar:version_number]]} + + + +```java +admin.brokers().getLeaderBroker() +``` + + + #### list of namespaces owned by a given broker It finds all namespaces which are owned and served by a given broker. From 5d720e339ba83f983d37a6e394a2b3eaa04d7bc0 Mon Sep 17 00:00:00 2001 From: linjunhua Date: Sat, 6 Mar 2021 18:57:07 +0800 Subject: [PATCH 3/9] print NoLeader when leader broker not exists --- .../java/org/apache/pulsar/broker/admin/impl/BrokersBase.java | 2 +- .../test/java/org/apache/pulsar/broker/admin/AdminTest.java | 2 +- .../src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java | 3 ++- site2/docs/admin-api-brokers.md | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 2b2e7f0999e56..5e7d9ebf9621f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -108,7 +108,7 @@ public String getLeaderBroker() throws Exception { try { return pulsar().getLeaderElectionService().getCurrentLeader() .map(LeaderBroker::getServiceUrl) - .orElse("None"); + .orElse(null); } catch (Exception e) { LOG.error("[{}] Failed to get the service url of the leader broker.", clientAppId(), e); throw new RestException(e); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index 97f019a631a72..8284f0811a412 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -625,7 +625,7 @@ public void brokers() throws Exception { String leaderBroker = brokers.getLeaderBroker(); assertEquals(leaderBroker, pulsar.getLeaderElectionService().getCurrentLeader() .map(LeaderBroker::getServiceUrl) - .orElse("None")); + .orElse(null)); } @Test diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java index 12d200a0f3b6b..0eaaf8e34a6d3 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.admin.cli; +import java.util.Optional; import org.apache.pulsar.client.admin.PulsarAdmin; import com.beust.jcommander.Parameter; @@ -45,7 +46,7 @@ private class LeaderBroker extends CliCommand { @Override void run() throws Exception { - print(getAdmin().brokers().getLeaderBroker()); + print(Optional.ofNullable(getAdmin().brokers().getLeaderBroker()).orElse("NoLeader")); } } diff --git a/site2/docs/admin-api-brokers.md b/site2/docs/admin-api-brokers.md index 514ef702672ed..a01cdf8c668f8 100644 --- a/site2/docs/admin-api-brokers.md +++ b/site2/docs/admin-api-brokers.md @@ -50,7 +50,7 @@ admin.brokers().getActiveBrokers(clusterName) ### Get the service url of the leader broker -Fetch the service url of the leader broker, or print None when it not exists. +Fetch the service url of the leader broker, or print NoLeader when it not exists. From f21daf3e6471cb0509bc01d1aedbd349ca4a6d8a Mon Sep 17 00:00:00 2001 From: linjunhua Date: Sat, 6 Mar 2021 19:04:55 +0800 Subject: [PATCH 4/9] fix test case --- .../java/org/apache/pulsar/broker/admin/AdminApiTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 549e991948494..c1ec79da950c4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin; +import org.apache.pulsar.broker.loadbalance.LeaderBroker; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -442,7 +443,9 @@ public void brokers() throws Exception { Assert.assertEquals(list2.size(), 1); String leaderBroker = admin.brokers().getLeaderBroker(); - Assert.assertNotNull(leaderBroker); + Assert.assertEquals(leaderBroker, pulsar.getLeaderElectionService().getCurrentLeader() + .map(LeaderBroker::getServiceUrl) + .orElse(null)); Map nsMap = admin.brokers().getOwnedNamespaces("test", list.get(0)); // since sla-monitor ns is not created nsMap.size() == 1 (for HeartBeat Namespace) @@ -637,7 +640,7 @@ public void properties() throws PulsarAdminException { } catch (PulsarAdminException e) { assertTrue(e instanceof NotFoundException); } - + Set allowedClusters = Sets.newHashSet("test"); TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), allowedClusters); admin.tenants().updateTenant("prop-xyz", tenantInfo); From dd03672db01da27cddc73ab4b7becef7408411bd Mon Sep 17 00:00:00 2001 From: linjunhua Date: Mon, 8 Mar 2021 16:16:14 +0800 Subject: [PATCH 5/9] return broker info instead of single string --- .../pulsar/broker/admin/impl/BrokersBase.java | 20 +++++++++++-------- .../pulsar/broker/admin/AdminApiTest.java | 9 ++++----- .../apache/pulsar/broker/admin/AdminTest.java | 7 +++---- .../apache/pulsar/client/admin/Brokers.java | 15 +++++++------- .../client/admin/internal/BrokersImpl.java | 11 +++++----- .../apache/pulsar/admin/cli/CmdBrokers.java | 5 ++--- .../common/policies/data/BrokerInfo.java | 15 ++++++++++++++ 7 files changed, 50 insertions(+), 32 deletions(-) create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerInfo.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 5e7d9ebf9621f..bb04c1b597e1f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -57,6 +57,7 @@ import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.conf.InternalConfigurationData; +import org.apache.pulsar.common.policies.data.BrokerInfo; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,21 +97,24 @@ public Set getActiveBrokers(@PathParam("cluster") String cluster) throws @GET @Path("/leaderBroker") @ApiOperation( - value = "Get the service url of the leader broker in the cluster.", - response = String.class) + value = "Get the information of the leader broker.", + response = BrokerInfo.class) @ApiResponses( value = { @ApiResponse(code = 401, message = "Authentication required"), - @ApiResponse(code = 403, message = "This operation requires super-user access") }) - public String getLeaderBroker() throws Exception { + @ApiResponse(code = 403, message = "This operation requires super-user access"), + @ApiResponse(code = 404, message = "Leader broker not found") }) + public BrokerInfo getLeaderBroker() throws Exception { validateSuperUserAccess(); try { - return pulsar().getLeaderElectionService().getCurrentLeader() - .map(LeaderBroker::getServiceUrl) - .orElse(null); + LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader() + .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find leader broker")); + BrokerInfo brokerInfo = new BrokerInfo(); + brokerInfo.setServiceUrl(leaderBroker.getServiceUrl()); + return brokerInfo; } catch (Exception e) { - LOG.error("[{}] Failed to get the service url of the leader broker.", clientAppId(), e); + LOG.error("[{}] Failed to get the information of the leader broker.", clientAppId(), e); throw new RestException(e); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index c1ec79da950c4..579a9f0f6bc37 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.admin; -import org.apache.pulsar.broker.loadbalance.LeaderBroker; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -67,6 +66,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; import org.apache.pulsar.broker.namespace.NamespaceService; @@ -110,6 +110,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy; import org.apache.pulsar.common.policies.data.BrokerAssignment; +import org.apache.pulsar.common.policies.data.BrokerInfo; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; @@ -442,10 +443,8 @@ public void brokers() throws Exception { Assert.assertNotNull(list2); Assert.assertEquals(list2.size(), 1); - String leaderBroker = admin.brokers().getLeaderBroker(); - Assert.assertEquals(leaderBroker, pulsar.getLeaderElectionService().getCurrentLeader() - .map(LeaderBroker::getServiceUrl) - .orElse(null)); + BrokerInfo leaderBroker = admin.brokers().getLeaderBroker(); + Assert.assertEquals(leaderBroker.getServiceUrl(), pulsar.getLeaderElectionService().getCurrentLeader().map(LeaderBroker::getServiceUrl).get()); Map nsMap = admin.brokers().getOwnedNamespaces("test", list.get(0)); // since sla-monitor ns is not created nsMap.size() == 1 (for HeartBeat Namespace) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index 8284f0811a412..caa75b427312c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -80,6 +80,7 @@ import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData; import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType; import org.apache.pulsar.common.policies.data.BundlesData; +import org.apache.pulsar.common.policies.data.BrokerInfo; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; import org.apache.pulsar.common.policies.data.Policies; @@ -622,10 +623,8 @@ public void brokers() throws Exception { assertEquals(activeBrokers.size(), 1); assertEquals(activeBrokers, Sets.newHashSet(pulsar.getAdvertisedAddress() + ":" + pulsar.getListenPortHTTP().get())); - String leaderBroker = brokers.getLeaderBroker(); - assertEquals(leaderBroker, pulsar.getLeaderElectionService().getCurrentLeader() - .map(LeaderBroker::getServiceUrl) - .orElse(null)); + BrokerInfo leaderBroker = brokers.getLeaderBroker(); + assertEquals(leaderBroker.getServiceUrl(), pulsar.getLeaderElectionService().getCurrentLeader().map(LeaderBroker::getServiceUrl).get()); } @Test diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java index 287bc64a9374e..11f7fd7c74af1 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java @@ -24,6 +24,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; import org.apache.pulsar.common.conf.InternalConfigurationData; +import org.apache.pulsar.common.policies.data.BrokerInfo; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; /** @@ -73,21 +74,21 @@ public interface Brokers { CompletableFuture> getActiveBrokersAsync(String cluster); /** - * Get the service url of the leader broker. + * Get the information of the leader broker. *

- * Get the service url of the leader broker. + * Get the information of the leader broker. *

* Response Example: * *

-     * "prod1-broker1.messaging.use.example.com:8080"
+     * {serviceUrl:"prod1-broker1.messaging.use.example.com:8080"}
      * 
* - * @return the service url of the leader broker + * @return the information of the leader broker. * @throws PulsarAdminException * Unexpected error */ - String getLeaderBroker() throws PulsarAdminException; + BrokerInfo getLeaderBroker() throws PulsarAdminException; /** * Get the service url of the leader broker asynchronously. @@ -97,14 +98,14 @@ public interface Brokers { * Response Example: * *
-     * "prod1-broker1.messaging.use.example.com:8080"
+     * {serviceUrl:"prod1-broker1.messaging.use.example.com:8080"}
      * 
* * @return the service url of the leader broker * @throws PulsarAdminException * Unexpected error */ - CompletableFuture getLeaderBrokerAsync() throws PulsarAdminException; + CompletableFuture getLeaderBrokerAsync() throws PulsarAdminException; /** * Get the map of owned namespaces and their status from a single broker in the cluster. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java index a04d7971e6cde..53b9e075709f6 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java @@ -32,6 +32,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.conf.InternalConfigurationData; +import org.apache.pulsar.common.policies.data.BrokerInfo; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; import org.apache.pulsar.common.util.Codec; @@ -77,7 +78,7 @@ public void failed(Throwable throwable) { } @Override - public String getLeaderBroker() throws PulsarAdminException { + public BrokerInfo getLeaderBroker() throws PulsarAdminException { try { return getLeaderBrokerAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { @@ -91,13 +92,13 @@ public String getLeaderBroker() throws PulsarAdminException { } @Override - public CompletableFuture getLeaderBrokerAsync() { + public CompletableFuture getLeaderBrokerAsync() { WebTarget path = adminBrokers.path("leaderBroker"); - final CompletableFuture future = new CompletableFuture<>(); + final CompletableFuture future = new CompletableFuture<>(); asyncGetRequest(path, - new InvocationCallback() { + new InvocationCallback() { @Override - public void completed(String leaderBroker) { + public void completed(BrokerInfo leaderBroker) { future.complete(leaderBroker); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java index 0eaaf8e34a6d3..707629c5db1f9 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.admin.cli; -import java.util.Optional; import org.apache.pulsar.client.admin.PulsarAdmin; import com.beust.jcommander.Parameter; @@ -41,12 +40,12 @@ void run() throws Exception { } } - @Parameters(commandDescription = "Get the service url of the leader broker") + @Parameters(commandDescription = "Get the information of the leader broker") private class LeaderBroker extends CliCommand { @Override void run() throws Exception { - print(Optional.ofNullable(getAdmin().brokers().getLeaderBroker()).orElse("NoLeader")); + print(getAdmin().brokers().getLeaderBroker()); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerInfo.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerInfo.java new file mode 100644 index 0000000000000..68e44e1b22c60 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerInfo.java @@ -0,0 +1,15 @@ +package org.apache.pulsar.common.policies.data; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Broker Information + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class BrokerInfo { + private String serviceUrl; +} From c6f5969d4716af05b3a7e9e4e30d1d5912548482 Mon Sep 17 00:00:00 2001 From: linjunhua Date: Mon, 8 Mar 2021 16:27:39 +0800 Subject: [PATCH 6/9] update the desc of cmd 'leader-broker' --- site2/docs/admin-api-brokers.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/site2/docs/admin-api-brokers.md b/site2/docs/admin-api-brokers.md index a01cdf8c668f8..27d145033c2b6 100644 --- a/site2/docs/admin-api-brokers.md +++ b/site2/docs/admin-api-brokers.md @@ -48,9 +48,9 @@ admin.brokers().getActiveBrokers(clusterName) -### Get the service url of the leader broker +### Get the information of the leader broker -Fetch the service url of the leader broker, or print NoLeader when it not exists. +Fetch the information of the leader broker. From 6f8133347ab17685f35fb66caca0bea1429e0322 Mon Sep 17 00:00:00 2001 From: linjunhua Date: Mon, 8 Mar 2021 19:16:28 +0800 Subject: [PATCH 7/9] update the content of cmd 'leader-broker' --- site2/docs/admin-api-brokers.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/site2/docs/admin-api-brokers.md b/site2/docs/admin-api-brokers.md index 27d145033c2b6..33ab56941daa9 100644 --- a/site2/docs/admin-api-brokers.md +++ b/site2/docs/admin-api-brokers.md @@ -50,7 +50,7 @@ admin.brokers().getActiveBrokers(clusterName) ### Get the information of the leader broker -Fetch the information of the leader broker. +Fetch the information of the leader broker, for example, the service url. @@ -60,7 +60,7 @@ $ pulsar-admin brokers leader-broker ``` ``` -broker1.use.org.com:8080 +BrokerInfo(serviceUrl=broker1.use.org.com:8080) ``` From 08dffb553f772a070f8ea60b06b3525a99b2be13 Mon Sep 17 00:00:00 2001 From: linjunhua Date: Tue, 9 Mar 2021 10:20:08 +0800 Subject: [PATCH 8/9] [doc] add the content of cmd 'leader-broker' --- site2/docs/admin-api-brokers.md | 1 + site2/docs/reference-pulsar-admin.md | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/site2/docs/admin-api-brokers.md b/site2/docs/admin-api-brokers.md index 33ab56941daa9..e01f84100677a 100644 --- a/site2/docs/admin-api-brokers.md +++ b/site2/docs/admin-api-brokers.md @@ -72,6 +72,7 @@ BrokerInfo(serviceUrl=broker1.use.org.com:8080) ```java admin.brokers().getLeaderBroker() ``` +For the detail of the code above, see [here](https://github.com/apache/pulsar/blob/master/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java#L80) diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md index 291d4fc7dd3fc..c8cf7bdcccb26 100644 --- a/site2/docs/reference-pulsar-admin.md +++ b/site2/docs/reference-pulsar-admin.md @@ -135,6 +135,14 @@ Usage $ pulsar-admin brokers list cluster-name ``` +### `leader-broker` +Get the information of the leader broker + +Usage +```bash +$ pulsar-admin brokers leader-broker +``` + ### `namespaces` List namespaces owned by the broker From 89198252a281c1458064904a88ff9248428f5caa Mon Sep 17 00:00:00 2001 From: linjunhua Date: Tue, 9 Mar 2021 11:53:22 +0800 Subject: [PATCH 9/9] add asf license --- .../common/policies/data/BrokerInfo.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerInfo.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerInfo.java index 68e44e1b22c60..c194a7867e8e9 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerInfo.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerInfo.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.pulsar.common.policies.data; import lombok.AllArgsConstructor;