Skip to content

Commit

Permalink
Fileinterface (#45)
Browse files Browse the repository at this point in the history
* Create pulsar-functions module (#1)

* Create pulsar-functions module

* rename `sdk` package to `api`

* Added the first cut of the Java interface for Pulsar functions (#2)

* Adhere to rest semantics

* Convert create function to a post request
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 0444ac4 commit 5ea8065
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 13 deletions.
Expand Up @@ -29,9 +29,6 @@
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.runtime.spawner.Spawner;

import java.nio.file.Files;
import java.nio.file.Paths;

@Parameters(commandDescription = "Operations about functions")
public class CmdFunctions extends CmdBase {
private LocalRunner localRunner;
Expand Down Expand Up @@ -124,7 +121,7 @@ class CreateFunction extends FunctionsCommand {
@Override
void run_functions_cmd() throws Exception {
PulsarFunctionsAdmin a = (PulsarFunctionsAdmin)admin;
a.functions().createFunction(functionConfig, Files.readAllBytes(Paths.get(jarFile)));
a.functions().createFunction(functionConfig, jarFile);
}
}

Expand All @@ -151,7 +148,7 @@ class UpdateFunction extends FunctionsCommand {
@Override
void run_functions_cmd() throws Exception {
PulsarFunctionsAdmin a = (PulsarFunctionsAdmin)admin;
a.functions().updateFunction(functionConfig, Files.readAllBytes(Paths.get(jarFile)));
a.functions().updateFunction(functionConfig, jarFile);
}
}

Expand Down
Expand Up @@ -92,7 +92,7 @@ public interface Functions {
* @throws PulsarAdminException
* Unexpected error
*/
void createFunction(FunctionConfig functionConfig, byte[] code) throws PulsarAdminException;
void createFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException;

/**
* Update the configuration for a function.
Expand All @@ -108,7 +108,7 @@ public interface Functions {
* @throws PulsarAdminException
* Unexpected error
*/
void updateFunction(FunctionConfig functionConfig, byte[] code) throws PulsarAdminException;
void updateFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException;

/**
* Delete an existing function
Expand Down
Expand Up @@ -30,6 +30,7 @@
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import java.io.FileInputStream;
import java.util.List;

public class FunctionsImpl extends BaseResource implements Functions {
Expand Down Expand Up @@ -61,10 +62,10 @@ public FunctionConfig getFunction(String tenant, String namespace, String functi
}

@Override
public void createFunction(FunctionConfig functionConfig, byte[] code) throws PulsarAdminException {
public void createFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException {
try {
final FormDataMultiPart mp = new FormDataMultiPart();
mp.bodyPart(new FormDataBodyPart("data", code, MediaType.APPLICATION_OCTET_STREAM_TYPE));
mp.bodyPart(new FormDataBodyPart("data", new FileInputStream(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
mp.bodyPart(new FormDataBodyPart("sourceTopic", functionConfig.getSourceTopic(),
MediaType.APPLICATION_JSON_TYPE));
mp.bodyPart(new FormDataBodyPart("sinkTopic", functionConfig.getSinkTopic(),
Expand All @@ -76,7 +77,7 @@ public void createFunction(FunctionConfig functionConfig, byte[] code) throws Pu
mp.bodyPart(new FormDataBodyPart("className", functionConfig.getClassName(),
MediaType.APPLICATION_JSON_TYPE));
request(functions.path(functionConfig.getTenant()).path(functionConfig.getNameSpace()).path(functionConfig.getName()))
.put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
.post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
Expand All @@ -93,11 +94,11 @@ public void deleteFunction(String cluster, String namespace, String function) th
}

@Override
public void updateFunction(FunctionConfig functionConfig, byte[] code) throws PulsarAdminException {
public void updateFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException {
try {
final FormDataMultiPart mp = new FormDataMultiPart();
if (code != null) {
mp.bodyPart(new FormDataBodyPart("data", code, MediaType.APPLICATION_OCTET_STREAM_TYPE));
if (fileName != null) {
mp.bodyPart(new FormDataBodyPart("data", new FileInputStream(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
}
if (functionConfig.getSourceTopic() != null) {
mp.bodyPart(new FormDataBodyPart("sourceTopic", functionConfig.getSourceTopic(),
Expand Down

0 comments on commit 5ea8065

Please sign in to comment.