Skip to content

Commit

Permalink
Nonthreadpool (#38)
Browse files Browse the repository at this point in the history
* Create pulsar-functions module (#1)

* Create pulsar-functions module

* rename `sdk` package to `api`

* Added the first cut of the Java interface for Pulsar functions (#2)

* In case of Kstream style running, we dont need to spawn a executor service to run the message

* Update comment
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 7fbe342 commit e90b308
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 20 deletions.
Expand Up @@ -100,7 +100,7 @@ class LocalRunner extends FunctionsCommand {
@Override
void run_functions_cmd() throws Exception {
LimitsConfig limitsConfig = new LimitsConfig(
60000, // 60 seconds
-1, // No timelimit
1024, // 1GB
1024 // 1024 outstanding tuples
);
Expand Down
Expand Up @@ -109,7 +109,10 @@ public JavaInstance(JavaInstanceConfig config, ClassLoader clsLoader) {
throw new RuntimeException("User class must be either a Request or Raw Request Handler");
}

executorService = Executors.newFixedThreadPool(1);
if (config.getTimeBudgetInMs() > 0) {
log.info("Spinning up a executor service since time budget is infinite");
executorService = Executors.newFixedThreadPool(1);
}
}

private void computeInputAndOutputTypesAndVerifySerDe() {
Expand Down Expand Up @@ -145,25 +148,11 @@ private void verifySupportedType(Type type, boolean allowVoid) {
public JavaExecutionResult handleMessage(String messageId, String topicName, byte[] data) {
context.setCurrentMessageContext(messageId, topicName);
JavaExecutionResult executionResult = new JavaExecutionResult();
if (executorService == null) {
return processMessage(executionResult, data);
}
Future<?> future = executorService.submit(() -> {
if (requestHandler != null) {
try {
Object input = inputSerDe.deserialize(data);
Object output = requestHandler.handleRequest(input, context);
executionResult.setResult(output);
} catch (Exception ex) {
executionResult.setUserException(ex);
}
} else if (rawRequestHandler != null) {
try {
ByteArrayInputStream inputStream = new ByteArrayInputStream(data);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
rawRequestHandler.handleRequest(inputStream, outputStream, context);
executionResult.setResult(outputStream.toByteArray());
} catch (Exception ex) {
executionResult.setUserException(ex);
}
}
return processMessage(executionResult, data);
});
try {
future.get(context.getTimeBudgetInMs(), TimeUnit.MILLISECONDS);
Expand All @@ -181,4 +170,26 @@ public JavaExecutionResult handleMessage(String messageId, String topicName, byt

return executionResult;
}

private JavaExecutionResult processMessage(JavaExecutionResult executionResult, byte[] data) {
if (requestHandler != null) {
try {
Object input = inputSerDe.deserialize(data);
Object output = requestHandler.handleRequest(input, context);
executionResult.setResult(output);
} catch (Exception ex) {
executionResult.setUserException(ex);
}
} else if (rawRequestHandler != null) {
try {
ByteArrayInputStream inputStream = new ByteArrayInputStream(data);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
rawRequestHandler.handleRequest(inputStream, outputStream, context);
executionResult.setResult(outputStream.toByteArray());
} catch (Exception ex) {
executionResult.setUserException(ex);
}
}
return executionResult;
}
}

0 comments on commit e90b308

Please sign in to comment.