From 786083b27dc9750075dbddc335bd64f3f81e7c0d Mon Sep 17 00:00:00 2001 From: Tandy <982963934@qq.com> Date: Thu, 1 Dec 2016 11:43:07 +0800 Subject: [PATCH] fix NullPointException with acked.get(rtp) acked.get(rtp) maybe null when antocommit is true. --- .../org/apache/storm/kafka/spout/KafkaSpout.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 439492b26fb..1dff51152d8 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -262,11 +262,13 @@ private void doSeekRetriableTopicPartitions() { final Set retriableTopicPartitions = retryService.retriableTopicPartitions(); for (TopicPartition rtp : retriableTopicPartitions) { - final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset(); - if (offsetAndMeta != null) { - kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle - } else { - kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1); // Seek to last committed offset + if (acked.get(rtp)!=null) { + final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset(); + if (offsetAndMeta != null) { + kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle + } else { + kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1); // Seek to last committed offset + } } } }