Skip to content

Commit

Permalink
add/fixes the following: (#62)
Browse files Browse the repository at this point in the history
1. upload packages to bookkeeper will include a UUID
2. incorrect setName - > setNamespace
  • Loading branch information
jerrypeng authored and sijie committed Mar 4, 2018
1 parent 38a5987 commit 43a09e6
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 45 deletions.
Expand Up @@ -21,6 +21,7 @@
import com.google.gson.Gson;

import java.io.Serializable;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand All @@ -42,7 +43,7 @@ public class FunctionMetaData implements Serializable, Cloneable {
// resource / limits config
private LimitsConfig limitsConfig;
// function package location
private String packageLocation;
private PackageLocationMetaData packageLocation;
private String runtime;
// the version of this function state
private long version;
Expand Down
Expand Up @@ -263,9 +263,9 @@ public void processUpdate(UpdateRequest updateRequest) {
FunctionConfig functionConfig = updateRequestFs.getFunctionConfig();
FunctionMetaData existingMetaData = getFunction(functionConfig.getTenant(),
functionConfig.getNamespace(), functionConfig.getName());
if (existingMetaData.getSpawner() != null) {
stopFunction(existingMetaData);
}

stopFunction(existingMetaData);

// update the function metadata
addFunctionToFunctionMap(updateRequestFs);
// check if this worker should run the update
Expand Down Expand Up @@ -314,7 +314,7 @@ private boolean startFunction(FunctionMetaData functionMetaData) {
+ functionMetaData.getFunctionConfig().getName();
try {
File fileLocation = File.createTempFile(prefix, ".jar", new File(workerConfig.getDownloadDirectory()));
if (!Utils.downloadFromBookkeeper(URI.create(functionMetaData.getPackageLocation()), new FileOutputStream(fileLocation), workerConfig)) {
if (!Utils.downloadFromBookkeeper(URI.create(functionMetaData.getPackageLocation().getPackageURI()), new FileOutputStream(fileLocation), workerConfig)) {
return false;
};
Spawner spawner = Spawner.createSpawner(functionMetaData.getFunctionConfig(), limitsConfig,
Expand Down
@@ -0,0 +1,35 @@
package org.apache.pulsar.functions.runtime.worker;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.io.Serializable;
import java.net.URI;

@Data
@Setter
@Getter
@EqualsAndHashCode
@ToString
public class PackageLocationMetaData implements Serializable{
private String packageName;
private String packageNamespace;
private String dlogUri;

public URI getPackageNamespaceURI() {
URI baseUri = URI.create(this.dlogUri);
String zookeeperHost = baseUri.getHost();
int zookeeperPort = baseUri.getPort();
return URI.create(
String.format("distributedlog://%s:%d/%s",
zookeeperHost, zookeeperPort, this.packageNamespace));
}

public String getPackageURI() {
return String.format("%s/%s", this.getPackageNamespaceURI().toString(), this.packageName);
}

}
Expand Up @@ -26,10 +26,10 @@
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.pulsar.functions.runtime.worker.dlog.DLInputStream;
import org.apache.pulsar.functions.runtime.worker.dlog.DLOutputStream;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;

import java.io.*;
import java.net.URI;
import java.util.UUID;

@Slf4j
public final class Utils {
Expand Down Expand Up @@ -100,23 +100,24 @@ public static String getDestPackageNamespaceURI(WorkerConfig workerConfig, Strin
}

public static URI getPackageURI(String destPackageNamespaceURI, String packageName) {
return URI.create(String.format("%s/%s", destPackageNamespaceURI, packageName));
return URI.create(String.format("%s/%s-%s", destPackageNamespaceURI, packageName, UUID.randomUUID()));
}

public static URI uploadToBookeeper(InputStream uploadedInputStream,
FormDataContentDisposition fileDetail,
String namespace, WorkerConfig workerConfig)
public static String getUniquePackageName(String packageName) {
return String.format("%s-%s", UUID.randomUUID().toString(), packageName);
}

public static void uploadToBookeeper(InputStream uploadedInputStream,
FunctionMetaData functionMetaData, WorkerConfig workerConfig)
throws IOException {
String packageName = fileDetail.getFileName();
String destPackageNamespaceURI = getDestPackageNamespaceURI(workerConfig, namespace);
URI packageURI = getPackageURI(destPackageNamespaceURI, packageName);

String packageName = functionMetaData.getPackageLocation().getPackageName();
String packageURI = functionMetaData.getPackageLocation().getPackageURI();
DistributedLogConfiguration conf = getDlogConf(workerConfig);

URI uri = URI.create(destPackageNamespaceURI);
URI packageNamespaceURI = functionMetaData.getPackageLocation().getPackageNamespaceURI();

Namespace dlogNamespace = NamespaceBuilder.newBuilder()
.clientId("pulsar-functions-uploader").conf(conf).uri(uri).build();
.clientId("pulsar-functions-uploader").conf(conf).uri(packageNamespaceURI).build();

// if the dest directory does not exist, create it.
DistributedLogManager dlm = null;
Expand All @@ -125,15 +126,15 @@ public static URI uploadToBookeeper(InputStream uploadedInputStream,
if (dlogNamespace.logExists(packageName)) {
// if the destination file exists, write a log message
log.info(String.format("Target function file already exists at '%s'. Overwriting it now",
packageURI.toString()));
packageURI));
dlogNamespace.deleteLog(packageName);
}
// copy the topology package to target working directory
log.info(String.format("Uploading function package '%s' to target DL at '%s'",
fileDetail.getName(), packageURI.toString()));
packageName, packageURI));


dlm = dlogNamespace.openLog(fileDetail.getFileName());
dlm = dlogNamespace.openLog(packageName);
writer = dlm.getAppendOnlyStreamWriter();

try (OutputStream out = new DLOutputStream(dlm, writer)) {
Expand All @@ -144,7 +145,6 @@ public static URI uploadToBookeeper(InputStream uploadedInputStream,
}
out.flush();
}
return packageURI;
}

public static boolean downloadFromBookkeeper(URI uri, OutputStream outputStream, WorkerConfig workerConfig) {
Expand Down
Expand Up @@ -67,6 +67,8 @@ protected void doStart() {
consumerConf));

log.info("Start worker {}...", workerConfig.getWorkerId());
log.info("Worker Configs: {}", workerConfig);
log.info("Limits Configs: {}", limitsConfig);
} catch (Exception e) {
log.error("Failed to create pulsar client to {}",
workerConfig.getPulsarServiceUrl(), e);
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.runtime.worker.FunctionMetaData;
import org.apache.pulsar.functions.runtime.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.runtime.worker.PackageLocationMetaData;
import org.apache.pulsar.functions.runtime.worker.Utils;
import org.apache.pulsar.functions.runtime.worker.request.RequestResult;
import org.apache.pulsar.functions.runtime.worker.WorkerConfig;
Expand Down Expand Up @@ -88,7 +89,7 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
// function configuration
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant(tenant);
functionConfig.setName(namespace);
functionConfig.setNamespace(namespace);
functionConfig.setName(functionName);
functionConfig.setSourceTopic(sourceTopic);
functionConfig.setSinkTopic(sinkTopic);
Expand All @@ -111,15 +112,14 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
functionMetaData.setVersion(0);

WorkerConfig workerConfig = getWorkerConfig();
functionMetaData.setPackageLocation(
Utils.getPackageURI(
Utils.getDestPackageNamespaceURI(workerConfig, namespace),
fileDetail.getFileName()
).toString()
);
PackageLocationMetaData packageLocationMetaData = new PackageLocationMetaData();
packageLocationMetaData.setPackageName(Utils.getUniquePackageName(fileDetail.getFileName()));
packageLocationMetaData.setPackageNamespace(namespace);
packageLocationMetaData.setDlogUri(workerConfig.getDlogUri());
functionMetaData.setPackageLocation(packageLocationMetaData);
functionMetaData.setWorkerId(workerConfig.getWorkerId());

return updateRequest(functionMetaData, uploadedInputStream, fileDetail);
return updateRequest(functionMetaData, uploadedInputStream);
}

@PUT
Expand Down Expand Up @@ -158,7 +158,7 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
// function configuration
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant(tenant);
functionConfig.setName(namespace);
functionConfig.setNamespace(namespace);
functionConfig.setName(functionName);
functionConfig.setSourceTopic(sourceTopic);
functionConfig.setSinkTopic(sinkTopic);
Expand All @@ -181,15 +181,14 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
functionMetaData.setVersion(0);

WorkerConfig workerConfig = getWorkerConfig();
functionMetaData.setPackageLocation(
Utils.getPackageURI(
Utils.getDestPackageNamespaceURI(workerConfig, namespace),
fileDetail.getFileName()
).toString()
);
PackageLocationMetaData packageLocationMetaData = new PackageLocationMetaData();
packageLocationMetaData.setPackageName(Utils.getUniquePackageName(fileDetail.getFileName()));
packageLocationMetaData.setPackageNamespace(namespace);
packageLocationMetaData.setDlogUri(workerConfig.getDlogUri());
functionMetaData.setPackageLocation(packageLocationMetaData);
functionMetaData.setWorkerId(workerConfig.getWorkerId());

return updateRequest(functionMetaData, uploadedInputStream, fileDetail);
return updateRequest(functionMetaData, uploadedInputStream);
}


Expand Down Expand Up @@ -286,25 +285,20 @@ public Response listFunctions(final @PathParam("tenant") String tenant,
}

private Response updateRequest(FunctionMetaData functionMetaData,
InputStream uploadedInputStream,
FormDataContentDisposition fileDetail) {
InputStream uploadedInputStream) {
WorkerConfig workerConfig = getWorkerConfig();

// Upload to bookeeper
URI packageURI = null;
try {
packageURI = Utils.uploadToBookeeper(uploadedInputStream, fileDetail,
functionMetaData.getFunctionConfig().getNamespace(), workerConfig);
Utils.uploadToBookeeper(uploadedInputStream, functionMetaData, workerConfig);
} catch (IOException e) {
log.error("Error uploading file {}", fileDetail.getFileName(), e);
log.error("Error uploading file {}", functionMetaData.getPackageLocation().getPackageName(), e);
return Response.serverError()
.type(MediaType.APPLICATION_JSON)
.entity(RestUtils.createMessage(e.getMessage()))
.build();
}

functionMetaData.setPackageLocation(packageURI.toString());

// Submit to FMT
FunctionMetaDataManager functionMetaDataManager = getWorkerFunctionStateManager();

Expand Down
Expand Up @@ -34,6 +34,7 @@ private static void runWorker(String workerId, int port) throws PulsarClientExce
workerConfig.setFunctionMetadataTopic("persistent://sample/standalone/ns1/fmt");
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setWorkerId(workerId);
workerConfig.setDownloadDirectory("/tmp");
Worker worker = new Worker(workerConfig, new LimitsConfig(-1, -1, -1, 1024));
worker.startAsync();
worker.awaitRunning();
Expand Down

0 comments on commit 43a09e6

Please sign in to comment.