Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
APEXMALHAR-2120 #resolve #comment solve problems of KafkaInputOperato…
Browse files Browse the repository at this point in the history
…rTest and AbstractKafkaInputOperator
  • Loading branch information
brightchen committed Jun 16, 2016
1 parent d2f0586 commit 6f99fb2
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 87 deletions.
4 changes: 2 additions & 2 deletions kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
<version>0.9.0.1</version>
<optional>true</optional>
<exclusions>
<exclusion>
Expand All @@ -222,7 +222,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
<version>0.9.0.1</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ public void emitImmediately(Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Lon
if (meta.getTopicPartition().equals(tp)) {
kc.resume(tp);
} else {
try {
kc.position(tp);
} catch (NoOffsetForPartitionException e) {
kc.seekToBeginning(tp);
}
kc.pause(tp);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand All @@ -55,7 +54,7 @@
* A bunch of test to verify the input operator will be automatically partitioned per kafka partition This test is launching its
* own Kafka cluster.
*/
@Ignore
//@Ignore
@RunWith(Parameterized.class)
public class KafkaInputOperatorTest extends KafkaOperatorTestBase
{
Expand All @@ -64,19 +63,20 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase

private String partition = null;

private String testName = "";
private transient String testName = "";

public static String APPLICATION_PATH = baseDir + File.separator + StramLocalCluster.class.getName() + File.separator;

@Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}")
public static Collection<Object[]> testScenario()
{
return Arrays.asList(new Object[][]{{true, false, "one_to_one"},// multi cluster with single partition
return Arrays.asList(new Object[][]{
{true, false, "one_to_one"},// multi cluster with single partition
{true, false, "one_to_many"},
{true, true, "one_to_one"},// multi cluster with multi partitions
{true, true, "one_to_many"},
{true, true, "one_to_many"}, //test failed, no data received.
{false, true, "one_to_one"}, // single cluster with multi partitions
{false, true, "one_to_many"},
{false, true, "one_to_many"}, //test failed, no data received.
{false, false, "one_to_one"}, // single cluster with single partitions
{false, false, "one_to_many"}
});
Expand Down Expand Up @@ -107,10 +107,17 @@ public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition

private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
private static List<String> tupleCollection = new LinkedList<>();
private static Map<String, Set<String>> tupleCollectedInWindow = new HashMap<>();

private static final int scale = 2;
private static final int totalCount = 10 * scale;
private static final int failureTrigger = 3 * scale;
private static final int tuplesPerWindow = 5 * scale;
private static final int waitTime = 60000 + 300 * scale;

//This latch was used to count the END_TUPLE, but the order of tuple can't be guaranteed,
//so, count valid tuple instead.
private static CountDownLatch latch;
private static boolean hasFailure = false;
private static int failureTrigger = 3000;
private static int k = 0;

/**
Expand All @@ -120,8 +127,14 @@ public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition
*/
public static class CollectorModule extends BaseOperator
{

public final transient CollectorInputPort inputPort = new CollectorInputPort(this);
public final transient DefaultInputPort<byte[]> inputPort = new DefaultInputPort<byte[]>()
{
@Override
public void process(byte[] bt)
{
processTuple(bt);
}
};

long currentWindowId;

Expand All @@ -130,7 +143,9 @@ public static class CollectorModule extends BaseOperator
boolean isIdempotentTest = false;

transient Set<String> windowTupleCollector = new HashSet<>();

private transient Map<String, Set<String>> tupleCollectedInWindow = new HashMap<>();
private transient int endTupleCount = 0;

@Override
public void setup(Context.OperatorContext context)
{
Expand All @@ -143,8 +158,26 @@ public void beginWindow(long windowId)
{
super.beginWindow(windowId);
currentWindowId = windowId;
endTupleCount = 0;
}


public void processTuple(byte[] bt)
{
String tuple = new String(bt);
if (hasFailure && k++ == failureTrigger) {
//you can only kill yourself once
hasFailure = false;
throw new RuntimeException();
}
if (tuple.equals(KafkaOperatorTestBase.END_TUPLE)) {
endTupleCount++;
return;
}

windowTupleCollector.add(tuple);
}

@Override
public void endWindow()
{
Expand All @@ -160,42 +193,26 @@ public void endWindow()
tupleCollectedInWindow.put(key, newSet);
}
}
windowTupleCollector.clear();
}

}

public static class CollectorInputPort extends DefaultInputPort<byte[]>
{
CollectorModule ownerNode;

CollectorInputPort(CollectorModule node) {
this.ownerNode = node;
}

@Override
public void process(byte[] bt)
{
String tuple = new String(bt);
if (hasFailure && k++ == failureTrigger) {
//you can only kill yourself once
hasFailure = false;
throw new RuntimeException();
}
if (tuple.equals(KafkaOperatorTestBase.END_TUPLE)) {
if (latch != null) {
latch.countDown();
//discard the tuples of this window if except happened
int tupleSize = windowTupleCollector.size();
tupleCollection.addAll(windowTupleCollector);
windowTupleCollector.clear();

if (latch != null) {
while (tupleSize-- > 0) {
if (latch.getCount() == 0) {
logger.warn("Receive extra data; Total received tuple size: {}", tupleCollection.size());
} else {
latch.countDown();
}
}
return;
}
tupleCollection.add(tuple);
if (ownerNode.isIdempotentTest) {
ownerNode.windowTupleCollector.add(tuple);
}
}

}


/**
* Test AbstractKafkaSinglePortInputOperator (i.e. an input adapter for Kafka, aka consumer). This module receives
* data from an outside test generator through Kafka message bus and feed that data into Malhar streaming platform.
Expand Down Expand Up @@ -230,24 +247,29 @@ public void testIdempotentInputOperatorWithFailure() throws Exception

public void testInputOperator(boolean hasFailure, boolean idempotent) throws Exception
{

// each broker should get a END_TUPLE message
latch = new CountDownLatch(totalBrokers);

int totalCount = 10000;

// Start producer

// Start producer and generate tuples
KafkaTestProducer p = new KafkaTestProducer(testName, hasMultiPartition, hasMultiCluster);
p.setSendCount(totalCount);
Thread t = new Thread(p);
t.start();
p.run();
p.close();


// each broker should get a END_TUPLE message
latch = new CountDownLatch(totalCount);

k = 0;
tupleCollection.clear();

logger.info("Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {}; hasMultiPartition: {}, partition: {}",
testName, totalBrokers, hasFailure, hasMultiCluster, hasMultiPartition, partition);

// Create DAG for testing.
LocalMode lma = LocalMode.newInstance();
DAG dag = lma.getDAG();

// Create KafkaSinglePortStringInputOperator
KafkaSinglePortInputOperator node = dag.addOperator("Kafka input", KafkaSinglePortInputOperator.class);
KafkaSinglePortInputOperator node = dag.addOperator("Kafka input" + testName, KafkaSinglePortInputOperator.class);
node.setInitialPartitionCount(1);
// set topic
node.setTopics(testName);
Expand All @@ -264,8 +286,7 @@ public void testInputOperator(boolean hasFailure, boolean idempotent) throws Exc
collector.isIdempotentTest = idempotent;

// Connect ports
dag.addStream("Kafka message", node.outputPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);

dag.addStream("Kafka message"+ testName, node.outputPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);

if (hasFailure) {
setupHasFailureTest(node, dag);
Expand All @@ -276,9 +297,16 @@ public void testInputOperator(boolean hasFailure, boolean idempotent) throws Exc
lc.setHeartbeatMonitoringEnabled(false);

lc.runAsync();

// Wait 60s for consumer finish consuming all the messages
boolean notTimeout = latch.await(waitTime, TimeUnit.MILLISECONDS);
lc.shutdown();

logger.info("Number of emitted tuples: {}, testName: {}", tupleCollection.size(), testName);
Assert.assertTrue("TIMEOUT. testName: " + this.testName + "; Collected data: " + tupleCollection, notTimeout);

// Wait 30s for consumer finish consuming all the messages
boolean notTimeout = latch.await(40000, TimeUnit.MILLISECONDS);
//waitMillis(500);

Collections.sort(tupleCollection, new Comparator<String>()
{
@Override
Expand All @@ -287,23 +315,29 @@ public int compare(String o1, String o2)
return Integer.parseInt(o1.split("_")[1]) - Integer.parseInt(o2.split("_")[1]);
}
});
Assert.assertTrue("TIMEOUT: 40s Collected " + tupleCollection, notTimeout);

// Check results
Assert.assertTrue("Collected tuples " + tupleCollection + " Tuple count is not expected", totalCount <=+ tupleCollection.size());
logger.debug(String.format("Number of emitted tuples: %d", tupleCollection.size()));

t.join();
p.close();
lc.shutdown();
Assert.assertTrue( "testName: " + testName + "; Collected tuple size: " + tupleCollection.size() + "; Expected tuple size: " + totalCount + "; data: \n" + tupleCollection,
totalCount == tupleCollection.size());

logger.info("End of test case: {}", testName);
}

protected void waitMillis(long millis)
{
try {
Thread.sleep(millis);
} catch (Exception e) {
//ignore
}
}

private void setupHasFailureTest(KafkaSinglePortInputOperator operator, DAG dag)
{
operator.setHoldingBufferSize(5000);
dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
//dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent(APPLICATION_PATH + "failureck", new Configuration()));
operator.setMaxTuplesPerWindow(500);
operator.setMaxTuplesPerWindow(tuplesPerWindow);
}

private String getClusterConfig() {
Expand Down

0 comments on commit 6f99fb2

Please sign in to comment.