From b45b6fdf66acdefe65a7fd0513e256460a5eb10f Mon Sep 17 00:00:00 2001 From: brightchen Date: Wed, 25 May 2016 13:01:26 -0700 Subject: [PATCH 1/2] APEXMALHAR-2076 #resolve #comment add AbstractTupleUniqueExactlyOnceKafkaOutputOperator --- .../kafka/AbstractKafkaOutputOperator.java | 2 +- ...eUniqueExactlyOnceKafkaOutputOperator.java | 612 ++++++++++++++++++ .../contrib/kafka/KafkaMetadataUtil.java | 95 ++- .../datatorrent/contrib/kafka/KafkaUtil.java | 358 ++++++++++ ...leUniqueExactlyOnceOutputOperatorTest.java | 512 +++++++++++++++ .../contrib/kafka/KafkaUtilTester.java | 128 ++++ contrib/src/test/resources/log4j.properties | 1 + 7 files changed, 1700 insertions(+), 8 deletions(-) create mode 100644 contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractTupleUniqueExactlyOnceKafkaOutputOperator.java create mode 100644 contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaUtil.java create mode 100644 contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTupleUniqueExactlyOnceOutputOperatorTest.java create mode 100644 contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaUtilTester.java diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java index f0835c4df5..8003669709 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java @@ -100,7 +100,7 @@ protected ProducerConfig createKafkaProducerConfig(){ return new ProducerConfig(configProperties); } - + public Producer getProducer() { return producer; diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractTupleUniqueExactlyOnceKafkaOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractTupleUniqueExactlyOnceKafkaOutputOperator.java new file mode 100644 index 0000000000..31d371b682 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractTupleUniqueExactlyOnceKafkaOutputOperator.java @@ -0,0 +1,612 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.kafka; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.common.util.Pair; + +import kafka.api.OffsetRequest; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; +import kafka.serializer.StringDecoder; + +/** + * Assumptions:
+ * - assume the value of incoming tuples are not duplicate among all operator partitions.
+ * - assume one Kafka partition can be written by multiple operator partitions at the same time
+ * - assume the Kafka partition was determined by tuple value itself( not depended on operator partition)
+ *

+ * + * Notes:
+ * - the order of data could be changed when replay. + * - the data could go to the other partition when replay. For example if the upstream operator failed.
+ *

+ * + * Implementation: for each Kafka partition, load minimum last window and the + * minimum offset of the last window of all operator partitions. And then load + * the tuples from Kafka based on this minimum offset. When processing tuple, if + * the window id is less than the minimum last window, just ignore the tuple. If + * window id equals loaded minimum window id, and tuple equals any of loaded + * tuple, ignore it. Else, send to Kafka + *

+ * + * @displayName Abstract Tuple Unique Exactly Once Kafka Output + * @category Messaging + * @tags output operator + */ +@InterfaceStability.Evolving +public abstract class AbstractTupleUniqueExactlyOnceKafkaOutputOperator + extends AbstractKafkaOutputOperator +{ + public static final String DEFAULT_CONTROL_TOPIC = "ControlTopic"; + protected transient int partitionNum = 1; + + /** + * allow client set the partitioner as partitioner may need some attributes + */ + protected kafka.producer.Partitioner partitioner; + + protected transient int operatorPartitionId; + + protected String controlTopic = DEFAULT_CONTROL_TOPIC; + + //The control info includes the time, use this time to track the head of control info we care. + protected int controlInfoTrackBackTime = 120000; + + /** + * max number of offset need to check + */ + protected int maxNumOffsetsOfControl = 1000; + + protected String controlProducerProperties; + protected Set brokerSet; + + protected transient long currentWindowId; + + /** + * the map from Kafka partition id to the control offset. this one is + * checkpointed and as the start offset to load the recovery control + * information Note: this only keep the information of this operator + * partition. + */ + protected transient Map partitionToLastControlOffset = Maps.newHashMap(); + + /** + * keep the minimal last window id for recovery. If only one partition + * crashed, it is ok just use the last window id of this operator partition as + * the recovery window id If all operator partitions crashed, should use the + * minimal last window id as the recovery window id, as the data may go to the + * other partitions. But as the operator can't distinguish which is the case. + * use the most general one. + */ + protected transient long minRecoveryWindowId = -2; + protected transient long maxRecoveryWindowId = -2; + + /** + * A map from Kafka partition id to lastMessages written to this kafka + * partition. This information was loaded depends on the + * RecoveryControlInfo.kafkaPartitionIdToOffset + */ + protected transient Map>> partitionToLastMsgs = Maps.newHashMap(); + + /** + * The messages are assume to written to the kafka partition decided by + * tupleToKeyValue(T tuple) and partitioner. But it also depended on the + * system. for example, it could be only one partition when create topic. + * Don't distinguish kafka partitions if partition is not reliable. + */ + protected transient Set> totalLastMsgs = Sets.newHashSet(); + + protected transient RecoveryControlInfo controlInfo = new RecoveryControlInfo(); + protected transient Producer controlDataProducer; + protected transient StringDecoder controlInfoDecoder; + + @Override + public void setup(OperatorContext context) + { + getBrokerSet(); + + super.setup(context); + controlInfoDecoder = new StringDecoder(null); + + operatorPartitionId = context.getId(); + + controlDataProducer = new Producer(createKafkaControlProducerConfig()); + + if (partitioner == null) { + createDefaultPartitioner(); + } + + loadControlData(); + } + + @Override + public void beginWindow(long windowId) + { + currentWindowId = windowId; + } + + /** + * Implement Operator Interface. + */ + @Override + public void endWindow() + { + //we'd better flush the cached tuples, but Kafka 0.8.1 doesn't support flush. + //keep the control information of this operator partition to control topic + saveControlData(); + } + + protected void createDefaultPartitioner() + { + try { + String className = (String)getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_PARTITIONER); + if (className != null) { + partitioner = (kafka.producer.Partitioner)Class.forName(className).newInstance(); + } + } catch (Exception e) { + throw new RuntimeException("Failed to initialize partitioner", e); + } + } + + /** + * load control data OUTPUT: lastMsgs and partitionToMinLastWindowId + */ + protected void loadControlData() + { + long loadDataTime = System.currentTimeMillis(); + + final String clientNamePrefix = getClientNamePrefix(); + Map consumers = KafkaUtil.createSimpleConsumers(clientNamePrefix, brokerSet, controlTopic); + if (consumers == null || consumers.size() != 1) { + logger.error("The consumer for recovery information was not expected. {}", consumers); + return; + } + final SimpleConsumer consumer = consumers.get(0); + if (consumer == null) { + logger.error("No consumer for recovery information."); + return; + } + + long latestOffset = KafkaMetadataUtil.getLastOffset(consumer, controlTopic, 0, OffsetRequest.LatestTime(), + KafkaMetadataUtil.getClientName(clientNamePrefix, controlTopic, 0)); + logger.debug("latestOffsets: {}", latestOffset); + if (latestOffset <= 0) { + return; + } + + int batchMessageSize = 100; + List> messages = Lists.newArrayList(); + + boolean isControlMessageEnough = false; + Map operatorPartitionIdToLastControlInfo = Maps.newHashMap(); + + while (latestOffset > 0 && !isControlMessageEnough) { + long startOffset = latestOffset - batchMessageSize + 1; + if (startOffset < 0) { + startOffset = 0; + } + + //read offsets as batch and handle them. + messages.clear(); + KafkaUtil.readMessagesBetween(consumer, KafkaMetadataUtil.getClientName(clientNamePrefix, controlTopic, 0), + controlTopic, 0, startOffset, latestOffset - 1, messages, 3); + for (Pair message : messages) { + //handle the message; we have to handle all the message. + RecoveryControlInfo rci = RecoveryControlInfo.fromString((String)controlInfoDecoder.fromBytes(message.second)); + isControlMessageEnough = (loadControlInfoIntermedia(rci, loadDataTime, + operatorPartitionIdToLastControlInfo) == 0); + + if (isControlMessageEnough) { + break; + } + } + + latestOffset = startOffset - 1; + } + + loadRecoveryWindowId(operatorPartitionIdToLastControlInfo); + loadLastMessages(operatorPartitionIdToLastControlInfo); + } + + /** + * load the recovery window id. right now use the minimal window id as the + * recovery window id Different Operator partitions maybe crashed at different + * window. use the minimal window of all operator partitions as the window for + * recovery. + * + * @param operatorPartitionIdToLastWindowId + */ + protected void loadRecoveryWindowId(Map operatorPartitionIdToLastControlInfo) + { + for (RecoveryControlInfo rci : operatorPartitionIdToLastControlInfo.values()) { + if (minRecoveryWindowId < 0 || rci.windowId < minRecoveryWindowId) { + minRecoveryWindowId = rci.windowId; + } + if (maxRecoveryWindowId < 0 || rci.windowId > maxRecoveryWindowId) { + maxRecoveryWindowId = rci.windowId; + } + } + } + + /** + * load control information from intermedia to + * + * @param operatorPartitionIdToLastWindowId + * @param operatorToKafkaToOffset + */ + protected void loadLastMessages(Map operatorPartitionIdToLastControlInfo) + { + partitionToLastControlOffset.clear(); + + for (Map.Entry entry : operatorPartitionIdToLastControlInfo.entrySet()) { + RecoveryControlInfo rci = entry.getValue(); + if (rci.windowId == this.minRecoveryWindowId) { + //get the minimal offset + for (Map.Entry kafkaPartitionEntry : rci.kafkaPartitionIdToOffset.entrySet()) { + Long offset = partitionToLastControlOffset.get(kafkaPartitionEntry.getKey()); + if (offset == null || offset > kafkaPartitionEntry.getValue()) { + partitionToLastControlOffset.put(kafkaPartitionEntry.getKey(), kafkaPartitionEntry.getValue()); + } + } + } + } + + partitionToLastMsgs.clear(); + + KafkaUtil.readMessagesAfterOffsetTo(getClientNamePrefix(), brokerSet, getTopic(), partitionToLastControlOffset, + partitionToLastMsgs); + + loadTotalLastMsgs(); + } + + /** + * load Total Last Messages from partitionToLastMsgs; + */ + protected void loadTotalLastMsgs() + { + totalLastMsgs.clear(); + if (partitionToLastMsgs == null || partitionToLastMsgs.isEmpty()) { + return; + } + for (List> msgs : partitionToLastMsgs.values()) { + totalLastMsgs.addAll(msgs); + } + } + + protected int loadControlInfoIntermedia(RecoveryControlInfo controlInfo, long loadDataTime, + Map operatorPartitionIdToLastControlInfo) + { + if (controlInfo.generateTime + controlInfoTrackBackTime < loadDataTime) { + return 0; + } + + //The record should be in ascent order, so the later should override the previous + operatorPartitionIdToLastControlInfo.put(controlInfo.partitionIdOfOperator, controlInfo); + + return 1; + } + + /** + * Current implementation we can get the number of operator partitions. So we + * we use the controlInfoTrackBackTime to control the trace back of control + * information. + * + * @param controlInfo + * @param loadDataTime + * @param operatorPartitionIdToLastWindowId + * @param operatorToKafkaToOffset + * @return 0 if control information is enough and don't need to load any more + */ + protected int loadControlInfoIntermedia(RecoveryControlInfo controlInfo, long loadDataTime, + Map operatorPartitionIdToLastWindowId, Map> operatorToKafkaToOffset) + { + if (controlInfo.generateTime + controlInfoTrackBackTime < loadDataTime) { + return 0; + } + + //The record should be in ascent order, so the later should override the previous + operatorPartitionIdToLastWindowId.put(controlInfo.partitionIdOfOperator, controlInfo.windowId); + operatorToKafkaToOffset.put(controlInfo.partitionIdOfOperator, controlInfo.kafkaPartitionIdToOffset); + + return 1; + } + + /** + * save the control data. each operator partition only save its control data + */ + protected void saveControlData() + { + controlInfo.generateTime = System.currentTimeMillis(); + controlInfo.partitionIdOfOperator = operatorPartitionId; + controlInfo.windowId = this.currentWindowId; + if (controlInfo.kafkaPartitionIdToOffset == null) { + controlInfo.kafkaPartitionIdToOffset = Maps.newHashMap(); + } else { + controlInfo.kafkaPartitionIdToOffset.clear(); + } + KafkaMetadataUtil.getLastOffsetsTo(getClientNamePrefix(), brokerSet, getTopic(), + controlInfo.kafkaPartitionIdToOffset); + + //send to control topic + controlDataProducer.send(new KeyedMessage(getControlTopic(), null, 0, controlInfo.toString())); + } + + protected String getClientNamePrefix() + { + return getClass().getName().replace('$', '.'); + } + + + protected Set getBrokerSet() + { + if (brokerSet == null) { + brokerSet = Sets.newHashSet((String)getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_BROKERLIST)); + } + return brokerSet; + } + + /** + * This input port receives tuples that will be written out to Kafka. + */ + public final transient DefaultInputPort inputPort = new DefaultInputPort() + { + @Override + public void process(T tuple) + { + processTuple(tuple); + } + }; + + /** + * separate to a method to give sub-class chance to override + * + * @param tuple + */ + protected void processTuple(T tuple) + { + Pair keyValue = tupleToKeyValue(tuple); + final int pid = getPartitionKey(keyValue.first); + + if (!skipTuple(pid, keyValue)) { + getProducer().send(new KeyedMessage(getTopic(), keyValue.first, pid, keyValue.second)); + sendCount++; + } + } + + protected boolean skipTuple(int partitionId, Pair msg) + { + if (currentWindowId <= minRecoveryWindowId) { + return true; + } + if (currentWindowId > maxRecoveryWindowId + 1) { + return false; + } + + return isDuplicateTuple(partitionId, msg); + } + + protected boolean isDuplicateTuple(int partitionId, Pair msg) + { + Collection> lastMsgs = partitionToLastMsgs.get(partitionId); + + //check depended on the partition only + if (lastMsgs == null || lastMsgs.isEmpty()) { + lastMsgs = totalLastMsgs; + } + + for (Pair cachedMsg : lastMsgs) { + if (equals(cachedMsg, msg)) { + return true; + } + } + return false; + + } + + protected boolean equals(Pair cachedMsg, Pair msg) + { + if (cachedMsg.first == null ^ msg.first == null) { + return false; + } + if (cachedMsg.second == null ^ msg.second == null) { + return false; + } + + if (cachedMsg.first == null && msg.first == null && cachedMsg.second == null && msg.second == null) { + return true; + } + + if (!equals(cachedMsg.first, msg.first)) { + return false; + } + + return equals(cachedMsg.second, msg.second); + } + + /** + * + * @param bytes + * @param value + * @return + */ + protected abstract boolean equals(byte[] bytes, M value); + + /** + * get the partition key. for 0.8.1, If a partition key is provided it will + * override the key for the purpose of partitioning but will not be stored. + * + * @return + */ + protected int getPartitionKey(K key) + { + if (partitioner != null) { + return partitioner.partition(key, partitionNum); + } + + if (key != null) { + return key.hashCode(); + } + + //stick to the Kafka partition, so can't use round robbin + return 0; + } + + /** + * setup the configuration for control producer + * + * @return + */ + protected ProducerConfig createKafkaControlProducerConfig() + { + if (controlProducerProperties == null || controlProducerProperties.isEmpty()) { + controlProducerProperties = getProducerProperties(); + } + + Properties prop = new Properties(); + for (String propString : controlProducerProperties.split(",")) { + if (!propString.contains("=")) { + continue; + } + String[] keyVal = StringUtils.trim(propString).split("="); + prop.put(StringUtils.trim(keyVal[0]), StringUtils.trim(keyVal[1])); + } + + //only support String encoder now, overwrite + prop.setProperty("serializer.class", "kafka.serializer.StringEncoder"); + prop.setProperty("key.serializer.class", "kafka.serializer.StringEncoder"); + + Properties configProperties = this.getConfigProperties(); + configProperties.putAll(prop); + + return new ProducerConfig(configProperties); + } + + /** + * Tell the operator how to convert a input tuple to a kafka key value pair + * + * @param tuple + * @return A kafka key value pair. + */ + protected abstract Pair tupleToKeyValue(T tuple); + + public kafka.producer.Partitioner getPartitioner() + { + return partitioner; + } + + public void setPartitioner(kafka.producer.Partitioner partitioner) + { + this.partitioner = partitioner; + } + + public String getControlTopic() + { + return controlTopic; + } + + public void setControlTopic(String controlTopic) + { + this.controlTopic = controlTopic; + } + + public String getControlProducerProperties() + { + return controlProducerProperties; + } + + public void setControlProducerProperties(String controlProducerProperties) + { + this.controlProducerProperties = controlProducerProperties; + } + + private static final Logger logger = LoggerFactory.getLogger(AbstractExactlyOnceKafkaOutputOperator.class); + + /** + * This class used to keep the recovery information + * + */ + protected static class RecoveryControlInfo + { + protected static final String SEPERATOR = "#"; + protected int partitionIdOfOperator; + protected long generateTime; + protected long windowId; + protected Map kafkaPartitionIdToOffset; + //( operatorPartitionId => ( lastWindowId, (KafkaPartitionId => offset) ) ) + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + sb.append(partitionIdOfOperator).append(SEPERATOR).append(generateTime).append(SEPERATOR).append(windowId); + sb.append(SEPERATOR).append(kafkaPartitionIdToOffset); + return sb.toString(); + } + + public static RecoveryControlInfo fromString(String str) + { + if (str == null || str.isEmpty()) { + throw new IllegalArgumentException("Input parameter is null or empty."); + } + String[] fields = str.split(SEPERATOR); + if (fields == null || fields.length != 4) { + throw new IllegalArgumentException( + "Invalid input String: \"" + str + "\", " + "expected fields seperated by '" + SEPERATOR + "'"); + } + + RecoveryControlInfo rci = new RecoveryControlInfo(); + rci.partitionIdOfOperator = Integer.valueOf(fields[0]); + rci.generateTime = Long.valueOf(fields[1]); + rci.windowId = Long.valueOf(fields[2]); + + String mapString = fields[3].trim(); + if (mapString.startsWith("{") && mapString.endsWith("}")) { + mapString = mapString.substring(1, mapString.length() - 1); + } + Map idToOffsetAsString = Splitter.on(",").withKeyValueSeparator("=").split(mapString); + rci.kafkaPartitionIdToOffset = Maps.newHashMap(); + for (Map.Entry entry : idToOffsetAsString.entrySet()) { + rci.kafkaPartitionIdToOffset.put(Integer.valueOf(entry.getKey()), Long.valueOf(entry.getValue())); + } + return rci; + } + } + +} diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java index f6057cd42b..6dfdf14ed0 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java @@ -59,9 +59,9 @@ */ public class KafkaMetadataUtil { - + public static final String PRODUCER_PROP_PARTITIONER = "partitioner.class"; - + public static final String PRODUCER_PROP_BROKERLIST = "metadata.broker.list"; private static Logger logger = LoggerFactory.getLogger(KafkaMetadataUtil.class); @@ -88,7 +88,7 @@ public static List getPartitionsForTopic(Set brokerLi } return tmd.partitionsMetadata(); } - + /** * @param brokers in multiple clusters, keyed by cluster id * @param topic @@ -104,14 +104,14 @@ public List transformEntry(String key, Collection bs) return getPartitionsForTopic(new HashSet(bs), topic); }}); } - - + + public static Set getBrokers(Set zkHost){ - + ZkClient zkclient = new ZkClient(StringUtils.join(zkHost, ',') ,30000, 30000, ZKStringSerializer$.MODULE$); Set brokerHosts = new HashSet(); for (Broker b : JavaConversions.asJavaIterable(ZkUtils.getAllBrokersInCluster(zkclient))) { - brokerHosts.add(b.getConnectionString()); + brokerHosts.add(b.connectionString()); } zkclient.close(); return brokerHosts; @@ -213,4 +213,85 @@ public static long getLastOffset(SimpleConsumer consumer, String topic, int part } + /** + * this method wrapper kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(OffsetRequest) + * @param consumer + * @param clientName + * @param topic + * @param partitionId + * @param time + * @param maxNumOffsets + * @return + */ + public static long[] getOffsetsBefore(SimpleConsumer consumer, String clientName, String topic, int partitionId, long time, int maxNumOffsets) + { + if (consumer == null) { + throw new IllegalArgumentException("consumer is not suppose to be null."); + } + + TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId); + Map requestInfo = new HashMap(); + requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(time, maxNumOffsets)); + OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); + OffsetResponse response = consumer.getOffsetsBefore(request); + + if (response.hasError()) { + logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId)); + return null; + } + return response.offsets(topic, partitionId); + } + + + /** + * get the last offset of each partition to the partitionToOffset map + * @param clientNamePrefix + * @param brokerSet + * @param topic + * @param time + * @param partitionToOffset + */ + public static void getLastOffsetsTo(String clientNamePrefix, Set brokerSet, String topic, + Map partitionToOffset) + { + // read last received kafka message + TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(brokerSet, topic); + + if (tm == null) { + throw new RuntimeException("Failed to retrieve topic metadata"); + } + + for (PartitionMetadata pm : tm.partitionsMetadata()) { + SimpleConsumer consumer = null; + try { + int partitionId = pm.partitionId(); + + String leadBroker = pm.leader().host(); + int port = pm.leader().port(); + final String clientName = getClientName(clientNamePrefix, topic, partitionId); + consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName); + + TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId); + Map requestInfo = new HashMap(); + requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1)); + OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); + OffsetResponse response = consumer.getOffsetsBefore(request); + + if (response.hasError()) { + logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId)); + } + partitionToOffset.put(partitionId, response.offsets(topic, partitionId)[0]); + } finally { + if (consumer != null) { + consumer.close(); + } + } + } + } + + public static String getClientName(String clientNamePrefix, String topic, int partitionId) + { + return clientNamePrefix + "_" + topic + "_" + partitionId; + } + } diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaUtil.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaUtil.java new file mode 100644 index 0000000000..d49e462faf --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaUtil.java @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.kafka; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import com.datatorrent.common.util.Pair; + +import kafka.api.FetchRequest; +import kafka.api.FetchRequestBuilder; +import kafka.javaapi.FetchResponse; +import kafka.javaapi.PartitionMetadata; +import kafka.javaapi.TopicMetadata; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.javaapi.message.ByteBufferMessageSet; +import kafka.message.Message; +import kafka.message.MessageAndOffset; + +public class KafkaUtil +{ + private static Logger logger = LoggerFactory.getLogger(KafkaUtil.class); + public static final int DEFAULT_TIMEOUT = 200; + public static final int DEFAULT_BUFFER_SIZE = 64 * 10240; + public static final int DEFAULT_FETCH_SIZE = 200; + + /** + * read last message ( the start offset send from partitionToOffset ) of all + * partition to partitionToMessages + * + * @param clientNamePrefix + * @param brokerSet + * @param topic + * @param partitionToStartOffset + * @param partitionToMessages + */ + public static void readMessagesAfterOffsetTo(String clientNamePrefix, Set brokerSet, String topic, + Map partitionToStartOffset, Map>> partitionToMessages) + { + // read last received kafka message + TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(brokerSet, topic); + + if (tm == null) { + throw new RuntimeException("Failed to retrieve topic metadata"); + } + + for (PartitionMetadata pm : tm.partitionsMetadata()) { + SimpleConsumer consumer = null; + try { + List> messagesOfPartition = partitionToMessages.get(pm.partitionId()); + if (messagesOfPartition == null) { + messagesOfPartition = Lists.newArrayList(); + partitionToMessages.put(pm.partitionId(), messagesOfPartition); + } + + long startOffset = partitionToStartOffset.get(pm.partitionId()) == null ? 0 + : partitionToStartOffset.get(pm.partitionId()); + final String clientName = KafkaMetadataUtil.getClientName(clientNamePrefix, tm.topic(), pm.partitionId()); + consumer = createSimpleConsumer(clientName, tm.topic(), pm); + + //the returned lastOffset is the offset which haven't written data to. + long lastOffset = KafkaMetadataUtil.getLastOffset(consumer, tm.topic(), pm.partitionId(), + kafka.api.OffsetRequest.LatestTime(), clientName); + logger.debug("lastOffset = {}", lastOffset); + if (lastOffset <= 0) { + continue; + } + + readMessagesBetween(consumer, clientName, topic, pm.partitionId(), startOffset, lastOffset - 1, + messagesOfPartition); + } finally { + if (consumer != null) { + consumer.close(); + } + } + } + } + + public static void readMessagesBetween(String clientNamePrefix, Set brokerSet, String topic, int partitionId, + long startOffset, long endOffset, List> messages) + { + Map consumers = createSimpleConsumers(clientNamePrefix, brokerSet, topic); + if (consumers == null) { + throw new RuntimeException("Can't find any consumer."); + } + + SimpleConsumer consumer = consumers.get(partitionId); + if (consumer == null) { + throw new IllegalArgumentException("No consumer for partition: " + partitionId); + } + + readMessagesBetween(consumer, KafkaMetadataUtil.getClientName(clientNamePrefix, topic, partitionId), topic, + partitionId, startOffset, endOffset, messages); + } + + /** + * get A map of partition id to SimpleConsumer + * + * @param clientNamePrefix + * @param brokerSet + * @param topic + * @return A map of partition id to SimpleConsumer + */ + public static Map createSimpleConsumers(String clientNamePrefix, Set brokerSet, + String topic) + { + return createSimpleConsumers(clientNamePrefix, brokerSet, topic, DEFAULT_TIMEOUT); + } + + /** + * get A map of partition id to SimpleConsumer + * + * @param clientNamePrefix + * @param brokerSet + * @param topic + * @param timeOut + * @return A map of partition id to SimpleConsumer + */ + public static Map createSimpleConsumers(String clientNamePrefix, Set brokerSet, + String topic, int timeOut) + { + if (clientNamePrefix == null || clientNamePrefix.isEmpty() || brokerSet == null || brokerSet.isEmpty() + || topic == null || topic.isEmpty()) { + throw new IllegalArgumentException( + "clientNamePrefix = " + clientNamePrefix + ", brokerSet = " + brokerSet + ", topic = " + topic); + } + + TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(brokerSet, topic); + + if (tm == null) { + throw new RuntimeException("Failed to retrieve topic metadata"); + } + + Map consumers = Maps.newHashMap(); + for (PartitionMetadata pm : tm.partitionsMetadata()) { + String clientName = KafkaMetadataUtil.getClientName(clientNamePrefix, tm.topic(), pm.partitionId()); + consumers.put(pm.partitionId(), createSimpleConsumer(clientName, tm.topic(), pm)); + } + return consumers; + } + + public static void readMessagesBetween(SimpleConsumer consumer, String clientName, String topic, int partitionId, + long startOffset, long endOffset, List> messages) + { + readMessagesBetween(consumer, clientName, topic, partitionId, startOffset, endOffset, messages, 1); + } + + /** + * read messages of a certain partition into messages + * + * @param consumer + * @param clientNamePrefix + * @param topic + * @param partitionId + * @param startOffset + * inclusive + * @param endOffset + * inclusive + * @param messages + * @param tryTimesOnEmptyMessage + * how many times should to try when response message is empty. <=0 + * means try forever. + */ + public static void readMessagesBetween(SimpleConsumer consumer, String clientName, String topic, int partitionId, + long startOffset, long endOffset, List> messages, int tryTimesOnEmptyMessage) + { + if (startOffset < 0 || endOffset < 0 || endOffset < startOffset) { + throw new IllegalArgumentException( + "Both offset should not less than zero and endOffset should not less than startOffset. startOffset = " + + startOffset + ", endoffset = " + endOffset); + } + + int readSize = 0; + int wantedSize = (int)(endOffset - startOffset + 1); + + int triedTimesOnEmptyMessage = 0; + while (readSize < wantedSize + && (tryTimesOnEmptyMessage <= 0 || triedTimesOnEmptyMessage < tryTimesOnEmptyMessage)) { + logger.debug("startOffset = {}", startOffset); + FetchRequest req = new FetchRequestBuilder().clientId(clientName) + .addFetch(topic, partitionId, startOffset, DEFAULT_FETCH_SIZE).build(); + + FetchResponse fetchResponse = consumer.fetch(req); + if (fetchResponse.hasError()) { + logger.error( + "Error fetching data Offset Data the Broker. Reason: " + fetchResponse.errorCode(topic, partitionId)); + return; + } + + triedTimesOnEmptyMessage++; + ByteBufferMessageSet messageSet = fetchResponse.messageSet(topic, partitionId); + for (MessageAndOffset messageAndOffset : messageSet) { + long offset = messageAndOffset.offset(); + logger.debug("offset = " + offset); + + if (offset > endOffset || offset < startOffset) { + continue; + } + triedTimesOnEmptyMessage = 0; + startOffset = offset + 1; + ++readSize; + messages.add(kafkaMessageToPair(messageAndOffset.message())); + } + } + } + + /** + * read last message of each partition into lastMessages + * + * @param clientNamePrefix + * @param brokerSet + * @param topic + * @param lastMessages + */ + public static void readLastMessages(String clientNamePrefix, Set brokerSet, String topic, + Map> lastMessages) + { + // read last received kafka message + TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(brokerSet, topic); + + if (tm == null) { + throw new RuntimeException("Failed to retrieve topic metadata"); + } + + for (PartitionMetadata pm : tm.partitionsMetadata()) { + SimpleConsumer consumer = null; + try { + String clientName = KafkaMetadataUtil.getClientName(clientNamePrefix, tm.topic(), pm.partitionId()); + consumer = createSimpleConsumer(clientName, tm.topic(), pm); + + long readOffset = KafkaMetadataUtil.getLastOffset(consumer, tm.topic(), pm.partitionId(), + kafka.api.OffsetRequest.LatestTime(), clientName); + + FetchRequest req = new FetchRequestBuilder().clientId(clientName) + .addFetch(tm.topic(), pm.partitionId(), readOffset - 1, DEFAULT_FETCH_SIZE).build(); + + FetchResponse fetchResponse = consumer.fetch(req); + if (fetchResponse.hasError()) { + logger.error("Error fetching data Offset Data the Broker. Reason: " + + fetchResponse.errorCode(topic, pm.partitionId())); + return; + } + + for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(tm.topic(), pm.partitionId())) { + lastMessages.put(pm.partitionId(), kafkaMessageToPair(messageAndOffset.message())); + } + } finally { + if (consumer != null) { + consumer.close(); + } + } + + } + } + + public static Pair readLastMessage(String clientNamePrefix, Set brokerSet, String topic, + int partitionId) + { + // read last received kafka message + TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(brokerSet, topic); + + if (tm == null) { + throw new RuntimeException("Failed to retrieve topic metadata"); + } + + for (PartitionMetadata pm : tm.partitionsMetadata()) { + SimpleConsumer consumer = null; + try { + if (pm.partitionId() != partitionId) { + continue; + } + + String clientName = KafkaMetadataUtil.getClientName(clientNamePrefix, tm.topic(), pm.partitionId()); + consumer = createSimpleConsumer(clientName, topic, pm); + + long readOffset = KafkaMetadataUtil.getLastOffset(consumer, topic, partitionId, + kafka.api.OffsetRequest.LatestTime(), clientName); + + FetchRequest req = new FetchRequestBuilder().clientId(clientName) + .addFetch(tm.topic(), pm.partitionId(), readOffset - 1, DEFAULT_FETCH_SIZE).build(); + + FetchResponse fetchResponse = consumer.fetch(req); + if (fetchResponse.hasError()) { + logger.error("Error fetching data Offset Data the Broker. Reason: " + + fetchResponse.errorCode(topic, pm.partitionId())); + return null; + } + + for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) { + return kafkaMessageToPair(messageAndOffset.message()); + } + } finally { + if (consumer != null) { + consumer.close(); + } + } + } + return null; + } + + /** + * convert Kafka message to pair + * + * @param m + * @return + */ + public static Pair kafkaMessageToPair(Message m) + { + ByteBuffer payload = m.payload(); + ByteBuffer key = m.key(); + byte[] keyBytes = null; + if (key != null) { + keyBytes = new byte[key.limit()]; + key.get(keyBytes); + } + + byte[] valueBytes = new byte[payload.limit()]; + payload.get(valueBytes); + return new Pair(keyBytes, valueBytes); + } + + public static SimpleConsumer createSimpleConsumer(String clientName, String topic, PartitionMetadata pm) + { + return createSimpleConsumer(clientName, topic, pm, DEFAULT_TIMEOUT, DEFAULT_BUFFER_SIZE); + } + + public static SimpleConsumer createSimpleConsumer(String clientName, String topic, PartitionMetadata pm, int timeout, + int bufferSize) + { + String leadBroker = pm.leader().host(); + int port = pm.leader().port(); + return new SimpleConsumer(leadBroker, port, timeout, bufferSize, clientName); + } +} diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTupleUniqueExactlyOnceOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTupleUniqueExactlyOnceOutputOperatorTest.java new file mode 100644 index 0000000000..abdcb01ebd --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTupleUniqueExactlyOnceOutputOperatorTest.java @@ -0,0 +1,512 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.kafka; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.common.util.Pair; +import com.datatorrent.stram.api.OperatorDeployInfo; + +import kafka.producer.ProducerConfig; +import kafka.serializer.StringDecoder; + +public class KafkaTupleUniqueExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase +{ + public static final int TUPLE_NUM_IN_ONE_WINDOW = 10; + public static final String topic1 = "OperatorTest1"; + public static final String controlTopic1 = "ControlTopic1"; + + public static final String topic2 = "OperatorTest2"; + public static final String controlTopic2 = "ControlTopic2"; + + public static final String topic3 = "OperatorTest3"; + public static final String controlTopic3 = "ControlTopic3"; + + public static class TupleUniqueExactlyOnceKafkaOutputTestOperator + extends AbstractTupleUniqueExactlyOnceKafkaOutputOperator + { + protected transient StringDecoder decoder = null; + + @Override + public void setup(OperatorContext context) + { + decoder = new StringDecoder(null); + super.setup(context); + } + + @Override + protected Pair tupleToKeyValue(Integer tuple) + { + return new Pair<>(String.valueOf(tuple % 2), String.valueOf(tuple)); + } + + @Override + protected boolean equals(byte[] bytes, T value) + { + if (bytes == null && value == null) { + return true; + } + if (value == null) { + return false; + } + return value.equals(decoder.fromBytes(bytes)); + } + + } + + protected void createTopic(String topicName) + { + createTopic(0, topicName); + if (hasMultiCluster) { + createTopic(1, topicName); + } + } + + protected ProducerConfig createKafkaControlProducerConfig() + { + return new ProducerConfig(this.getKafkaProperties()); + } + + /** + * This test case there are only one operator partition, and the order of data + * changed when recovery. + */ + @Test + public void testOutOfOrder() + { + OperatorDeployInfo context = new OperatorDeployInfo(); + context.id = 1; + int[] expectedTuple = new int[(int)(TUPLE_NUM_IN_ONE_WINDOW * 3)]; + int tupleIndex = 0; + long windowId = 0; + { + //create required topics + createTopic(topic1); + createTopic(controlTopic1); + + TupleUniqueExactlyOnceKafkaOutputTestOperator operator = createOperator(topic1, controlTopic1, 1); + + int i = 0; + for (int windowCount = 0; windowCount < 2; ++windowCount) { + operator.beginWindow(windowId++); + + for (; i < TUPLE_NUM_IN_ONE_WINDOW * (windowCount + 1); ++i) { + operator.processTuple(i); + expectedTuple[tupleIndex++] = i; + } + waitMills(500); + operator.endWindow(); + } + + //last window, the crash window + operator.beginWindow(windowId++); + for (; i < TUPLE_NUM_IN_ONE_WINDOW * 2.5; ++i) { + operator.processTuple(i); + expectedTuple[tupleIndex++] = i; + } + + //crashed now. + } + + //let kafka message send to server + waitMills(1000); + + { + //recovery + TupleUniqueExactlyOnceKafkaOutputTestOperator operator = new TupleUniqueExactlyOnceKafkaOutputTestOperator(); + operator.setTopic(topic1); + operator.setControlTopic(controlTopic1); + operator.setConfigProperties(getKafkaProperties()); + + operator.setup(context); + + //assume replay start with 2nd window, but different order + int i = TUPLE_NUM_IN_ONE_WINDOW; + + windowId = 1; + operator.beginWindow(windowId++); + for (; i < TUPLE_NUM_IN_ONE_WINDOW * 2; i += 2) { + operator.processTuple(i); + } + i = TUPLE_NUM_IN_ONE_WINDOW + 1; + for (; i < TUPLE_NUM_IN_ONE_WINDOW * 2; i += 2) { + operator.processTuple(i); + } + waitMills(500); + operator.endWindow(); + + //3rd window, in different order + operator.beginWindow(windowId++); + i = TUPLE_NUM_IN_ONE_WINDOW * 2; + for (; i < TUPLE_NUM_IN_ONE_WINDOW * 3; i += 2) { + operator.processTuple(i); + if (i >= TUPLE_NUM_IN_ONE_WINDOW * 2.5) { + expectedTuple[tupleIndex++] = i; + } + } + + i = TUPLE_NUM_IN_ONE_WINDOW * 2 + 1; + for (; i < TUPLE_NUM_IN_ONE_WINDOW * 3; i += 2) { + operator.processTuple(i); + if (i >= TUPLE_NUM_IN_ONE_WINDOW * 2.5) { + expectedTuple[tupleIndex++] = i; + } + } + } + + int[] actualTuples = readTuplesFromKafka(topic1); + Assert.assertArrayEquals(expectedTuple, actualTuples); + } + + protected TupleUniqueExactlyOnceKafkaOutputTestOperator createOperator(String topic, String controlTopic, int id) + { + TupleUniqueExactlyOnceKafkaOutputTestOperator operator = new TupleUniqueExactlyOnceKafkaOutputTestOperator(); + operator.setTopic(topic); + operator.setControlTopic(controlTopic); + + operator.setConfigProperties(getKafkaProperties()); + OperatorDeployInfo context = new OperatorDeployInfo(); + context.id = id; + operator.setup(context); + + return operator; + } + + /** + * This test case test the case the tuple go to other operator partition when + * recovery. + */ + @Test + public void testDifferentPartition() + { + //hasMultiPartition = true; + + int[] expectedTuples = new int[(int)(TUPLE_NUM_IN_ONE_WINDOW * 6)]; + int tupleIndex = 0; + long windowId1 = 0; + long windowId2 = 0; + + //create required topics + createTopic(topic2); + createTopic(controlTopic2); + + { + TupleUniqueExactlyOnceKafkaOutputTestOperator operator1 = createOperator(topic2, controlTopic2, 1); + TupleUniqueExactlyOnceKafkaOutputTestOperator operator2 = createOperator(topic2, controlTopic2, 2); + TupleUniqueExactlyOnceKafkaOutputTestOperator[] operators = new TupleUniqueExactlyOnceKafkaOutputTestOperator[] { + operator1, operator2 }; + + //send as round robin + int i = 0; + for (int windowCount = 0; windowCount < 2; ++windowCount) { + operator1.beginWindow(windowId1++); + operator2.beginWindow(windowId2++); + + for (; i < TUPLE_NUM_IN_ONE_WINDOW * (windowCount + 1) * 2; ++i) { + operators[i % 2].processTuple(i); + expectedTuples[tupleIndex++] = i; + } + waitMills(500); + operator1.endWindow(); + operator2.endWindow(); + } + + //last window, the crash window + operator1.beginWindow(windowId1++); + operator2.beginWindow(windowId2++); + for (; i < TUPLE_NUM_IN_ONE_WINDOW * 2.5 * 2; ++i) { + operators[i % 2].processTuple(i); + expectedTuples[tupleIndex++] = i; + } + + //crashed now. + } + + //let kafka message send to server + waitMills(1000); + int lastTuple = tupleIndex - 1; + { + //recovery + TupleUniqueExactlyOnceKafkaOutputTestOperator operator1 = createOperator(topic2, controlTopic2, 1); + TupleUniqueExactlyOnceKafkaOutputTestOperator operator2 = createOperator(topic2, controlTopic2, 2); + //tuple go to different partition + TupleUniqueExactlyOnceKafkaOutputTestOperator[] operators = new TupleUniqueExactlyOnceKafkaOutputTestOperator[] { + operator2, operator1 }; + + //assume replay start with 2nd window, but different order + int i = TUPLE_NUM_IN_ONE_WINDOW * 2; + + windowId1 = 1; + windowId2 = 1; + + //window id: 1, 2 + for (int windowCount = 0; windowCount < 2; ++windowCount) { + operator1.beginWindow(windowId1++); + operator2.beginWindow(windowId2++); + + for (; i < TUPLE_NUM_IN_ONE_WINDOW * (windowCount + 2) * 2; ++i) { + operators[i % 2].processTuple(i); + if (i > lastTuple) { + expectedTuples[tupleIndex++] = i; + } + } + waitMills(500); + operator1.endWindow(); + operator2.endWindow(); + } + } + + int[] actualTuples = readTuplesFromKafka(topic2); + Arrays.sort(actualTuples); + Arrays.sort(expectedTuples); + + assertArrayEqualsWithDetailInfo(expectedTuples, actualTuples); + } + + /** + * This test case test only one operator partition crash, while the other + * operator partition keep on write data to the same Kafka partition. + */ + @Test + public void testOnePartitionCrash() + { + + int[] expectedTuples = new int[(int)(TUPLE_NUM_IN_ONE_WINDOW * 6)]; + int tupleIndex = 0; + long windowId1 = 0; + long windowId2 = 0; + + //create required topics + createTopic(topic3); + createTopic(controlTopic3); + + { + TupleUniqueExactlyOnceKafkaOutputTestOperator operator1 = createOperator(topic3, controlTopic3, 1); + TupleUniqueExactlyOnceKafkaOutputTestOperator operator2 = createOperator(topic3, controlTopic3, 2); + TupleUniqueExactlyOnceKafkaOutputTestOperator[] operators = new TupleUniqueExactlyOnceKafkaOutputTestOperator[] { + operator1, operator2 }; + + //send as round robin + int i = 0; + for (int windowCount = 0; windowCount < 2; ++windowCount) { + operator1.beginWindow(windowId1++); + operator2.beginWindow(windowId2++); + + for (; i < TUPLE_NUM_IN_ONE_WINDOW * (windowCount + 1) * 2; ++i) { + operators[i % 2].processTuple(i); + expectedTuples[tupleIndex++] = i; + } + waitMills(500); + operator1.endWindow(); + operator2.endWindow(); + } + + //operator1 crash, while operator2 alive + operator1.beginWindow(windowId1++); + //operator1 handle even number; + for (; i < TUPLE_NUM_IN_ONE_WINDOW * 2.5 * 2; i += 2) { + operators[i % 2].processTuple(i); + expectedTuples[tupleIndex++] = i; + } + + //operator1 crashed now. + + //operator2 still alive, operator2 handle odd number + operator2.beginWindow(windowId2++); + i = TUPLE_NUM_IN_ONE_WINDOW * 4 + 1; + for (; i < TUPLE_NUM_IN_ONE_WINDOW * 3 * 2; i += 2) { + operator2.processTuple(i); + expectedTuples[tupleIndex++] = i; + } + operator2.endWindow(); + + } + + //let kafka message send to server + waitMills(1000); + + //operator1 recover from second window + int lastTuple = (int)(TUPLE_NUM_IN_ONE_WINDOW * 2.5 * 2) - 1; + { + //recovery + TupleUniqueExactlyOnceKafkaOutputTestOperator operator1 = createOperator(topic3, controlTopic3, 1); + + //assume replay start with 2nd window, same order + int i = TUPLE_NUM_IN_ONE_WINDOW * 2; + + windowId1 = 1; + + //window id: 1, 2 + for (int windowCount = 0; windowCount < 2; ++windowCount) { + operator1.beginWindow(windowId1++); + + for (; i < TUPLE_NUM_IN_ONE_WINDOW * (windowCount + 2) * 2; i += 2) { + operator1.processTuple(i); + if (i > lastTuple) { + expectedTuples[tupleIndex++] = i; + } + } + waitMills(500); + operator1.endWindow(); + } + } + + int[] actualTuples = readTuplesFromKafka(topic3); + Arrays.sort(actualTuples); + Arrays.sort(expectedTuples); + + assertArrayEqualsWithDetailInfo(expectedTuples, actualTuples); + } + + /** + * Test the application which using TupleUniqueExactlyOnceKafkaOutputTestOperator is launchalbe in local mode + */ + @Test + public void testLaunchApp() throws Exception + { + Configuration conf = new Configuration(false); + LocalMode lma = LocalMode.newInstance(); + DAG dag = lma.getDAG(); + + TupleGenerateOperator generateOperator = new TupleGenerateOperator(); + dag.addOperator("GenerateOperator", generateOperator); + + TupleUniqueExactlyOnceKafkaOutputTestOperator testOperator = new TupleUniqueExactlyOnceKafkaOutputTestOperator(); + dag.addOperator("TestOperator", testOperator); + + dag.addStream("stream", generateOperator.outputPort, testOperator.inputPort); + + StreamingApplication app = new StreamingApplication() + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + } + }; + + lma.prepareDAG(app, conf); + + // Create local cluster + final LocalMode.Controller lc = lma.getController(); + lc.run(5000); + + lc.shutdown(); + } + + public static void assertArrayEqualsWithDetailInfo(int[] expectedTuples, int[] actualTuples) + { + Assert.assertTrue("Length incorrect. expected " + expectedTuples.length + "; actual " + actualTuples.length, + actualTuples.length == expectedTuples.length); + for (int i = 0; i < actualTuples.length; ++i) { + Assert.assertEquals("Not equal. index=" + i + ", expected=" + expectedTuples[i] + ", actual=" + actualTuples[i], + actualTuples[i], expectedTuples[i]); + } + } + + public void waitMills(long millis) + { + try { + Thread.sleep(millis); + } catch (Exception e) { + //ignore + } + } + + public int[] readTuplesFromKafka(String topic) + { + StringDecoder decoder = new StringDecoder(null); + Map partitionToStartOffset = Maps.newHashMap(); + partitionToStartOffset.put(0, 0L); + + this.waitMills(1000); + + Map>> partitionToMessages = Maps.newHashMap(); + KafkaUtil.readMessagesAfterOffsetTo("TestOperator", getBrokerSet(), topic, partitionToStartOffset, + partitionToMessages); + + List> msgList = partitionToMessages.get(0); + int[] values = new int[msgList.size()]; + int index = 0; + for (Pair msg : msgList) { + values[index++] = Integer.valueOf(decoder.fromBytes(msg.second)); + } + return values; + } + + + protected Set getBrokerSet() + { + return Sets.newHashSet((String)getKafkaProperties().get(KafkaMetadataUtil.PRODUCER_PROP_BROKERLIST)); + } + + public Properties getKafkaProperties() + { + Properties props = new Properties(); + props.setProperty("serializer.class", "kafka.serializer.StringEncoder"); + //props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder"); + //props.setProperty("key.serializer.class", "kafka.serializer.StringEncoder"); + props.put("metadata.broker.list", "localhost:9092"); + //props.setProperty("producer.type", "sync"); + props.setProperty("producer.type", "async"); + props.setProperty("queue.buffering.max.ms", "100"); + props.setProperty("queue.buffering.max.messages", "5"); + props.setProperty("batch.num.messages", "5"); + return props; + } + + + public static class TupleGenerateOperator extends BaseOperator implements InputOperator + { + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort outputPort = new DefaultOutputPort<>(); + protected int value = 0; + + @Override + public void emitTuples() + { + if (!outputPort.isConnected()) { + return; + } + + for (int i = 0; i < 100; ++i) { + outputPort.emit(++value); + } + } + } +} diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaUtilTester.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaUtilTester.java new file mode 100644 index 0000000000..c27803dea8 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaUtilTester.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.kafka; + +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import com.datatorrent.common.util.Pair; + +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; + +public class KafkaUtilTester extends KafkaOperatorTestBase +{ + public static final String topic = "UtilTestTopic"; + public static final String clientNamePrefix = "UtilTestClient"; + public static final int DATA_SIZE = 50; + + protected Producer producer; + private transient Set brokerSet; + + public void beforeTest() + { + //Got exception when using multiple partition. + //java.io.FileNotFoundException: target/kafka-server-data/1/1/replication-offset-checkpoint (No such file or directory) + //hasMultiPartition = true; + + super.beforeTest(); + createTopic(topic); + + producer = new Producer(createKafkaProducerConfig()); + getBrokerSet(); + + sendData(); + } + + public void sendData() + { + for (int i = 0; i < DATA_SIZE; ++i) { + producer.send(new KeyedMessage(topic, null, "message " + i)); + } + + waitMills(1000); + } + + @Test + public void testReadMessagesAfterOffsetTo() + { + Map partitionToStartOffset = Maps.newHashMap(); + partitionToStartOffset.put(1, 0L); + Map>> partitionToMessages = Maps.newHashMap(); + KafkaUtil.readMessagesAfterOffsetTo(clientNamePrefix, brokerSet, topic, partitionToStartOffset, + partitionToMessages); + final int dataSize = partitionToMessages.entrySet().iterator().next().getValue().size(); + Assert.assertTrue("data size is: " + dataSize, dataSize == DATA_SIZE); + } + + public void waitMills(long millis) + { + try { + Thread.sleep(millis); + } catch (Exception e) { + //ignore + } + } + + protected void createTopic(String topicName) + { + createTopic(0, topicName); + if (hasMultiCluster) { + createTopic(1, topicName); + } + } + + protected Properties getConfigProperties() + { + Properties props = new Properties(); + props.setProperty("serializer.class", "kafka.serializer.StringEncoder"); + //props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder"); + //props.setProperty("key.serializer.class", "kafka.serializer.StringEncoder"); + props.put("metadata.broker.list", "localhost:9092"); + //props.setProperty("producer.type", "sync"); + props.setProperty("producer.type", "async"); + props.setProperty("queue.buffering.max.ms", "10"); + props.setProperty("queue.buffering.max.messages", "10"); + props.setProperty("batch.num.messages", "5"); + + return props; + } + + protected ProducerConfig createKafkaProducerConfig() + { + return new ProducerConfig(getConfigProperties()); + } + + protected Set getBrokerSet() + { + if (brokerSet == null) { + brokerSet = Sets.newHashSet((String)getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_BROKERLIST)); + } + return brokerSet; + } +} diff --git a/contrib/src/test/resources/log4j.properties b/contrib/src/test/resources/log4j.properties index 2fcbe3812a..cfc50cfa0b 100644 --- a/contrib/src/test/resources/log4j.properties +++ b/contrib/src/test/resources/log4j.properties @@ -39,3 +39,4 @@ log4j.logger.org=info #log4j.logger.org.apache.commons.beanutils=warn log4j.logger.com.datatorrent=debug log4j.logger.org.apache.apex=debug +log4j.logger.kafka=info From d20ee902f9b5cfd1209a021daf1d01534f9a5ac8 Mon Sep 17 00:00:00 2001 From: brightchen Date: Mon, 19 Sep 2016 11:17:24 -0700 Subject: [PATCH 2/2] APEXMALHAR-2076 #resolve #comment add AbstractTupleUniqueExactlyOnceKafkaOutputOperator --- ...eUniqueExactlyOnceKafkaOutputOperator.java | 21 +++++++++++---- .../contrib/kafka/KafkaMetadataUtil.java | 2 +- .../datatorrent/contrib/kafka/KafkaUtil.java | 27 ++++++++++++++----- 3 files changed, 38 insertions(+), 12 deletions(-) diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractTupleUniqueExactlyOnceKafkaOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractTupleUniqueExactlyOnceKafkaOutputOperator.java index 31d371b682..08f5b4b62c 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractTupleUniqueExactlyOnceKafkaOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractTupleUniqueExactlyOnceKafkaOutputOperator.java @@ -75,7 +75,7 @@ public abstract class AbstractTupleUniqueExactlyOnceKafkaOutputOperator extends AbstractKafkaOutputOperator { public static final String DEFAULT_CONTROL_TOPIC = "ControlTopic"; - protected transient int partitionNum = 1; + protected static final transient int partitionNum = 1; /** * allow client set the partitioner as partitioner may need some attributes @@ -87,14 +87,14 @@ public abstract class AbstractTupleUniqueExactlyOnceKafkaOutputOperator protected String controlTopic = DEFAULT_CONTROL_TOPIC; //The control info includes the time, use this time to track the head of control info we care. - protected int controlInfoTrackBackTime = 120000; + private int controlInfoTrackBackTime = 120000; /** * max number of offset need to check */ protected int maxNumOffsetsOfControl = 1000; - protected String controlProducerProperties; + private String controlProducerProperties; protected Set brokerSet; protected transient long currentWindowId; @@ -126,10 +126,10 @@ public abstract class AbstractTupleUniqueExactlyOnceKafkaOutputOperator protected transient Map>> partitionToLastMsgs = Maps.newHashMap(); /** - * The messages are assume to written to the kafka partition decided by + * The messages which written to the Kafka partition decided by * tupleToKeyValue(T tuple) and partitioner. But it also depended on the * system. for example, it could be only one partition when create topic. - * Don't distinguish kafka partitions if partition is not reliable. + * Don't distinguish Kafka partitions if partition is not reliable. */ protected transient Set> totalLastMsgs = Sets.newHashSet(); @@ -609,4 +609,15 @@ public static RecoveryControlInfo fromString(String str) } } + public int getControlInfoTrackBackTime() + { + return controlInfoTrackBackTime; + } + + public void setControlInfoTrackBackTime(int controlInfoTrackBackTime) + { + this.controlInfoTrackBackTime = controlInfoTrackBackTime; + } + + } diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java index 6dfdf14ed0..fbe3daa3df 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java @@ -111,7 +111,7 @@ public static Set getBrokers(Set zkHost){ ZkClient zkclient = new ZkClient(StringUtils.join(zkHost, ',') ,30000, 30000, ZKStringSerializer$.MODULE$); Set brokerHosts = new HashSet(); for (Broker b : JavaConversions.asJavaIterable(ZkUtils.getAllBrokersInCluster(zkclient))) { - brokerHosts.add(b.connectionString()); + brokerHosts.add(b.getConnectionString()); } zkclient.close(); return brokerHosts; diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaUtil.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaUtil.java index d49e462faf..95388e5560 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaUtil.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaUtil.java @@ -80,7 +80,7 @@ public static void readMessagesAfterOffsetTo(String clientNamePrefix, Set createSimpleConsumers(String clientNa Map consumers = Maps.newHashMap(); for (PartitionMetadata pm : tm.partitionsMetadata()) { String clientName = KafkaMetadataUtil.getClientName(clientNamePrefix, tm.topic(), pm.partitionId()); - consumers.put(pm.partitionId(), createSimpleConsumer(clientName, tm.topic(), pm)); + consumers.put(pm.partitionId(), getOrCreateSimpleConsumer(clientName, tm.topic(), pm)); } return consumers; } @@ -250,7 +250,7 @@ public static void readLastMessages(String clientNamePrefix, Set brokerS SimpleConsumer consumer = null; try { String clientName = KafkaMetadataUtil.getClientName(clientNamePrefix, tm.topic(), pm.partitionId()); - consumer = createSimpleConsumer(clientName, tm.topic(), pm); + consumer = getOrCreateSimpleConsumer(clientName, tm.topic(), pm); long readOffset = KafkaMetadataUtil.getLastOffset(consumer, tm.topic(), pm.partitionId(), kafka.api.OffsetRequest.LatestTime(), clientName); @@ -295,7 +295,7 @@ public static Pair readLastMessage(String clientNamePrefix, Set< } String clientName = KafkaMetadataUtil.getClientName(clientNamePrefix, tm.topic(), pm.partitionId()); - consumer = createSimpleConsumer(clientName, topic, pm); + consumer = getOrCreateSimpleConsumer(clientName, topic, pm); long readOffset = KafkaMetadataUtil.getLastOffset(consumer, topic, partitionId, kafka.api.OffsetRequest.LatestTime(), clientName); @@ -343,9 +343,24 @@ public static Pair kafkaMessageToPair(Message m) return new Pair(keyBytes, valueBytes); } - public static SimpleConsumer createSimpleConsumer(String clientName, String topic, PartitionMetadata pm) + private static Map cachedConsumer = Maps.newHashMap(); + + public static SimpleConsumer getOrCreateSimpleConsumer(String clientName, String topic, PartitionMetadata pm) + { + String key = getKey(clientName, topic, pm); + SimpleConsumer consumer = cachedConsumer.get(key); + if (consumer == null) { + consumer = createSimpleConsumer(clientName, topic, pm, DEFAULT_TIMEOUT, DEFAULT_BUFFER_SIZE); + cachedConsumer.put(key, consumer); + } + return consumer; + } + + private static final String SEPERATOR = "|"; + + private static String getKey(String clientName, String topic, PartitionMetadata pm) { - return createSimpleConsumer(clientName, topic, pm, DEFAULT_TIMEOUT, DEFAULT_BUFFER_SIZE); + return clientName + SEPERATOR + topic + SEPERATOR + pm.leader().host() + SEPERATOR + pm.leader().port(); } public static SimpleConsumer createSimpleConsumer(String clientName, String topic, PartitionMetadata pm, int timeout,