Skip to content

Commit

Permalink
Add admin api to support stop function (#2415)
Browse files Browse the repository at this point in the history
### Motivation

We have a usecase where we want to stop the function temporary to avoid draining on that function topics. So, adding an admin api/cli to stop function.

### Modifications

- add REST/CLI api to stop function.

### Result

we can stop and then restart function with admin/cli api.
  • Loading branch information
rdhabalia authored and sijie committed Aug 21, 2018
1 parent 05f8560 commit d804701
Show file tree
Hide file tree
Showing 9 changed files with 225 additions and 26 deletions.
Expand Up @@ -292,6 +292,31 @@ public Response restartFunction(final @PathParam("tenant") String tenant,
return functions.restartFunctionInstances(tenant, namespace, functionName);
}

@POST
@ApiOperation(value = "Stop function instance", response = Void.class)
@ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The function does not exist"),
@ApiResponse(code = 500, message = "Internal server error") })
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stop")
@Consumes(MediaType.APPLICATION_JSON)
public Response stopFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId);
}

@POST
@ApiOperation(value = "Stop all function instances", response = Void.class)
@ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The function does not exist"),
@ApiResponse(code = 500, message = "Internal server error") })
@Path("/{tenant}/{namespace}/{functionName}/stop")
@Consumes(MediaType.APPLICATION_JSON)
public Response stopFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
return functions.stopFunctionInstances(tenant, namespace, functionName);
}

@POST
@ApiOperation(
value = "Uploads Pulsar Function file data",
Expand Down
Expand Up @@ -509,7 +509,7 @@ public void testFileUrlFunctionWithoutPassingTypeArgs() throws Exception {
}

@Test(timeOut = 20000)
public void testFunctionRestartApi() throws Exception {
public void testFunctionStopAndRestartApi() throws Exception {

final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
Expand Down Expand Up @@ -543,6 +543,21 @@ public void testFunctionRestartApi() throws Exception {
SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
assertEquals(subStats.consumers.size(), 1);

// it should stop consumer : so, check none of the consumer connected on subscription
admin.functions().stopFunction(tenant, namespacePortion, functionName);

retryStrategically((test) -> {
try {
SubscriptionStats subStat = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
return subStat != null && subStat.consumers.size() == 0;
} catch (PulsarAdminException e) {
return false;
}
}, 5, 150);

subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
assertEquals(subStats.consumers.size(), 0);

// it should restart consumer : so, check if consumer came up again after restarting function
admin.functions().restartFunction(tenant, namespacePortion, functionName);

Expand Down
Expand Up @@ -215,6 +215,40 @@ public interface Functions {
*/
void restartFunction(String tenant, String namespace, String function) throws PulsarAdminException;


/**
* Stop function instance
*
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param function
* Function name
*
* @param instanceId
* Function instanceId
*
* @throws PulsarAdminException
* Unexpected error
*/
void stopFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException;

/**
* Stop all function instances
*
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param function
* Function name
*
* @throws PulsarAdminException
* Unexpected error
*/
void stopFunction(String tenant, String namespace, String function) throws PulsarAdminException;

/**
* Triggers the function by writing to the input topic.
*
Expand Down
Expand Up @@ -236,6 +236,27 @@ public void restartFunction(String tenant, String namespace, String functionName
}
}

@Override
public void stopFunction(String tenant, String namespace, String functionName, int instanceId)
throws PulsarAdminException {
try {
request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
.path("stop")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public void stopFunction(String tenant, String namespace, String functionName) throws PulsarAdminException {
try {
request(functions.path(tenant).path(namespace).path(functionName).path("stop"))
.post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public void uploadFunction(String sourceFile, String path) throws PulsarAdminException {
try {
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.admin.cli.CmdFunctions.GetFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.ListFunctions;
import org.apache.pulsar.admin.cli.CmdFunctions.RestartFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.StopFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction;
import org.apache.pulsar.admin.cli.CmdSinks.CreateSink;
import org.apache.pulsar.admin.cli.CmdSources.CreateSource;
Expand Down Expand Up @@ -243,6 +244,34 @@ public void restartFunctionInstances() throws Exception {
verify(functions, times(1)).restartFunction(tenant, namespace, fnName);
}

@Test
public void stopFunction() throws Exception {
String fnName = TEST_NAME + "-function";
String tenant = "sample";
String namespace = "ns1";
int instanceId = 0;
cmd.run(new String[] { "stop", "--tenant", tenant, "--namespace", namespace, "--name", fnName,
"--instance-id", Integer.toString(instanceId)});

StopFunction stop = cmd.getStopper();
assertEquals(fnName, stop.getFunctionName());

verify(functions, times(1)).stopFunction(tenant, namespace, fnName, instanceId);
}

@Test
public void stopFunctionInstances() throws Exception {
String fnName = TEST_NAME + "-function";
String tenant = "sample";
String namespace = "ns1";
cmd.run(new String[] { "stop", "--tenant", tenant, "--namespace", namespace, "--name", fnName });

StopFunction stop = cmd.getStopper();
assertEquals(fnName, stop.getFunctionName());

verify(functions, times(1)).stopFunction(tenant, namespace, fnName);
}

@Test
public void testCreateFunctionWithHttpUrl() throws Exception {
String fnName = TEST_NAME + "-function";
Expand Down
Expand Up @@ -104,6 +104,7 @@ public class CmdFunctions extends CmdBase {
private final GetFunction getter;
private final GetFunctionStatus functionStatus;
private final RestartFunction restart;
private final StopFunction stop;
private final ListFunctions lister;
private final StateGetter stateGetter;
private final TriggerFunction triggerer;
Expand Down Expand Up @@ -853,7 +854,28 @@ void runCmd() throws Exception {
System.out.println("Restarted successfully");
}
}


@Parameters(commandDescription = "Temporary stops function instance. (If worker restarts then it reassigns and starts functiona again")
class StopFunction extends FunctionCommand {

@Parameter(names = "--instance-id", description = "The function instanceId (stop all instances if instance-id is not provided")
protected String instanceId;

@Override
void runCmd() throws Exception {
if (isNotBlank(instanceId)) {
try {
admin.functions().stopFunction(tenant, namespace, functionName, Integer.parseInt(instanceId));
} catch (NumberFormatException e) {
System.err.println("instance-id must be a number");
}
} else {
admin.functions().stopFunction(tenant, namespace, functionName);
}
System.out.println("Restarted successfully");
}
}

@Parameters(commandDescription = "Delete a Pulsar Function that's running on a Pulsar cluster")
class DeleteFunction extends FunctionCommand {
@Override
Expand Down Expand Up @@ -1066,12 +1088,14 @@ public CmdFunctions(PulsarAdmin admin) throws PulsarClientException {
downloader = new DownloadFunction();
cluster = new GetCluster();
restart = new RestartFunction();
stop = new StopFunction();
jcommander.addCommand("localrun", getLocalRunner());
jcommander.addCommand("create", getCreater());
jcommander.addCommand("delete", getDeleter());
jcommander.addCommand("update", getUpdater());
jcommander.addCommand("get", getGetter());
jcommander.addCommand("restart", getRestarter());
jcommander.addCommand("stop", getStopper());
jcommander.addCommand("getstatus", getStatuser());
jcommander.addCommand("list", getLister());
jcommander.addCommand("querystate", getStateGetter());
Expand Down Expand Up @@ -1139,6 +1163,11 @@ RestartFunction getRestarter() {
return restart;
}

@VisibleForTesting
StopFunction getStopper() {
return stop;
}

private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig functionConfig) {
String[] args = fqfn.split("/");
if (args.length != 3) {
Expand Down
Expand Up @@ -90,7 +90,7 @@ public class FunctionRuntimeManager implements AutoCloseable{

private MembershipManager membershipManager;
private final ConnectorsManager connectorsManager;

public FunctionRuntimeManager(WorkerConfig workerConfig,
PulsarClient pulsarClient,
Namespace dlogNamespace,
Expand Down Expand Up @@ -326,7 +326,8 @@ public InstanceCommunication.FunctionStatus getFunctionInstanceStatus(String ten
return functionStatus;
}

public Response restartFunctionInstance(String tenant, String namespace, String functionName, int instanceId) throws Exception {
public Response stopFunctionInstance(String tenant, String namespace, String functionName, int instanceId,
boolean restart) throws Exception {
Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
final String fullFunctionName = String.format("%s/%s/%s/%s", tenant, namespace, functionName, instanceId);
if (assignment == null) {
Expand All @@ -336,9 +337,9 @@ public Response restartFunctionInstance(String tenant, String namespace, String

final String assignedWorkerId = assignment.getWorkerId();
final String workerId = this.workerConfig.getWorkerId();

if (assignedWorkerId.equals(workerId)) {
restartFunction(Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
stopFunction(Utils.getFullyQualifiedInstanceId(assignment.getInstance()), restart);
return Response.status(Status.OK).build();
} else {
// query other worker
Expand All @@ -355,8 +356,10 @@ public Response restartFunctionInstance(String tenant, String namespace, String
}

URI redirect = null;
final String redirectUrl = String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/restart",
workerInfo.getWorkerHostname(), workerInfo.getPort(), tenant, namespace, functionName, instanceId);
String action = restart ? "restart" : "stop";
final String redirectUrl = String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/%s",
workerInfo.getWorkerHostname(), workerInfo.getPort(), tenant, namespace, functionName, instanceId,
action);
try {
redirect = new URI(redirectUrl);
} catch (URISyntaxException e) {
Expand All @@ -369,7 +372,8 @@ public Response restartFunctionInstance(String tenant, String namespace, String
}
}

public Response restartFunctionInstances(String tenant, String namespace, String functionName) throws Exception {
public Response stopFunctionInstances(String tenant, String namespace, String functionName, boolean restart)
throws Exception {
final String fullFunctionName = String.format("%s/%s/%s", tenant, namespace, functionName);
Collection<Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName);

Expand All @@ -382,7 +386,7 @@ public Response restartFunctionInstances(String tenant, String namespace, String
final String workerId = this.workerConfig.getWorkerId();
String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance());
if (assignedWorkerId.equals(workerId)) {
restartFunction(fullyQualifiedInstanceId);
stopFunction(fullyQualifiedInstanceId, restart);
} else {
List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
WorkerInfo workerInfo = null;
Expand All @@ -398,26 +402,29 @@ public Response restartFunctionInstances(String tenant, String namespace, String
continue;
}
Client client = ClientBuilder.newClient();
String action = restart ? "restart" : "stop";
// TODO: create and use pulsar-admin to support authorization and authentication and manage redirect
final String instanceRestartUrl = String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/restart",
final String instanceRestartUrl = String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/%s",
workerInfo.getWorkerHostname(), workerInfo.getPort(), tenant, namespace, functionName,
assignment.getInstance().getInstanceId());
assignment.getInstance().getInstanceId(), action);
client.target(instanceRestartUrl).request(MediaType.APPLICATION_JSON)
.post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
}
}
return Response.status(Status.OK).build();
}

private void restartFunction(String fullyQualifiedInstanceId) throws Exception {
log.info("[{}] restarting..", fullyQualifiedInstanceId);
private void stopFunction(String fullyQualifiedInstanceId, boolean restart) throws Exception {
log.info("[{}] {}..", restart ? "restarting" : "stopping", fullyQualifiedInstanceId);
FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(fullyQualifiedInstanceId);
if (functionRuntimeInfo != null) {
this.functionActioner.stopFunction(functionRuntimeInfo);
try {
this.functionActioner.startFunction(functionRuntimeInfo);
if(restart) {
this.functionActioner.startFunction(functionRuntimeInfo);
}
} catch (Exception ex) {
log.info("{} Error starting function", fullyQualifiedInstanceId, ex);
log.info("{} Error re-starting function", fullyQualifiedInstanceId, ex);
functionRuntimeInfo.setStartupException(ex);
throw ex;
}
Expand Down

0 comments on commit d804701

Please sign in to comment.