diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java index da84979b017..496e1d8127b 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java @@ -24,6 +24,7 @@ import java.util.function.Supplier; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RetriableException; import org.apache.storm.kafka.spout.internal.OffsetManager; import org.apache.storm.metric.api.IMetric; import org.slf4j.Logger; @@ -76,8 +77,17 @@ public Object getValueAndReset() { Map topicMetricsMap = new HashMap<>(); Set topicPartitions = offsetManagers.keySet(); - Map beginningOffsets = consumer.beginningOffsets(topicPartitions); - Map endOffsets = consumer.endOffsets(topicPartitions); + Map beginningOffsets; + Map endOffsets; + + try { + beginningOffsets = consumer.beginningOffsets(topicPartitions); + endOffsets = consumer.endOffsets(topicPartitions); + } catch (RetriableException e) { + LOG.warn("Failed to get offsets from Kafka! Will retry on next metrics tick.", e); + return null; + } + //map to hold partition level and topic level metrics Map result = new HashMap<>(); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java index 512d274564d..d7f563f3634 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java @@ -21,17 +21,14 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyListOf; import static org.mockito.ArgumentMatchers.anyObject; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.clearInvocations; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; import java.util.HashSet; import java.util.List; @@ -39,8 +36,10 @@ import java.util.Set; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Time; @@ -428,4 +427,16 @@ public void testOffsetMetrics() throws Exception { assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue(), 10); assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue(), 0); } + + @Test + public void testOffsetMetricsReturnsNullWhenRetriableExceptionThrown() throws Exception { + final int messageCount = 10; + prepareSpout(messageCount); + + // Ensure a timeout exception results in the return value being null + when(getKafkaConsumer().beginningOffsets(anyCollection())).thenThrow(TimeoutException.class); + + Map offsetMetric = (Map) spout.getKafkaOffsetMetric().getValueAndReset(); + assertNull(offsetMetric); + } }