Navigation Menu

Skip to content

Commit

Permalink
Added cli commands to get function cluster related information (#2426)
Browse files Browse the repository at this point in the history
* Added command line to get cluster/cluster leader/function assignment information.
Also refactored such kind of meta requests to a seperate endpoint

* Removed left-over debug statements

* Added back /functionsmetrics

* Added /functionsmetrics back to the broker worker

* Removed leftover references of getcluster

* Fix integration tests

* Fixed instantiation of service in worker only mode

* Removed log statement

* Seperated stats calls to a seperate endpoint to mimic broker
  • Loading branch information
srkukarni committed Aug 25, 2018
1 parent d4b214c commit f44d367
Show file tree
Hide file tree
Showing 21 changed files with 640 additions and 296 deletions.
Expand Up @@ -198,50 +198,6 @@ public Response listFunctions(final @PathParam("tenant") String tenant,


} }


@GET
@ApiOperation(
value = "Fetches information about the Pulsar cluster running Pulsar Functions",
response = WorkerInfo.class,
responseContainer = "List"
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")

})
@Path("/cluster")
@Produces(MediaType.APPLICATION_JSON)
public List<WorkerInfo> getCluster() {
return functions.getCluster();
}

@GET
@ApiOperation(
value = "Fetches info about the leader node of the Pulsar cluster running Pulsar Functions",
response = WorkerInfo.class
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")

})
@Path("/cluster/leader")
public WorkerInfo getClusterLeader() {
return functions.getClusterLeader();
}

@GET
@ApiOperation(
value = "Fetches information about which Pulsar Functions are assigned to which Pulsar clusters",
response = Assignment.class,
responseContainer = "Map"
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
@Path("/assignments")
public Response getAssignments() {
return functions.getAssignments();
}

@POST @POST
@ApiOperation( @ApiOperation(
value = "Triggers a Pulsar Function with a user-specified value or file data", value = "Triggers a Pulsar Function with a user-specified value or file data",
Expand Down
@@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.v2;

import java.util.function.Supplier;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.worker.WorkerService;

import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;

@Slf4j
@Path("/worker")
public class Worker extends AdminResource implements Supplier<WorkerService> {

private final WorkerImpl worker;

public Worker() {
this.worker = new WorkerImpl(this);
}

@Override
public WorkerService get() {
return pulsar().getWorkerService();
}

@GET
@ApiOperation(
value = "Fetches information about the Pulsar cluster running Pulsar Functions"
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")

})
@Path("/cluster")
@Produces(MediaType.APPLICATION_JSON)
public Response getCluster() {
return worker.getCluster();
}

@GET
@ApiOperation(
value = "Fetches info about the leader node of the Pulsar cluster running Pulsar Functions")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")

})
@Path("/cluster/leader")
@Produces(MediaType.APPLICATION_JSON)
public Response getClusterLeader() {
return worker.getClusterLeader();
}

@GET
@ApiOperation(
value = "Fetches information about which Pulsar Functions are assigned to which Pulsar clusters",
response = Function.Assignment.class,
responseContainer = "Map"
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
@Path("/assignments")
public Response getAssignments() {
return worker.getAssignments();
}
}
Expand Up @@ -18,39 +18,51 @@
*/ */
package org.apache.pulsar.broker.admin.v2; package org.apache.pulsar.broker.admin.v2;


import java.io.IOException;
import java.util.Collection;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;

import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
import org.apache.pulsar.functions.worker.rest.FunctionApiResource;

import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses; import io.swagger.annotations.ApiResponses;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.Collection;
import java.util.function.Supplier;


@Slf4j @Slf4j
@Path("/worker-stats") @Path("/worker-stats")
public class WorkerStats extends FunctionApiResource { public class WorkerStats extends AdminResource implements Supplier<WorkerService> {


@GET private final WorkerImpl worker;
@Path("/functions")
@ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = Metrics.class) public WorkerStats() {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), this.worker = new WorkerImpl(this);
@ApiResponse(code = 503, message = "Worker service is not running") }) }
public Response getStats() throws IOException {
return functions.getFunctionsMetrcis(clientAppId()); @Override
public WorkerService get() {
return pulsar().getWorkerService();
} }

@GET @GET
@Path("/metrics") @Path("/metrics")
@ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, responseContainer = "List") @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, responseContainer = "List")
@ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission") }) @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission") })
public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws Exception { public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws Exception {
return functions.getWorkerMetrcis(clientAppId()); return worker.getWorkerMetrcis(clientAppId());
}

@GET
@Path("/functionsmetrics")
@ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = Metrics.class)
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 503, message = "Worker service is not running") })
public Response getStats() throws IOException {
return worker.getFunctionsMetrics(clientAppId());
} }
} }
Expand Up @@ -209,12 +209,4 @@ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {


return new WorkerService(workerConfig); return new WorkerService(workerConfig);
} }

@Test
public void testGetWokersApi() throws Exception {
List<WorkerInfo> workers = admin.functions().getCluster();
Assert.assertEquals(workers.size(), 1);
Assert.assertEquals(workers.get(0).getPort(), workerServicePort);
}

} }
Expand Up @@ -320,11 +320,4 @@ public interface Functions {
* *
*/ */
Set<String> getSinks() throws PulsarAdminException; Set<String> getSinks() throws PulsarAdminException;

/**
* Get list of workers present under a cluster
* @return
* @throws PulsarAdminException
*/
List<WorkerInfo> getCluster() throws PulsarAdminException;
} }
Expand Up @@ -43,7 +43,7 @@
import org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl; import org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl;
import org.apache.pulsar.client.admin.internal.SchemasImpl; import org.apache.pulsar.client.admin.internal.SchemasImpl;
import org.apache.pulsar.client.admin.internal.TopicsImpl; import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.admin.internal.WorkerStatsImpl; import org.apache.pulsar.client.admin.internal.WorkerImpl;
import org.apache.pulsar.client.admin.internal.TenantsImpl; import org.apache.pulsar.client.admin.internal.TenantsImpl;
import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl; import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
import org.apache.pulsar.client.admin.internal.ResourceQuotasImpl; import org.apache.pulsar.client.admin.internal.ResourceQuotasImpl;
Expand Down Expand Up @@ -85,7 +85,7 @@ public class PulsarAdmin implements Closeable {
private final String serviceUrl; private final String serviceUrl;
private final Lookup lookups; private final Lookup lookups;
private final Functions functions; private final Functions functions;
private final WorkerStats workerStats; private final Worker worker;
private final Schemas schemas; private final Schemas schemas;
protected final WebTarget root; protected final WebTarget root;
protected final Authentication auth; protected final Authentication auth;
Expand Down Expand Up @@ -189,7 +189,7 @@ public PulsarAdmin(String serviceUrl, ClientConfigurationData clientConfigData)
this.resourceQuotas = new ResourceQuotasImpl(root, auth); this.resourceQuotas = new ResourceQuotasImpl(root, auth);
this.lookups = new LookupImpl(root, auth, useTls); this.lookups = new LookupImpl(root, auth, useTls);
this.functions = new FunctionsImpl(root, auth); this.functions = new FunctionsImpl(root, auth);
this.workerStats = new WorkerStatsImpl(root, auth); this.worker = new WorkerImpl(root, auth);
this.schemas = new SchemasImpl(root, auth); this.schemas = new SchemasImpl(root, auth);
this.bookies = new BookiesImpl(root, auth); this.bookies = new BookiesImpl(root, auth);
} }
Expand Down Expand Up @@ -361,8 +361,8 @@ public Functions functions() {
* *
* @return the Worker stats * @return the Worker stats
*/ */
public WorkerStats workerStats() { public Worker worker() {
return workerStats; return worker;
} }


/** /**
Expand Down
Expand Up @@ -19,13 +19,16 @@
package org.apache.pulsar.client.admin; package org.apache.pulsar.client.admin;


import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Map;


import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics; import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
import org.apache.pulsar.functions.worker.WorkerInfo;


/** /**
* Admin interface for worker stats management. * Admin interface for worker stats management.
*/ */
public interface WorkerStats { public interface Worker {




/** /**
Expand All @@ -41,4 +44,25 @@ public interface WorkerStats {
* @throws PulsarAdminException * @throws PulsarAdminException
*/ */
Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws PulsarAdminException; Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws PulsarAdminException;

/**
* Get List of all workers belonging to this cluster
* @return
* @throws PulsarAdminException
*/
List<WorkerInfo> getCluster() throws PulsarAdminException;

/**
* Get the worker who is the leader of the cluster
* @return
* @throws PulsarAdminException
*/
WorkerInfo getClusterLeader() throws PulsarAdminException;

/**
* Get the function assignment among the cluster
* @return
* @throws PulsarAdminException
*/
Map<String, Collection<String>> getAssignments() throws PulsarAdminException;
} }

0 comments on commit f44d367

Please sign in to comment.