New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5225: StreamsResetter doesn't allow custom Consumer properties #3970
Conversation
Call for review @guozhangwang @bbejeck @dguy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mjsax, how can we test this?
@@ -178,7 +186,7 @@ private void parseArguments(final String[] args) throws IOException { | |||
} | |||
} | |||
|
|||
private void maybeResetInputAndSeekToEndIntermediateTopicOffsets() { | |||
private void maybeResetInputAndSeekToEndIntermediateTopicOffsets(final HashMap consumerConfig) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Map
Added a test with SSL. \cc @ijuma |
throw new IllegalArgumentException("Unknown security protocol: " + listenerConfig.toString()); | ||
} | ||
} | ||
return kafka.config().hostName() + ":" + kafka.boundPort(ListenerName.forSecurityProtocol(securityProtocol)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can pass the listener name instead of the security protocol and remove a bunch of code.
Updated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mjsax, one minor comment. However, is there a way we can share some of the test code between ResetIntegrationTest
and ResetIntegrationWithSSLTest
? Seems like there are a lot of similarities? If you think it will distract from the test too much, i'm ok with it. Let me know.
@Override | ||
public void uncaughtException(Thread t, Throwable e) { | ||
handlerReference.close(10, TimeUnit.SECONDS); | ||
e.printStackTrace(System.err); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.error(e)
? same below
Updated this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mjsax, LGTM
Merged to trunk |
No description provided.