Skip to content

Commit

Permalink
STORM-3529: Catch and Log Kafka RetriableException
Browse files Browse the repository at this point in the history
  • Loading branch information
OliverMD committed Nov 10, 2019
1 parent 34ff637 commit 2f0de3f
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 7 deletions.
Expand Up @@ -24,6 +24,7 @@
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.metric.api.IMetric;
import org.slf4j.Logger;
Expand Down Expand Up @@ -76,8 +77,17 @@ public Object getValueAndReset() {
Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
Set<TopicPartition> topicPartitions = offsetManagers.keySet();

Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
Map<TopicPartition, Long> beginningOffsets;
Map<TopicPartition, Long> endOffsets;

try {
beginningOffsets = consumer.beginningOffsets(topicPartitions);
endOffsets = consumer.endOffsets(topicPartitions);
} catch (RetriableException e) {
LOG.warn("Failed to get offsets from Kafka! Will retry on next metrics tick.", e);
return null;
}

//map to hold partition level and topic level metrics
Map<String, Long> result = new HashMap<>();

Expand Down
Expand Up @@ -21,26 +21,25 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyListOf;
import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.*;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Time;
Expand Down Expand Up @@ -428,4 +427,33 @@ public void testOffsetMetrics() throws Exception {
assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue(), 10);
assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue(), 0);
}

@Test
public void testOffsetMetricsReturnsNullWhenRetriableExceptionThrown() throws Exception {
final int messageCount = 10;
prepareSpout(messageCount);

Map<String, Long> offsetMetric = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset();
assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue(), 0);
// the offset of the last available message + 1.
assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue(), 10);
assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalRecordsInPartitions").longValue(), 10);
assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue(), 0);
assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue(), 0);
//totalSpoutLag = totalLatestTimeOffset-totalLatestCompletedOffset
assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue(), 10);

//Emit half messages and check that they are emitted. Ack the messages too
for (int i = 0; i < messageCount; i++) {
nextTuple_verifyEmitted_ack_resetCollector(i);
}

commitAndVerifyAllMessagesCommitted(messageCount);

// Ensure a timeout exception results in the return value being null
when(getKafkaConsumer().beginningOffsets(anyCollection())).thenThrow(TimeoutException.class);

offsetMetric = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset();
assertNull(offsetMetric);
}
}

0 comments on commit 2f0de3f

Please sign in to comment.