Skip to content

Commit

Permalink
[integration tests] add a smoketest for pulsar standalone (#2732)
Browse files Browse the repository at this point in the history
*Motivation*

Ideally we should run all integration tests on both cluster mode and standalone
mode. However the apache ci can't really afford to do so. so we run all the integration
tests on cluster mode. We only run basic validation and test new features (e.g. state)
on standalone.

*Changes*

Add PulsarStandalone related test base and suite and a simmple smoke test.
This would set the framework for integration tests for state
  • Loading branch information
sijie committed Oct 5, 2018
1 parent da83322 commit d26bea8
Show file tree
Hide file tree
Showing 9 changed files with 352 additions and 65 deletions.
Expand Up @@ -101,6 +101,8 @@ protected void configure() {
} }
} }


protected void beforeStart() {}

@Override @Override
public void start() { public void start() {
if (httpPort > 0 && servicePort < 0) { if (httpPort > 0 && servicePort < 0) {
Expand All @@ -118,7 +120,8 @@ public void start() {
createContainerCmd.withName(getContainerName()); createContainerCmd.withName(getContainerName());
createContainerCmd.withEntrypoint(serviceEntryPoint); createContainerCmd.withEntrypoint(serviceEntryPoint);
}); });


beforeStart();
super.start(); super.start();
log.info("Start pulsar service {} at container {}", serviceName, containerName); log.info("Start pulsar service {} at container {}", serviceName, containerName);
} }
Expand Down
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.containers;

import static java.time.temporal.ChronoUnit.SECONDS;

import java.time.Duration;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;

/**
* A pulsar container that runs standalone.
*/
public class StandaloneContainer extends PulsarContainer<StandaloneContainer> {

public static final String NAME = "standalone";

public StandaloneContainer(String clusterName) {
super(clusterName,
NAME,
NAME + "-cluster",
"bin/pulsar",
BROKER_PORT,
BROKER_HTTP_PORT);
}

@Override
protected void configure() {
super.configure();
setCommand("standalone");
}

@Override
protected void beforeStart() {
// update the wait strategy until public/default namespace is created
this.waitStrategy = new HttpWaitStrategy()
.forPort(BROKER_HTTP_PORT)
.forStatusCode(200)
.forPath("/admin/v2/namespaces/public/default")
.withStartupTimeout(Duration.of(300, SECONDS));
}

public String getPlainTextServiceUrl() {
return "pulsar://" + getContainerIpAddress() + ":" + getMappedPort(BROKER_PORT);
}

public String getHttpServiceUrl() {
return "http://" + getContainerIpAddress() + ":" + getMappedPort(BROKER_HTTP_PORT);
}
}
Expand Up @@ -41,50 +41,23 @@
import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase; import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import org.testng.collections.Lists; import org.testng.collections.Lists;


/** /**
* Test pulsar produce/consume semantics * Test pulsar produce/consume semantics
*/ */
@Slf4j @Slf4j
public class SemanticsTest extends PulsarClusterTestBase { public class SemanticsTest extends PulsarTestSuite {


// //
// Test Basic Publish & Consume Operations // Test Basic Publish & Consume Operations
// //


@Test(dataProvider = "ServiceUrlAndTopics") @Test(dataProvider = "ServiceUrlAndTopics")
public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception { public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception {
String topicName = generateTopicName("testpubconsume", isPersistent); super.testPublishAndConsume(serviceUrl, isPersistent);

int numMessages = 10;

try (PulsarClient client = PulsarClient.builder()
.serviceUrl(serviceUrl)
.build()) {

try (Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("my-sub")
.subscribe()) {

try (Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topicName)
.create()) {

for (int i = 0; i < numMessages; i++) {
producer.send("smoke-message-" + i);
}
}

for (int i = 0; i < numMessages; i++) {
Message<String> m = consumer.receive();
assertEquals("smoke-message-" + i, m.getValue());
}
}
}
} }


@Test(dataProvider = "ServiceUrls") @Test(dataProvider = "ServiceUrls")
Expand Down
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.standalone;

import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
import org.testng.annotations.Test;


public class SmokeTest extends PulsarStandaloneTestSuite {

@Test(dataProvider = "StandaloneServiceUrlAndTopics")
public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception {
super.testPublishAndConsume(serviceUrl, isPersistent);
}

}
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.suites;

import org.apache.pulsar.tests.integration.topologies.PulsarStandaloneTestBase;
import org.testng.ITest;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;

public class PulsarStandaloneTestSuite extends PulsarStandaloneTestBase implements ITest {

@BeforeSuite
public void setUpCluster() throws Exception {
super.startCluster();
}

@AfterSuite
public void tearDownCluster() throws Exception {
super.stopCluster();
}


@Override
public String getTestName() {
return "pulsar-standalone-suite";
}
}
Expand Up @@ -21,13 +21,12 @@
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.testng.annotations.DataProvider; import org.testng.annotations.DataProvider;


import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Stream; import java.util.stream.Stream;


import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.joining;


@Slf4j @Slf4j
public abstract class PulsarClusterTestBase { public abstract class PulsarClusterTestBase extends PulsarTestBase {


@DataProvider(name = "ServiceUrlAndTopics") @DataProvider(name = "ServiceUrlAndTopics")
public static Object[][] serviceUrlAndTopics() { public static Object[][] serviceUrlAndTopics() {
Expand Down Expand Up @@ -112,36 +111,4 @@ public void tearDownCluster() {
} }
} }


public static String randomName(int numChars) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < numChars; i++) {
sb.append((char) (ThreadLocalRandom.current().nextInt(26) + 'a'));
}
return sb.toString();
}

protected static String generateNamespaceName() {
return "ns-" + randomName(8);
}

protected static String generateTopicName(String topicPrefix, boolean isPersistent) {
return generateTopicName("default", topicPrefix, isPersistent);
}

protected static String generateTopicName(String namespace, String topicPrefix, boolean isPersistent) {
String topicName = new StringBuilder(topicPrefix)
.append("-")
.append(randomName(8))
.append("-")
.append(System.currentTimeMillis())
.toString();
if (isPersistent) {
return "persistent://public/" + namespace + "/" + topicName;
} else {
return "non-persistent://public/" + namespace + "/" + topicName;
}
}



} }
@@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.topologies;

import static org.testng.Assert.assertEquals;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.containers.StandaloneContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.testcontainers.containers.Network;
import org.testng.annotations.DataProvider;

/**
* A test base to run tests on standalone cluster.
*
* <p>Ideally we should run all integration tests on both cluster mode and standalone
* mode. However the apache ci can't really afford to do so. so we run all the integration
* tests on cluster mode. We only run basic validation and test new features (e.g. state)
* on standalone.
*/
@Slf4j
public abstract class PulsarStandaloneTestBase extends PulsarTestBase {

@DataProvider(name = "StandaloneServiceUrlAndTopics")
public static Object[][] serviceUrlAndTopics() {
return new Object[][] {
// plain text, persistent topic
{
container.getPlainTextServiceUrl(),
true,
},
// plain text, non-persistent topic
{
container.getPlainTextServiceUrl(),
false
}
};
}

protected static Network network;
protected static StandaloneContainer container;

protected void startCluster() throws Exception {
network = Network.newNetwork();
String clusterName = PulsarClusterTestBase.randomName(8);
container = new StandaloneContainer(clusterName)
.withNetwork(network)
.withNetworkAliases(StandaloneContainer.NAME + "-" + clusterName);
container.tailContainerLog();
container.start();
log.info("Pulsar cluster {} is up running:", clusterName);
log.info("\tBinary Service Url : {}", container.getPlainTextServiceUrl());
log.info("\tHttp Service Url : {}", container.getHttpServiceUrl());

// add cluster to public tenant
ContainerExecResult result = container.execCmd(
"/pulsar/bin/pulsar-admin", "namespaces", "policies", "public/default");
assertEquals(0, result.getExitCode());
log.info("public/default namespace policies are {}", result.getStdout());
}

protected void stopCluster() throws Exception {
container.stop();
network.close();
}

}

0 comments on commit d26bea8

Please sign in to comment.