From e90b30882df4f7046b481bf1a281bc0574271ec9 Mon Sep 17 00:00:00 2001 From: Sanjeev Kulkarni Date: Thu, 21 Dec 2017 15:44:40 -0800 Subject: [PATCH] Nonthreadpool (#38) * Create pulsar-functions module (#1) * Create pulsar-functions module * rename `sdk` package to `api` * Added the first cut of the Java interface for Pulsar functions (#2) * In case of Kstream style running, we dont need to spawn a executor service to run the message * Update comment --- .../apache/pulsar/admin/cli/CmdFunctions.java | 2 +- .../runtime/instance/JavaInstance.java | 49 ++++++++++++------- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 39c19ba73494b..cbd1ff574c34b 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -100,7 +100,7 @@ class LocalRunner extends FunctionsCommand { @Override void run_functions_cmd() throws Exception { LimitsConfig limitsConfig = new LimitsConfig( - 60000, // 60 seconds + -1, // No timelimit 1024, // 1GB 1024 // 1024 outstanding tuples ); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/instance/JavaInstance.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/instance/JavaInstance.java index e10a86e5f74f0..4af11c5f5da10 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/instance/JavaInstance.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/instance/JavaInstance.java @@ -109,7 +109,10 @@ public JavaInstance(JavaInstanceConfig config, ClassLoader clsLoader) { throw new RuntimeException("User class must be either a Request or Raw Request Handler"); } - executorService = Executors.newFixedThreadPool(1); + if (config.getTimeBudgetInMs() > 0) { + log.info("Spinning up a executor service since time budget is infinite"); + executorService = Executors.newFixedThreadPool(1); + } } private void computeInputAndOutputTypesAndVerifySerDe() { @@ -145,25 +148,11 @@ private void verifySupportedType(Type type, boolean allowVoid) { public JavaExecutionResult handleMessage(String messageId, String topicName, byte[] data) { context.setCurrentMessageContext(messageId, topicName); JavaExecutionResult executionResult = new JavaExecutionResult(); + if (executorService == null) { + return processMessage(executionResult, data); + } Future future = executorService.submit(() -> { - if (requestHandler != null) { - try { - Object input = inputSerDe.deserialize(data); - Object output = requestHandler.handleRequest(input, context); - executionResult.setResult(output); - } catch (Exception ex) { - executionResult.setUserException(ex); - } - } else if (rawRequestHandler != null) { - try { - ByteArrayInputStream inputStream = new ByteArrayInputStream(data); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - rawRequestHandler.handleRequest(inputStream, outputStream, context); - executionResult.setResult(outputStream.toByteArray()); - } catch (Exception ex) { - executionResult.setUserException(ex); - } - } + return processMessage(executionResult, data); }); try { future.get(context.getTimeBudgetInMs(), TimeUnit.MILLISECONDS); @@ -181,4 +170,26 @@ public JavaExecutionResult handleMessage(String messageId, String topicName, byt return executionResult; } + + private JavaExecutionResult processMessage(JavaExecutionResult executionResult, byte[] data) { + if (requestHandler != null) { + try { + Object input = inputSerDe.deserialize(data); + Object output = requestHandler.handleRequest(input, context); + executionResult.setResult(output); + } catch (Exception ex) { + executionResult.setUserException(ex); + } + } else if (rawRequestHandler != null) { + try { + ByteArrayInputStream inputStream = new ByteArrayInputStream(data); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + rawRequestHandler.handleRequest(inputStream, outputStream, context); + executionResult.setResult(outputStream.toByteArray()); + } catch (Exception ex) { + executionResult.setUserException(ex); + } + } + return executionResult; + } }