Skip to content

Commit

Permalink
Extend Spark Streaming Receiver UT Coverage (#1448)
Browse files Browse the repository at this point in the history
  • Loading branch information
erenavsarogullari authored and merlimat committed Mar 27, 2018
1 parent dafe123 commit b1fbd3a
Showing 1 changed file with 36 additions and 10 deletions.
Expand Up @@ -22,11 +22,12 @@
import static org.mockito.Mockito.doNothing;

import org.apache.pulsar.client.api.*;
import org.apache.spark.storage.StorageLevel;
import org.mockito.ArgumentCaptor;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

import org.apache.pulsar.spark.SparkStreamingPulsarReceiver;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand All @@ -35,6 +36,11 @@

public class SparkStreamingPulsarReceiverTest extends MockedPulsarServiceBaseTest {

private final String URL = "pulsar://127.0.0.1:" + BROKER_PORT + "/";
private static final String TOPIC = "persistent://p1/c1/ns1/topic1";
private static final String SUBS = "sub1";
private static final String EXPECTED_MESSAGE = "pulsar-spark test message";

@BeforeClass
@Override
protected void setup() throws Exception {
Expand All @@ -51,12 +57,9 @@ protected void cleanup() throws Exception {
public void testReceivedMessage() throws Exception {
ClientConfiguration clientConf = new ClientConfiguration();
ConsumerConfiguration consConf = new ConsumerConfiguration();
String url = "pulsar://127.0.0.1:" + BROKER_PORT + "/";
String topic = "persistent://p1/c1/ns1/topic1";
String subs = "sub1";

SparkStreamingPulsarReceiver receiver = spy(
new SparkStreamingPulsarReceiver(clientConf, consConf, url, topic, subs));
new SparkStreamingPulsarReceiver(clientConf, consConf, URL, TOPIC, SUBS));
MessageListener msgListener = spy(new MessageListener() {
@Override
public void received(Consumer consumer, Message msg) {
Expand All @@ -70,17 +73,40 @@ public void received(Consumer consumer, Message msg) {

receiver.onStart();
waitForTransmission();
PulsarClient pulsarClient = PulsarClient.create(url, clientConf);
Producer producer = pulsarClient.createProducer(topic, new ProducerConfiguration());
producer.send("pulsar-spark test message".getBytes());
PulsarClient pulsarClient = PulsarClient.create(URL, clientConf);
Producer producer = pulsarClient.createProducer(TOPIC, new ProducerConfiguration());
producer.send(EXPECTED_MESSAGE.getBytes());
waitForTransmission();
receiver.onStop();
assertEquals(new String(msgCaptor.getValue().getData()), "pulsar-spark test message");
assertEquals(new String(msgCaptor.getValue().getData()), EXPECTED_MESSAGE);
}

@Test
public void testDefaultSettingsOfReceiver() {
ClientConfiguration clientConf = new ClientConfiguration();
ConsumerConfiguration consConf = new ConsumerConfiguration();
SparkStreamingPulsarReceiver receiver =
new SparkStreamingPulsarReceiver(clientConf, consConf, URL, TOPIC, SUBS);
assertEquals(receiver.storageLevel(), StorageLevel.MEMORY_AND_DISK_2());
assertEquals(consConf.getAckTimeoutMillis(), 60_000);
assertNotNull(consConf.getMessageListener());
}

@Test(expectedExceptions = NullPointerException.class,
expectedExceptionsMessageRegExp = "ClientConfiguration must not be null")
public void testReceiverWhenClientConfigurationIsNull() {
new SparkStreamingPulsarReceiver(null, new ConsumerConfiguration(), URL, TOPIC, SUBS);
}

@Test(expectedExceptions = NullPointerException.class,
expectedExceptionsMessageRegExp = "ConsumerConfiguration must not be null")
public void testReceiverWhenConsumerConfigurationIsNull() {
new SparkStreamingPulsarReceiver(new ClientConfiguration(), null, URL, TOPIC, SUBS);
}

private static void waitForTransmission() {
try {
Thread.sleep(1000);
Thread.sleep(1_000);
} catch (InterruptedException e) {
}
}
Expand Down

0 comments on commit b1fbd3a

Please sign in to comment.