Skip to content

Commit

Permalink
Added getstatus function (#87)
Browse files Browse the repository at this point in the history
* Added getstatus function

* Changed to GetFunctionStatus

* Added licence header
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent d5955d8 commit f4130f8
Show file tree
Hide file tree
Showing 14 changed files with 210 additions and 72 deletions.
Expand Up @@ -46,6 +46,7 @@ public class CmdFunctions extends CmdBase {
private final DeleteFunction deleter;
private final UpdateFunction updater;
private final GetFunction getter;
private final GetFunctionStatus statuser;
private final ListFunctions lister;

/**
Expand Down Expand Up @@ -261,6 +262,17 @@ void runCmd() throws Exception {
}
}

@Parameters(commandDescription = "GetStatus function")
class GetFunctionStatus extends FunctionCommand {
@Override
void runCmd() throws Exception {
if (tenant == null || namespace == null || functionName == null) {
throw new RuntimeException("Missing arguments");
}
print(fnAdmin.functions().getFunctionStatus(tenant, namespace, functionName));
}
}

@Parameters(commandDescription = "Delete function")
class DeleteFunction extends FunctionCommand {
@Override
Expand Down Expand Up @@ -301,12 +313,14 @@ public CmdFunctions(PulsarAdmin admin) {
deleter = new DeleteFunction();
updater = new UpdateFunction();
getter = new GetFunction();
statuser = new GetFunctionStatus();
lister = new ListFunctions();
jcommander.addCommand("localrun", getLocalRunner());
jcommander.addCommand("create", getCreater());
jcommander.addCommand("delete", getDeleter());
jcommander.addCommand("update", getUpdater());
jcommander.addCommand("get", getGetter());
jcommander.addCommand("getstatus", getStatuser());
jcommander.addCommand("list", getLister());
}

Expand Down Expand Up @@ -335,6 +349,9 @@ GetFunction getGetter() {
return getter;
}

@VisibleForTesting
GetFunctionStatus getStatuser() { return statuser; }

@VisibleForTesting
ListFunctions getLister() {
return lister;
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.fs.FunctionStatus;

import java.util.List;

Expand Down Expand Up @@ -76,19 +77,11 @@ public interface Functions {
FunctionConfig getFunction(String tenant, String namespace, String function) throws PulsarAdminException;

/**
* Create a new cluster.
* <p>
* Provisions a new cluster. This operation requires Pulsar super-user privileges.
* <p>
* The name cannot contain '/' characters.
* Create a new function.
*
* @param functionConfig
* the function configuration object
*
* @throws NotAuthorized
* You don't have admin permission to create the cluster
* @throws ConflictException
* Cluster already exists
* @throws PulsarAdminException
* Unexpected error
*/
Expand Down Expand Up @@ -132,4 +125,20 @@ public interface Functions {
* Unexpected error
*/
void deleteFunction(String tenant, String namespace, String function) throws PulsarAdminException;

/**
* Gets the current status of a function.
*
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param function
* Function name
*
* @throws PulsarAdminException
* Unexpected error
*/
FunctionStatus getFunctionStatus(String tenant, String namespace, String function) throws PulsarAdminException;

}
Expand Up @@ -23,6 +23,7 @@
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.policies.data.*;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.fs.FunctionStatus;
import org.glassfish.jersey.media.multipart.FormDataBodyPart;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
Expand Down Expand Up @@ -62,6 +63,15 @@ public FunctionConfig getFunction(String tenant, String namespace, String functi
}
}

@Override
public FunctionStatus getFunctionStatus(String tenant, String namespace, String function) throws PulsarAdminException {
try {
return request(functions.path(tenant).path(namespace).path(function).path("status")).get(FunctionStatus.class);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public void createFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException {
try {
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.apache.pulsar.functions.runtime.container;

import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.fs.FunctionStatus;

import java.util.concurrent.CompletableFuture;

Expand All @@ -38,4 +39,6 @@ public interface FunctionContainer {

FunctionConfig getFunctionConfig();

CompletableFuture<FunctionStatus> getFunctionStatus();

}
Expand Up @@ -21,6 +21,7 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;

import java.util.concurrent.TimeUnit;
Expand All @@ -38,6 +39,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.fs.FunctionStatus;
import org.apache.pulsar.functions.runtime.instance.JavaExecutionResult;
import org.apache.pulsar.functions.runtime.instance.JavaInstance;
import org.apache.pulsar.functions.runtime.instance.JavaInstanceConfig;
Expand Down Expand Up @@ -208,9 +210,9 @@ private void processResult(Message msg, JavaExecutionResult result, long process
} else if (result.getTimeoutException() != null) {
log.info("Timedout when processing message {}", msg, result.getTimeoutException());
stats.incrementTimeoutException();
} else if (result.getResult() != null) {
} else {
stats.incrementProcessSuccess(System.nanoTime() - processAt);
if (sinkProducer != null) {
if (result.getResult() != null && sinkProducer != null) {
byte[] output = null;
if (result.getResult() != null) {
output = serDe.serialize(result.getResult());
Expand All @@ -233,9 +235,7 @@ private void processResult(Message msg, JavaExecutionResult result, long process
} else if (processingGuarantees == FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) {
sourceConsumer.acknowledgeAsync(msg);
}
} else if (processingGuarantees == FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) {
sourceConsumer.acknowledgeAsync(msg);
}
}
}

@Override
Expand Down Expand Up @@ -290,6 +290,17 @@ public FunctionConfig getFunctionConfig() {
return javaInstanceConfig.getFunctionConfig();
}

@Override
public CompletableFuture<FunctionStatus> getFunctionStatus() {
FunctionStatus retval = new FunctionStatus();
retval.setNumProcessed(stats.getTotalProcessed());
retval.setNumSuccessfullyProcessed(stats.getTotalSuccessfullyProcessed());
retval.setNumUserExceptions(stats.getTotalUserExceptions());
retval.setNumSystemExceptions(stats.getTotalSystemExceptions());
retval.setNumTimeouts(stats.getTotalTimeoutExceptions());
return CompletableFuture.completedFuture(retval);
}

private static String convertMessageIdToString(MessageId messageId) {
return messageId.toByteArray().toString();
}
Expand Down
Expand Up @@ -24,9 +24,11 @@
package org.apache.pulsar.functions.runtime.spawner;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.fs.FunctionStatus;
import org.apache.pulsar.functions.runtime.container.FunctionContainerFactory;
import org.apache.pulsar.functions.runtime.instance.JavaInstanceConfig;
import org.apache.pulsar.functions.runtime.FunctionID;
Expand Down Expand Up @@ -80,6 +82,10 @@ public void join() throws Exception {
}
}

public CompletableFuture<FunctionStatus> getFunctionStatus() {
return functionContainer.getFunctionStatus();
}

@Override
public void close() {
if (null != functionContainer) {
Expand Down
Expand Up @@ -22,6 +22,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.runtime.container.FunctionContainerFactory;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.runtime.spawner.Spawner;
Expand Down Expand Up @@ -77,7 +78,12 @@ public FunctionActioner(WorkerConfig workerConfig, LimitsConfig limitsConfig,
FunctionAction action = actionQueue.poll(1, TimeUnit.SECONDS);
if (action == null) continue;
if (action.getAction() == FunctionAction.Action.START) {
startFunction(action.getFunctionMetaData());
try {
startFunction(action.getFunctionMetaData());
} catch (Exception ex) {
log.info("Error starting function", ex);
action.getFunctionMetaData().setStartupException(ex);
}
} else {
stopFunction(action.getFunctionMetaData());
}
Expand All @@ -102,45 +108,36 @@ public void join() throws InterruptedException {
actioner.join();
}

private boolean startFunction(FunctionMetaData functionMetaData) {
private void startFunction(FunctionMetaData functionMetaData) throws Exception {
log.info("Starting function {} ...", functionMetaData.getFunctionConfig().getName());
try {
File pkgDir = new File(
workerConfig.getDownloadDirectory(),
StringUtils.join(
new String[]{
functionMetaData.getFunctionConfig().getTenant(),
functionMetaData.getFunctionConfig().getNamespace(),
functionMetaData.getFunctionConfig().getName(),
},
File.separatorChar));
pkgDir.mkdirs();
File pkgDir = new File(
workerConfig.getDownloadDirectory(),
StringUtils.join(
new String[]{
functionMetaData.getFunctionConfig().getTenant(),
functionMetaData.getFunctionConfig().getNamespace(),
functionMetaData.getFunctionConfig().getName(),
},
File.separatorChar));
pkgDir.mkdirs();

File pkgFile = new File(pkgDir, new File(functionMetaData.getPackageLocation().getPackagePath()).getName());
if (!pkgFile.exists()) {
log.info("Function package file {} doesn't exist, downloading from {}",
pkgFile, functionMetaData.getPackageLocation());
if (!Utils.downloadFromBookkeeper(
dlogNamespace,
new FileOutputStream(pkgFile),
functionMetaData.getPackageLocation().getPackagePath())) {
log.error("Not able to download {} to {}", functionMetaData.getPackageLocation().getPackagePath(), pkgFile.getPath());
return false;
}
}
Spawner spawner = Spawner.createSpawner(functionMetaData.getFunctionConfig(), limitsConfig,
pkgFile.getAbsolutePath(), functionContainerFactory);

AssignmentInfo assignmentInfo = new AssignmentInfo();
assignmentInfo.setFunctionMetaData(functionMetaData);
assignmentInfo.setSpawner(spawner);
assignments.put(functionMetaData.getFunctionConfig().getFullyQualifiedName(), assignmentInfo);
spawner.start();
return true;
} catch (Exception ex) {
log.error("Function {} failed to start", functionMetaData.getFunctionConfig().getName(), ex);
return false;
File pkgFile = new File(pkgDir, new File(functionMetaData.getPackageLocation().getPackagePath()).getName());
if (!pkgFile.exists()) {
log.info("Function package file {} doesn't exist, downloading from {}",
pkgFile, functionMetaData.getPackageLocation());
Utils.downloadFromBookkeeper(
dlogNamespace,
new FileOutputStream(pkgFile),
functionMetaData.getPackageLocation().getPackagePath());
}
Spawner spawner = Spawner.createSpawner(functionMetaData.getFunctionConfig(), limitsConfig,
pkgFile.getAbsolutePath(), functionContainerFactory);

AssignmentInfo assignmentInfo = new AssignmentInfo();
assignmentInfo.setFunctionMetaData(functionMetaData);
assignmentInfo.setSpawner(spawner);
assignments.put(functionMetaData.getFunctionConfig().getFullyQualifiedName(), assignmentInfo);
spawner.start();
}

private boolean stopFunction(FunctionMetaData functionMetaData) {
Expand All @@ -153,4 +150,16 @@ private boolean stopFunction(FunctionMetaData functionMetaData) {
}
return false;
}

public boolean containsAssignment(FunctionConfig functionConfig) {
return assignments.containsKey(functionConfig.getFullyQualifiedName());
}

public Spawner getSpawner(FunctionConfig functionConfig) {
if (!containsAssignment(functionConfig)) {
return null;
} else {
return assignments.get(functionConfig.getFullyQualifiedName()).getSpawner();
}
}
}
Expand Up @@ -52,6 +52,8 @@ public class FunctionMetaData implements Serializable, Cloneable {
// the timestamp when the function was created
private long createTime;
private String workerId;
// Did we encounter any exception starting this function
private Exception startupException;

public void incrementVersion() {
this.version = this.version + 1;
Expand Down
Expand Up @@ -121,23 +121,17 @@ public static void uploadToBookeeper(Namespace dlogNamespace,
}
}

public static boolean downloadFromBookkeeper(Namespace namespace,
public static void downloadFromBookkeeper(Namespace namespace,
OutputStream outputStream,
String packagePath) {
try (DistributedLogManager dlm = namespace.openLog(packagePath)) {
try (InputStream in = new DLInputStream(dlm)) {
int read = 0;
byte[] bytes = new byte[1024];
while ((read = in.read(bytes)) != -1) {
outputStream.write(bytes, 0, read);
}
outputStream.flush();
return true;
}
} catch (Exception ex) {
log.error("failed to download from bookeeper with exception", ex);
return false;
String packagePath) throws Exception {
DistributedLogManager dlm = namespace.openLog(packagePath);
InputStream in = new DLInputStream(dlm);
int read = 0;
byte[] bytes = new byte[1024];
while ((read = in.read(bytes)) != -1) {
outputStream.write(bytes, 0, read);
}
outputStream.flush();
}

public static DistributedLogConfiguration getDlogConf(WorkerConfig workerConfig) {
Expand Down
Expand Up @@ -124,7 +124,7 @@ protected void doStart() {
throw new RuntimeException(e);
}

WorkerServer server = new WorkerServer(workerConfig, functionMetaDataManager, dlogNamespace);
WorkerServer server = new WorkerServer(workerConfig, functionMetaDataManager, dlogNamespace, functionActioner);
this.serverThread = new Thread(server, server.getThreadName());

log.info("Start worker server on port {}...", workerConfig.getWorkerPort());
Expand Down

0 comments on commit f4130f8

Please sign in to comment.