From 3ac302a474b1a3ce3b2a2130827c17f1c70f092d Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Mon, 23 Jan 2017 17:34:37 +0800 Subject: [PATCH] Change emit order && change manual partition order --- .../org/apache/storm/kafka/spout/KafkaSpout.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 207ba23fa29..c42ec462ebc 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -336,19 +336,18 @@ private boolean emitTupleIfNotEmitted(ConsumerRecord record) { collector.emit(tuple); } } else { - if (tuple instanceof KafkaTuple) { - collector.emit(((KafkaTuple) tuple).getStream(), tuple, msgId); - } else { - collector.emit(tuple, msgId); - } - emitted.add(msgId); - if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from schedule. retryService.remove(msgId); } else { //New tuple, hence increment the uncommitted offset counter numUncommittedOffsets++; } + + if (tuple instanceof KafkaTuple) { + collector.emit(((KafkaTuple) tuple).getStream(), tuple, msgId); + } else { + collector.emit(tuple, msgId); + } } LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId); return true;