Skip to content

Commit

Permalink
Add numFunctionWorkers and externalServices to cluster spec (#2160)
Browse files Browse the repository at this point in the history
*Motivation*

ClusterSpec is used for defining how a cluster looks like for integration
testing. Add `numFunctionWorker` and `externalServices` in cluster spec,
so we have a common place for setting up a cluster.

*Changes*

- `numFunctionWorkers`: define how many function workers to run in the cluster. When a cluster is created from the cluster spec, it will start
the same number of worker containers.

- `functionRuntimeType`: define how the worker will invoke functions,
whether it is in process mode or thread mode.

- `externalServices`: define whether there are more external services to
run along with the cluster. for example, we need cassandra or kafka for
testing connectors, and we need s3 mock for testing offloaders.
  • Loading branch information
sijie committed Jul 14, 2018
1 parent ea6eae7 commit 10f273a
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 59 deletions.
Expand Up @@ -16,28 +16,33 @@
* specific language governing permissions and limitations * specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package org.apache.pulsar.tests.integration.functions.runtime; package org.apache.pulsar.tests.containers;


import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.testcontainers.containers.Container.ExecResult; import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
import org.testng.annotations.BeforeClass;


/** /**
* Run the runtime test cases in thread mode. * Cassandra Container.
*/ */
@Slf4j @Slf4j
public class PulsarFunctionsThreadRuntimeTest extends PulsarFunctionsRuntimeTest { public class CassandraContainer<SelfT extends ChaosContainer<SelfT>> extends ChaosContainer<SelfT> {


public PulsarFunctionsThreadRuntimeTest() { public static final String NAME = "cassandra";
super(RuntimeFactory.THREAD); public static final int PORT = 9042;
}


@BeforeClass public CassandraContainer(String clusterName) {
public void setupCluster() throws Exception { super(clusterName, "cassandra:3");
super.setupCluster(RuntimeFactory.THREAD.toString());
pulsarCluster.startFunctionWorkersWithThreadContainerFactory(1);
ExecResult result = pulsarCluster.getAnyWorker().execCmd("cat", "/pulsar/conf/functions_worker.yml");
log.info("Functions Worker Config : \n{}", result.getStdout());
} }


@Override
protected void configure() {
super.configure();
this.withNetworkAliases(NAME)
.withExposedPorts(PORT)
.withCreateContainerCmdModifier(createContainerCmd -> {
createContainerCmd.withHostName(NAME);
createContainerCmd.withName(clusterName + "-" + NAME);
})
.waitingFor(new HostPortWaitStrategy());
}
} }
@@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.containers;

import lombok.extern.slf4j.Slf4j;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;

/**
* Cassandra Container.
*/
@Slf4j
public class KafkaContainer<SelfT extends ChaosContainer<SelfT>> extends ChaosContainer<SelfT> {

public static final String NAME = "kafka";
public static final int INTERNAL_PORT = 9092;
public static final int PORT = 9093;

public KafkaContainer(String clusterName) {
super(clusterName, "confluentinc/cp-kafka:4.1.1");
}

@Override
protected void configure() {
super.configure();
this.withNetworkAliases(NAME)
.withExposedPorts(INTERNAL_PORT, PORT)
.withClasspathResourceMapping(
"kafka-zookeeper.properties", "/zookeeper.properties",
BindMode.READ_ONLY)
.withCommand("sh", "-c", "zookeeper-server-start /zookeeper.properties & /etc/confluent/docker/run")
.withEnv("KAFKA_LISTENERS",
"INTERNAL://kafka:" + INTERNAL_PORT + ",PLAINTEXT://" + "0.0.0.0" + ":" + PORT)
.withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181")
.withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "INTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
.withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "INTERNAL")
.withEnv("KAFKA_BROKER_ID", "1")
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
.withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "")
.withCreateContainerCmdModifier(createContainerCmd -> {
createContainerCmd.withHostName(NAME);
createContainerCmd.withName(clusterName + "-" + NAME);
})
.waitingFor(new HostPortWaitStrategy());
}
}
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.containers;

import lombok.extern.slf4j.Slf4j;
import org.testcontainers.containers.SocatContainer;
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;

/**
* Cassandra Container.
*/
@Slf4j
public class KafkaProxyContainer extends SocatContainer {

public static final String NAME = "kafka-proxy";

private final String clusterName;

public KafkaProxyContainer(String clusterName) {
super();
this.clusterName = clusterName;
}

@Override
protected void configure() {
super.configure();
this.withNetworkAliases(NAME)
.withTarget(KafkaContainer.PORT, KafkaContainer.NAME)
.withCreateContainerCmdModifier(createContainerCmd -> {
createContainerCmd.withHostName(NAME);
createContainerCmd.withName(clusterName + "-" + NAME);
})
.waitingFor(new HostPortWaitStrategy());
}
}
Expand Up @@ -16,26 +16,12 @@
* specific language governing permissions and limitations * specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package org.apache.pulsar.tests.integration.functions.runtime; package org.apache.pulsar.tests.topologies;

import lombok.extern.slf4j.Slf4j;
import org.testcontainers.containers.Container;
import org.testng.annotations.BeforeClass;


/** /**
* Run runtime tests in process mode. * Runtime type to run functions.
*/ */
@Slf4j public enum FunctionRuntimeType {
public class PulsarFunctionsProcessRuntimeTest extends PulsarFunctionsRuntimeTest { PROCESS,
public PulsarFunctionsProcessRuntimeTest() { THREAD
super(RuntimeFactory.PROCESS);
}

@BeforeClass
public void setupCluster() throws Exception {
super.setupCluster(RuntimeFactory.PROCESS.toString());
pulsarCluster.startFunctionWorkersWithProcessContainerFactory(1);
Container.ExecResult result = pulsarCluster.getAnyWorker().execCmd("cat", "/pulsar/conf/functions_worker.yml");
log.info("Functions Worker Config : \n{}", result.getStdout());
}
} }
Expand Up @@ -21,20 +21,15 @@
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.tests.containers.PulsarContainer.CS_PORT; import static org.apache.pulsar.tests.containers.PulsarContainer.CS_PORT;


import com.github.dockerjava.api.command.CreateContainerCmd;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;


import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;


import lombok.Getter; import lombok.Getter;
Expand All @@ -49,7 +44,6 @@
import org.testcontainers.containers.Container.ExecResult; import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network; import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;


/** /**
* Pulsar Cluster in containers. * Pulsar Cluster in containers.
Expand Down Expand Up @@ -167,6 +161,27 @@ public void start() throws Exception {
log.info("Pulsar cluster {} is up running:", clusterName); log.info("Pulsar cluster {} is up running:", clusterName);
log.info("\tBinary Service Url : {}", getPlainTextServiceUrl()); log.info("\tBinary Service Url : {}", getPlainTextServiceUrl());
log.info("\tHttp Service Url : {}", getHttpServiceUrl()); log.info("\tHttp Service Url : {}", getHttpServiceUrl());

// start function workers
if (spec.numFunctionWorkers() > 0) {
switch (spec.functionRuntimeType()) {
case THREAD:
startFunctionWorkersWithThreadContainerFactory(spec.numFunctionWorkers());
break;
case PROCESS:
startFunctionWorkersWithProcessContainerFactory(spec.numFunctionWorkers());
break;
}
}

// start external services
Map<String, GenericContainer<?>> externalServices = spec.externalServices;
if (null != externalServices) {
externalServices.entrySet().forEach(service -> {
service.getValue().start();
log.info("Successfully start external service {}.", service.getKey());
});
}
} }


private static <T extends PulsarContainer> Map<String, T> runNumContainers(String serviceName, private static <T extends PulsarContainer> Map<String, T> runNumContainers(String serviceName,
Expand All @@ -186,13 +201,15 @@ private static <T extends PulsarContainer> Map<String, T> runNumContainers(Strin
} }


public void stop() { public void stop() {

Stream.of(proxyContainer, csContainer, zkContainer).parallel().forEach(GenericContainer::stop);
Stream<GenericContainer> list1 = Stream.of(proxyContainer, csContainer, zkContainer); workerContainers.values().parallelStream().forEach(GenericContainer::stop);
Stream<GenericContainer> list2 = brokerContainers.values().parallelStream().forEach(GenericContainer::stop);
Stream.of(workerContainers.values(), brokerContainers.values(), bookieContainers.values()) bookieContainers.values().parallelStream().forEach(GenericContainer::stop);
.flatMap(Collection::stream); if (null != spec.externalServices()) {
Stream<GenericContainer> list3 = Stream.concat(list1, list2); spec.externalServices().values()
list3.parallel().forEach(GenericContainer::stop); .parallelStream()
.forEach(GenericContainer::stop);
}


try { try {
network.close(); network.close();
Expand All @@ -201,7 +218,7 @@ public void stop() {
} }
} }


public void startFunctionWorkersWithProcessContainerFactory(int numFunctionWorkers) { private void startFunctionWorkersWithProcessContainerFactory(int numFunctionWorkers) {
String serviceUrl = "pulsar://pulsar-broker-0:" + PulsarContainer.BROKER_PORT; String serviceUrl = "pulsar://pulsar-broker-0:" + PulsarContainer.BROKER_PORT;
String httpServiceUrl = "http://pulsar-broker-0:" + PulsarContainer.BROKER_HTTP_PORT; String httpServiceUrl = "http://pulsar-broker-0:" + PulsarContainer.BROKER_HTTP_PORT;
workerContainers.putAll(runNumContainers( workerContainers.putAll(runNumContainers(
Expand All @@ -225,7 +242,7 @@ public void startFunctionWorkersWithProcessContainerFactory(int numFunctionWorke
)); ));
} }


public void startFunctionWorkersWithThreadContainerFactory(int numFunctionWorkers) { private void startFunctionWorkersWithThreadContainerFactory(int numFunctionWorkers) {
String serviceUrl = "pulsar://pulsar-broker-0:" + PulsarContainer.BROKER_PORT; String serviceUrl = "pulsar://pulsar-broker-0:" + PulsarContainer.BROKER_PORT;
String httpServiceUrl = "http://pulsar-broker-0:" + PulsarContainer.BROKER_HTTP_PORT; String httpServiceUrl = "http://pulsar-broker-0:" + PulsarContainer.BROKER_HTTP_PORT;
workerContainers.putAll(runNumContainers( workerContainers.putAll(runNumContainers(
Expand Down
Expand Up @@ -18,11 +18,15 @@
*/ */
package org.apache.pulsar.tests.topologies; package org.apache.pulsar.tests.topologies;


import java.util.Map;
import lombok.Builder; import lombok.Builder;
import lombok.Builder.Default; import lombok.Builder.Default;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import org.apache.pulsar.tests.containers.ChaosContainer;
import org.testcontainers.containers.GenericContainer;
import org.testng.collections.Maps;


/** /**
* Spec to build a pulsar cluster. * Spec to build a pulsar cluster.
Expand Down Expand Up @@ -64,6 +68,30 @@ public class PulsarClusterSpec {
@Default @Default
int numProxies = 1; int numProxies = 1;


/**
* Returns number of function workers.
*
* @return number of function workers.
*/
@Default
int numFunctionWorkers = 0;

/**
* Returns the function runtime type.
*
* @return the function runtime type.
*/
@Default
FunctionRuntimeType functionRuntimeType = FunctionRuntimeType.PROCESS;

/**
* Returns the list of external services to start with
* this cluster.
*
* @return the list of external services to start with the cluster.
*/
Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();

/** /**
* Returns the flag whether to enable/disable container log. * Returns the flag whether to enable/disable container log.
* *
Expand Down
Expand Up @@ -62,7 +62,6 @@ public static Object[][] serviceUrls() {
@BeforeClass @BeforeClass
public void setupCluster() throws Exception { public void setupCluster() throws Exception {
this.setupCluster(""); this.setupCluster("");
pulsarCluster.startFunctionWorkersWithProcessContainerFactory(1);
} }


public void setupCluster(String namePrefix) throws Exception { public void setupCluster(String namePrefix) throws Exception {
Expand All @@ -75,7 +74,7 @@ public void setupCluster(String namePrefix) throws Exception {
setupCluster(spec); setupCluster(spec);
} }


private void setupCluster(PulsarClusterSpec spec) throws Exception { protected void setupCluster(PulsarClusterSpec spec) throws Exception {
log.info("Setting up cluster {} with {} bookies, {} brokers", log.info("Setting up cluster {} with {} bookies, {} brokers",
spec.clusterName(), spec.numBookies(), spec.numBrokers()); spec.clusterName(), spec.numBookies(), spec.numBrokers());


Expand Down
@@ -0,0 +1,36 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

clientPort=2181
dataDir=/var/lib/zookeeper/data
dataLogDir=/var/lib/zookeeper/log

0 comments on commit 10f273a

Please sign in to comment.