From a7ac98f4b5595d7d0689a7b32c2d152bebd0c67a Mon Sep 17 00:00:00 2001 From: Andy Seidel Date: Wed, 13 Jun 2018 11:29:07 -0700 Subject: [PATCH] STORM-3102 Remove check for partition offset before every emit. IN kafka >0.10.2 this check became expensive, causing large performance decreases. --- .../java/org/apache/storm/kafka/spout/KafkaSpout.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index e8ecb3e2e73..76c546e9c13 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -461,17 +461,6 @@ private boolean emitOrRetryTuple(ConsumerRecord record) { } else if (emitted.contains(msgId)) { // has been emitted and it is pending ack or fail LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record); } else { - final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); - if (isAtLeastOnceProcessing() - && committedOffset != null - && committedOffset.offset() > record.offset() - && commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, Collections.unmodifiableMap(offsetManagers))) { - // Ensures that after a topology with this id is started, the consumer fetch - // position never falls behind the committed offset (STORM-2844) - throw new IllegalStateException("Attempting to emit a message that has already been committed." - + " This should never occur when using the at-least-once processing guarantee."); - } - final List tuple = kafkaSpoutConfig.getTranslator().apply(record); if (isEmitTuple(tuple)) { final boolean isScheduled = retryService.isScheduled(msgId);