-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4706: Unify StreamsKafkaClient instances #2631
Conversation
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @sharad-develop
Please wait for additional comments from @enothereska or @dguy before updating the PR (to avoid going forth and back :))
\cc @enothereska @dguy for second review
time, | ||
streamsMetadataState, | ||
cacheSizeBytes); | ||
config, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: move builder
one line down to get aligned indention.
} catch (final IOException e) { | ||
log.warn("Could not close StreamKafkaClient.", e); | ||
} | ||
streamsKafkaClient.checkBrokerCompatibility(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaStreams
should close StreamsKafkaClient
in it's own close()
method. Also, InternalTopicManager
should not close StreamKafkaClient
anymore.
@@ -207,7 +207,7 @@ public void configure(Map<String, ?> configs) { | |||
} | |||
|
|||
internalTopicManager = new InternalTopicManager( | |||
new StreamsKafkaClient(this.streamThread.config), | |||
this.streamThread.getStreamsKafkaClient(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: remove this
(we try to avoid this
when possible)
@@ -1103,6 +1104,22 @@ public String toString(final String indent) { | |||
} | |||
|
|||
/** | |||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add JavaDocs (if we keep the setter)
} | ||
|
||
/** | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add JavaDocs (if we keep the getter)
time, | ||
streamsMetadataState, | ||
cacheSizeBytes); | ||
threads[i].setStreamsKafkaClient(streamsKafkaClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add this to the constructor instead of a setter.
\cc @enothereska @dguy WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially started with a constructor approach. But the checkstyle was making the build fail as the constructor exceeded 10 arguments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think constructor is a better approach , but not sure if there is a way around checkstyle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you can remove parameter applicationId
because it is contained in the config
anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Should I go ahead with the constructor approach or are we still deliberating on alternative approaches which are getting discussed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we are still discussing...
@@ -203,6 +203,7 @@ private synchronized void setStateWhenNotInPendingShutdown(final State newState) | |||
final StateDirectory stateDirectory; | |||
private String originalReset; | |||
private StreamPartitionAssignor partitionAssignor = null; | |||
private StreamsKafkaClient streamsKafkaClient; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be final
is we set it via constructor instead of setter.
@sharad-develop Can you please update the PR title to |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There is a bunch of streams test failures |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking we can extract an interface from StreamsKafkaClient
- i'm not 100% sure what will be called yet (maybe AdminClient
?). Then we add a method to KafkaClientSupplier
, i.e., AdminClient getAdminClient(final StreamsConfig config)
This would mean that creation of all Kafka related clients is behind the same interface. Which means that it will eventually help us with being able to decouple our integration tests from real brokers.
It also means that we don't need to add any setters or constructor params to the StreamThread
as it already accepts a KafkaClientSupplier
as an argument. If we do only really want one instance of the StreamsKafkaClient
then this can be done easily enough in the DefaultKafkaClientSupplier
.
@mjsax @enothereska @guozhangwang - thoughts?
I like the |
@dguy I like your idea, and I think the name of |
BTW I think we can also piggy-back this with KIP-117 which will introduce an admin client in common package that is can be generally used. Today we have different impls of such client in streams and connect, and that KIP is aimed to condense all these clients. |
@guozhangwang Do you suggest to delay this PR until KIP-117 is done? This might take some time, as the KIP is not even voted yet. |
That's a good point: let's not wait for KIP-117 then, and we can do the merge in a separate JIRA after that is done. |
Sounds good to me. I did create a JIRA for this: https://issues.apache.org/jira/browse/KAFKA-4857 @sharad-develop Can you follow up with this PR and address the comments. Thanks. |
Closing this PR for now. |
KAFKA:4706 - Unify StreamsKafkaClient instances