Skip to content

Commit

Permalink
Cleaning up and improving worker endpoints (#3191)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrypeng authored and srkukarni committed Dec 14, 2018
1 parent 018fb04 commit 23a6622
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 214 deletions.
Expand Up @@ -18,24 +18,24 @@
*/
package org.apache.pulsar.broker.admin.v2;

import java.util.function.Supplier;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.worker.WorkerService;

import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

@Slf4j
@Path("/worker")
public class Worker extends AdminResource implements Supplier<WorkerService> {
Expand All @@ -53,42 +53,47 @@ public WorkerService get() {

@GET
@ApiOperation(
value = "Fetches information about the Pulsar cluster running Pulsar Functions"
value = "Fetches information about the Pulsar cluster running Pulsar Functions",
response = WorkerInfo.class,
responseContainer = "List"
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")

@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 503, message = "Worker service is not running")
})
@Path("/cluster")
@Produces(MediaType.APPLICATION_JSON)
public Response getCluster() {
public List<WorkerInfo> getCluster() {
return worker.getCluster();
}

@GET
@ApiOperation(
value = "Fetches info about the leader node of the Pulsar cluster running Pulsar Functions")
value = "Fetches info about the leader node of the Pulsar cluster running Pulsar Functions",
response = WorkerInfo.class
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")

@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 503, message = "Worker service is not running")
})
@Path("/cluster/leader")
@Produces(MediaType.APPLICATION_JSON)
public Response getClusterLeader() {
public WorkerInfo getClusterLeader() {
return worker.getClusterLeader();
}

@GET
@ApiOperation(
value = "Fetches information about which Pulsar Functions are assigned to which Pulsar clusters",
response = Function.Assignment.class,
responseContainer = "Map"
response = Map.class
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 503, message = "Worker service is not running")
})
@Path("/assignments")
public Response getAssignments() {
@Produces(MediaType.APPLICATION_JSON)
public Map<String, Collection<String>> getAssignments() {
return worker.getAssignments();
}
}
Expand Up @@ -23,15 +23,17 @@
import io.swagger.annotations.ApiResponses;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

@Slf4j
Expand All @@ -51,18 +53,33 @@ public WorkerService get() {

@GET
@Path("/metrics")
@ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, responseContainer = "List")
@ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission") })
@ApiOperation(
value = "Gets the metrics for Monitoring",
notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics",
response = org.apache.pulsar.common.stats.Metrics.class,
responseContainer = "List")
@ApiResponses(value = {
@ApiResponse(code = 401, message = "Don't have admin permission"),
@ApiResponse(code = 503, message = "Worker service is not running")
})
@Produces(MediaType.APPLICATION_JSON)
public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws Exception {
return worker.getWorkerMetrics(clientAppId());
}

@GET
@Path("/functionsmetrics")
@ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = Metrics.class)
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 503, message = "Worker service is not running") })
public Response getStats() throws IOException {
@ApiOperation(
value = "Get metrics for all functions owned by worker",
notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics",
response = WorkerFunctionInstanceStats.class,
responseContainer = "List")
@ApiResponses(value = {
@ApiResponse(code = 401, message = "Don't have admin permission"),
@ApiResponse(code = 503, message = "Worker service is not running")
})
@Produces(MediaType.APPLICATION_JSON)
public List<WorkerFunctionInstanceStats> getStats() throws IOException {
return worker.getFunctionsMetrics(clientAppId());
}
}
Expand Up @@ -22,8 +22,8 @@
import java.util.List;
import java.util.Map;

import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;

/**
* Admin interface for worker stats management.
Expand All @@ -36,8 +36,8 @@ public interface Worker {
* @return
* @throws PulsarAdminException
*/
Metrics getFunctionsStats() throws PulsarAdminException;
List<WorkerFunctionInstanceStats> getFunctionsStats() throws PulsarAdminException;

/**
* Get worker metrics.
* @return
Expand Down
Expand Up @@ -18,27 +18,20 @@
*/
package org.apache.pulsar.client.admin.internal;

import static org.apache.pulsar.client.admin.internal.FunctionsImpl.mergeJson;

import java.lang.reflect.Type;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Worker;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;

import javax.ws.rs.ClientErrorException;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Worker;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.WorkerInfo;
import java.util.Collection;
import java.util.List;
import java.util.Map;

@Slf4j
public class WorkerImpl extends BaseResource implements Worker {
Expand All @@ -53,27 +46,28 @@ public WorkerImpl(WebTarget web, Authentication auth) {
}

@Override
public Metrics getFunctionsStats() throws PulsarAdminException {
public List<WorkerFunctionInstanceStats> getFunctionsStats() throws PulsarAdminException {
try {
Response response = request(workerStats.path("functionsmetrics")).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
String jsonResponse = response.readEntity(String.class);
Metrics.Builder metricsBuilder = Metrics.newBuilder();
mergeJson(jsonResponse, metricsBuilder);
return metricsBuilder.build();
} catch (Exception e) {
throw getApiException(e);
}
}
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
List<WorkerFunctionInstanceStats> metricsList
= response.readEntity(new GenericType<List<WorkerFunctionInstanceStats>>() {});
return metricsList;
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws PulsarAdminException {
try {
return request(workerStats.path("metrics"))
.get(new GenericType<List<org.apache.pulsar.common.stats.Metrics>>() {
});
Response response = request(workerStats.path("metrics")).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
return response.readEntity(new GenericType<List<org.apache.pulsar.common.stats.Metrics>>() {});
} catch (Exception e) {
throw getApiException(e);
}
Expand All @@ -82,9 +76,11 @@ public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws Pu
@Override
public List<WorkerInfo> getCluster() throws PulsarAdminException {
try {
return request(worker.path("cluster"))
.get(new GenericType<List<WorkerInfo>>() {
});
Response response = request(worker.path("cluster")).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
return response.readEntity(new GenericType<List<WorkerInfo>>() {});
} catch (Exception e) {
throw getApiException(e);
}
Expand All @@ -93,8 +89,11 @@ public List<WorkerInfo> getCluster() throws PulsarAdminException {
@Override
public WorkerInfo getClusterLeader() throws PulsarAdminException {
try {
return request(worker.path("cluster").path("leader"))
.get(new GenericType<WorkerInfo>(){});
Response response = request(worker.path("cluster").path("leader")).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
return response.readEntity(new GenericType<WorkerInfo>(){});
} catch (Exception e) {
throw getApiException(e);
}
Expand All @@ -107,9 +106,8 @@ public Map<String, Collection<String>> getAssignments() throws PulsarAdminExcept
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
String jsonResponse = response.readEntity(String.class);
Type type = new TypeToken<Map<String, Collection<String>>>(){}.getType();
Map<String, Collection<String>> assignments = new Gson().fromJson(jsonResponse, type);
Map<String, Collection<String>> assignments
= response.readEntity(new GenericType<Map<String, Collection<String>>>() {});
return assignments;
} catch (Exception e) {
throw getApiException(e);
Expand Down
Expand Up @@ -175,6 +175,14 @@ <T> void print(List<T> items) {
}
}

<T> void printList(T item) {
try {
System.out.println(writer.writeValueAsString(item));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

<T> void print(T item) {
try {
System.out.println(writer.writeValueAsString(item));
Expand Down

0 comments on commit 23a6622

Please sign in to comment.