From 0444ced6c857c1dbbbbb8821f25ad367c0b47012 Mon Sep 17 00:00:00 2001 From: sandeshh Date: Wed, 25 May 2016 08:56:56 -0700 Subject: [PATCH] Kafka 0.9.0 output operators and unit tests. 1. Abstract Base class 2. Kafka Output operator 3. Exactly Once output operator Key in the Kafka message is used by the operator to track the tuples written by it. --- .../kafka/AbstractKafkaOutputOperator.java | 122 ++++++ ...kaSinglePortExactlyOnceOutputOperator.java | 401 ++++++++++++++++++ .../kafka/KafkaSinglePortOutputOperator.java | 43 ++ .../malhar/kafka/KafkaOutputOperatorTest.java | 375 ++++++++++++++++ kafka/src/test/resources/log4j.properties | 5 +- 5 files changed, 944 insertions(+), 2 deletions(-) create mode 100644 kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java create mode 100644 kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java create mode 100644 kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java create mode 100644 kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java new file mode 100644 index 0000000000..56d9611dd4 --- /dev/null +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.kafka; + +import java.util.Properties; +import javax.validation.constraints.NotNull; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import com.datatorrent.api.Context; +import com.datatorrent.api.Operator; + +/** + * This is the base implementation of a Kafka output operator(0.9.0), which writes data to the Kafka message bus. + * + * @displayName Abstract Kafka Output + * @category Messaging + * @tags output operator + * + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public abstract class AbstractKafkaOutputOperator implements Operator +{ + private transient Producer producer; + @NotNull + private String topic; + private Properties properties = new Properties(); + + @Override + public void setup(Context.OperatorContext context) + { + producer = new KafkaProducer(properties); + } + + /** + * Implement Component Interface. + */ + @Override + public void teardown() + { + producer.close(); + } + + /** + * Implement Operator Interface. + */ + @Override + public void beginWindow(long windowId) + { + } + + /** + * Implement Operator Interface. + */ + @Override + public void endWindow() + { + } + + public Properties getProperties() + { + return properties; + } + + /** + * Set the Kafka producer properties. + * + * @param properties Producer properties + */ + public void setProperties(Properties properties) + { + this.properties.putAll(properties); + } + + /** + * Set the Kafka producer property. + * + * @param key Producer Property name + * @param val Producer Property value + */ + public void setProperty(Object key, Object val) + { + properties.put(key,val); + } + + public String getTopic() + { + return topic; + } + + /** + * Set the Kafka topic + * @param topic Kafka topic for which the data is sent + */ + public void setTopic(String topic) + { + this.topic = topic; + } + + protected Producer getProducer() + { + return producer; + } +} + diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java new file mode 100644 index 0000000000..09ae1cbe1b --- /dev/null +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java @@ -0,0 +1,401 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.kafka; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.Operator; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; + +/** + * Kafka output operator with exactly once processing semantics. + *
+ * + *

+ * Recovery handling + *

  • Offsets of the Kafka partitions are stored in the WindowDataManager at the endWindow
  • + *
  • During recovery, + *
      + *
    • Partially written Streaming Window before the crash is constructed. ( Explained below )
    • + *
    • Tuples from the completed Streaming Window's are skipped
    • + *
    • Tuples coming for the partially written Streaming Window are skipped. + * (No assumption is made on the order and the uniqueness of the tuples)
    • + *
    + *
  • + *

    + * + *

    + * Partial Window Construction + *

  • Operator uses the Key in the Kafka message, which is not available for use by the operator users.
  • + *
  • Key is used to uniquely identify the message written by the particular instance of this operator.
  • + * This allows multiple writers to same Kafka partitions. Format of the key is "APPLICATTION_ID#OPERATOR_ID". + *
  • During recovery Kafka partitions are read between the latest offset and the last written offsets.
  • + *
  • All the tuples written by the particular instance is kept in the Map
  • + *

    + * + *

    + * Limitations + *

  • Key in the Kafka message is reserved for Operator's use
  • + *
  • During recovery, operator needs to read tuples between 2 offsets, if there are lot of data to be read, Operator may + * appear to be blocked to the Stram and can kill the operator.
  • + *

    + * + * @displayName Kafka Single Port Exactly Once Output(0.9.0) + * @category Messaging + * @tags output operator + * + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator + implements Operator.CheckpointNotificationListener +{ + private transient String key; + private transient String appName; + private transient Integer operatorId; + private transient Long windowId; + private transient Map partialWindowTuples = new HashMap<>(); + private transient KafkaConsumer consumer; + + private WindowDataManager windowDataManager = new FSWindowDataManager(); + private final int KAFKA_CONNECT_ATTEMPT = 10; + private final String KEY_SEPARATOR = "#"; + + private final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; + private final String VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; + + public final transient DefaultInputPort inputPort = new DefaultInputPort() + { + @Override + public void process(T tuple) + { + sendTuple(tuple); + } + }; + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + + this.operatorId = context.getId(); + this.windowDataManager.setup(context); + this.appName = context.getValue(Context.DAGContext.APPLICATION_NAME); + this.key = appName + KEY_SEPARATOR + (new Integer(operatorId)); + this.consumer = KafkaConsumerInit(); + } + + @Override + public void beginWindow(long windowId) + { + this.windowId = windowId; + + if (windowId == windowDataManager.getLargestRecoveryWindow()) { + rebuildPartialWindow(); + } + } + + @Override + public void checkpointed(long windowId) + { + } + + @Override + public void committed(long windowId) + { + try { + windowDataManager.deleteUpTo(operatorId, windowId); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void beforeCheckpoint(long windowId) + { + } + + @Override + public void teardown() + { + consumer.close(); + super.teardown(); + } + + @Override + public void endWindow() + { + if (!partialWindowTuples.isEmpty() && windowId > windowDataManager.getLargestRecoveryWindow()) { + throw new RuntimeException("Violates Exactly once. Not all the tuples received after operator reset."); + } + + // Every tuples should be written before the offsets are stored in the window data manager. + getProducer().flush(); + + try { + this.windowDataManager.save(getPartitionsAndOffsets(true), operatorId, windowId); + } catch (IOException | InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + public WindowDataManager getWindowDataManager() + { + return windowDataManager; + } + + public void setWindowDataManager(WindowDataManager windowDataManager) + { + this.windowDataManager = windowDataManager; + } + + private boolean doesKeyBelongsToThisInstance(int operatorId, String key) + { + String[] split = key.split(KEY_SEPARATOR); + + if (split.length != 2) { + return false; + } + + if ((Integer.parseInt(split[1]) == operatorId) && (split[0].equals(appName))) { + return true; + } + + return false; + } + + private boolean alreadyInKafka(T message) + { + if ( windowId <= windowDataManager.getLargestRecoveryWindow() ) { + return true; + } + + if (partialWindowTuples.containsKey(message)) { + + Integer val = partialWindowTuples.get(message); + + if ( val == 0 ) { + return false; + } else if ( val == 1 ) { + partialWindowTuples.remove(message); + } else { + partialWindowTuples.put(message, val - 1); + } + + return true; + } + + return false; + } + + private Map getPartitionsAndOffsets(boolean latest) throws ExecutionException, InterruptedException + { + List partitionInfoList = consumer.partitionsFor(getTopic()); + List topicPartitionList = new java.util.ArrayList<>(); + + for ( PartitionInfo partitionInfo: partitionInfoList) { + topicPartitionList.add(new TopicPartition(getTopic(), partitionInfo.partition()) ); + } + + Map parttionsAndOffset = new HashMap<>(); + consumer.assign(topicPartitionList); + + for ( PartitionInfo partitionInfo: partitionInfoList) { + + try { + + TopicPartition topicPartition = new TopicPartition(getTopic(), partitionInfo.partition()); + if (latest) { + consumer.seekToEnd(topicPartition); + } else { + consumer.seekToBeginning(topicPartition); + } + parttionsAndOffset.put(partitionInfo.partition(), consumer.position(topicPartition)); + + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + return parttionsAndOffset; + } + + private void rebuildPartialWindow() + { + logger.info("Rebuild the partial window after " + windowDataManager.getLargestRecoveryWindow()); + + Map storedOffsets; + Map currentOffsets; + + try { + storedOffsets = (Map)this.windowDataManager.load(operatorId, windowId); + currentOffsets = getPartitionsAndOffsets(true); + } catch (IOException | ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + + if (currentOffsets == null) { + logger.debug("No tuples found while building partial window " + windowDataManager.getLargestRecoveryWindow()); + return; + } + + if (storedOffsets == null) { + + logger.debug("Stored offset not available, seeking to the beginning of the Kafka Partition."); + + try { + storedOffsets = getPartitionsAndOffsets(false); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + List topicPartitions = new ArrayList<>(); + + for (Map.Entry entry: currentOffsets.entrySet()) { + + topicPartitions.add(new TopicPartition(getTopic(), entry.getKey())); + } + + consumer.assign(topicPartitions); + + for (Map.Entry entry: currentOffsets.entrySet()) { + + Long storedOffset = 0L; + Integer currentPartition = entry.getKey(); + Long currentOffset = entry.getValue(); + + if (storedOffsets.containsKey(currentPartition)) { + storedOffset = storedOffsets.get(currentPartition); + } + + if (storedOffset >= currentOffset) { + continue; + } + + try { + consumer.seek(new TopicPartition(getTopic(), currentPartition), storedOffset); + } catch (Exception ex) { + logger.info("Rebuilding of the partial window is not complete, exactly once recovery is not possible."); + throw new RuntimeException(ex); + } + + int kafkaAttempt = 0; + + while ( true ) { + + ConsumerRecords consumerRecords = consumer.poll(100); + + if (consumerRecords.count() == 0) { + if (kafkaAttempt++ == KAFKA_CONNECT_ATTEMPT) { + break; + } + } else { + kafkaAttempt = 0; + } + + boolean crossedBoundary = false; + + for (ConsumerRecord consumerRecord : consumerRecords) { + + if (!doesKeyBelongsToThisInstance(operatorId, (String)consumerRecord.key())) { + continue; + } + + T value = (T)consumerRecord.value(); + + if ( partialWindowTuples.containsKey(value)) { + Integer count = partialWindowTuples.get(value); + partialWindowTuples.put(value, count + 1); + } else { + partialWindowTuples.put(value, 1); + } + + if (consumerRecord.offset() >= currentOffset) { + crossedBoundary = true; + break; + } + } + + if (crossedBoundary) { + break; + } + } + } + } + + private KafkaConsumer KafkaConsumerInit() + { + Properties props = new Properties(); + + props.put(BOOTSTRAP_SERVERS_CONFIG, getProperties().get(BOOTSTRAP_SERVERS_CONFIG)); + props.put(KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER); + props.put(VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER); + + return new KafkaConsumer<>(props); + } + + protected void sendTuple(T tuple) + { + if ( alreadyInKafka(tuple) ) { + return; + } + + getProducer().send(new ProducerRecord<>(getTopic(), key, tuple),new Callback() + { + public void onCompletion(RecordMetadata metadata, Exception e) + { + if (e != null) { + logger.info("Wrting to Kafka failed with an exception {}" + e.getMessage()); + throw new RuntimeException(e); + } + } + }); + } + + private static final Logger logger = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); +} + diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java new file mode 100644 index 0000000000..a08c7a2685 --- /dev/null +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.kafka; + +import org.apache.kafka.clients.producer.ProducerRecord; +import com.datatorrent.api.DefaultInputPort; + +/** + * Kafka output operator with single input port (inputPort). + * It supports atleast once processing guarantees + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class KafkaSinglePortOutputOperator extends AbstractKafkaOutputOperator +{ + /** + * This input port receives tuples that will be written out to Kafka. + */ + public final transient DefaultInputPort inputPort = new DefaultInputPort() + { + @Override + public void process(V tuple) + { + getProducer().send(new ProducerRecord(getTopic(),tuple)); + } + }; +} diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java new file mode 100644 index 0000000000..db27be02c0 --- /dev/null +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java @@ -0,0 +1,375 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.kafka; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.Operator; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.stram.StramLocalCluster; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.commons.io.FileUtils; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; + +@Ignore +public class KafkaOutputOperatorTest extends KafkaOperatorTestBase +{ + String testName; + private static List tupleCollection = new LinkedList<>(); + private final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; + private final String VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; + private final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + private final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + + public static String APPLICATION_PATH = baseDir + File.separator + StramLocalCluster.class.getName() + File.separator; + + @Before + public void before() + { + FileUtils.deleteQuietly(new File(APPLICATION_PATH)); + testName = TEST_TOPIC + testCounter++; + createTopic(0, testName); + if (hasMultiCluster) { + createTopic(1, testName); + } + } + + @Test + public void testExactlyOnceWithFailure() throws Exception + { + List toKafka = GenerateList(); + + sendDataToKafka(true, toKafka, true, false); + + List fromKafka = ReadFromKafka(); + + Assert.assertTrue("With Failure", compare(fromKafka, toKafka)); + } + + @Test + public void testExactlyOnceWithNoFailure() throws Exception + { + List toKafka = GenerateList(); + + sendDataToKafka(true, toKafka, false, false); + + List fromKafka = ReadFromKafka(); + + Assert.assertTrue("With No Failure", compare(fromKafka, toKafka)); + } + + @Test + public void testExactlyOnceWithDifferentTuplesAfterRecovery() throws Exception + { + List toKafka = GenerateList(); + + try { + sendDataToKafka(true, toKafka, true, true); + } catch (RuntimeException ex) { + + boolean expectedException = false; + if ( ex.getMessage().contains("Violates")) { + expectedException = true; + } + + Assert.assertTrue("Different tuples after recovery", expectedException); + return; + } + + Assert.assertTrue("Wrong tuples during replay, should throw exception", false); + } + + @Test + public void testKafkaOutput() throws Exception + { + List toKafka = GenerateList(); + + sendDataToKafka(false, toKafka, false, false); + + List fromKafka = ReadFromKafka(); + + Assert.assertTrue("No failure", compare(fromKafka, toKafka)); + } + + @Test + public void testKafkaOutputWithFailure() throws Exception + { + List toKafka = GenerateList(); + + sendDataToKafka(false, toKafka, true, true); + + List fromKafka = ReadFromKafka(); + + Assert.assertTrue("No failure", fromKafka.size() > toKafka.size()); + } + + private void sendDataToKafka(boolean exactlyOnce, List toKafka, boolean hasFailure, boolean differentTuplesAfterRecovery) throws InterruptedException + { + Properties props = new Properties(); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getClusterConfig()); + + Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(Context.DAGContext.APPLICATION_NAME, "MyKafkaApp"); + + OperatorContextTestHelper.TestIdOperatorContext operatorContext = new OperatorContextTestHelper.TestIdOperatorContext(2, attributeMap); + + cleanUp(operatorContext); + + Operator kafkaOutput; + DefaultInputPort inputPort; + + if ( exactlyOnce ) { + KafkaSinglePortExactlyOnceOutputOperator kafkaOutputTemp = ResetKafkaOutput(testName, props, operatorContext); + inputPort = kafkaOutputTemp.inputPort; + kafkaOutput = kafkaOutputTemp; + } else { + KafkaSinglePortOutputOperator kafkaOutputTemp = ResetKafkaSimpleOutput(testName, props, operatorContext); + inputPort = kafkaOutputTemp.inputPort; + kafkaOutput = kafkaOutputTemp; + } + + kafkaOutput.beginWindow(1); + inputPort.getSink().put(toKafka.get(0)); + inputPort.getSink().put(toKafka.get(1)); + inputPort.getSink().put(toKafka.get(2)); + kafkaOutput.endWindow(); + kafkaOutput.beginWindow(2); + inputPort.getSink().put(toKafka.get(3)); + inputPort.getSink().put(toKafka.get(4)); + inputPort.getSink().put(toKafka.get(5)); + kafkaOutput.endWindow(); + kafkaOutput.beginWindow(3); + inputPort.getSink().put(toKafka.get(6)); + inputPort.getSink().put(toKafka.get(7)); + + if ( hasFailure ) { + + if ( exactlyOnce ) { + KafkaSinglePortExactlyOnceOutputOperator kafkaOutputTemp = ResetKafkaOutput(testName, props, operatorContext); + inputPort = kafkaOutputTemp.inputPort; + kafkaOutput = kafkaOutputTemp; + } else { + KafkaSinglePortOutputOperator kafkaOutputTemp = ResetKafkaSimpleOutput(testName, props, operatorContext); + inputPort = kafkaOutputTemp.inputPort; + kafkaOutput = kafkaOutputTemp; + } + + kafkaOutput.beginWindow(2); + inputPort.getSink().put(toKafka.get(3)); + inputPort.getSink().put(toKafka.get(4)); + inputPort.getSink().put(toKafka.get(5)); + kafkaOutput.endWindow(); + kafkaOutput.beginWindow(3); + inputPort.getSink().put(toKafka.get(6)); + + if (!differentTuplesAfterRecovery) { + inputPort.getSink().put(toKafka.get(7)); + } + } + + inputPort.getSink().put(toKafka.get(8)); + inputPort.getSink().put(toKafka.get(9)); + kafkaOutput.endWindow(); + kafkaOutput.beginWindow(4); + inputPort.getSink().put(toKafka.get(10)); + inputPort.getSink().put(toKafka.get(11)); + kafkaOutput.endWindow(); + + cleanUp(operatorContext); + } + + private KafkaSinglePortExactlyOnceOutputOperator ResetKafkaOutput(String testName, Properties props, Context.OperatorContext operatorContext) + { + KafkaSinglePortExactlyOnceOutputOperator kafkaOutput = new KafkaSinglePortExactlyOnceOutputOperator<>(); + kafkaOutput.setTopic(testName); + kafkaOutput.setProperties(props); + kafkaOutput.setup(operatorContext); + + return kafkaOutput; + } + + private KafkaSinglePortOutputOperator ResetKafkaSimpleOutput(String testName, Properties props, Context.OperatorContext operatorContext) + { + KafkaSinglePortOutputOperator kafkaOutput = new KafkaSinglePortOutputOperator<>(); + kafkaOutput.setTopic(testName); + kafkaOutput.setProperties(props); + kafkaOutput.setup(operatorContext); + + return kafkaOutput; + } + + private void cleanUp(Context.OperatorContext operatorContext) + { + WindowDataManager windowDataManager = new FSWindowDataManager(); + windowDataManager.setup(operatorContext); + try { + windowDataManager.deleteUpTo(operatorContext.getId(),windowDataManager.getLargestRecoveryWindow()); + } catch (IOException e) { + e.printStackTrace(); + } + } + + private boolean compare(List fromKafka, List toKafka) + { + if (fromKafka.size() != toKafka.size()) { + return false; + } + + for (int i = 0; i < fromKafka.size(); ++i) { + if ( !fromKafka.get(i).equals(toKafka.get(i))) { + return false; + } + } + + return true; + } + + private String getClusterConfig() + { + String l = "localhost:"; + return l + TEST_KAFKA_BROKER_PORT[0][0] + + (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") + + (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") + + (hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : ""); + } + + private List GenerateList() + { + List strings = new ArrayList<>(); + + for (Integer i = 0; i < 12; ++i) { + + strings.add(i.toString()); + } + + return strings; + } + + public List ReadFromKafka() + { + tupleCollection.clear(); + + // Create KafkaSinglePortStringInputOperator + Properties props = new Properties(); + props.put(BOOTSTRAP_SERVERS_CONFIG, getClusterConfig()); + props.put(BOOTSTRAP_SERVERS_CONFIG, getClusterConfig()); + props.put(KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER); + props.put(VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER); + props.put(GROUP_ID_CONFIG, "KafkaTest"); + + + LocalMode lma = LocalMode.newInstance(); + DAG dag = lma.getDAG(); + + // Create KafkaSinglePortStringInputOperator + KafkaSinglePortInputOperator node = dag.addOperator("Kafka input", KafkaSinglePortInputOperator.class); + node.setConsumerProps(props); + node.setInitialPartitionCount(1); + // set topic + node.setTopics(testName); + node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name()); + node.setClusters(getClusterConfig()); + node.setStrategy("one_to_one"); + + // Create Test tuple collector + CollectorModule collector1 = dag.addOperator("collector", new CollectorModule()); + + // Connect ports + dag.addStream("Kafka message", node.outputPort, collector1.inputPort); + + + // Create local cluster + final LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(false); + + lc.run(30000); + + return tupleCollection; + } + + public static class CollectorModule extends BaseOperator + { + + public final transient CollectorInputPort inputPort = new CollectorInputPort(this); + + long currentWindowId; + long operatorId; + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + operatorId = context.getId(); + } + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + currentWindowId = windowId; + } + + @Override + public void endWindow() + { + super.endWindow(); + } + + } + + public static class CollectorInputPort extends DefaultInputPort + { + CollectorModule ownerNode; + + CollectorInputPort(CollectorModule node) + { + this.ownerNode = node; + } + + @Override + public void process(byte[] bt) + { + String tuple = new String(bt); + tupleCollection.add(tuple); + } + } +} diff --git a/kafka/src/test/resources/log4j.properties b/kafka/src/test/resources/log4j.properties index c115950b06..910e44a8b2 100644 --- a/kafka/src/test/resources/log4j.properties +++ b/kafka/src/test/resources/log4j.properties @@ -38,12 +38,13 @@ log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n log4j.appender.SYSLOG.Facility=LOCAL1 -log4j.logger.org=INFO +#log4j.logger.org=INFO #log4j.logger.org.apache.commons.beanutils=warn log4j.logger.com.datatorrent=INFO log4j.logger.org.apache.apex=INFO -log4j.logger.org.apacke.kafka=WARN +log4j.logger.org.apache.kafka=WARN log4j.logger.kafka.consumer=WARN log4j.logger.kafka=WARN +log4j.logger.org.apache.zookeeper=WARN