Skip to content

Commit

Permalink
Fix process runtime with extra dependencies and re-enable python inte…
Browse files Browse the repository at this point in the history
…gration tests (#2926)

### Motivation

fix python process with extra dependencies

```
java.io.IOException: Cannot run program “PYTHONPATH=${PYTHONPATH}:/Users/jerrypeng/workspace/incubator-pulsar/instances/deps”: error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048) ~[?:1.8.0_144]
    at org.apache.pulsar.functions.runtime.ProcessRuntime.startProcess(ProcessRuntime.java:291) [pulsar-functions-runtime.jar:2.3.0-SNAPSHOT]
    at org.apache.pulsar.functions.runtime.ProcessRuntime.start(ProcessRuntime.java:124) [pulsar-functions-runtime.jar:2.3.0-SNAPSHOT]
    at org.apache.pulsar.functions.runtime.RuntimeSpawner$1.run(RuntimeSpawner.java:94) [pulsar-functions-runtime.jar:2.3.0-SNAPSHOT]
    at java.util.TimerThread.mainLoop(Timer.java:555) [?:1.8.0_144]
    at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_144]
Caused by: java.io.IOException: error=2, No such file or directory
    at java.lang.UNIXProcess.forkAndExec(Native Method) ~[?:1.8.0_144]
    at java.lang.UNIXProcess.<init>(UNIXProcess.java:247) ~[?:1.8.0_144]
    at java.lang.ProcessImpl.start(ProcessImpl.java:134) ~[?:1.8.0_144]
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029) ~[?:1.8.0_144]
    ... 5 more
```

### Modifications

- set environment variables in process builder for process runtime
- enable python function integration tests
- add integration test for python function with extra dependencies

### Result

python functions test passed and integration test for python function with extra deps is tested.
  • Loading branch information
sijie committed Nov 5, 2018
1 parent 6617b32 commit ee22e0a
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 37 deletions.
Expand Up @@ -28,13 +28,13 @@
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceControlGrpc; import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;


import java.io.InputStream; import java.io.InputStream;
Expand Down Expand Up @@ -65,6 +65,7 @@ class ProcessRuntime implements Runtime {
private InstanceConfig instanceConfig; private InstanceConfig instanceConfig;
private final Long expectedHealthCheckInterval; private final Long expectedHealthCheckInterval;
private final SecretsProviderConfigurator secretsProviderConfigurator; private final SecretsProviderConfigurator secretsProviderConfigurator;
private final String extraDependenciesDir;
private static final long GRPC_TIMEOUT_SECS = 5; private static final long GRPC_TIMEOUT_SECS = 5;


ProcessRuntime(InstanceConfig instanceConfig, ProcessRuntime(InstanceConfig instanceConfig,
Expand Down Expand Up @@ -95,10 +96,14 @@ class ProcessRuntime implements Runtime {
logConfigFile = System.getenv("PULSAR_HOME") + "/conf/functions-logging/logging_config.ini"; logConfigFile = System.getenv("PULSAR_HOME") + "/conf/functions-logging/logging_config.ini";
break; break;
} }
this.extraDependenciesDir = extraDependenciesDir;
this.processArgs = RuntimeUtils.composeArgs( this.processArgs = RuntimeUtils.composeArgs(
instanceConfig, instanceConfig,
instanceFile, instanceFile,
extraDependenciesDir, // DONT SET extra dependencies here (for python runtime),
// since process runtime is using Java ProcessBuilder,
// we have to set the environment variable via ProcessBuilder
FunctionDetails.Runtime.JAVA == instanceConfig.getFunctionDetails().getRuntime() ? extraDependenciesDir : null,
logDirectory, logDirectory,
codeFile, codeFile,
pulsarServiceUrl, pulsarServiceUrl,
Expand Down Expand Up @@ -286,6 +291,9 @@ private void startProcess() {
deathException = null; deathException = null;
try { try {
ProcessBuilder processBuilder = new ProcessBuilder(processArgs).inheritIO(); ProcessBuilder processBuilder = new ProcessBuilder(processArgs).inheritIO();
if (StringUtils.isNotEmpty(extraDependenciesDir)) {
processBuilder.environment().put("PYTHONPATH", "${PYTHONPATH}:" + extraDependenciesDir);
}
secretsProviderConfigurator.configureProcessRuntimeSecretsProvider(processBuilder, instanceConfig.getFunctionDetails()); secretsProviderConfigurator.configureProcessRuntimeSecretsProvider(processBuilder, instanceConfig.getFunctionDetails());
log.info("ProcessBuilder starting the process with args {}", String.join(" ", processBuilder.command())); log.info("ProcessBuilder starting the process with args {}", String.join(" ", processBuilder.command()));
process = processBuilder.start(); process = processBuilder.start();
Expand Down
Expand Up @@ -303,21 +303,10 @@ private void verifyPythonInstance(InstanceConfig config, String extraDepsDir) th
ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l); ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
List<String> args = container.getProcessArgs(); List<String> args = container.getProcessArgs();


int totalArgs; int totalArgs = 30;
int portArg; int portArg = 23;
String pythonPath; String pythonPath = "";
int configArg; int configArg = 9;
if (null == extraDepsDir) {
totalArgs = 30;
portArg = 23;
configArg = 9;
pythonPath = "";
} else {
totalArgs = 31;
portArg = 24;
configArg = 10;
pythonPath = "PYTHONPATH=${PYTHONPATH}:" + extraDepsDir + " ";
}


assertEquals(args.size(), totalArgs); assertEquals(args.size(), totalArgs);
String expectedArgs = pythonPath + "python " + pythonInstanceFile String expectedArgs = pythonPath + "python " + pythonInstanceFile
Expand Down
7 changes: 7 additions & 0 deletions tests/docker-images/latest-version-image/Dockerfile
Expand Up @@ -33,3 +33,10 @@ COPY ssl/ca.cert.pem ssl/broker.key-pk8.pem ssl/broker.cert.pem \
COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \ COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \
scripts/run-bookie.sh scripts/run-broker.sh scripts/run-functions-worker.sh scripts/run-proxy.sh scripts/run-presto-worker.sh \ scripts/run-bookie.sh scripts/run-broker.sh scripts/run-functions-worker.sh scripts/run-proxy.sh scripts/run-presto-worker.sh \
/pulsar/bin/ /pulsar/bin/

# copy python test examples

RUN mkdir -p /pulsar/instances/deps

COPY python-examples/exclamation_lib.py /pulsar/instances/deps/
COPY python-examples/exclamation_with_extra_deps.py /pulsar/examples/python-examples/
@@ -0,0 +1,22 @@
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

def exclamation(input):
return input + '!'
@@ -0,0 +1,31 @@
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

from pulsar import Function
from exclamation_lib import exclamation

# The classic ExclamationFunction that appends an exclamation at the end
# of the input
class ExclamationFunction(Function):
def __init__(self):
pass

def process(self, input, context):
return exclamation(input)
Expand Up @@ -18,6 +18,7 @@
*/ */
package org.apache.pulsar.tests.integration.functions; package org.apache.pulsar.tests.integration.functions;


import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail; import static org.testng.Assert.fail;
Expand Down Expand Up @@ -611,49 +612,64 @@ protected void getSourceInfoNotFound(String tenant, String namespace, String sou
// Test CRUD functions on different runtimes. // Test CRUD functions on different runtimes.
// //


@Test(enabled = false) @Test
public void testPythonExclamationFunction() throws Exception { public void testPythonExclamationFunction() throws Exception {
testExclamationFunction(Runtime.PYTHON, false, false); testExclamationFunction(Runtime.PYTHON, false, false, false);
} }


@Test(enabled = false) @Test
public void testPythonExclamationFunctionWithExtraDeps() throws Exception {
testExclamationFunction(Runtime.PYTHON, false, false, true);
}

@Test
public void testPythonExclamationZipFunction() throws Exception { public void testPythonExclamationZipFunction() throws Exception {
testExclamationFunction(Runtime.PYTHON, false, true); testExclamationFunction(Runtime.PYTHON, false, true, false);
} }


@Test(enabled = false) @Test
public void testPythonExclamationTopicPatternFunction() throws Exception { public void testPythonExclamationTopicPatternFunction() throws Exception {
testExclamationFunction(Runtime.PYTHON, true, false); testExclamationFunction(Runtime.PYTHON, true, false, false);
} }


@Test @Test
public void testJavaExclamationFunction() throws Exception { public void testJavaExclamationFunction() throws Exception {
testExclamationFunction(Runtime.JAVA, false, false); testExclamationFunction(Runtime.JAVA, false, false, false);
} }


@Test @Test
public void testJavaExclamationTopicPatternFunction() throws Exception { public void testJavaExclamationTopicPatternFunction() throws Exception {
testExclamationFunction(Runtime.JAVA, true, false); testExclamationFunction(Runtime.JAVA, true, false, false);
} }


private void testExclamationFunction(Runtime runtime, boolean isTopicPattern, boolean pyZip) throws Exception { private void testExclamationFunction(Runtime runtime,
boolean isTopicPattern,
boolean pyZip,
boolean withExtraDeps) throws Exception {
if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.PYTHON) { if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.PYTHON) {
// python can only run on process mode // python can only run on process mode
return; return;
} }


Schema<?> schema;
if (Runtime.JAVA == runtime) {
schema = Schema.STRING;
} else {
schema = Schema.BYTES;
}

String inputTopicName = "persistent://public/default/test-exclamation-" + runtime + "-input-" + randomName(8); String inputTopicName = "persistent://public/default/test-exclamation-" + runtime + "-input-" + randomName(8);
String outputTopicName = "test-exclamation-" + runtime + "-output-" + randomName(8); String outputTopicName = "test-exclamation-" + runtime + "-output-" + randomName(8);
if (isTopicPattern) { if (isTopicPattern) {
@Cleanup PulsarClient client = PulsarClient.builder() @Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl()) .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build(); .build();
@Cleanup Consumer<String> consumer1 = client.newConsumer(Schema.STRING) @Cleanup Consumer<?> consumer1 = client.newConsumer(schema)
.topic(inputTopicName + "1") .topic(inputTopicName + "1")
.subscriptionType(SubscriptionType.Exclusive) .subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub") .subscriptionName("test-sub")
.subscribe(); .subscribe();
@Cleanup Consumer<String> consumer2 = client.newConsumer(Schema.STRING) @Cleanup Consumer<?> consumer2 = client.newConsumer(schema)
.topic(inputTopicName + "2") .topic(inputTopicName + "2")
.subscriptionType(SubscriptionType.Exclusive) .subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub") .subscriptionName("test-sub")
Expand All @@ -665,13 +681,19 @@ private void testExclamationFunction(Runtime runtime, boolean isTopicPattern, bo


// submit the exclamation function // submit the exclamation function
submitExclamationFunction( submitExclamationFunction(
runtime, inputTopicName, outputTopicName, functionName, pyZip); runtime, inputTopicName, outputTopicName, functionName, pyZip, withExtraDeps, schema);


// get function info // get function info
getFunctionInfoSuccess(functionName); getFunctionInfoSuccess(functionName);


// publish and consume result // publish and consume result
publishAndConsumeMessages(inputTopicName, outputTopicName, numMessages); if (Runtime.JAVA == runtime) {
// java supports schema
publishAndConsumeMessages(inputTopicName, outputTopicName, numMessages);
} else {
// python doesn't support schema
publishAndConsumeMessagesBytes(inputTopicName, outputTopicName, numMessages);
}


// get function status // get function status
getFunctionStatus(functionName, numMessages); getFunctionStatus(functionName, numMessages);
Expand All @@ -687,22 +709,26 @@ private static void submitExclamationFunction(Runtime runtime,
String inputTopicName, String inputTopicName,
String outputTopicName, String outputTopicName,
String functionName, String functionName,
boolean pyZip) throws Exception { boolean pyZip,
boolean withExtraDeps,
Schema<?> schema) throws Exception {
submitFunction( submitFunction(
runtime, runtime,
inputTopicName, inputTopicName,
outputTopicName, outputTopicName,
functionName, functionName,
pyZip, pyZip,
getExclamationClass(runtime, pyZip), withExtraDeps,
Schema.STRING); getExclamationClass(runtime, pyZip, withExtraDeps),
schema);
} }


private static <T> void submitFunction(Runtime runtime, private static <T> void submitFunction(Runtime runtime,
String inputTopicName, String inputTopicName,
String outputTopicName, String outputTopicName,
String functionName, String functionName,
boolean pyZip, boolean pyZip,
boolean withExtraDeps,
String functionClass, String functionClass,
Schema<T> inputTopicSchema) throws Exception { Schema<T> inputTopicSchema) throws Exception {
CommandGenerator generator; CommandGenerator generator;
Expand All @@ -723,6 +749,8 @@ private static <T> void submitFunction(Runtime runtime,
generator.setRuntime(runtime); generator.setRuntime(runtime);
if (pyZip) { if (pyZip) {
command = generator.generateCreateFunctionCommand(EXCLAMATION_PYTHONZIP_FILE); command = generator.generateCreateFunctionCommand(EXCLAMATION_PYTHONZIP_FILE);
} else if (withExtraDeps) {
command = generator.generateCreateFunctionCommand(EXCLAMATION_WITH_DEPS_PYTHON_FILE);
} else { } else {
command = generator.generateCreateFunctionCommand(EXCLAMATION_PYTHON_FILE); command = generator.generateCreateFunctionCommand(EXCLAMATION_PYTHON_FILE);
} }
Expand Down Expand Up @@ -850,6 +878,56 @@ private static void publishAndConsumeMessages(String inputTopic,
} }
} }


private static void publishAndConsumeMessagesBytes(String inputTopic,
String outputTopic,
int numMessages) throws Exception {
@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
@Cleanup Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
.topic(outputTopic)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();
if (inputTopic.endsWith(".*")) {
@Cleanup Producer<byte[]> producer1 = client.newProducer(Schema.BYTES)
.topic(inputTopic.substring(0, inputTopic.length() - 2) + "1")
.create();
@Cleanup Producer<byte[]> producer2 = client.newProducer(Schema.BYTES)
.topic(inputTopic.substring(0, inputTopic.length() - 2) + "2")
.create();

for (int i = 0; i < numMessages / 2; i++) {
producer1.send(("message-" + i).getBytes(UTF_8));
}

for (int i = numMessages / 2; i < numMessages; i++) {
producer2.send(("message-" + i).getBytes(UTF_8));
}
} else {
@Cleanup Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic(inputTopic)
.create();

for (int i = 0; i < numMessages; i++) {
producer.send(("message-" + i).getBytes(UTF_8));
}
}

Set<String> expectedMessages = new HashSet<>();
for (int i = 0; i < numMessages; i++) {
expectedMessages.add("message-" + i + "!");
}

for (int i = 0; i < numMessages; i++) {
Message<byte[]> msg = consumer.receive(30, TimeUnit.SECONDS);
String msgValue = new String(msg.getValue(), UTF_8);
log.info("Received: {}", msgValue);
assertTrue(expectedMessages.contains(msgValue));
expectedMessages.remove(msgValue);
}
}

private static void deleteFunction(String functionName) throws Exception { private static void deleteFunction(String functionName) throws Exception {
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd( ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT, PulsarCluster.ADMIN_SCRIPT,
Expand All @@ -872,7 +950,7 @@ public void testAutoSchemaFunction() throws Exception {


// submit the exclamation function // submit the exclamation function
submitFunction( submitFunction(
Runtime.JAVA, inputTopicName, outputTopicName, functionName, false, Runtime.JAVA, inputTopicName, outputTopicName, functionName, false, false,
AutoSchemaFunction.class.getName(), AutoSchemaFunction.class.getName(),
Schema.AVRO(CustomObject.class)); Schema.AVRO(CustomObject.class));


Expand Down
Expand Up @@ -74,20 +74,28 @@ public void teardownFunctionWorkers() {
"org.apache.pulsar.functions.api.examples.ExclamationFunction"; "org.apache.pulsar.functions.api.examples.ExclamationFunction";


public static final String EXCLAMATION_PYTHON_CLASS = public static final String EXCLAMATION_PYTHON_CLASS =
"exclamation.ExclamationFunction"; "exclamation_function.ExclamationFunction";

public static final String EXCLAMATION_WITH_DEPS_PYTHON_CLASS =
"exclamation_with_extra_deps.ExclamationFunction";


public static final String EXCLAMATION_PYTHONZIP_CLASS = public static final String EXCLAMATION_PYTHONZIP_CLASS =
"exclamation"; "exclamation";


public static final String EXCLAMATION_PYTHON_FILE = "exclamation_function.py"; public static final String EXCLAMATION_PYTHON_FILE = "exclamation_function.py";
public static final String EXCLAMATION_WITH_DEPS_PYTHON_FILE = "exclamation_with_extra_deps.py";
public static final String EXCLAMATION_PYTHONZIP_FILE = "exclamation.zip"; public static final String EXCLAMATION_PYTHONZIP_FILE = "exclamation.zip";


protected static String getExclamationClass(Runtime runtime, boolean pyZip) { protected static String getExclamationClass(Runtime runtime,
boolean pyZip,
boolean extraDeps) {
if (Runtime.JAVA == runtime) { if (Runtime.JAVA == runtime) {
return EXCLAMATION_JAVA_CLASS; return EXCLAMATION_JAVA_CLASS;
} else if (Runtime.PYTHON == runtime) { } else if (Runtime.PYTHON == runtime) {
if (pyZip) { if (pyZip) {
return EXCLAMATION_PYTHONZIP_CLASS; return EXCLAMATION_PYTHONZIP_CLASS;
} else if (extraDeps) {
return EXCLAMATION_WITH_DEPS_PYTHON_CLASS;
} else { } else {
return EXCLAMATION_PYTHON_CLASS; return EXCLAMATION_PYTHON_CLASS;
} }
Expand Down

0 comments on commit ee22e0a

Please sign in to comment.