Skip to content

Commit

Permalink
Prepare admin client to support v2 api. (#1384)
Browse files Browse the repository at this point in the history
* Prepare admin client to support v2 api.

* Fix admin with functions.
  • Loading branch information
cckellogg authored and sijie committed Mar 15, 2018
1 parent 323f4f1 commit cad2039
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 153 deletions.
Expand Up @@ -1649,7 +1649,7 @@ public void testPulsarAdminForUriAndUrlEncoding(String topicName) throws Excepti
pulsarClient.newConsumer().topic(topic1).subscriptionName("my-subscriber-name").subscribe();

PersistentTopicsImpl persistent = (PersistentTopicsImpl) admin.persistentTopics();
Field field = PersistentTopicsImpl.class.getDeclaredField("persistentTopics");
Field field = PersistentTopicsImpl.class.getDeclaredField("adminPersistentTopics");
field.setAccessible(true);
WebTarget persistentTopics = (WebTarget) field.get(persistent);

Expand Down
Expand Up @@ -73,10 +73,12 @@ public class PulsarAdmin implements Closeable {

private final Client client;
private final URL serviceUrl;
protected final WebTarget web;
private final Lookup lookups;
protected final WebTarget root;
protected final Authentication auth;



static {
/**
* The presence of slf4j-jdk14.jar, that is the jul binding for SLF4J, will force SLF4J calls to be delegated to
Expand Down Expand Up @@ -169,17 +171,16 @@ public PulsarAdmin(URL serviceUrl, ClientConfigurationData pulsarConfig) throws
this.client = clientBuilder.build();

this.serviceUrl = serviceUrl;
WebTarget root = client.target(serviceUrl.toString());
web = root.path("/admin");

this.clusters = new ClustersImpl(web, auth);
this.brokers = new BrokersImpl(web, auth);
this.brokerStats = new BrokerStatsImpl(web, auth);
this.properties = new PropertiesImpl(web, auth);
this.namespaces = new NamespacesImpl(web, auth);
this.persistentTopics = new PersistentTopicsImpl(web, auth);
this.nonPersistentTopics = new NonPersistentTopicsImpl(web, auth);
this.resourceQuotas = new ResourceQuotasImpl(web, auth);
root = client.target(serviceUrl.toString());

this.clusters = new ClustersImpl(root, auth);
this.brokers = new BrokersImpl(root, auth);
this.brokerStats = new BrokerStatsImpl(root, auth);
this.properties = new PropertiesImpl(root, auth);
this.namespaces = new NamespacesImpl(root, auth);
this.persistentTopics = new PersistentTopicsImpl(root, auth);
this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth);
this.resourceQuotas = new ResourceQuotasImpl(root, auth);
this.lookups = new LookupImpl(root, auth, pulsarConfig.isUseTls());
}

Expand Down
Expand Up @@ -38,17 +38,17 @@
*/
public class BrokerStatsImpl extends BaseResource implements BrokerStats {

private final WebTarget brokerStats;
private final WebTarget adminBrokerStats;

public BrokerStatsImpl(WebTarget target, Authentication auth) {
super(auth);
brokerStats = target.path("/broker-stats");
adminBrokerStats = target.path("/admin/broker-stats");
}

@Override
public JsonArray getMetrics() throws PulsarAdminException {
try {
String json = request(brokerStats.path("/metrics")).get(String.class);
String json = request(adminBrokerStats.path("/metrics")).get(String.class);
return new Gson().fromJson(json, JsonArray.class);
} catch (Exception e) {
throw getApiException(e);
Expand All @@ -58,7 +58,7 @@ public JsonArray getMetrics() throws PulsarAdminException {
@Override
public AllocatorStats getAllocatorStats(String allocatorName) throws PulsarAdminException {
try {
return request(brokerStats.path("/allocator-stats").path(allocatorName)).get(AllocatorStats.class);
return request(adminBrokerStats.path("/allocator-stats").path(allocatorName)).get(AllocatorStats.class);
} catch (Exception e) {
throw getApiException(e);
}
Expand All @@ -67,7 +67,7 @@ public AllocatorStats getAllocatorStats(String allocatorName) throws PulsarAdmin
@Override
public JsonArray getMBeans() throws PulsarAdminException {
try {
String json = request(brokerStats.path("/mbeans")).get(String.class);
String json = request(adminBrokerStats.path("/mbeans")).get(String.class);
return new Gson().fromJson(json, JsonArray.class);
} catch (Exception e) {
throw getApiException(e);
Expand All @@ -77,7 +77,7 @@ public JsonArray getMBeans() throws PulsarAdminException {
@Override
public JsonObject getTopics() throws PulsarAdminException {
try {
String json = request(brokerStats.path("/destinations")).get(String.class);
String json = request(adminBrokerStats.path("/destinations")).get(String.class);
return new Gson().fromJson(json, JsonObject.class);
} catch (Exception e) {
throw getApiException(e);
Expand All @@ -87,7 +87,7 @@ public JsonObject getTopics() throws PulsarAdminException {
@Override
public LoadManagerReport getLoadReport() throws PulsarAdminException {
try {
return request(brokerStats.path("/load-report")).get(LocalBrokerData.class);
return request(adminBrokerStats.path("/load-report")).get(LocalBrokerData.class);
} catch (Exception e) {
throw getApiException(e);
}
Expand All @@ -96,7 +96,7 @@ public LoadManagerReport getLoadReport() throws PulsarAdminException {
@Override
public JsonObject getPendingBookieOpsStats() throws PulsarAdminException {
try {
String json = request(brokerStats.path("/bookieops")).get(String.class);
String json = request(adminBrokerStats.path("/bookieops")).get(String.class);
return new Gson().fromJson(json, JsonObject.class);
} catch (Exception e) {
throw getApiException(e);
Expand All @@ -107,7 +107,7 @@ public JsonObject getBrokerResourceAvailability(String property, String cluster,
throws PulsarAdminException {
try {
String json = request(
brokerStats.path("/broker-resource-availability").path(property).path(cluster).path(namespace))
adminBrokerStats.path("/broker-resource-availability").path(property).path(cluster).path(namespace))
.get(String.class);
return new Gson().fromJson(json, JsonObject.class);
} catch (Exception e) {
Expand Down
Expand Up @@ -33,17 +33,17 @@
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;

public class BrokersImpl extends BaseResource implements Brokers {
private final WebTarget brokers;
private final WebTarget adminBrokers;

public BrokersImpl(WebTarget web, Authentication auth) {
super(auth);
brokers = web.path("/brokers");
adminBrokers = web.path("/admin/brokers");
}

@Override
public List<String> getActiveBrokers(String cluster) throws PulsarAdminException {
try {
return request(brokers.path(cluster)).get(new GenericType<List<String>>() {
return request(adminBrokers.path(cluster)).get(new GenericType<List<String>>() {
});
} catch (Exception e) {
throw getApiException(e);
Expand All @@ -54,7 +54,7 @@ public List<String> getActiveBrokers(String cluster) throws PulsarAdminException
public Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(String cluster, String brokerUrl)
throws PulsarAdminException {
try {
return request(brokers.path(cluster).path(brokerUrl).path("ownedNamespaces")).get(
return request(adminBrokers.path(cluster).path(brokerUrl).path("ownedNamespaces")).get(
new GenericType<Map<String, NamespaceOwnershipStatus>>() {
});
} catch (Exception e) {
Expand All @@ -65,7 +65,7 @@ public Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(String cluster,
@Override
public void updateDynamicConfiguration(String configName, String configValue) throws PulsarAdminException {
try {
request(brokers.path("/configuration/").path(configName).path(configValue)).post(Entity.json(""),
request(adminBrokers.path("/configuration/").path(configName).path(configValue)).post(Entity.json(""),
ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
Expand All @@ -75,7 +75,7 @@ public void updateDynamicConfiguration(String configName, String configValue) th
@Override
public Map<String, String> getAllDynamicConfigurations() throws PulsarAdminException {
try {
return request(brokers.path("/configuration/").path("values")).get(new GenericType<Map<String, String>>() {
return request(adminBrokers.path("/configuration/").path("values")).get(new GenericType<Map<String, String>>() {
});
} catch (Exception e) {
throw getApiException(e);
Expand All @@ -85,7 +85,7 @@ public Map<String, String> getAllDynamicConfigurations() throws PulsarAdminExcep
@Override
public List<String> getDynamicConfigurationNames() throws PulsarAdminException {
try {
return request(brokers.path("/configuration")).get(new GenericType<List<String>>() {
return request(adminBrokers.path("/configuration")).get(new GenericType<List<String>>() {
});
} catch (Exception e) {
throw getApiException(e);
Expand All @@ -95,7 +95,7 @@ public List<String> getDynamicConfigurationNames() throws PulsarAdminException {
@Override
public InternalConfigurationData getInternalConfigurationData() throws PulsarAdminException {
try {
return request(brokers.path("/internal-configuration")).get(InternalConfigurationData.class);
return request(adminBrokers.path("/internal-configuration")).get(InternalConfigurationData.class);
} catch (Exception e) {
throw getApiException(e);
}
Expand Down
Expand Up @@ -38,17 +38,18 @@

public class ClustersImpl extends BaseResource implements Clusters {

private final WebTarget clusters;
private final WebTarget adminClusters;
//private final WebTarget clusters;

public ClustersImpl(WebTarget web, Authentication auth) {
super(auth);
clusters = web.path("/clusters");
adminClusters = web.path("/admin/clusters");
}

@Override
public List<String> getClusters() throws PulsarAdminException {
try {
return request(clusters).get(new GenericType<List<String>>() {
return request(adminClusters).get(new GenericType<List<String>>() {
});
} catch (Exception e) {
throw getApiException(e);
Expand All @@ -58,7 +59,7 @@ public List<String> getClusters() throws PulsarAdminException {
@Override
public ClusterData getCluster(String cluster) throws PulsarAdminException {
try {
return request(clusters.path(cluster)).get(ClusterData.class);
return request(adminClusters.path(cluster)).get(ClusterData.class);
} catch (Exception e) {
throw getApiException(e);
}
Expand All @@ -67,7 +68,7 @@ public ClusterData getCluster(String cluster) throws PulsarAdminException {
@Override
public void createCluster(String cluster, ClusterData clusterData) throws PulsarAdminException {
try {
request(clusters.path(cluster))
request(adminClusters.path(cluster))
.put(Entity.entity(clusterData, MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
Expand All @@ -77,7 +78,7 @@ public void createCluster(String cluster, ClusterData clusterData) throws Pulsar
@Override
public void updateCluster(String cluster, ClusterData clusterData) throws PulsarAdminException {
try {
request(clusters.path(cluster)).post(Entity.entity(clusterData, MediaType.APPLICATION_JSON),
request(adminClusters.path(cluster)).post(Entity.entity(clusterData, MediaType.APPLICATION_JSON),
ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
Expand All @@ -87,7 +88,7 @@ public void updateCluster(String cluster, ClusterData clusterData) throws Pulsar
@Override
public void updatePeerClusterNames(String cluster, LinkedHashSet<String> peerClusterNames) throws PulsarAdminException {
try {
request(clusters.path(cluster).path("peers")).post(Entity.entity(peerClusterNames, MediaType.APPLICATION_JSON),
request(adminClusters.path(cluster).path("peers")).post(Entity.entity(peerClusterNames, MediaType.APPLICATION_JSON),
ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
Expand All @@ -98,7 +99,7 @@ public void updatePeerClusterNames(String cluster, LinkedHashSet<String> peerClu
@Override
public Set<String> getPeerClusterNames(String cluster) throws PulsarAdminException {
try {
return request(clusters.path(cluster).path("peers")).get(LinkedHashSet.class);
return request(adminClusters.path(cluster).path("peers")).get(LinkedHashSet.class);
} catch (Exception e) {
throw getApiException(e);
}
Expand All @@ -107,7 +108,7 @@ public Set<String> getPeerClusterNames(String cluster) throws PulsarAdminExcepti
@Override
public void deleteCluster(String cluster) throws PulsarAdminException {
try {
request(clusters.path(cluster)).delete(ErrorData.class);
request(adminClusters.path(cluster)).delete(ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
Expand All @@ -116,7 +117,7 @@ public void deleteCluster(String cluster) throws PulsarAdminException {
@Override
public Map<String, NamespaceIsolationData> getNamespaceIsolationPolicies(String cluster) throws PulsarAdminException {
try {
return request(clusters.path(cluster).path("namespaceIsolationPolicies")).get(
return request(adminClusters.path(cluster).path("namespaceIsolationPolicies")).get(
new GenericType<Map<String, NamespaceIsolationData>>() {
});
} catch (Exception e) {
Expand All @@ -138,13 +139,14 @@ public void updateNamespaceIsolationPolicy(String cluster, String policyName,

@Override
public void deleteNamespaceIsolationPolicy(String cluster, String policyName) throws PulsarAdminException {
request(clusters.path(cluster).path("namespaceIsolationPolicies").path(policyName)).delete(ErrorData.class);
request(adminClusters.path(cluster)
.path("namespaceIsolationPolicies").path(policyName)).delete(ErrorData.class);
}

private void setNamespaceIsolationPolicy(String cluster, String policyName,
NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException {
try {
request(clusters.path(cluster).path("namespaceIsolationPolicies").path(policyName)).post(
request(adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName)).post(
Entity.entity(namespaceIsolationData, MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
Expand All @@ -155,7 +157,7 @@ private void setNamespaceIsolationPolicy(String cluster, String policyName,
public NamespaceIsolationData getNamespaceIsolationPolicy(String cluster, String policyName)
throws PulsarAdminException {
try {
return request(clusters.path(cluster).path("namespaceIsolationPolicies").path(policyName)).get(
return request(adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName)).get(
NamespaceIsolationData.class);
} catch (Exception e) {
throw getApiException(e);
Expand All @@ -174,13 +176,13 @@ public void updateFailureDomain(String cluster, String domainName, FailureDomain

@Override
public void deleteFailureDomain(String cluster, String domainName) throws PulsarAdminException {
request(clusters.path(cluster).path("failureDomains").path(domainName)).delete(ErrorData.class);
request(adminClusters.path(cluster).path("failureDomains").path(domainName)).delete(ErrorData.class);
}

@Override
public Map<String, FailureDomain> getFailureDomains(String cluster) throws PulsarAdminException {
try {
return request(clusters.path(cluster).path("failureDomains"))
return request(adminClusters.path(cluster).path("failureDomains"))
.get(new GenericType<Map<String, FailureDomain>>() {
});
} catch (Exception e) {
Expand All @@ -191,7 +193,8 @@ public Map<String, FailureDomain> getFailureDomains(String cluster) throws Pulsa
@Override
public FailureDomain getFailureDomain(String cluster, String domainName) throws PulsarAdminException {
try {
return request(clusters.path(cluster).path("failureDomains").path(domainName)).get(FailureDomain.class);
return request(adminClusters.path(cluster).path("failureDomains")
.path(domainName)).get(FailureDomain.class);
} catch (Exception e) {
throw getApiException(e);
}
Expand All @@ -200,7 +203,7 @@ public FailureDomain getFailureDomain(String cluster, String domainName) throws
private void setDomain(String cluster, String domainName,
FailureDomain domain) throws PulsarAdminException {
try {
request(clusters.path(cluster).path("failureDomains").path(domainName)).post(
request(adminClusters.path(cluster).path("failureDomains").path(domainName)).post(
Entity.entity(domain, MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
Expand Down

0 comments on commit cad2039

Please sign in to comment.