Skip to content

Commit

Permalink
Enhance Pulsar cluster with container logs (#2078)
Browse files Browse the repository at this point in the history
### Motivation
This is continuation of the existing work to move from the arquiilian framework to test containers library , here we focus on more traceability in the pulsar test containers with logging and migrate the smoke test from the old framework.

### Result
Smoke test based on testcontainers should be removing the older arquillian based version.
  • Loading branch information
aahmed-se authored and sijie committed Jul 6, 2018
1 parent 2f6dc80 commit 99892d7
Show file tree
Hide file tree
Showing 17 changed files with 183 additions and 189 deletions.
Expand Up @@ -23,6 +23,8 @@
*/ */
public class BKContainer extends PulsarContainer<BKContainer> { public class BKContainer extends PulsarContainer<BKContainer> {


public static final String NAME = "bookie";

public BKContainer(String clusterName, String hostName) { public BKContainer(String clusterName, String hostName) {
super( super(
clusterName, hostName, hostName, "bin/run-bookie.sh", BOOKIE_PORT, INVALID_PORT); clusterName, hostName, hostName, "bin/run-bookie.sh", BOOKIE_PORT, INVALID_PORT);
Expand Down
Expand Up @@ -23,6 +23,8 @@
*/ */
public class BrokerContainer extends PulsarContainer<BrokerContainer> { public class BrokerContainer extends PulsarContainer<BrokerContainer> {


public static final String NAME = "pulsar-broker";

public BrokerContainer(String clusterName, String hostName) { public BrokerContainer(String clusterName, String hostName) {
super( super(
clusterName, hostName, hostName, "bin/run-broker.sh", BROKER_PORT, INVALID_PORT); clusterName, hostName, hostName, "bin/run-broker.sh", BROKER_PORT, INVALID_PORT);
Expand Down
Expand Up @@ -104,13 +104,13 @@ public void onNext(Frame item) {
public ExecResult execCmd(String... cmd) throws Exception { public ExecResult execCmd(String... cmd) throws Exception {
String cmdString = StringUtils.join(cmd, " "); String cmdString = StringUtils.join(cmd, " ");


log.info("DOCKER.exec({}:{}): Executing ...", containerId, cmdString); log.info("DOCKER.exec({}:{}): Executing ...", containerName.substring(1), cmdString);


ExecResult result = execInContainer(cmd); ExecResult result = execInContainer(cmd);


log.info("Docker.exec({}:{}): Done", containerId, cmdString); log.info("Docker.exec({}:{}): Done", containerName.substring(1), cmdString);
log.info("Docker.exec({}:{}): Stdout -\n{}", containerId, cmdString, result.getStdout()); log.info("Docker.exec({}:{}): Stdout -\n{}", containerName.substring(1), cmdString, result.getStdout());
log.info("Docker.exec({}:{}): Stderr -\n{}", containerId, cmdString, result.getStderr()); log.info("Docker.exec({}:{}): Stderr -\n{}", containerName.substring(1), cmdString, result.getStderr());


return result; return result;
} }
Expand Down
Expand Up @@ -23,6 +23,8 @@
*/ */
public class ProxyContainer extends PulsarContainer<ProxyContainer> { public class ProxyContainer extends PulsarContainer<ProxyContainer> {


public static final String NAME = "pulsar-proxy";

public ProxyContainer(String clusterName, String hostName) { public ProxyContainer(String clusterName, String hostName) {
super( super(
clusterName, hostName, hostName, "bin/run-proxy.sh", BROKER_PORT, BROKER_HTTP_PORT); clusterName, hostName, hostName, "bin/run-proxy.sh", BROKER_PORT, BROKER_HTTP_PORT);
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.Objects; import java.util.Objects;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.DockerUtils; import org.apache.pulsar.tests.DockerUtils;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy; import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;


/** /**
Expand All @@ -43,20 +44,20 @@ public abstract class PulsarContainer<SelfT extends PulsarContainer<SelfT>> exte


private final String hostname; private final String hostname;
private final String serviceName; private final String serviceName;
private final String serviceEntrypoint; private final String serviceEntryPoint;
private final int servicePort; private final int servicePort;
private final int httpPort; private final int httpPort;


public PulsarContainer(String clusterName, public PulsarContainer(String clusterName,
String hostname, String hostname,
String serviceName, String serviceName,
String serviceEntrypoint, String serviceEntryPoint,
int servicePort, int servicePort,
int httpPort) { int httpPort) {
super(clusterName, IMAGE_NAME); super(clusterName, IMAGE_NAME);
this.hostname = hostname; this.hostname = hostname;
this.serviceName = serviceName; this.serviceName = serviceName;
this.serviceEntrypoint = serviceEntrypoint; this.serviceEntryPoint = serviceEntryPoint;
this.servicePort = servicePort; this.servicePort = servicePort;
this.httpPort = httpPort; this.httpPort = httpPort;
} }
Expand Down Expand Up @@ -97,9 +98,9 @@ public void start() {
this.withCreateContainerCmdModifier(createContainerCmd -> { this.withCreateContainerCmdModifier(createContainerCmd -> {
createContainerCmd.withHostName(hostname); createContainerCmd.withHostName(hostname);
createContainerCmd.withName(getContainerName()); createContainerCmd.withName(getContainerName());
createContainerCmd.withEntrypoint(serviceEntrypoint); createContainerCmd.withEntrypoint(serviceEntryPoint);
}); });

super.start(); super.start();
log.info("Start pulsar service {} at container {}", serviceName, containerName); log.info("Start pulsar service {} at container {}", serviceName, containerName);
} }
Expand Down
Expand Up @@ -23,6 +23,8 @@
*/ */
public class WorkerContainer extends PulsarContainer<WorkerContainer> { public class WorkerContainer extends PulsarContainer<WorkerContainer> {


public static final String NAME = "pulsar-worker";

public WorkerContainer(String clusterName, String hostname) { public WorkerContainer(String clusterName, String hostname) {
super( super(
clusterName, hostname, hostname, "bin/run-functions-worker.sh", -1, BROKER_HTTP_PORT); clusterName, hostname, hostname, "bin/run-functions-worker.sh", -1, BROKER_HTTP_PORT);
Expand Down
Expand Up @@ -23,7 +23,7 @@
/** /**
* A pulsar container that runs zookeeper. * A pulsar container that runs zookeeper.
*/ */
public class ZKContainer<SelfT extends ZKContainer<SelfT>> extends PulsarContainer<SelfT> { public class ZKContainer<SelfT extends PulsarContainer<SelfT>> extends PulsarContainer<SelfT> {


public static final String NAME = "zookeeper"; public static final String NAME = "zookeeper";


Expand All @@ -42,14 +42,14 @@ public ZKContainer(String clusterName) {
public ZKContainer(String clusterName, public ZKContainer(String clusterName,
String hostname, String hostname,
String serviceName, String serviceName,
String serviceEntrypoint, String serviceEntryPoint,
int servicePort, int servicePort,
int httpPort) { int httpPort) {
super( super(
clusterName, clusterName,
hostname, hostname,
serviceName, serviceName,
serviceEntrypoint, serviceEntryPoint,
servicePort, servicePort,
httpPort); httpPort);
} }
Expand Down
Expand Up @@ -21,14 +21,22 @@
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.tests.containers.PulsarContainer.CS_PORT; import static org.apache.pulsar.tests.containers.PulsarContainer.CS_PORT;


import com.github.dockerjava.api.command.CreateContainerCmd;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.containers.BKContainer; import org.apache.pulsar.tests.containers.BKContainer;
Expand All @@ -39,7 +47,9 @@
import org.apache.pulsar.tests.containers.WorkerContainer; import org.apache.pulsar.tests.containers.WorkerContainer;
import org.apache.pulsar.tests.containers.ZKContainer; import org.apache.pulsar.tests.containers.ZKContainer;
import org.testcontainers.containers.Container.ExecResult; import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network; import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;


/** /**
* Pulsar Cluster in containers. * Pulsar Cluster in containers.
Expand All @@ -61,6 +71,7 @@ public static PulsarCluster forSpec(PulsarClusterSpec spec) {
} }


private final PulsarClusterSpec spec; private final PulsarClusterSpec spec;

@Getter @Getter
private final String clusterName; private final String clusterName;
private final Network network; private final Network network;
Expand All @@ -72,10 +83,13 @@ public static PulsarCluster forSpec(PulsarClusterSpec spec) {
private final ProxyContainer proxyContainer; private final ProxyContainer proxyContainer;


private PulsarCluster(PulsarClusterSpec spec) { private PulsarCluster(PulsarClusterSpec spec) {

this.spec = spec; this.spec = spec;
this.clusterName = spec.clusterName(); this.clusterName = spec.clusterName();
this.network = Network.newNetwork(); this.network = Network.newNetwork();
this.zkContainer = (ZKContainer) new ZKContainer(clusterName)
this.zkContainer = new ZKContainer(clusterName);
this.zkContainer
.withNetwork(network) .withNetwork(network)
.withNetworkAliases(ZKContainer.NAME) .withNetworkAliases(ZKContainer.NAME)
.withEnv("clusterName", clusterName) .withEnv("clusterName", clusterName)
Expand All @@ -86,10 +100,12 @@ private PulsarCluster(PulsarClusterSpec spec) {
this.csContainer = new CSContainer(clusterName) this.csContainer = new CSContainer(clusterName)
.withNetwork(network) .withNetwork(network)
.withNetworkAliases(CSContainer.NAME); .withNetworkAliases(CSContainer.NAME);

this.bookieContainers = Maps.newTreeMap(); this.bookieContainers = Maps.newTreeMap();
this.brokerContainers = Maps.newTreeMap(); this.brokerContainers = Maps.newTreeMap();
this.workerContainers = Maps.newTreeMap(); this.workerContainers = Maps.newTreeMap();
this.proxyContainer = new ProxyContainer(clusterName, "pulsar-proxy")
this.proxyContainer = new ProxyContainer(clusterName, ProxyContainer.NAME)
.withNetwork(network) .withNetwork(network)
.withNetworkAliases("pulsar-proxy") .withNetworkAliases("pulsar-proxy")
.withEnv("zkServers", ZKContainer.NAME) .withEnv("zkServers", ZKContainer.NAME)
Expand Down Expand Up @@ -170,12 +186,14 @@ private static <T extends PulsarContainer> Map<String, T> runNumContainers(Strin
} }


public void stop() { public void stop() {
proxyContainer.stop();
workerContainers.values().forEach(WorkerContainer::stop); Stream<GenericContainer> list1 = Stream.of(proxyContainer, csContainer, zkContainer);
brokerContainers.values().forEach(BrokerContainer::stop); Stream<GenericContainer> list2 =
bookieContainers.values().forEach(BKContainer::stop); Stream.of(workerContainers.values(), brokerContainers.values(), bookieContainers.values())
csContainer.stop(); .flatMap(Collection::stream);
zkContainer.stop(); Stream<GenericContainer> list3 = Stream.concat(list1, list2);
list3.parallel().forEach(GenericContainer::stop);

try { try {
network.close(); network.close();
} catch (Exception e) { } catch (Exception e) {
Expand Down
Expand Up @@ -18,14 +18,18 @@
*/ */
package org.apache.pulsar.tests.topologies; package org.apache.pulsar.tests.topologies;


import java.util.concurrent.ThreadLocalRandom;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.testng.annotations.AfterClass; import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider; import org.testng.annotations.DataProvider;


import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Stream;

import static java.util.stream.Collectors.joining;

@Slf4j @Slf4j
public class PulsarClusterTestBase { public abstract class PulsarClusterTestBase {


@DataProvider(name = "ServiceUrlAndTopics") @DataProvider(name = "ServiceUrlAndTopics")
public static Object[][] serviceUrlAndTopics() { public static Object[][] serviceUrlAndTopics() {
Expand Down Expand Up @@ -56,19 +60,22 @@ public static Object[][] serviceUrls() {
protected static PulsarCluster pulsarCluster; protected static PulsarCluster pulsarCluster;


@BeforeClass @BeforeClass
public static void setupCluster() throws Exception { public void setupCluster() throws Exception {
StringBuilder sb = new StringBuilder(); this.setupCluster("");
for (int i = 0; i < 8; i++) { pulsarCluster.startFunctionWorkersWithProcessContainerFactory(1);
sb.append((char) (ThreadLocalRandom.current().nextInt(26) + 'a')); }
}
public void setupCluster(String namePrefix) throws Exception {
PulsarClusterSpec spec = PulsarClusterSpec.builder() PulsarClusterSpec spec = PulsarClusterSpec.builder()
.clusterName(sb.toString()) .clusterName(Stream.of(this.getClass().getSimpleName(), namePrefix, randomName(5))
.filter(s -> s != null && !s.isEmpty())
.collect(joining("-")))
.build(); .build();


setupCluster(spec); setupCluster(spec);
} }


protected static void setupCluster(PulsarClusterSpec spec) throws Exception { private void setupCluster(PulsarClusterSpec spec) throws Exception {
log.info("Setting up cluster {} with {} bookies, {} brokers", log.info("Setting up cluster {} with {} bookies, {} brokers",
spec.clusterName(), spec.numBookies(), spec.numBrokers()); spec.clusterName(), spec.numBookies(), spec.numBrokers());


Expand All @@ -79,14 +86,14 @@ protected static void setupCluster(PulsarClusterSpec spec) throws Exception {
} }


@AfterClass @AfterClass
public static void teardownCluster() { public void tearDownCluster() {
if (null != pulsarCluster) { if (null != pulsarCluster) {
pulsarCluster.stop(); pulsarCluster.stop();
} }
} }


protected static String randomName(int numChars) { protected static String randomName(int numChars) {
StringBuilder sb = new StringBuilder();;;; StringBuilder sb = new StringBuilder();
for (int i = 0; i < 8; i++) { for (int i = 0; i < 8; i++) {
sb.append((char) (ThreadLocalRandom.current().nextInt(26) + 'a')); sb.append((char) (ThreadLocalRandom.current().nextInt(26) + 'a'));
} }
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.tests.integration.functions; package org.apache.pulsar.tests.integration.functions;


import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.functions.runtime.PulsarFunctionsRuntimeTest;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime; import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
import org.apache.pulsar.tests.topologies.PulsarClusterTestBase; import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
import org.testcontainers.containers.Container.ExecResult; import org.testcontainers.containers.Container.ExecResult;
Expand Down Expand Up @@ -49,16 +50,6 @@ protected static String getExclamationClass(Runtime runtime) {
} }
} }


@BeforeClass
public static void setupCluster() throws Exception {
PulsarClusterTestBase.setupCluster();

pulsarCluster.startFunctionWorkersWithProcessContainerFactory(1);

ExecResult result = pulsarCluster.getAnyWorker().execCmd("cat", "/pulsar/conf/functions_worker.yml");
log.info("Functions Worker Config : \n{}", result.getStdout());
}

@DataProvider(name = "FunctionRuntimes") @DataProvider(name = "FunctionRuntimes")
public static Object[][] functionRuntimes() { public static Object[][] functionRuntimes() {
return new Object[][] { return new Object[][] {
Expand Down
Expand Up @@ -18,11 +18,24 @@
*/ */
package org.apache.pulsar.tests.integration.functions.runtime; package org.apache.pulsar.tests.integration.functions.runtime;


import lombok.extern.slf4j.Slf4j;
import org.testcontainers.containers.Container;
import org.testng.annotations.BeforeClass;

/** /**
* Run runtime tests in process mode. * Run runtime tests in process mode.
*/ */
@Slf4j
public class PulsarFunctionsProcessRuntimeTest extends PulsarFunctionsRuntimeTest { public class PulsarFunctionsProcessRuntimeTest extends PulsarFunctionsRuntimeTest {
public PulsarFunctionsProcessRuntimeTest() { public PulsarFunctionsProcessRuntimeTest() {
super(ContainerFactory.PROCESS); super(RuntimeFactory.PROCESS);
}

@BeforeClass
public void setupCluster() throws Exception {
super.setupCluster(RuntimeFactory.PROCESS.toString());
pulsarCluster.startFunctionWorkersWithProcessContainerFactory(1);
Container.ExecResult result = pulsarCluster.getAnyWorker().execCmd("cat", "/pulsar/conf/functions_worker.yml");
log.info("Functions Worker Config : \n{}", result.getStdout());
} }
} }
Expand Up @@ -40,15 +40,15 @@
*/ */
public abstract class PulsarFunctionsRuntimeTest extends PulsarFunctionsTestBase { public abstract class PulsarFunctionsRuntimeTest extends PulsarFunctionsTestBase {


protected enum ContainerFactory { public enum RuntimeFactory {
PROCESS, PROCESS,
THREAD THREAD
} }


private final ContainerFactory containerFactory; private final RuntimeFactory runtimeFactory;


public PulsarFunctionsRuntimeTest(ContainerFactory containerFactory) { public PulsarFunctionsRuntimeTest(RuntimeFactory runtimeFactory) {
this.containerFactory = containerFactory; this.runtimeFactory = runtimeFactory;
} }


// //
Expand All @@ -57,7 +57,7 @@ public PulsarFunctionsRuntimeTest(ContainerFactory containerFactory) {


@Test(dataProvider = "FunctionRuntimes") @Test(dataProvider = "FunctionRuntimes")
public void testExclamationFunction(Runtime runtime) throws Exception { public void testExclamationFunction(Runtime runtime) throws Exception {
if (ContainerFactory.THREAD == containerFactory && Runtime.PYTHON == runtime) { if (runtimeFactory == RuntimeFactory.THREAD && runtime == Runtime.PYTHON) {
// python can only run on process mode // python can only run on process mode
return; return;
} }
Expand Down
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.tests.integration.functions.runtime; package org.apache.pulsar.tests.integration.functions.runtime;


import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
import org.testcontainers.containers.Container.ExecResult; import org.testcontainers.containers.Container.ExecResult;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;


Expand All @@ -30,15 +29,13 @@
public class PulsarFunctionsThreadRuntimeTest extends PulsarFunctionsRuntimeTest { public class PulsarFunctionsThreadRuntimeTest extends PulsarFunctionsRuntimeTest {


public PulsarFunctionsThreadRuntimeTest() { public PulsarFunctionsThreadRuntimeTest() {
super(ContainerFactory.THREAD); super(RuntimeFactory.THREAD);
} }


@BeforeClass @BeforeClass
public static void setupCluster() throws Exception { public void setupCluster() throws Exception {
PulsarClusterTestBase.setupCluster(); super.setupCluster(RuntimeFactory.THREAD.toString());

pulsarCluster.startFunctionWorkersWithThreadContainerFactory(1); pulsarCluster.startFunctionWorkersWithThreadContainerFactory(1);

ExecResult result = pulsarCluster.getAnyWorker().execCmd("cat", "/pulsar/conf/functions_worker.yml"); ExecResult result = pulsarCluster.getAnyWorker().execCmd("cat", "/pulsar/conf/functions_worker.yml");
log.info("Functions Worker Config : \n{}", result.getStdout()); log.info("Functions Worker Config : \n{}", result.getStdout());
} }
Expand Down

0 comments on commit 99892d7

Please sign in to comment.