Skip to content

Commit

Permalink
remove redundant endpoint to get workers in cluster (#2240)
Browse files Browse the repository at this point in the history
### Motivation

There are two endpoints to do the same thing for worker functions which is get a list of workers

### Modifications

Remove one of the redundant endpoints
  • Loading branch information
jerrypeng authored and sijie committed Jul 27, 2018
1 parent 46e6ea8 commit 8318977
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 44 deletions.
Expand Up @@ -209,7 +209,8 @@ public Response listFunctions(final @PathParam("tenant") String tenant,


}) })
@Path("/cluster") @Path("/cluster")
public Response getCluster() { @Produces(MediaType.APPLICATION_JSON)
public List<WorkerInfo> getCluster() {
return functions.getCluster(); return functions.getCluster();
} }


Expand Down Expand Up @@ -302,14 +303,4 @@ public Response downloadFunction(final @QueryParam("path") String path) {
public List<ConnectorDefinition> getConnectorsList() throws IOException { public List<ConnectorDefinition> getConnectorsList() throws IOException {
return functions.getListOfConnectors(); return functions.getListOfConnectors();
} }

@GET
@Path("/workers")
@ApiOperation(value = "Get all current member workers.")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public List<WorkerInfo> getWorkers() {
return functions.getWorkers();
}

} }
Expand Up @@ -212,7 +212,7 @@ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {


@Test @Test
public void testGetWokersApi() throws Exception { public void testGetWokersApi() throws Exception {
List<WorkerInfo> workers = admin.functions().getWorkers(); List<WorkerInfo> workers = admin.functions().getCluster();
Assert.assertEquals(workers.size(), 1); Assert.assertEquals(workers.size(), 1);
Assert.assertEquals(workers.get(0).getPort(), workerServicePort); Assert.assertEquals(workers.get(0).getPort(), workerServicePort);
} }
Expand Down
Expand Up @@ -258,5 +258,5 @@ public interface Functions {
* @return * @return
* @throws PulsarAdminException * @throws PulsarAdminException
*/ */
List<WorkerInfo> getWorkers() throws PulsarAdminException; List<WorkerInfo> getCluster() throws PulsarAdminException;
} }
Expand Up @@ -273,9 +273,9 @@ public Set<String> getSinks() throws PulsarAdminException {
.map(ConnectorDefinition::getName).collect(Collectors.toSet()); .map(ConnectorDefinition::getName).collect(Collectors.toSet());
} }


public List<WorkerInfo> getWorkers() throws PulsarAdminException { public List<WorkerInfo> getCluster() throws PulsarAdminException {
try { try {
return request(functions.path("workers")).get(new GenericType<List<WorkerInfo>>() { return request(functions.path("cluster")).get(new GenericType<List<WorkerInfo>>() {
}); });
} catch (Exception e) { } catch (Exception e) {
throw getApiException(e); throw getApiException(e);
Expand Down
Expand Up @@ -107,7 +107,7 @@ public class CmdFunctions extends CmdBase {
private final TriggerFunction triggerer; private final TriggerFunction triggerer;
private final UploadFunction uploader; private final UploadFunction uploader;
private final DownloadFunction downloader; private final DownloadFunction downloader;
private final GetWorkers workers; private final GetCluster cluster;


/** /**
* Base command * Base command
Expand Down Expand Up @@ -888,11 +888,11 @@ void runCmd() throws Exception {
} }
} }


@Parameters(commandDescription = "Get list of workers registered into cluster") @Parameters(commandDescription = "Get list of workers registered in cluster")
class GetWorkers extends BaseCommand { class GetCluster extends BaseCommand {
@Override @Override
void runCmd() throws Exception { void runCmd() throws Exception {
String json = (new Gson()).toJson(admin.functions().getWorkers()); String json = (new Gson()).toJson(admin.functions().getCluster());
Gson gson = new GsonBuilder().setPrettyPrinting().create(); Gson gson = new GsonBuilder().setPrettyPrinting().create();
System.out.println(gson.toJson(new JsonParser().parse(json))); System.out.println(gson.toJson(new JsonParser().parse(json)));
} }
Expand All @@ -911,7 +911,7 @@ public CmdFunctions(PulsarAdmin admin) throws PulsarClientException {
triggerer = new TriggerFunction(); triggerer = new TriggerFunction();
uploader = new UploadFunction(); uploader = new UploadFunction();
downloader = new DownloadFunction(); downloader = new DownloadFunction();
workers = new GetWorkers(); cluster = new GetCluster();
jcommander.addCommand("localrun", getLocalRunner()); jcommander.addCommand("localrun", getLocalRunner());
jcommander.addCommand("create", getCreater()); jcommander.addCommand("create", getCreater());
jcommander.addCommand("delete", getDeleter()); jcommander.addCommand("delete", getDeleter());
Expand All @@ -923,7 +923,7 @@ public CmdFunctions(PulsarAdmin admin) throws PulsarClientException {
jcommander.addCommand("trigger", getTriggerer()); jcommander.addCommand("trigger", getTriggerer());
jcommander.addCommand("upload", getUploader()); jcommander.addCommand("upload", getUploader());
jcommander.addCommand("download", getDownloader()); jcommander.addCommand("download", getDownloader());
jcommander.addCommand("workers", workers); jcommander.addCommand("cluster", cluster);
} }


@VisibleForTesting @VisibleForTesting
Expand Down
Expand Up @@ -488,7 +488,7 @@ public List<ConnectorDefinition> getListOfConnectors() {
return this.worker().getConnectorsManager().getConnectors(); return this.worker().getConnectorsManager().getConnectors();
} }


public List<WorkerInfo> getWorkers() { public List<WorkerInfo> getCluster() {
if (!isWorkerServiceAvailable()) { if (!isWorkerServiceAvailable()) {
throw new WebApplicationException( throw new WebApplicationException(
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON) Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
Expand All @@ -497,17 +497,6 @@ public List<WorkerInfo> getWorkers() {
return worker().getMembershipManager().getCurrentMembership(); return worker().getMembershipManager().getCurrentMembership();
} }


public Response getCluster() {

if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
}

MembershipManager membershipManager = worker().getMembershipManager();
List<WorkerInfo> members = membershipManager.getCurrentMembership();
return Response.status(Status.OK).entity(new Gson().toJson(members)).build();
}

public WorkerInfo getClusterLeader() { public WorkerInfo getClusterLeader() {
if (!isWorkerServiceAvailable()) { if (!isWorkerServiceAvailable()) {
throw new WebApplicationException( throw new WebApplicationException(
Expand Down
Expand Up @@ -131,7 +131,9 @@ public Response listFunctions(final @PathParam("tenant") String tenant,


@GET @GET
@Path("/cluster") @Path("/cluster")
public Response getCluster() { @Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Fetches information about the Pulsar cluster running Pulsar Functions")
public List<WorkerInfo> getCluster() {
return functions.getCluster(); return functions.getCluster();
} }


Expand Down Expand Up @@ -179,14 +181,4 @@ public Response downloadFunction(final @QueryParam("path") String path) {
public List<ConnectorDefinition> getConnectorsList() throws IOException { public List<ConnectorDefinition> getConnectorsList() throws IOException {
return functions.getListOfConnectors(); return functions.getListOfConnectors();
} }

@GET
@Path("/workers")
@ApiOperation(value = "Get all current member workers.")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public List<WorkerInfo> getWorkers() {
return functions.getWorkers();
}

} }

0 comments on commit 8318977

Please sign in to comment.