Skip to content

Commit

Permalink
Run functions runtime test with thread mode (#2070)
Browse files Browse the repository at this point in the history
*Motivation*

Pulsar Functions supports both process and thread modes to run functions.
Currently the test case only test process mode.

*Changes*

Improve the existing test to support testing pulsar functions in thread mode
  • Loading branch information
sijie committed Jul 3, 2018
1 parent 7559632 commit e16f91c
Show file tree
Hide file tree
Showing 8 changed files with 294 additions and 146 deletions.
28 changes: 22 additions & 6 deletions docker/pulsar/scripts/gen-yml-from-env.py
Expand Up @@ -27,6 +27,18 @@
import os, sys
import yaml

INT_KEYS = [
'workerPort',
'numFunctionPackageReplicas',
'failureCheckFreqMs',
'rescheduleTimeoutMs',
'initialBrokerReconnectMaxRetries',
'assignmentWriteMaxRetries',
'instanceLivenessCheckFreqMs'
]

PF_ENV_PREFIX = 'PF_'

if len(sys.argv) < 2:
print 'Usage: %s' % (sys.argv[0])
sys.exit(1)
Expand All @@ -39,25 +51,29 @@
# update the config
modified = False
for k in sorted(os.environ.keys()):
key_parts = k.split('_')
if not k.startswith(PF_ENV_PREFIX):
continue

v = os.environ[k]

k = k[len(PF_ENV_PREFIX):]
key_parts = k.split('_')

i = 0
conf_to_modify = conf
while i < len(key_parts):
key_part = key_parts[i]
if not key_part in conf_to_modify:
break

if i == (len(key_parts) - 1):
if key_part == 'workerPort':
if key_part in INT_KEYS:
conf_to_modify[key_part] = int(v)
else:
conf_to_modify[key_part] = v

modified = True
else:
if not key_part in conf_to_modify:
conf_to_modify[key_part] = {}
conf_to_modify = conf_to_modify[key_part]
modified = True
i += 1
# Store back the updated config in the same file
f = open(conf_filename , 'w')
Expand Down
Expand Up @@ -54,7 +54,6 @@ public class WorkerConfig implements Serializable {
private String pulsarFunctionsCluster;
private int numFunctionPackageReplicas;
private String downloadDirectory;
private long snapshotFreqMs;
private String stateStorageServiceUrl;
private String functionAssignmentTopicName;
private String schedulerClassName;
Expand Down
Expand Up @@ -183,7 +183,7 @@ public void stop() {
}
}

public void startFunctionWorkers(int numFunctionWorkers) {
public void startFunctionWorkersWithProcessContainerFactory(int numFunctionWorkers) {
String serviceUrl = "pulsar://pulsar-broker-0:" + PulsarContainer.BROKER_PORT;
String httpServiceUrl = "http://pulsar-broker-0:" + PulsarContainer.BROKER_HTTP_PORT;
workerContainers.putAll(runNumContainers(
Expand All @@ -193,14 +193,39 @@ public void startFunctionWorkers(int numFunctionWorkers) {
.withNetwork(network)
.withNetworkAliases(name)
// worker settings
.withEnv("workerId", name)
.withEnv("workerHostname", name)
.withEnv("workerPort", "" + PulsarContainer.BROKER_HTTP_PORT)
.withEnv("pulsarFunctionsCluster", clusterName)
.withEnv("pulsarServiceUrl", serviceUrl)
.withEnv("pulsarWebServiceUrl", httpServiceUrl)
.withEnv("PF_workerId", name)
.withEnv("PF_workerHostname", name)
.withEnv("PF_workerPort", "" + PulsarContainer.BROKER_HTTP_PORT)
.withEnv("PF_pulsarFunctionsCluster", clusterName)
.withEnv("PF_pulsarServiceUrl", serviceUrl)
.withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
// script
.withEnv("clusterName", clusterName)
.withEnv("zookeeperServers", ZKContainer.NAME)
// bookkeeper tools
.withEnv("zkServers", ZKContainer.NAME)
));
}

public void startFunctionWorkersWithThreadContainerFactory(int numFunctionWorkers) {
String serviceUrl = "pulsar://pulsar-broker-0:" + PulsarContainer.BROKER_PORT;
String httpServiceUrl = "http://pulsar-broker-0:" + PulsarContainer.BROKER_HTTP_PORT;
workerContainers.putAll(runNumContainers(
"functions-worker",
numFunctionWorkers,
(name) -> new WorkerContainer(clusterName, name)
.withNetwork(network)
.withNetworkAliases(name)
// worker settings
.withEnv("PF_workerId", name)
.withEnv("PF_workerHostname", name)
.withEnv("PF_workerPort", "" + PulsarContainer.BROKER_HTTP_PORT)
.withEnv("PF_pulsarFunctionsCluster", clusterName)
.withEnv("PF_pulsarServiceUrl", serviceUrl)
.withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
.withEnv("PF_threadContainerFactory_threadGroupName", "pf-container-group")
// script
.withEnv("clusterName", clusterName)
.withEnv("zookeeperServers", ZKContainer.NAME)
// bookkeeper tools
.withEnv("zkServers", ZKContainer.NAME)
Expand Down
Expand Up @@ -18,20 +18,10 @@
*/
package org.apache.pulsar.tests.integration.functions;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.tests.containers.WorkerContainer;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
import org.apache.pulsar.tests.integration.functions.utils.UploadDownloadCommandGenerator;
import org.apache.pulsar.tests.topologies.PulsarCluster;
import org.testcontainers.containers.Container.ExecResult;
Expand Down Expand Up @@ -94,126 +84,6 @@ public void checkDownload() throws Exception {
assertTrue(output.getStderr().isEmpty());
}

//
// Test CRUD functions on different runtimes.
//

@Test(dataProvider = "FunctionRuntimes")
public void testExclamationFunction(Runtime runtime) throws Exception {
String inputTopicName = "test-exclamation-" + runtime + "-input-" + randomName(8);
String outputTopicName = "test-exclamation-" + runtime + "-output-" + randomName(8);
String functionName = "test-exclamation-fn-" + randomName(8);
final int numMessages = 10;

// submit the exclamation function
submitExclamationFunction(
inputTopicName, outputTopicName, functionName);

// get function info
getFunctionInfoSuccess(functionName);

// publish and consume result
publishAndConsumeMessages(inputTopicName, outputTopicName, numMessages);

// get function status
getFunctionStatus(functionName, numMessages);

// delete function
deleteFunction(functionName);

// get function info
getFunctionInfoNotFound(functionName);
}

private static void submitExclamationFunction(String inputTopicName,
String outputTopicName,
String functionName) throws Exception {
CommandGenerator generator = CommandGenerator.createDefaultGenerator(inputTopicName, EXCLAMATION_FUNC_CLASS);
generator.setSinkTopic(outputTopicName);
generator.setFunctionName(functionName);
String command = generator.generateCreateFunctionCommand();
String[] commands = {
"sh", "-c", command
};
ExecResult result = pulsarCluster.getAnyWorker().execCmd(
commands);
assertTrue(result.getStdout().contains("\"Created successfully\""));
}

private static void getFunctionInfoSuccess(String functionName) throws Exception {
ExecResult result = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"get",
"--tenant", "public",
"--namespace", "default",
"--name", functionName
);
assertTrue(result.getStdout().contains("\"name\": \"" + functionName + "\""));
}

private static void getFunctionInfoNotFound(String functionName) throws Exception {
ExecResult result = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"get",
"--tenant", "public",
"--namespace", "default",
"--name", functionName
);
assertTrue(result.getStderr().contains("Reason: Function " + functionName + " doesn't exist"));
}

private static void getFunctionStatus(String functionName, int numMessages) throws Exception {
ExecResult result = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"getstatus",
"--tenant", "public",
"--namespace", "default",
"--name", functionName
);
assertTrue(result.getStdout().contains("\"running\": true"));
assertTrue(result.getStdout().contains("\"numProcessed\": \"" + numMessages + "\""));
assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\": \"" + numMessages + "\""));
}

private static void publishAndConsumeMessages(String inputTopic,
String outputTopic,
int numMessages) throws Exception {
@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
@Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(outputTopic)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();
@Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
.topic(inputTopic)
.create();

for (int i = 0; i < numMessages; i++) {
producer.send("message-" + i);
}

for (int i = 0; i < numMessages; i++) {
Message<String> msg = consumer.receive();
assertEquals("message-" + i + "!", msg.getValue());
}
}

private static void deleteFunction(String functionName) throws Exception {
ExecResult result = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"delete",
"--tenant", "public",
"--namespace", "default",
"--name", functionName
);
assertTrue(result.getStdout().contains("Deleted successfully"));
assertTrue(result.getStderr().isEmpty());
}

}
Expand Up @@ -18,15 +18,18 @@
*/
package org.apache.pulsar.tests.integration.functions;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
import org.testcontainers.containers.Container.ExecResult;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;

/**
* A cluster to run pulsar functions for testing functions related features.
*/
public class PulsarFunctionsTestBase extends PulsarClusterTestBase {
@Slf4j
public abstract class PulsarFunctionsTestBase extends PulsarClusterTestBase {

public static final String EXCLAMATION_FUNC_CLASS =
"org.apache.pulsar.functions.api.examples.ExclamationFunction";
Expand All @@ -35,7 +38,10 @@ public class PulsarFunctionsTestBase extends PulsarClusterTestBase {
public static void setupCluster() throws Exception {
PulsarClusterTestBase.setupCluster();

pulsarCluster.startFunctionWorkers(1);
pulsarCluster.startFunctionWorkersWithProcessContainerFactory(1);

ExecResult result = pulsarCluster.getAnyWorker().execCmd("cat", "/pulsar/conf/functions_worker.yml");
log.info("Functions Worker Config : \n{}", result.getStdout());
}

@DataProvider(name = "FunctionRuntimes")
Expand Down
@@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.functions.runtime;

/**
* Run runtime tests in process mode.
*/
public class PulsarFunctionsProcessRuntimeTest extends PulsarFunctionsRuntimeTest {
}

0 comments on commit e16f91c

Please sign in to comment.