Skip to content

Commit

Permalink
Introduce functioncache and functioncontainer (#9)
Browse files Browse the repository at this point in the history
* Introduce `functioncache` and `functioncontainer`

- introduce `functioncache` for managing function dependencies (e.g. setting up function class loader and its classpath). The `functioncache` is shared across multiple instances.
- introduce `functioncontainer` for providing a runtime environment to invoke functions.
- provide a thread based `functioncontainer` implementation, which uses functioncache to load classpath and initial its function class loader.

* Revert FunctionID/InstanceID to String

* Add test cases for functioncache and functioncontainer
  • Loading branch information
sijie committed Mar 4, 2018
1 parent c1b68ed commit 920277b
Show file tree
Hide file tree
Showing 32 changed files with 1,675 additions and 6 deletions.
Expand Up @@ -20,7 +20,10 @@

import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.StringConverter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.util.List;
import lombok.Getter;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
Expand All @@ -38,6 +41,13 @@ class LocalRunner extends CliCommand {

@Parameter(names = "--name", description = "Function Name\n")
private String name;
@Parameter(names = "--function-classname", description = "Function Class Name\n")
private String className;
@Parameter(
names = "--function-classpath",
description = "Function Classpath\n",
listConverter = StringConverter.class)
private List<String> jarFiles;
@Parameter(names = "--source-topic", description = "Input Topic Name\n")
private String sourceTopicName;
@Parameter(names = "--sink-topic", description = "Output Topic Name\n")
Expand All @@ -63,6 +73,14 @@ void run() throws Exception {
if (null != name) {
fc.setName(name);
}
if (null != className) {
fc.setClassName(className);
}
if (null != jarFiles) {
fc.setJarFiles(jarFiles);
} else {
fc.setJarFiles(Lists.newArrayList());
}
// TODO: execute the runner here

System.out.println(ReflectionToStringBuilder.toString(fc, ToStringStyle.MULTI_LINE_STYLE));
Expand Down
Expand Up @@ -52,6 +52,13 @@ public interface Context {
*/
String getFunctionId();

/**
* The id of the instance that invokes this function.
*
* @return the instance id
*/
String getInstanceId();

/**
* The version of the function that we are executing
* @return The version id
Expand Down
Expand Up @@ -22,18 +22,29 @@
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.File;
import java.io.IOException;
import java.util.List;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

/**
* Function Configuration.
*/
@Data
@Setter
@Getter
@EqualsAndHashCode
@ToString
public class FunctionConfig {

// function name
private String name;
// function class name
private String className;
// function jar name
private List<String> jarFiles;
// source topic
private String sourceTopic;
// sink topic
Expand Down
Expand Up @@ -61,7 +61,12 @@ public String getFunctionName() {

@Override
public String getFunctionId() {
return config.getFunctionId();
return config.getFunctionId().toString();
}

@Override
public String getInstanceId() {
return config.getInstanceId().toString();
}

@Override
Expand Down
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.functions.instance;

import java.lang.reflect.ParameterizedType;
import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.functions.api.RawRequestHandler;
import org.apache.pulsar.functions.api.RequestHandler;
Expand All @@ -27,7 +26,6 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.List;
Expand All @@ -39,7 +37,7 @@
* program if invoking via a process based invocation or using JavaInstance using a thread
* based invocation.
*/
class JavaInstance {
public class JavaInstance {
enum SupportedTypes {
INTEGER,
STRING,
Expand Down
Expand Up @@ -18,19 +18,30 @@
*/
package org.apache.pulsar.functions.instance;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.runtime.FunctionID;
import org.apache.pulsar.functions.runtime.InstanceID;

/**
* This is the config passed to the Java Instance. Contains all the information
* passed to run functions
*/
@Data
@Getter
@Setter
class JavaInstanceConfig {
@EqualsAndHashCode
@ToString
public class JavaInstanceConfig {
private String functionName;
private String functionId;
private FunctionID functionId;
private InstanceID instanceId;
private String functionVersion;
private FunctionConfig functionConfig;
private String nameSpace;
private String userName;
private int timeBudgetInMs;
Expand Down
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.functions.runtime;

/**
* A identifier for a function.
*/
public class FunctionID extends UUID {

private static final long serialVersionUID = 1L;

public FunctionID() {
super();
}

public FunctionID(long lowestBits, long highestBits) {
super(lowestBits, highestBits);
}

@Override
public String toString() {
return "fn-" + super.toString();
}
}
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.functions.runtime;

/**
* Unique identifier for an execution of a function.
*/
public class InstanceID extends UUID {

private static final long serialVersionUID = 1L;

public static final InstanceID INVALID_INSTANCE_ID = new InstanceID(-1L, -1L);

public InstanceID() {
super();
}

public InstanceID(long lowestBits, long highestBits) {
super(lowestBits, highestBits);
}

@Override
public String toString() {
return "instance-" + super.toString();
}
}
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.functions.runtime;

import com.google.common.annotations.VisibleForTesting;
import java.io.Serializable;

/**
* A identifier for an object.
*/
public class UUID implements Serializable {

private static final long serialVersionUID = 1L;

private final java.util.UUID uuid;

public UUID() {
this.uuid = java.util.UUID.randomUUID();
}

public UUID(long mostSigBits, long leastSigBits) {
this.uuid = new java.util.UUID(mostSigBits, leastSigBits);
}

@VisibleForTesting
java.util.UUID getInternalUUID() {
return uuid;
}

@Override
public int hashCode() {
return uuid.hashCode();
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof UUID)) {
return false;
}
UUID another = (UUID) obj;
return uuid.equals(another.uuid);
}

@Override
public String toString() {
return uuid.toString();
}
}
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.functions.runtime.container;

import org.apache.pulsar.functions.runtime.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.runtime.functioncache.FunctionCacheManagerImpl;

/**
* An abstract implementation of {@link FunctionContainerFactory}.
*/
public abstract class AbstractFunctionContainerFactory implements FunctionContainerFactory {

protected final FunctionCacheManager fnCache;

protected AbstractFunctionContainerFactory() {
this.fnCache = new FunctionCacheManagerImpl();
}

@Override
public void close() {
fnCache.close();
}

}
@@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.functions.runtime.container;

/**
* A function container is an environment for invoking functions.
*/
public interface FunctionContainer {

void start() throws Exception;

void join() throws InterruptedException;

void stop();

}

0 comments on commit 920277b

Please sign in to comment.