Skip to content

Commit

Permalink
Made upload/download generic so that it doesnt have to be function sp…
Browse files Browse the repository at this point in the history
…ecific (#1629)
  • Loading branch information
srkukarni authored and sijie committed Apr 21, 2018
1 parent 4f73c54 commit b4658e2
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 123 deletions.
Expand Up @@ -155,24 +155,17 @@ public Response triggerFunction(final @PathParam("tenant") String tenant,
}

@POST
@Path("/{tenant}/{namespace}/{functionName}/upload")
@Path("/upload")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public Response uploadFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail) {
return functions.uploadFunction(
tenant, namespace, functionName, uploadedInputStream, fileDetail);
public Response uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("path") String path) {
return functions.uploadFunction(uploadedInputStream, path);
}

@GET
@Path("/{tenant}/{namespace}/{functionName}/download")
public Response downloadFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @QueryParam("filename") String fileName) {
return functions.downloadFunction(tenant, namespace, functionName, fileName);
@Path("/download")
public Response downloadFunction(final @QueryParam("path") String path) {
return functions.downloadFunction(path);
}

}
Expand Up @@ -160,38 +160,30 @@ public interface Functions {
String triggerFunction(String tenant, String namespace, String function, String triggerValue, String triggerFile) throws PulsarAdminException;

/**
* Upload Function Code.
* Upload Data.
*
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param function
* Function name
* @param fileName
* Code where file is located
* @param sourceFile
* dataFile that needs to be uploaded
* @param path
* Path where data should be stored
*
* @throws PulsarAdminException
* Unexpected error
*/
void uploadFunction(String tenant, String namespace, String function, String fileName) throws PulsarAdminException;
void uploadFunction(String sourceFile, String path) throws PulsarAdminException;

/**
* Download Function Code.
*
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param function
* Function name
* @param fileName
* Code where function should be downloaded
* @param destinationFile
* file where data should be downloaded to
* @param path
* Path where data is located
*
* @throws PulsarAdminException
* Unexpected error
*/
void downloadFunction(String tenant, String namespace, String function, String fileName) throws PulsarAdminException;
void downloadFunction(String destinationFile, String path) throws PulsarAdminException;


}
Expand Up @@ -169,27 +169,27 @@ public String triggerFunction(String tenant, String namespace, String functionNa
}

@Override
public void uploadFunction(String tenant, String namespace, String functionName, String fileName) throws PulsarAdminException {
public void uploadFunction(String sourceFile, String path) throws PulsarAdminException {
try {
final FormDataMultiPart mp = new FormDataMultiPart();

mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
mp.bodyPart(new FileDataBodyPart("data", new File(sourceFile), MediaType.APPLICATION_OCTET_STREAM_TYPE));

request(functions.path(tenant).path(namespace).path(functionName).path("upload"))
mp.bodyPart(new FormDataBodyPart("path", path, MediaType.TEXT_PLAIN_TYPE));
request(functions.path("upload"))
.post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public void downloadFunction(String tenant, String namespace, String function, String path) throws PulsarAdminException {
public void downloadFunction(String destinationPath, String path) throws PulsarAdminException {
try {
Path pathToFile = Paths.get(path);
InputStream response = request(functions.path(tenant).path(namespace).path(function).path("download")
.queryParam("filename", pathToFile.getFileName().toString())).get(InputStream.class);
InputStream response = request(functions.path("download")
.queryParam("path", path)).get(InputStream.class);
if (response != null) {
File targetFile = new File(path);
File targetFile = new File(destinationPath);
java.nio.file.Files.copy(
response,
targetFile.toPath(),
Expand Down
Expand Up @@ -693,46 +693,41 @@ void runCmd() throws Exception {
}
}

@Parameters(commandDescription = "Upload Pulsar Function code to Pulsar")
class UploadFunction extends FunctionCommand {
@Parameters(commandDescription = "Upload File Data to Pulsar")
class UploadFunction extends BaseCommand {
@Parameter(
names = "--jar",
description = "Path to the jar file for the function (if the function is written in Java)",
listConverter = StringConverter.class)
protected String jarFile;
names = "--sourceFile",
description = "The file whose contents need to be uploaded",
listConverter = StringConverter.class, required = true)
protected String sourceFile;
@Parameter(
names = "--py",
description = "Path to the main Python file for the function (if the function is written in Python)",
listConverter = StringConverter.class)
protected String pyFile;

names = "--path",
description = "Path where the contents need to be stored",
listConverter = StringConverter.class, required = true)
protected String path;
@Override
void runCmd() throws Exception {
if (jarFile == null && pyFile == null) {
throw new RuntimeException("Either a jar File or a python file needs to be specified");
}
String userCodeFile;
if (jarFile != null) {
userCodeFile = jarFile;
} else {
userCodeFile = pyFile;
}
admin.functions().uploadFunction(tenant, namespace, functionName, userCodeFile);
admin.functions().uploadFunction(sourceFile, path);
print("Uploaded successfully");
}
}

@Parameters(commandDescription = "Download Pulsar Function code from Pulsar")
class DownloadFunction extends FunctionCommand {
@Parameters(commandDescription = "Download File Data from Pulsar")
class DownloadFunction extends BaseCommand {
@Parameter(
names = "--destinationFile",
description = "The file where downloaded contents need to be stored",
listConverter = StringConverter.class, required = true)
protected String destinationFile;
@Parameter(
names = "--downloadPath",
description = "Path where the file needs to be downloaded)",
names = "--path",
description = "Path where the contents are to be stored",
listConverter = StringConverter.class, required = true)
protected String downloadPath;
protected String path;

@Override
void runCmd() throws Exception {
admin.functions().downloadFunction(tenant, namespace, functionName, downloadPath);
admin.functions().downloadFunction(destinationFile, path);
print("Downloaded successfully");
}
}
Expand Down
Expand Up @@ -529,52 +529,32 @@ public Response triggerFunction(final @PathParam("tenant") String tenant,
}

@POST
@Path("/{tenant}/{namespace}/{functionName}/upload")
@Path("/upload")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public Response uploadFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail) {
public Response uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("path") String path) {
// validate parameters
try {
if (tenant == null) {
throw new IllegalArgumentException("Tenant is not provided");
}
if (namespace == null) {
throw new IllegalArgumentException("Namespace is not provided");
}
if (functionName == null) {
throw new IllegalArgumentException("Function Name is not provided");
}
if (uploadedInputStream == null || fileDetail == null) {
throw new IllegalArgumentException("Function Package is not provided");
if (uploadedInputStream == null || path == null) {
throw new IllegalArgumentException("Function Package is not provided " + path);
}
} catch (IllegalArgumentException e) {
log.error("Invalid upload function request @ /{}/{}/{}",
tenant, namespace, functionName, e);
log.error("Invalid upload function request @ /{}", path, e);
return Response.status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}

String packageLocation = String.format(
"%s/%s/%s/%s",
tenant,
namespace,
functionName,
fileDetail.getFileName());

// Upload to bookkeeper
try {
log.info("Uploading function package to {}", packageLocation);
log.info("Uploading function package to {}", path);

Utils.uploadToBookeeper(
worker().getDlogNamespace(),
uploadedInputStream,
packageLocation);
path);
} catch (IOException e) {
log.error("Error uploading file {}", packageLocation, e);
log.error("Error uploading file {}", path, e);
return Response.serverError()
.type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage()))
Expand All @@ -585,23 +565,14 @@ public Response uploadFunction(final @PathParam("tenant") String tenant,
}

@GET
@Path("/{tenant}/{namespace}/{functionName}/download")
public Response downloadFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @QueryParam("filename") String fileName) {
String packageLocation = String.format(
"%s/%s/%s/%s",
tenant,
namespace,
functionName,
fileName);
@Path("/download")
public Response downloadFunction(final @QueryParam("path") String path) {
return Response.status(Status.OK).entity(
new StreamingOutput() {
@Override
public void write(final OutputStream output) throws IOException {
Utils.downloadFromBookkeeper(worker().getDlogNamespace(),
output, packageLocation);
output, path);
}
}).build();
}
Expand Down
Expand Up @@ -142,24 +142,17 @@ public Response triggerFunction(final @PathParam("tenant") String tenant,
}

@POST
@Path("/{tenant}/{namespace}/{functionName}/upload")
@Path("/upload")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public Response uploadFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail) {
return functions.uploadFunction(
tenant, namespace, functionName, uploadedInputStream, fileDetail);
public Response uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("path") String path) {
return functions.uploadFunction(uploadedInputStream, path);
}

@GET
@Path("/{tenant}/{namespace}/{functionName}/download")
public Response downloadFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @QueryParam("filename") String fileName) {
return functions.downloadFunction(tenant, namespace, functionName, fileName);
@Path("/download")
public Response downloadFunction(final @QueryParam("path") String path) {
return functions.downloadFunction(path);
}

}

0 comments on commit b4658e2

Please sign in to comment.