New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
STORM-3529: Catch and Log Kafka RetriableException #3164
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't mind adding a test that we return null on RetriableException. There's a test in KafkaSpoutSingleTopicTest (testOffsetMetrics) that you could maybe take inspiration from.
beginningOffsets = consumer.beginningOffsets(topicPartitions); | ||
endOffsets = consumer.endOffsets(topicPartitions); | ||
} catch (RetriableException e) { | ||
LOG.error("Failed to get offsets from Kafka", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider lowering to WARN level, and write that we will retry on next metrics tick. I think this is only a concern if it is recurring, so error level seems a little strong.
2f0de3f
to
16d7042
Compare
...storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
Outdated
Show resolved
Hide resolved
Map<String, Long> offsetMetric = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset(); | ||
assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue(), 0); | ||
// the offset of the last available message + 1. | ||
assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue(), 10); | ||
assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalRecordsInPartitions").longValue(), 10); | ||
assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue(), 0); | ||
assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue(), 0); | ||
//totalSpoutLag = totalLatestTimeOffset-totalLatestCompletedOffset | ||
assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue(), 10); | ||
|
||
//Emit all messages and check that they are emitted. Ack the messages too | ||
for (int i = 0; i < messageCount; i++) { | ||
nextTuple_verifyEmitted_ack_resetCollector(i); | ||
} | ||
|
||
commitAndVerifyAllMessagesCommitted(messageCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this bit necessary?
+1 assuming the test passes. |
No description provided.