Skip to content
Permalink
Browse files
end to end tests use unique topic/regions, incase of cyclical/endless…
… looping of events
  • Loading branch information
jhuynh1 committed Jan 24, 2020
1 parent a570805 commit 04bdfc9e4aa3abd336ff98e47d832367de851291
Showing 2 changed files with 25 additions and 19 deletions.
@@ -33,14 +33,13 @@

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

public class GeodeKafkaTestCluster {
@@ -49,8 +48,11 @@ public class GeodeKafkaTestCluster {
public static TemporaryFolder temporaryFolder = new TemporaryFolder();
private static boolean debug = true;

public static String TEST_TOPICS = "someTopic";
public static String TEST_REGIONS = "someRegion";
public static String TEST_TOPIC = "someTopic";
public static String TEST_REGION = "someRegion";

public static String TEST_TOPIC_FOR_SINK = "someTopicForSink";
public static String TEST_REGION_FOR_SINK = "someTopicForSink";

private static ZooKeeperLocalCluster zooKeeperLocalCluster;
private static KafkaLocalCluster kafkaLocalCluster;
@@ -83,7 +85,8 @@ public static void shutdown() {
KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000,
15000,10, Time.SYSTEM, "myGroup","myMetricType", null);
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
adminZkClient.deleteTopic(TEST_TOPICS);
adminZkClient.deleteTopic(TEST_TOPIC);
adminZkClient.deleteTopic(TEST_TOPIC_FOR_SINK);

kafkaLocalCluster.stop();
geodeLocalCluster.stop();
@@ -103,7 +106,9 @@ private static void createTopic() {
Properties topicProperties = new Properties();
topicProperties.put("flush.messages", "1");
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
adminZkClient.createTopic(TEST_TOPICS,1
adminZkClient.createTopic(TEST_TOPIC,1
,1, topicProperties, RackAwareMode.Disabled$.MODULE$);
adminZkClient.createTopic(TEST_TOPIC_FOR_SINK,1
,1, topicProperties, RackAwareMode.Disabled$.MODULE$);
}

@@ -174,7 +179,7 @@ public static Consumer<String,String> createConsumer() {
final Consumer<String, String> consumer =
new KafkaConsumer<>(props);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(TEST_TOPICS));
consumer.subscribe(Collections.singletonList(TEST_TOPIC));
return consumer;
}

@@ -196,33 +201,32 @@ public static Producer<String,String> createProducer() {
@Test
public void endToEndSourceTest() {
ClientCache client = createGeodeClient();
Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGIONS);
Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION);

//right now just verify something makes it end to end
AtomicInteger valueReceived = new AtomicInteger(0);
await().atMost(10, TimeUnit.SECONDS).until(() -> {
region.put("KEY", "VALUE" + System.currentTimeMillis());
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(4));
for (ConsumerRecord<String, String> record: records) {
// System.out.println("WE consumed a record:" + record);
valueReceived.incrementAndGet();
}
return valueReceived.get() > 0;
return valueReceived.get() == 10;
});
}

@Test
public void endToEndSinkTest() {
ClientCache client = createGeodeClient();
Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGIONS);
Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK);

Producer<String, String> producer = createProducer();
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord(TEST_TOPICS, "KEY" + i, "VALUE" + i));
producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i, "VALUE" + i));
}

int i = 0;
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertNotNull(region.get("KEY" + i)));
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
}

}
@@ -20,8 +20,10 @@

import static geode.kafka.GeodeConnectorConfig.REGIONS;
import static geode.kafka.GeodeConnectorConfig.TOPICS;
import static geode.kafka.GeodeKafkaTestCluster.TEST_REGIONS;
import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPICS;
import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION;
import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SINK;
import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC;
import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK;

public class WorkerAndHerderWrapper {

@@ -57,8 +59,8 @@ public static void main(String[] args) throws IOException {
sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName());
sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector");
sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
sourceProps.put(REGIONS, TEST_REGIONS);
sourceProps.put(TOPICS, TEST_TOPICS);
sourceProps.put(REGIONS, TEST_REGION);
sourceProps.put(TOPICS, TEST_TOPIC);

herder.putConnectorConfig(
sourceProps.get(ConnectorConfig.NAME_CONFIG),
@@ -69,8 +71,8 @@ public static void main(String[] args) throws IOException {
sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSink.class.getName());
sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector");
sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
sinkProps.put(REGIONS, TEST_REGIONS);
sinkProps.put(TOPICS, TEST_TOPICS);
sinkProps.put(REGIONS, TEST_REGION_FOR_SINK);
sinkProps.put(TOPICS, TEST_TOPIC_FOR_SINK);

herder.putConnectorConfig(
sinkProps.get(ConnectorConfig.NAME_CONFIG),

0 comments on commit 04bdfc9

Please sign in to comment.