Skip to content

Commit

Permalink
Spawner (#11)
Browse files Browse the repository at this point in the history
* Create pulsar-functions module (#1)

* Create pulsar-functions module

* rename `sdk` package to `api`

* Added the first cut of the Java interface for Pulsar functions (#2)

* Added the first cut of the Spawner

* Added more func

* Use already existing FunctionConfig

* ompiling version

* Rationalized the use of data structures

* Completed Spawner

* Fixed the unittest
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 920277b commit e30808c
Show file tree
Hide file tree
Showing 19 changed files with 689 additions and 160 deletions.
Expand Up @@ -38,7 +38,10 @@
@EqualsAndHashCode
@ToString
public class FunctionConfig {

// namespace that the function resides in
private String nameSpace;
// Username that the function belongs to
private String userName;
// function name
private String name;
// function class name
Expand Down
Expand Up @@ -30,7 +30,7 @@ class ContextImpl implements Context {

// Per Message related
private String messageId;
private String topicName;
private String currentTopicName;
private long startTime;

public ContextImpl(JavaInstanceConfig config, Logger logger) {
Expand All @@ -40,7 +40,7 @@ public ContextImpl(JavaInstanceConfig config, Logger logger) {

public void setCurrentMessageContext(String messageId, String topicName) {
this.messageId = messageId;
this.topicName = topicName;
this.currentTopicName = topicName;
this.startTime = System.currentTimeMillis();
}

Expand All @@ -51,12 +51,12 @@ public String getMessageId() {

@Override
public String getTopicName() {
return topicName;
return currentTopicName;
}

@Override
public String getFunctionName() {
return config.getFunctionName();
return config.getFunctionConfig().getName();
}

@Override
Expand Down
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.instance;

import lombok.*;
import org.apache.pulsar.functions.spawner.ExecutionResult;


import java.io.ByteArrayOutputStream;
import java.util.concurrent.TimeoutException;

/**
* This is the Java Instance. This is started by the spawner using the JavaInstanceClient
* program if invoking via a process based invocation or using JavaInstance using a thread
* based invocation.
*/
@Data
@Setter
@Getter
@EqualsAndHashCode
@ToString
public class JavaExecutionResult implements ExecutionResult {
private Exception userException;
private TimeoutException timeoutException;
private byte[] result;

public void reset() {
this.setUserException(null);
this.setTimeoutException(null);
this.setResult((byte[])null);
}
}
Expand Up @@ -21,11 +21,11 @@
import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.functions.api.RawRequestHandler;
import org.apache.pulsar.functions.api.RequestHandler;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.*;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.List;
Expand All @@ -38,6 +38,7 @@
* based invocation.
*/
public class JavaInstance {
private static final Logger log = LoggerFactory.getLogger(JavaInstance.class);
enum SupportedTypes {
INTEGER,
STRING,
Expand All @@ -50,59 +51,12 @@ enum SupportedTypes {
LIST
}
private ContextImpl context;
private Logger logger;
private SupportedTypes inputType;
private SupportedTypes outputType;
private RequestHandler requestHandler;
private RawRequestHandler rawRequestHandler;
private ExecutorService executorService;
private ExecutionResult executionResult;

class ExecutionResult {
private Exception userException;
private TimeoutException timeoutException;
private Object resultValue;
private ByteArrayOutputStream outputStream;

public Exception getUserException() {
return userException;
}

public void setUserException(Exception userException) {
this.userException = userException;
}

public TimeoutException getTimeoutException() {
return timeoutException;
}

public void setTimeoutException(TimeoutException timeoutException) {
this.timeoutException = timeoutException;
}

public Object getResultValue() {
return resultValue;
}

public void setResultValue(Object resultValue) {
this.resultValue = resultValue;
}

public ByteArrayOutputStream getOutputStream() {
return outputStream;
}

public void setOutputStream(ByteArrayOutputStream outputStream) {
this.outputStream = outputStream;
}

public void reset() {
this.setUserException(null);
this.setTimeoutException(null);
this.setResultValue(null);
this.setOutputStream(null);
}
}
private JavaExecutionResult executionResult;

public static Object createObject(String userClassName) {
Object object;
Expand All @@ -119,13 +73,12 @@ public static Object createObject(String userClassName) {
return object;
}

public JavaInstance(JavaInstanceConfig config, String userClassName, Logger logger) {
this(config, createObject(userClassName), logger);
public JavaInstance(JavaInstanceConfig config) {
this(config, createObject(config.getFunctionConfig().getClassName()));
}

public JavaInstance(JavaInstanceConfig config, Object object, Logger logger) {
this.context = new ContextImpl(config, logger);
this.logger = logger;
public JavaInstance(JavaInstanceConfig config, Object object) {
this.context = new ContextImpl(config, log);
if (object instanceof RequestHandler) {
requestHandler = (RequestHandler) object;
computeInputAndOutputTypes();
Expand All @@ -136,7 +89,7 @@ public JavaInstance(JavaInstanceConfig config, Object object, Logger logger) {
}

executorService = Executors.newFixedThreadPool(1);
this.executionResult = new ExecutionResult();
this.executionResult = new JavaExecutionResult();
}

private void computeInputAndOutputTypes() {
Expand Down Expand Up @@ -169,16 +122,17 @@ private SupportedTypes computeSupportedType(Type type) {
}
}

public ExecutionResult handleMessage(String messageId, String topicName, byte[] data) {
public JavaExecutionResult handleMessage(String messageId, String topicName, byte[] data) {
context.setCurrentMessageContext(messageId, topicName);
executionResult.reset();
Future<?> future = executorService.submit(new Runnable() {
@Override
public void run() {
if (requestHandler != null) {
try {
Object obj = deserialize(data);
executionResult.setResultValue(requestHandler.handleRequest(obj, context));
Object input = deserialize(data);
Object output = requestHandler.handleRequest(input, context);
executionResult.setResult(serialize(output));
} catch (Exception ex) {
executionResult.setUserException(ex);
}
Expand All @@ -187,7 +141,7 @@ public void run() {
ByteArrayInputStream inputStream = new ByteArrayInputStream(data);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
rawRequestHandler.handleRequest(inputStream, outputStream, context);
executionResult.setOutputStream(outputStream);
executionResult.setResult(outputStream.toByteArray());
} catch (Exception ex) {
executionResult.setUserException(ex);
}
Expand All @@ -197,49 +151,66 @@ public void run() {
try {
future.get(context.getTimeBudgetInMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.error("handleMessage was interrupted");
log.error("handleMessage was interrupted");
executionResult.setUserException(e);
} catch (ExecutionException e) {
logger.error("handleMessage threw exception: " + e.getCause());
log.error("handleMessage threw exception: " + e.getCause());
executionResult.setUserException(e);
} catch (TimeoutException e) {
future.cancel(true); // <-- interrupt the job
logger.error("handleMessage timed out");
log.error("handleMessage timed out");
executionResult.setTimeoutException(e);
}

return executionResult;
}

private Object deserialize(byte[] data) throws Exception {
switch (inputType) {
case INTEGER: {
return ByteBuffer.wrap(data).getInt();
}
case LONG: {
return ByteBuffer.wrap(data).getLong();
}
case DOUBLE: {
return ByteBuffer.wrap(data).getDouble();
}
case FLOAT: {
return ByteBuffer.wrap(data).getFloat();
}
case SHORT: {
return ByteBuffer.wrap(data).getShort();
private byte[] serialize(Object resultValue) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutput out = null;
try {
out = new ObjectOutputStream(bos);
out.writeObject(resultValue);
out.flush();
return bos.toByteArray();
} catch (Exception ex) {
} finally {
try {
bos.close();
} catch (IOException ex) {
// ignore close exception
}
case BYTE: {
return ByteBuffer.wrap(data).get();
}
return null;
}

private Object deserialize(byte[] data) throws Exception {
Object obj = null;
ByteArrayInputStream bis = null;
ObjectInputStream ois = null;
try {
bis = new ByteArrayInputStream(data);
ois = new ObjectInputStream(bis);
obj = ois.readObject();
} finally {
if (bis != null) {
bis.close();
}
case STRING: {
return new String(data);
if (ois != null) {
ois.close();
}
}
switch (inputType) {
case INTEGER:
case LONG:
case DOUBLE:
case FLOAT:
case SHORT:
case BYTE:
case STRING:
case MAP:
case LIST: {
ByteArrayInputStream byteIn = new ByteArrayInputStream(data);
ObjectInputStream in = new ObjectInputStream(byteIn);
return in.readObject();
}
case LIST:
return obj;
default: {
throw new RuntimeException("Unknown SupportedType " + inputType);
}
Expand Down
Expand Up @@ -37,13 +37,10 @@
@EqualsAndHashCode
@ToString
public class JavaInstanceConfig {
private String functionName;
private FunctionID functionId;
private InstanceID instanceId;
private String functionVersion;
private FunctionConfig functionConfig;
private String nameSpace;
private String userName;
private FunctionID functionId;
private String functionVersion;
private int timeBudgetInMs;
private int maxMemory;
}
}
Expand Up @@ -19,15 +19,26 @@

package org.apache.pulsar.functions.runtime.container;

import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.spawner.ExecutionResult;

import java.util.concurrent.CompletableFuture;

/**
* A function container is an environment for invoking functions.
*/
public interface FunctionContainer {

String getId();

void start() throws Exception;

void join() throws InterruptedException;

void stop();

FunctionConfig getFunctionConfig();

CompletableFuture<ExecutionResult> sendMessage(String topicName, String messageId, byte[] data);

}
Expand Up @@ -30,12 +30,10 @@ public interface FunctionContainerFactory extends AutoCloseable {
* Create a function container to execute a java instance.
*
* @param instanceConfig java instance config
* @param instanceRunnable java instance runnable
* @return function container to start/stop instance
*/
FunctionContainer createContainer(
JavaInstanceConfig instanceConfig,
Runnable instanceRunnable);
JavaInstanceConfig instanceConfig);

@Override
void close();
Expand Down

0 comments on commit e30808c

Please sign in to comment.