Skip to content

Commit

Permalink
Made use of LimitsConfig in JavaInstanceConfig (#92)
Browse files Browse the repository at this point in the history
* Made use of LimitsConfig in JavaInstanceConfig

* Add uncommited file
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 71997bb commit 493e54a
Show file tree
Hide file tree
Showing 16 changed files with 31 additions and 27 deletions.
Expand Up @@ -25,13 +25,12 @@
import lombok.Getter;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarFunctionsAdmin;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.functions.annotation.Annotations;
import org.apache.pulsar.functions.api.RequestHandler;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.runtime.container.ThreadFunctionContainerFactory;
import org.apache.pulsar.functions.runtime.serde.SerDe;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;
import org.apache.pulsar.functions.runtime.spawner.Spawner;
import org.apache.pulsar.functions.utils.Reflections;

Expand Down
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.runtime.spawner;
package org.apache.pulsar.functions.fs;

import java.io.File;
import java.io.IOException;
Expand Down
Expand Up @@ -76,12 +76,12 @@ public String getFunctionVersion() {

@Override
public long getMemoryLimit() {
return config.getMaxMemory();
return config.getLimitsConfig().getMaxMemoryMb() * 1024 * 1024;
}

@Override
public long getTimeBudgetInMs() {
return config.getTimeBudgetInMs();
return config.getLimitsConfig().getMaxTimeMs();
}

@Override
Expand Down
Expand Up @@ -114,7 +114,7 @@ public JavaInstance(JavaInstanceConfig config, ClassLoader clsLoader) {
throw new RuntimeException("User class must be either a Request or Raw Request Handler");
}

if (config.getTimeBudgetInMs() > 0) {
if (config.getLimitsConfig().getMaxTimeMs() > 0) {
log.info("Spinning up a executor service since time budget is infinite");
executorService = Executors.newFixedThreadPool(1);
}
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.runtime.FunctionID;
import org.apache.pulsar.functions.runtime.InstanceID;
import org.apache.pulsar.functions.fs.LimitsConfig;

/**
* This is the config passed to the Java Instance. Contains all the information
Expand All @@ -38,9 +39,8 @@
@ToString
public class JavaInstanceConfig {
private InstanceID instanceId;
private FunctionConfig functionConfig;
private FunctionID functionId;
private String functionVersion;
private int timeBudgetInMs;
private int maxMemory;
private FunctionConfig functionConfig;
private LimitsConfig limitsConfig;
}
Expand Up @@ -29,6 +29,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.fs.FunctionStatus;
import org.apache.pulsar.functions.fs.LimitsConfig;
import org.apache.pulsar.functions.runtime.container.FunctionContainerFactory;
import org.apache.pulsar.functions.runtime.instance.JavaInstanceConfig;
import org.apache.pulsar.functions.runtime.FunctionID;
Expand Down Expand Up @@ -98,8 +99,7 @@ private JavaInstanceConfig createJavaInstanceConfig() {
javaInstanceConfig.setFunctionConfig(assignmentInfo.getFunctionConfig());
javaInstanceConfig.setFunctionId(assignmentInfo.getFunctionId());
javaInstanceConfig.setFunctionVersion(assignmentInfo.getFunctionVersion());
javaInstanceConfig.setTimeBudgetInMs(limitsConfig.getMaxTimeMs());
javaInstanceConfig.setMaxMemory(limitsConfig.getMaxMemoryMb());
javaInstanceConfig.setLimitsConfig(limitsConfig);
return javaInstanceConfig;
}
}
Expand Up @@ -30,6 +30,7 @@
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;
import org.apache.pulsar.functions.runtime.instance.JavaInstanceConfig;
import org.apache.pulsar.functions.runtime.FunctionID;
import org.apache.pulsar.functions.runtime.InstanceID;
Expand Down Expand Up @@ -81,8 +82,10 @@ JavaInstanceConfig createJavaInstanceConfig() {
config.setFunctionId(new FunctionID());
config.setFunctionVersion("1.0");
config.setInstanceId(new InstanceID());
config.setMaxMemory(2048);
config.setTimeBudgetInMs(2000);
LimitsConfig limitsConfig = new LimitsConfig();
limitsConfig.setMaxTimeMs(2000);
limitsConfig.setMaxMemoryMb(2048);
config.setLimitsConfig(limitsConfig);

return config;
}
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.RequestHandler;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;
import org.apache.pulsar.functions.runtime.serde.JavaSerDe;
import org.apache.pulsar.functions.runtime.serde.Utf8StringSerDe;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -82,10 +83,12 @@ public Void handleRequest(String input, Context context) throws Exception {

private static JavaInstanceConfig createInstanceConfig() {
FunctionConfig functionConfig = new FunctionConfig();
LimitsConfig limitsConfig = new LimitsConfig();
functionConfig.setInputSerdeClassName(Utf8StringSerDe.class.getName());
functionConfig.setOutputSerdeClassName(Utf8StringSerDe.class.getName());
JavaInstanceConfig instanceConfig = new JavaInstanceConfig();
instanceConfig.setFunctionConfig(functionConfig);
instanceConfig.setLimitsConfig(limitsConfig);
return instanceConfig;
}

Expand All @@ -96,7 +99,7 @@ private static JavaInstanceConfig createInstanceConfig() {
@Test
public void testLongRunningFunction() throws Exception {
JavaInstanceConfig config = createInstanceConfig();
config.setTimeBudgetInMs(2000);
config.getLimitsConfig().setMaxTimeMs(2000);
JavaInstance instance = new JavaInstance(
config, new LongRunningHandler(), Thread.currentThread().getContextClassLoader());
String testString = "ABC123";
Expand All @@ -113,7 +116,7 @@ public void testLongRunningFunction() throws Exception {
@Test
public void testLambda() {
JavaInstanceConfig config = createInstanceConfig();
config.setTimeBudgetInMs(2000);
config.getLimitsConfig().setMaxTimeMs(2000);
JavaInstance instance = new JavaInstance(
config,
(RequestHandler<String, String>) (input, context) -> input + "-lambda",
Expand Down Expand Up @@ -165,7 +168,7 @@ public void testVoidInputClasses() {
@Test
public void testVoidOutputClasses() {
JavaInstanceConfig config = createInstanceConfig();
config.setTimeBudgetInMs(2000);
config.getLimitsConfig().setMaxTimeMs(2000);
JavaInstance instance = new JavaInstance(
config, new VoidOutputHandler(), Thread.currentThread().getContextClassLoader());
String testString = "ABC123";
Expand All @@ -181,7 +184,7 @@ public void testVoidOutputClasses() {
@Test
public void testInconsistentInputType() {
JavaInstanceConfig config = createInstanceConfig();
config.setTimeBudgetInMs(2000);
config.getLimitsConfig().setMaxTimeMs(2000);
config.getFunctionConfig().setInputSerdeClassName(JavaSerDe.class.getName());

try {
Expand All @@ -201,7 +204,7 @@ public void testInconsistentInputType() {
@Test
public void testInconsistentOutputType() {
JavaInstanceConfig config = createInstanceConfig();
config.setTimeBudgetInMs(2000);
config.getLimitsConfig().setMaxTimeMs(2000);
config.getFunctionConfig().setOutputSerdeClassName(JavaSerDe.class.getName());

try {
Expand Down
Expand Up @@ -23,7 +23,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.functions.runtime.container.FunctionContainerFactory;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;
import org.apache.pulsar.functions.runtime.spawner.Spawner;

import java.io.File;
Expand Down
Expand Up @@ -29,7 +29,7 @@
import lombok.ToString;
import lombok.experimental.Accessors;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;

@Data
@Setter
Expand Down
Expand Up @@ -23,7 +23,7 @@
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;
import org.slf4j.bridge.SLF4JBridgeHandler;

/**
Expand Down
Expand Up @@ -27,15 +27,14 @@
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderConfiguration;
import org.apache.pulsar.functions.runtime.container.FunctionContainerFactory;
import org.apache.pulsar.functions.runtime.container.ThreadFunctionContainerFactory;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;
import org.apache.pulsar.functions.worker.request.ServiceRequestManager;
import org.apache.pulsar.functions.worker.rest.WorkerServer;

Expand Down
Expand Up @@ -29,7 +29,7 @@
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;

@Data
@Setter
Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.apache.pulsar.functions.annotation.Annotations;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.fs.FunctionStatus;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;
import org.apache.pulsar.functions.runtime.spawner.Spawner;
import org.apache.pulsar.functions.worker.FunctionMetaData;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
Expand Down
Expand Up @@ -19,7 +19,7 @@
package org.apache.pulsar.functions.worker;

import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;

import java.net.URISyntaxException;
import org.testng.annotations.Test;
Expand Down
Expand Up @@ -45,7 +45,7 @@
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.fs.FunctionConfig.ProcessingGuarantees;
import org.apache.pulsar.functions.runtime.serde.Utf8StringSerDe;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;
import org.apache.pulsar.functions.worker.FunctionMetaData;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.PackageLocationMetaData;
Expand Down

0 comments on commit 493e54a

Please sign in to comment.