Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE 9757][pulsar-admin] add cmd to get service url of the leader broker #9799

Merged
merged 10 commits into from
Mar 10, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import javax.ws.rs.core.Response.Status;
import org.apache.pulsar.broker.PulsarService.State;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
Expand All @@ -56,6 +57,7 @@
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.policies.data.BrokerInfo;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -92,6 +94,31 @@ public Set<String> getActiveBrokers(@PathParam("cluster") String cluster) throws
}
}

@GET
@Path("/leaderBroker")
@ApiOperation(
value = "Get the information of the leader broker.",
response = BrokerInfo.class)
@ApiResponses(
value = {
@ApiResponse(code = 401, message = "Authentication required"),
@ApiResponse(code = 403, message = "This operation requires super-user access"),
@ApiResponse(code = 404, message = "Leader broker not found") })
public BrokerInfo getLeaderBroker() throws Exception {
validateSuperUserAccess();

try {
LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader()
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find leader broker"));
BrokerInfo brokerInfo = new BrokerInfo();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I create BrokerInfo which is the same as LeaderBroker, Because I can not access LeaderBroker in module pulsar-client-admin.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eolivelli Please help to review this PR.

brokerInfo.setServiceUrl(leaderBroker.getServiceUrl());
return brokerInfo;
} catch (Exception e) {
LOG.error("[{}] Failed to get the information of the leader broker.", clientAppId(), e);
throw new RestException(e);
}
}

@GET
@Path("/{clusterName}/{broker-webserviceurl}/ownedNamespaces")
@ApiOperation(value = "Get the list of namespaces served by the specific broker",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
Expand Down Expand Up @@ -109,6 +110,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.BrokerInfo;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
Expand Down Expand Up @@ -441,6 +443,9 @@ public void brokers() throws Exception {
Assert.assertNotNull(list2);
Assert.assertEquals(list2.size(), 1);

BrokerInfo leaderBroker = admin.brokers().getLeaderBroker();
Assert.assertEquals(leaderBroker.getServiceUrl(), pulsar.getLeaderElectionService().getCurrentLeader().map(LeaderBroker::getServiceUrl).get());

Map<String, NamespaceOwnershipStatus> nsMap = admin.brokers().getOwnedNamespaces("test", list.get(0));
// since sla-monitor ns is not created nsMap.size() == 1 (for HeartBeat Namespace)
Assert.assertEquals(nsMap.size(), 1);
Expand Down Expand Up @@ -634,7 +639,7 @@ public void properties() throws PulsarAdminException {
} catch (PulsarAdminException e) {
assertTrue(e instanceof NotFoundException);
}

Set<String> allowedClusters = Sets.newHashSet("test");
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), allowedClusters);
admin.tenants().updateTenant("prop-xyz", tenantInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.conf.InternalConfigurationData;
Expand All @@ -79,6 +80,7 @@
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.BrokerInfo;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.Policies;
Expand Down Expand Up @@ -620,6 +622,9 @@ public void brokers() throws Exception {
Set<String> activeBrokers = brokers.getActiveBrokers("use");
assertEquals(activeBrokers.size(), 1);
assertEquals(activeBrokers, Sets.newHashSet(pulsar.getAdvertisedAddress() + ":" + pulsar.getListenPortHTTP().get()));

BrokerInfo leaderBroker = brokers.getLeaderBroker();
assertEquals(leaderBroker.getServiceUrl(), pulsar.getLeaderElectionService().getCurrentLeader().map(LeaderBroker::getServiceUrl).get());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.policies.data.BrokerInfo;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;

/**
Expand Down Expand Up @@ -72,6 +73,40 @@ public interface Brokers {
*/
CompletableFuture<List<String>> getActiveBrokersAsync(String cluster);

/**
* Get the information of the leader broker.
* <p/>
* Get the information of the leader broker.
* <p/>
* Response Example:
*
* <pre>
* <code>{serviceUrl:"prod1-broker1.messaging.use.example.com:8080"}</code>
* </pre>
*
* @return the information of the leader broker.
* @throws PulsarAdminException
* Unexpected error
*/
BrokerInfo getLeaderBroker() throws PulsarAdminException;

/**
* Get the service url of the leader broker asynchronously.
* <p/>
* Get the service url of the leader broker.
* <p/>
* Response Example:
*
* <pre>
* <code>{serviceUrl:"prod1-broker1.messaging.use.example.com:8080"}</code>
* </pre>
*
* @return the service url of the leader broker
* @throws PulsarAdminException
* Unexpected error
*/
CompletableFuture<BrokerInfo> getLeaderBrokerAsync() throws PulsarAdminException;

/**
* Get the map of owned namespaces and their status from a single broker in the cluster.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.policies.data.BrokerInfo;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.util.Codec;

Expand Down Expand Up @@ -76,6 +77,39 @@ public void failed(Throwable throwable) {
return future;
}

@Override
public BrokerInfo getLeaderBroker() throws PulsarAdminException {
try {
return getLeaderBrokerAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<BrokerInfo> getLeaderBrokerAsync() {
WebTarget path = adminBrokers.path("leaderBroker");
final CompletableFuture<BrokerInfo> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<BrokerInfo>() {
@Override
public void completed(BrokerInfo leaderBroker) {
future.complete(leaderBroker);
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

@Override
public Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(String cluster, String brokerUrl)
throws PulsarAdminException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public void brokers() throws Exception {
brokers.run(split("list use"));
verify(mockBrokers).getActiveBrokers("use");

brokers.run(split("leader-broker"));
verify(mockBrokers).getLeaderBroker();

brokers.run(split("namespaces use --url http://my-service.url:8080"));
verify(mockBrokers).getOwnedNamespaces("use", "http://my-service.url:8080");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ void run() throws Exception {
}
}

@Parameters(commandDescription = "Get the information of the leader broker")
private class LeaderBroker extends CliCommand {

@Override
void run() throws Exception {
print(getAdmin().brokers().getLeaderBroker());
}
}

@Parameters(commandDescription = "List namespaces owned by the broker")
private class Namespaces extends CliCommand {
@Parameter(description = "cluster-name\n", required = true)
Expand Down Expand Up @@ -77,7 +86,7 @@ void run() throws Exception {
getAdmin().brokers().deleteDynamicConfiguration(configName);
}
}

@Parameters(commandDescription = "Get all overridden dynamic-configuration values")
private class GetAllConfigurationsCmd extends CliCommand {

Expand All @@ -86,7 +95,7 @@ void run() throws Exception {
print(getAdmin().brokers().getAllDynamicConfigurations());
}
}

@Parameters(commandDescription = "Get list of updatable configuration name")
private class GetUpdatableConfigCmd extends CliCommand {

Expand Down Expand Up @@ -140,6 +149,7 @@ void run() throws Exception {
public CmdBrokers(Supplier<PulsarAdmin> admin) {
super("brokers", admin);
jcommander.addCommand("list", new List());
jcommander.addCommand("leader-broker", new LeaderBroker());
jcommander.addCommand("namespaces", new Namespaces());
jcommander.addCommand("update-dynamic-config", new UpdateConfigurationCmd());
jcommander.addCommand("delete-dynamic-config", new DeleteConfigurationCmd());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.policies.data;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Broker Information
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class BrokerInfo {
private String serviceUrl;
}
28 changes: 28 additions & 0 deletions site2/docs/admin-api-brokers.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,34 @@ admin.brokers().getActiveBrokers(clusterName)

<!--END_DOCUSAURUS_CODE_TABS-->

### Get the information of the leader broker

Fetch the information of the leader broker, for example, the service url.

<!--DOCUSAURUS_CODE_TABS-->
<!--pulsar-admin-->

```shell
$ pulsar-admin brokers leader-broker
```

Copy link
Member

@Anonymitaet Anonymitaet Mar 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. If pulsar-admin brokers leader-broker has flags and descriptions, do not forget to add them to pulsar-admin docs.

  2. To allow users to have more info (flags and descriptions), it is better to add the URL of the pulsar-admin brokers leader-broker command on pulsar-admin. Like For more information, see [here](xxx).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your tip.
There is no more args for this command.

```
BrokerInfo(serviceUrl=broker1.use.org.com:8080)
```

<!--REST API-->

{@inject: endpoint|GET|/admin/v2/brokers/leaderBroker?version=[[pulsar:version_number]]}

<!--JAVA-->

```java
admin.brokers().getLeaderBroker()
```
For the detail of the code above, see [here](https://github.com/apache/pulsar/blob/master/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java#L80)

Copy link
Member

@Anonymitaet Anonymitaet Mar 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to add the Java admin URL to give more info to users?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The URL was contained in this doc.
The brokers method of the {@Inject: javadoc:PulsarAdmin:/admin/org/apache/pulsar/client/admin/PulsarAdmin.html} object in the Java API

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your explanations. Sorry I do not get you. I mean we can add a link w/ the line pointed to this command? Like https://github.com/apache/pulsar/blob/master/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java#L80

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

<!--END_DOCUSAURUS_CODE_TABS-->

#### list of namespaces owned by a given broker

It finds all namespaces which are owned and served by a given broker.
Expand Down
8 changes: 8 additions & 0 deletions site2/docs/reference-pulsar-admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ Usage
$ pulsar-admin brokers list cluster-name
```

### `leader-broker`
Get the information of the leader broker

Usage
```bash
$ pulsar-admin brokers leader-broker
```

### `namespaces`
List namespaces owned by the broker

Expand Down