Skip to content

Commit

Permalink
[pulsar-broker] get list of bundles under a namespace (apache#8450)
Browse files Browse the repository at this point in the history
* [pulsar-broker] get list of bundles under a namespace

* add bundles

* add command
  • Loading branch information
rdhabalia authored and flowchartsman committed Nov 17, 2020
1 parent 6de1f22 commit fffcb28
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;

import org.apache.pulsar.common.policies.data.TenantOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1284,6 +1284,17 @@ public void testMaxNumPartitionsPerPartitionedTopicFailure() {
}
}

@Test
public void testListOfNamespaceBundles() throws Exception {
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant("prop-xyz2", tenantInfo);
admin.namespaces().createNamespace("prop-xyz2/ns1", 10);
admin.namespaces().setNamespaceReplicationClusters("prop-xyz2/ns1", Sets.newHashSet("test"));
admin.namespaces().createNamespace("prop-xyz2/test/ns2", 10);
assertEquals(admin.namespaces().getBundles("prop-xyz2/ns1").numBundles, 10);
assertEquals(admin.namespaces().getBundles("prop-xyz2/test/ns2").numBundles, 10);
}

@Test
public void testUpdateClusterWithProxyUrl() throws Exception {
ClusterData cluster = new ClusterData(pulsar.getWebServiceAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,35 @@ public interface Namespaces {
*/
CompletableFuture<List<String>> getTopicsAsync(String namespace);

/**
* Get the list of bundles.
* <p/>
* Get the list of all the bundles under a certain namespace.
* <p/>
*
* @param namespace
* Namespace name
*
* @throws NotAuthorizedException
* You don't have admin permission
* @throws NotFoundException
* Namespace does not exist
* @throws PulsarAdminException
* Unexpected error
*/
BundlesData getBundles(String namespace) throws PulsarAdminException;

/**
* Get the list of bundles asynchronously.
* <p/>
* Get the list of all the bundles under a certain namespace.
* <p/>
*
* @param namespace
* Namespace name
*/
CompletableFuture<BundlesData> getBundlesAsync(String namespace);

/**
* Get policies for a namespace.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,41 @@ public List<String> getTopics(String namespace) throws PulsarAdminException {
}
}

@Override
public BundlesData getBundles(String namespace) throws PulsarAdminException {
try {
return getBundlesAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<BundlesData> getBundlesAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
String action = "bundles";
WebTarget path = namespacePath(ns, action);
final CompletableFuture<BundlesData> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<BundlesData>() {
@Override
public void completed(BundlesData bundles) {
future.complete(bundles);
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

@Override
public CompletableFuture<List<String>> getTopicsAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Get the list of bundles for a namespace")
private class GetBundles extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
print(admin.namespaces().getBundles(namespace));
}
}

@Parameters(commandDescription = "Get the list of destinations for a namespace", hidden = true)
private class GetDestinations extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
Expand Down Expand Up @@ -1710,6 +1722,7 @@ public CmdNamespaces(PulsarAdmin admin) {
jcommander.addCommand("list-cluster", new GetNamespacesPerCluster());

jcommander.addCommand("topics", new GetTopics());
jcommander.addCommand("bundles", new GetBundles());
jcommander.addCommand("destinations", new GetDestinations());
jcommander.addCommand("policies", new GetPolicies());
jcommander.addCommand("create", new Create());
Expand Down

0 comments on commit fffcb28

Please sign in to comment.