Skip to content

Commit

Permalink
Allow workers to deploy multiple function instances (#221)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrypeng authored and sijie committed Mar 4, 2018
1 parent d1699b2 commit 4b89453
Show file tree
Hide file tree
Showing 21 changed files with 463 additions and 263 deletions.
Expand Up @@ -50,6 +50,7 @@
import org.apache.pulsar.functions.api.PulsarFunction;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.runtime.container.InstanceConfig;
import org.apache.pulsar.functions.runtime.container.ThreadFunctionContainerFactory;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.runtime.spawner.Spawner;
Expand All @@ -59,6 +60,7 @@
import java.io.File;
import java.lang.reflect.Type;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;

@Slf4j
Expand Down Expand Up @@ -423,12 +425,18 @@ void runCmd() throws Exception {
admin.getServiceUrl().toString(),
stateStorageServiceUrl)) {

Spawner spawner = Spawner.createSpawner(
functionConfig,
InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionConfig(functionConfig);
// TODO: correctly implement function version and id
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
instanceConfig.setFunctionId(UUID.randomUUID().toString());
instanceConfig.setInstanceId("0");
instanceConfig.setMaxBufferedTuples(1024);
Spawner spawner = new Spawner(
instanceConfig,
userCodeFile,
containerFactory,
null,
1024,
0);
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
Expand Down
Expand Up @@ -22,7 +22,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;

import java.util.List;

Expand Down Expand Up @@ -138,6 +138,6 @@ public interface Functions {
* @throws PulsarAdminException
* Unexpected error
*/
FunctionStatus getFunctionStatus(String tenant, String namespace, String function) throws PulsarAdminException;
FunctionStatusList getFunctionStatus(String tenant, String namespace, String function) throws PulsarAdminException;

}
Expand Up @@ -25,7 +25,7 @@
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.policies.data.*;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
import org.glassfish.jersey.media.multipart.FormDataBodyPart;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
Expand Down Expand Up @@ -80,14 +80,15 @@ public FunctionConfig getFunction(String tenant, String namespace, String functi
}

@Override
public FunctionStatus getFunctionStatus(String tenant, String namespace, String function) throws PulsarAdminException {
public FunctionStatusList getFunctionStatus(
String tenant, String namespace, String function) throws PulsarAdminException {
try {
Response response = request(functions.path(tenant).path(namespace).path(function).path("status")).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
String jsonResponse = response.readEntity(String.class);
FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
FunctionStatusList.Builder functionStatusBuilder = FunctionStatusList.newBuilder();
JsonFormat.parser().merge(jsonResponse, functionStatusBuilder);
return functionStatusBuilder.build();
} catch (Exception e) {
Expand Down
1 change: 1 addition & 0 deletions pulsar-functions/conf/example.yml
Expand Up @@ -27,3 +27,4 @@ userConfig:

output: "persistent://sample/standalone/ns1/test_result"
autoAck: true
parallelism: 2
12 changes: 9 additions & 3 deletions pulsar-functions/proto/src/main/proto/Function.proto
Expand Up @@ -40,7 +40,6 @@ message FunctionConfig {
string namespace = 2;
string name = 3;
string className = 4;
repeated string inputs = 14;
// map from input topic name to serde
map<string, string> customSerdeInputs = 5;
string outputSerdeClassName = 6;
Expand All @@ -50,6 +49,8 @@ message FunctionConfig {
SubscriptionType subscriptionType = 11;
Runtime runtime = 12;
bool autoAck = 13;
repeated string inputs = 14;
int32 parallelism = 15;
}

message PackageLocationMetaData {
Expand All @@ -68,7 +69,12 @@ message Snapshot {
bytes lastAppliedMessageId = 2;
}

message Assignment {
message Instance {
FunctionMetaData functionMetaData = 1;
int32 instanceId = 2;
}

message Assignment {
Instance instance = 1;
string workerId = 2;
}
}
Expand Up @@ -50,6 +50,10 @@ message FunctionStatus {
int64 lastInvocationTime = 13;
}

message FunctionStatusList {
repeated FunctionStatus functionStatusList = 1;
}

message MetricsData {
message DataDigest {
double count = 1;
Expand All @@ -63,4 +67,4 @@ message MetricsData {
service InstanceControl {
rpc GetFunctionStatus(google.protobuf.Empty) returns (FunctionStatus) {}
rpc GetAndResetMetrics(google.protobuf.Empty) returns (MetricsData) {}
}
}
Expand Up @@ -111,6 +111,7 @@ public void start() throws Exception {
instanceConfig.setFunctionId(functionId);
instanceConfig.setFunctionVersion(functionVersion);
instanceConfig.setInstanceId(instanceId);
instanceConfig.setMaxBufferedTuples(maxBufferedTuples);
FunctionConfig.Builder functionConfigBuilder = FunctionConfig.newBuilder();
functionConfigBuilder.setTenant(tenant);
functionConfigBuilder.setNamespace(namespace);
Expand Down Expand Up @@ -157,12 +158,11 @@ public void start() throws Exception {
pulsarServiceUrl,
stateStorageServiceUrl);

Spawner spawner = Spawner.createSpawner(
functionConfig,
Spawner spawner = new Spawner(
instanceConfig,
jarFile,
containerFactory,
null,
maxBufferedTuples,
0);

server = ServerBuilder.forPort(port)
Expand Down

This file was deleted.

Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.CompletableFuture;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.runtime.container.FunctionContainerFactory;
Expand All @@ -40,27 +41,6 @@
@Slf4j
public class Spawner implements AutoCloseable {

public static Spawner createSpawner(FunctionConfig fnConfig,
String codeFile,
FunctionContainerFactory containerFactory,
MetricsSink metricsSink,
int maxBufferedTuples,
int metricsCollectionInterval) {
AssignmentInfo assignmentInfo = new AssignmentInfo(
fnConfig,
UUID.randomUUID().toString(),
UUID.randomUUID().toString(),
UUID.randomUUID().toString()
);
return new Spawner(
assignmentInfo,
codeFile,
containerFactory,
metricsSink,
maxBufferedTuples,
metricsCollectionInterval);
}

private final InstanceConfig instanceConfig;
private final FunctionContainerFactory functionContainerFactory;
private final String codeFile;
Expand All @@ -71,13 +51,12 @@ public static Spawner createSpawner(FunctionConfig fnConfig,
private Timer metricsCollectionTimer;
private int numRestarts;

private Spawner(AssignmentInfo assignmentInfo,
public Spawner(InstanceConfig instanceConfig,
String codeFile,
FunctionContainerFactory containerFactory,
MetricsSink metricsSink,
int maxBufferedTuples,
int metricsCollectionInterval) {
this.instanceConfig = createInstanceConfig(assignmentInfo, maxBufferedTuples);
this.instanceConfig = instanceConfig;
this.functionContainerFactory = containerFactory;
this.codeFile = codeFile;
this.metricsSink = metricsSink;
Expand All @@ -86,17 +65,24 @@ private Spawner(AssignmentInfo assignmentInfo,
}

public void start() throws Exception {
log.info("Spawner starting function {}", this.instanceConfig.getFunctionConfig().getName());
functionContainer = functionContainerFactory.createContainer(instanceConfig, codeFile);
log.info("Spawner starting function {} - {}", this.instanceConfig.getFunctionConfig().getName(),
this.instanceConfig.getInstanceId());
functionContainer = functionContainerFactory.createContainer(this.instanceConfig, codeFile);
functionContainer.start();
if (metricsSink != null) {
log.info("Scheduling Metrics Collection every " + metricsCollectionInterval + " secs for " + FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()));
log.info("Scheduling Metrics Collection every {} secs for {} - {}",
metricsCollectionInterval,
FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()),
instanceConfig.getInstanceId());
metricsCollectionTimer = new Timer();
metricsCollectionTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
if (functionContainer.isAlive()) {
log.info("Collecting metrics for function" + FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()));

log.info("Collecting metrics for function {} - {}",
FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()),
instanceConfig.getInstanceId());
functionContainer.getAndResetMetrics().thenAccept(t -> {
if (t != null) {
log.debug("Collected metrics {}", t);
Expand Down Expand Up @@ -142,14 +128,4 @@ public void close() {
metricsCollectionTimer = null;
}
}

private InstanceConfig createInstanceConfig(AssignmentInfo assignmentInfo, int maxBufferedTuples) {
InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionConfig(assignmentInfo.getFunctionConfig());
instanceConfig.setFunctionId(assignmentInfo.getFunctionId());
instanceConfig.setFunctionVersion(assignmentInfo.getFunctionVersion());
instanceConfig.setInstanceId(assignmentInfo.getInstanceId());
instanceConfig.setMaxBufferedTuples(maxBufferedTuples);
return instanceConfig;
}
}
Expand Up @@ -22,14 +22,17 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.runtime.container.FunctionContainerFactory;
import org.apache.pulsar.functions.runtime.container.InstanceConfig;
import org.apache.pulsar.functions.runtime.metrics.MetricsSink;
import org.apache.pulsar.functions.runtime.spawner.Spawner;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;

import java.io.File;
import java.io.FileOutputStream;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -100,8 +103,10 @@ public void join() throws InterruptedException {
}

private void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exception {
FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionMetaData();
log.info("Starting function {} ...", functionMetaData.getFunctionConfig().getName());
Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
FunctionMetaData functionMetaData = instance.getFunctionMetaData();
log.info("Starting function {} - {} ...",
functionMetaData.getFunctionConfig().getName(), instance.getInstanceId());
File pkgDir = new File(
workerConfig.getDownloadDirectory(),
StringUtils.join(
Expand All @@ -123,17 +128,26 @@ private void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Excep
dlogNamespace,
new FileOutputStream(pkgFile),
functionMetaData.getPackageLocation().getPackagePath());
Spawner spawner = Spawner.createSpawner(functionMetaData.getFunctionConfig(),
pkgFile.getAbsolutePath(), functionContainerFactory,
metricsSink, 1024, metricsCollectionInterval);

InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionConfig(functionMetaData.getFunctionConfig());
// TODO: set correct function id and version when features implemented
instanceConfig.setFunctionId(UUID.randomUUID().toString());
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
instanceConfig.setInstanceId(String.valueOf(functionRuntimeInfo.getFunctionInstance().getInstanceId()));
instanceConfig.setMaxBufferedTuples(1024);
Spawner spawner = new Spawner(instanceConfig, pkgFile.getAbsolutePath(), functionContainerFactory,
metricsSink, metricsCollectionInterval);

functionRuntimeInfo.setSpawner(spawner);
spawner.start();
}

private boolean stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionMetaData();
log.info("Stopping function {}...", functionMetaData.getFunctionConfig().getName());
Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
FunctionMetaData functionMetaData = instance.getFunctionMetaData();
log.info("Stopping function {} - {}...",
functionMetaData.getFunctionConfig().getName(), instance.getInstanceId());
if (functionRuntimeInfo.getSpawner() != null) {
functionRuntimeInfo.getSpawner().close();
functionRuntimeInfo.setSpawner(null);
Expand Down
Expand Up @@ -20,6 +20,7 @@

import lombok.*;
import lombok.experimental.Accessors;
import org.apache.pulsar.functions.proto.Function.Instance;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.runtime.spawner.Spawner;

Expand All @@ -29,8 +30,8 @@
@Accessors(chain = true)
public class FunctionRuntimeInfo {

// function meta data
private FunctionMetaData functionMetaData;
private Instance functionInstance;

// The associated runtime with it if any
private Spawner spawner;
// Any exceptions on startup
Expand Down

0 comments on commit 4b89453

Please sign in to comment.