Skip to content

Commit

Permalink
Added ProcessContainerFactory (#105)
Browse files Browse the repository at this point in the history
* Added Process based container factory

* Added java instance log4j

* Generate uberjar

* Added log directory for process container

* Better naming

* Corrected java command line

* More changes to get things to work

* Fixed process args

* Temp change that needs to be reverted

* Merge conflict

* Siwthced to gprc 1.5.0 for netty compat

* Getter for ProcessBuilder

* Added test

* Added container info

* Send Internal Server Error if we have trouble with the request

* Dont use unsupported apis

* Revert changes

* Make Thread container as default

* Fix build
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent d02ae48 commit cb74f21
Show file tree
Hide file tree
Showing 19 changed files with 845 additions and 19 deletions.
Expand Up @@ -225,6 +225,7 @@ void runCmd() throws Exception {
); );


try (ThreadFunctionContainerFactory containerFactory = new ThreadFunctionContainerFactory( try (ThreadFunctionContainerFactory containerFactory = new ThreadFunctionContainerFactory(
"LocalRunnerThreadGroup",
limitsConfig.getMaxBufferedTuples(), limitsConfig.getMaxBufferedTuples(),
admin.getServiceUrl().toString())) { admin.getServiceUrl().toString())) {


Expand Down
5 changes: 5 additions & 0 deletions pulsar-functions/conf/function_worker.yml
Expand Up @@ -29,3 +29,8 @@ limitsConfig:
maxMemoryMb: -1 maxMemoryMb: -1
maxCpuCores: -1 maxCpuCores: -1
maxBufferedTuples: 1024 maxBufferedTuples: 1024
threadContainerFactory:
threadGroupName: "Thread Function Container Group"
# processContainerFactory:
# javaInstanceJarLocation: "/Users/sanjeevkulkarni/workspace/srkukarni/pulsar_private/pulsar-functions/runtime/target/java-instance.jar"
# logDirectory: "/Users/sanjeevkulkarni/workspace/srkukarni/pulsar_private/pulsar-functions/runtime/target/logs"
6 changes: 6 additions & 0 deletions pulsar-functions/proto/pom.xml
Expand Up @@ -41,6 +41,12 @@
<version>3.5.1</version> <version>3.5.1</version>
</dependency> </dependency>


<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>1.5.0</version>
</dependency>

</dependencies> </dependencies>
<build> <build>
<extensions> <extensions>
Expand Down
38 changes: 38 additions & 0 deletions pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
syntax = "proto3";
package proto;

import "google/protobuf/empty.proto";

option java_package = "org.apache.pulsar.functions.proto";
option java_outer_classname = "InstanceCommunication";

message FunctionStatusResponseProto {
string failureException = 1;
int64 numProcessed = 2;
int64 numSuccessfullyProcessed = 3;
int64 numTimeouts = 4;
int64 numUserExceptions = 5;
int64 numSystemExceptions = 6;
}

service InstanceControl {
rpc GetFunctionStatus(google.protobuf.Empty) returns (FunctionStatusResponseProto) {}
}
64 changes: 58 additions & 6 deletions pulsar-functions/runtime/pom.xml
Expand Up @@ -69,12 +69,6 @@
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>


<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.5.1</version>
</dependency>

<dependency> <dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId> <groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId> <artifactId>jackson-dataformat-yaml</artifactId>
Expand All @@ -96,6 +90,20 @@
<version>${gson.version}</version> <version>${gson.version}</version>
</dependency> </dependency>



<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.5.1</version>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>1.5.0</version>
</dependency>


<dependency> <dependency>
<groupId>net.jodah</groupId> <groupId>net.jodah</groupId>
<artifactId>typetools</artifactId> <artifactId>typetools</artifactId>
Expand All @@ -113,6 +121,50 @@
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
</dependency> </dependency>


<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
</dependency>

</dependencies> </dependencies>


<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>java-instance</finalName>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.openjdk.jmh.Main</mainClass>
</transformer>
</transformers>
<filters>
<filter>
<!-- Shading signed JARs will fail without
this. http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project> </project>
@@ -0,0 +1,203 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.functions.runtime.container;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Empty;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.fs.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import org.apache.pulsar.functions.runtime.instance.JavaInstanceConfig;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* A function container implemented using java thread.
*/
@Slf4j
class ProcessFunctionContainer implements FunctionContainer {

// The thread that invokes the function
@Getter
private Process process;
@Getter
private final ProcessBuilder processBuilder;
private final int instancePort;
private Exception startupException;
private ManagedChannel channel;
private InstanceControlGrpc.InstanceControlFutureStub stub;

ProcessFunctionContainer(JavaInstanceConfig instanceConfig,
int maxBufferedTuples,
String javaInstanceJarFile,
String logDirectory,
String jarFile,
String pulsarServiceUrl) {
List<String> args = new LinkedList<>();
args.add("java");
args.add("-cp");
args.add(javaInstanceJarFile);
args.add("-Dlog4j.configurationFile=java_instance_log4j2.yml");
args.add("-Dpulsar.log.dir=" + logDirectory);
args.add("-Dpulsar.log.file=" + instanceConfig.getFunctionId());
args.add("org.apache.pulsar.functions.runtime.instance.JavaInstanceMain");
args.add("--instance-id");
args.add(instanceConfig.getInstanceId());
args.add("--function-id");
args.add(instanceConfig.getFunctionId());
args.add("--function-version");
args.add(instanceConfig.getFunctionVersion());
args.add("--tenant");
args.add(instanceConfig.getFunctionConfig().getTenant());
args.add("--namespace");
args.add(instanceConfig.getFunctionConfig().getNamespace());
args.add("--name");
args.add(instanceConfig.getFunctionConfig().getName());
args.add("--function-classname");
args.add(instanceConfig.getFunctionConfig().getClassName());
args.add("--source-topic");
args.add(instanceConfig.getFunctionConfig().getSourceTopic());
args.add("--input-serde-classname");
args.add(instanceConfig.getFunctionConfig().getInputSerdeClassName());
if (instanceConfig.getFunctionConfig().getSinkTopic() != null) {
args.add("--sink-topic");
args.add(instanceConfig.getFunctionConfig().getSinkTopic());
}
if (instanceConfig.getFunctionConfig().getOutputSerdeClassName() != null) {
args.add("--output-serde-classname");
args.add(instanceConfig.getFunctionConfig().getOutputSerdeClassName());
}
args.add("--processing-guarantees");
if (instanceConfig.getFunctionConfig().getProcessingGuarantees() != null) {
args.add(String.valueOf(instanceConfig.getFunctionConfig().getProcessingGuarantees()));
} else {
args.add("ATMOST_ONCE");
}
args.add("--jar");
args.add(jarFile);
args.add("--pulsar-serviceurl");
args.add(pulsarServiceUrl);
args.add("--max-buffered-tuples");
args.add(String.valueOf(maxBufferedTuples));
Map<String, String> userConfig = instanceConfig.getFunctionConfig().getUserConfig();
String userConfigString = "";
if (userConfig != null && !userConfig.isEmpty()) {
for (Map.Entry<String, String> entry : userConfig.entrySet()) {
if (!userConfigString.isEmpty()) {
userConfigString = userConfigString + ",";
}
userConfigString = userConfigString + entry.getKey() + ":" + entry.getValue();
}
args.add("--user-config");
args.add(userConfigString);
}
instancePort = findAvailablePort();
args.add("--port");
args.add(String.valueOf(instancePort));

processBuilder = new ProcessBuilder(args);
}

/**
* The core logic that initialize the thread container and executes the function
*/
@Override
public void start() throws Exception {
try {
log.info("ProcessBuilder starting the process with args {}", String.join(" ", processBuilder.command()));
process = processBuilder.start();
} catch (Exception ex) {
log.error("Starting process failed", ex);
startupException = ex;
throw ex;
}
try {
int exitValue = process.exitValue();
log.error("Instance Process quit unexpectedly with return value " + exitValue);
} catch (IllegalThreadStateException ex) {
log.info("Started process successfully");
}
channel = ManagedChannelBuilder.forAddress("127.0.0.1", instancePort)
.usePlaintext(true)
.build();
stub = InstanceControlGrpc.newFutureStub(channel);
}

@Override
public void stop() {
process.destroy();
channel.shutdown();
}

@Override
public CompletableFuture<FunctionStatus> getFunctionStatus() {
CompletableFuture<FunctionStatus> retval = new CompletableFuture<>();
ListenableFuture<InstanceCommunication.FunctionStatusResponseProto> response = stub.getFunctionStatus(Empty.newBuilder().build());
Futures.addCallback(response, new FutureCallback<InstanceCommunication.FunctionStatusResponseProto>() {
@Override
public void onFailure(Throwable throwable) {
retval.completeExceptionally(throwable);
}

@Override
public void onSuccess(InstanceCommunication.FunctionStatusResponseProto t) {
retval.complete(convertToFunctionStatus(t));
}
});
return retval;
}

private FunctionStatus convertToFunctionStatus(InstanceCommunication.FunctionStatusResponseProto status) {
FunctionStatus retval = new FunctionStatus();
retval.setRunning(true);
retval.setNumProcessed(status.getNumProcessed());
retval.setNumSuccessfullyProcessed(status.getNumSuccessfullyProcessed());
retval.setNumUserExceptions(status.getNumUserExceptions());
retval.setNumSystemExceptions(status.getNumSystemExceptions());
retval.setNumTimeouts(status.getNumTimeouts());
return retval;
}

private int findAvailablePort() {
// The logic here is a little flaky. There is no guarantee that this
// port returned will be available later on when the instance starts
// TODO(sanjeev):- Fix this
try {
ServerSocket socket = new ServerSocket(0);
int port = socket.getLocalPort();
socket.close();
return port;
} catch (IOException ex){
throw new RuntimeException("No free port found", ex);
}
}
}

0 comments on commit cb74f21

Please sign in to comment.