New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6318: StreamsResetter should return non-zero return code on error #4305
Conversation
@@ -316,6 +317,7 @@ private void maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consu | |||
for (final String topic : notFoundInputTopics) { | |||
System.out.println("Topic: " + topic); | |||
} | |||
topicNotFound = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You reuse 1 for EXIT_CODE_ERROR.
Assigning EXIT_CODE_ERROR directly would be clearer.
Thanks for the PR! We should add a test for this to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR!
We should add a test for this to ResetIntegrationTest
.
@@ -248,7 +249,7 @@ private void maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consu | |||
|
|||
if (inputTopics.size() == 0 && intermediateTopics.size() == 0) { | |||
System.out.println("No input or intermediate topics specified. Skipping seek."); | |||
return; | |||
return 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use EXIT_CODE_SUCCESS
final List<String> inputTopics = options.valuesOf(inputTopicsOption); | ||
final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption); | ||
int topicNotFound = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
init with EXIT_CODE_SUCCESS
System.err.println("ERROR: Resetting offsets failed."); | ||
throw e; | ||
} | ||
System.out.println("Done."); | ||
return topicNotFound; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove empty line.
@@ -326,10 +328,13 @@ private void maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consu | |||
} | |||
|
|||
} catch (final Exception e) { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove empty line
@mjsax I don't seem to see the test component in my fork. Looks like i don't have the latest code. I will send out a seperate PR for the test. |
Please add the test here and include in this PR (otherwise, we might drop adding a test on the floor): https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. The added test is a good test, but does not test what it is supposed to test...
I would keep and rename the test to shouldNotAllowToResetWhileStreamsIsRunning
and add a two new tests for non-existing topics (one test for input topic, one test for intermediate topic). I think, for this two new test, we don't even need to create/start/stop a KafakStreams
application at all.
Last comment. I don't think we need to add this new test to AbstractResetIntegrationTest
but should put the code directly int ResetIntegerationTest
final List<String> parameterList = new ArrayList<>( | ||
Arrays.asList("--application-id", APP_ID + testNo, | ||
"--bootstrap-servers", bootstrapServers, | ||
"--input-topics", INPUT_TOPIC)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to set a non-existing topic name here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure will change it.
KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); | ||
streams.start(); | ||
|
||
final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this fails, because KS is still running. This is a valid test, but does test if StreamsResetter
fails for non-existing topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me change the input-topics in the parameterlist. Then i believe this should fail for sure.
Will add more tests without kafka streams. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Call for second review @bbejeck @dguy @guozhangwang
@@ -81,4 +92,44 @@ public void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() | |||
public void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws Exception { | |||
super.testReprocessingByDurationAfterResetWithoutIntermediateUserTopic(); | |||
} | |||
|
|||
@Test | |||
public void testInputTopicAbsentWithoutIntermediateUserTopic() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe a nit: I understand what you are doing here, but IMHO there should be two separate tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test just needs a rename: it calls shouldNotAllowToResetWhileStreamsIsRunning()
and should have the same name. There are two separate test below: one for invalid input topic and one for invalid intermediate topic. (or did you mean something else, @bbejeck )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to apologize. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ssanthalingam thanks for updating, no apologies necessary!
one minor comment, otherwise LGTM. |
LGTM. Merged to trunk. |
If users specify a non-existing topic as input parameter, StreamsResetter only prints out an error message that the topic was not found, but return code is still zero. We should return a non-zero return code for this case.