From 0cacb109050e15ac7fd8831eb691109c0ed3bd88 Mon Sep 17 00:00:00 2001 From: Aniket Alhat Date: Mon, 9 Jul 2018 12:11:28 +0900 Subject: [PATCH] Added support to handle absent topics --- .../storm/kafka/spout/NamedTopicFilter.java | 12 ++++++++++-- .../storm/kafka/spout/NamedTopicFilterTest.java | 15 +++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java index 3d1ec1efc75..e6b79dcd2b6 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java @@ -26,12 +26,15 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Filter that returns all partitions for the specified topics. */ public class NamedTopicFilter implements TopicFilter { + private static final Logger LOG = LoggerFactory.getLogger(NamedTopicFilter.class); private final Set topics; /** @@ -54,8 +57,13 @@ public NamedTopicFilter(String... topics) { public List getFilteredTopicPartitions(KafkaConsumer consumer) { List allPartitions = new ArrayList<>(); for (String topic : topics) { - for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) { - allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + List partitionInfoList = consumer.partitionsFor(topic); + if(partitionInfoList != null) { + for (PartitionInfo partitionInfo : partitionInfoList) { + allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + } + } else { + LOG.warn("Topic {} not found, skipping addition of the topic", topic); } } return allPartitions; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java index fe3325cf2f5..c196a328c15 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java @@ -18,6 +18,7 @@ import org.apache.storm.kafka.spout.NamedTopicFilter; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; @@ -63,6 +64,20 @@ public void testFilter() { } + @Test + public void testFilterOnAbsentTopic() { + String presentTopic = "present"; + String absentTopic = "absent"; + + NamedTopicFilter filter = new NamedTopicFilter(presentTopic, absentTopic); + when(consumerMock.partitionsFor(presentTopic)).thenReturn(Collections.singletonList(createPartitionInfo(presentTopic, 2))); + when(consumerMock.partitionsFor(absentTopic)).thenReturn(null); + + List presentPartitions = filter.getFilteredTopicPartitions(consumerMock); + assertThat("Expected filter to pass only topics which are present", presentPartitions, + contains(new TopicPartition(presentTopic, 2))); + } + private PartitionInfo createPartitionInfo(String topic, int partition) { return new PartitionInfo(topic, partition, null, null, null); }