-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15951: MissingSourceTopicException should include topic names #15573
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @chickenchickenlove !
Here my comments!
this.missingTopics = new HashSet<>(); | ||
} | ||
|
||
public MissingSourceTopicException(final String message, final Set<String> missingTopics) { | ||
super(message); | ||
this.missingTopics = missingTopics; | ||
} | ||
|
||
public Set<String> getMissingTopics() { | ||
return this.missingTopics; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is part of the public API. That means, we cannot change it without a Kafka Improvement Proposal (KIP)[1].
I am not sure if adding the missing source topics to the exception makes too much sense, because the exception is caught in the StreamsPartitionAssignor
[2] and transformed to a group assignment error (INCOMPLETE_SOURCE_TOPIC_METADATA
). The missing source topics are not propagated to the point where the actual MissingSourceTopicException
is thrown to the users, which is in the StreamsRebalanceListener [3]. To achieve this, a protocol change would be needed which I think it is not worth.
What we could do instead is log an error message with the missing source topics in RepartitionsTopics
or StreamsPartitionAssignor
. I slightly prefer the latter.
[1] https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
[2]
Line 449 in d8dd068
} catch (final MissingSourceTopicException e) { |
[3]
Line 58 in 4fe4cdc
throw new MissingSourceTopicException("One or more source topics were missing during rebalance"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @cadonna. Thanks for your comment and your time. 🙇♂️
It is very helpful for me, and i feel quite sorry to make you spend a lot of time for me.
After reading the KIP document, now i can tell public interface should be introduced carefully!
I have a comment to make new commit to apply your comment.
When you have some free time, could you take a look? it will be very helpful for me. 🙇♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't worry! We are also here to guide new contributors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cadonna thanks for your comment, really.
your comment encourages me really 🙇♂️
missingSourceTopics, subtopologyId, topologyName), | ||
missingSourceTopics), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to return a set of missing source topics from RepartitionTopics
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cadonna, I have a question. ✋
May i use the package-private access modifier
for this?
I think that package-private access modifier
seems not be included to public interface
, right? (https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals)
If so, i think that solution will be quite simple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, i write skeleton code below!
Does it make sense to you as well?
public class RepartitionTopics {
...
// Add new field (private)
private final Set<String> missingTopics = new HashSet();
...
public Set<String> topologiesWithMissingInputTopics() { ... }
public Queue<StreamsException> missingSourceTopicExceptions() { ... }
// Add new method (package-private)
Set<String> getMissingTopics() {
return this.missingTopics;
}
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The public interface is defined as everything that shows up in the javadocs. Classes in a package whose name contains internals
do not show up in the javadocs. Class RepartitionTopics
is in package org.apache.kafka.streams.processor.internals
. Thus, RepartitionTopics
is not part of the public interface.
Regarding field missingTopics
, there is already missingInputTopicsBySubtopology
that includes all missing topics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really appreciate your kind explanation. 👍
I've understood it clearly, and thanks to you, I've established the correct direction for revision.
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final Cluster metadata) { | |||
final boolean isMissingInputTopics = !repartitionTopics.missingSourceTopicExceptions().isEmpty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you could then use the method on RepartitionTopcs
I proposed in my other comment.
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final Cluster metadata) { | |||
final boolean isMissingInputTopics = !repartitionTopics.missingSourceTopicExceptions().isEmpty(); | |||
if (isMissingInputTopics) { | |||
if (!taskManager.topologyMetadata().hasNamedTopologies()) { | |||
throw new MissingSourceTopicException("Missing source topics."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition to throwing the exception you would also log the error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was Bruno meant was, that we need to add log.error(...)
to log the error message before throwing the exception. Seems you did not add this yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding to a previous comment from Bruno, I am wondering if we should also change the error log in StreamsRebalanceListener
to point out that the missing source topic names might be logged on a different instance?
The StreamsRebalanceListener
is executed on every instance, but StreamsPartitionAssignor
only on the group leader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mjsax thanks for your comments!
Was Bruno meant was, that we need to add log.error(...) to log the error message before throwing the exception. Seems you did not add this yet?
No, i created new commits. Please refer to images below 🙇♂️
In this Image, Bruno saidIn addition to throwing the exception you would also log the error.
and then, I have made these changes to reflect that comment to maintain readability.
you can see this commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding to a previous comment from Bruno, I am wondering if we should also change the error log in StreamsRebalanceListener to point out that the missing source topic names might be logged on a different instance?
The StreamsRebalanceListener is executed on every instance, but StreamsPartitionAssignor only on the group leader.
@mjsax , IMHO, currently with this PR alone, it is not possible to refer to Missing Topics
on StreamRebalanceListener
.
To get Missing topics
on StreamRebalanceListener
as well, i wrote suggestion on this PR. Please refer to Idea for improving more
on this PR description. I think it can be done without modifying public API.
Also, i will create some images for detail. Wait a sec, please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- As you know,
ThreadLocal
provides storage specific to each thread. - Both
StreamThread
,StreamRebalanceListener
,StreamPartitionAssignor
are included on internal package. that means, it is not public API. StreamRebalanceListener
has reference ofStreamThread
Already.
kafka/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Line 622 in 4ccbf16
this.rebalanceListener = new StreamsRebalanceListener(time, taskManager, this, this.log, this.assignmentErrorCode); |
From this, I believe one
StreamThread
has own StreamRebalanceListener
instance. thsu, ThreadLocal
is suitable workaround in this case, i believe.
Thus, all the things that we should do, are 3step.
- Add
ThreadLocal
to field ofStreamThread
. - Add method that put
missing source topics
toThreadLocal
before throwMissingSourceTopicExceptions
, - Add some codes on
StreamRebalanceListener
to getMissingSourceTopics
fromThreadLocal
.
Does it make sense to you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems there is some misunderstanding. Sorry for causing confusion.
- This current code is not producing a log message:
final String errorMsg = String.format("Missing source topics. %s", repartitionTopics.missingSourceTopics());
throw new MissingSourceTopicException(errorMsg);
The code should be something like:
final String errorMsg = String.format("Missing source topics. %s", repartitionTopics.missingSourceTopics());
log.error(errorMsg);
throw new MissingSourceTopicException(errorMsg);
- I did not propose to include the topic names... As Bruno already pointed out, it would require a protocol change what seems to be overkill. In
StreamsRebalanceListener
, we currently log
Received error code 1
I would propose to actually change AssignorError
to contain a proper String. Error code 1
does not mean anything to users. Additionally, it might be good to just change the error message of the log line and the exception to say something like: "To check which topics are missing, please look into the logs of the consumer group leader. Only the leaders knows and logs the name of the missing topics."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mjsax Thanks for your kindful descriptions. it was very helpful for me 🙇♂️
1.This current code is not producing a log message:
I misunderstood, thanks for your time!!
I added log.error()
through this commit.
I would propose to actually change AssignorError to contain a proper String.
I agree with you.
Here's how I've implemented it based on your suggestion. (commit)
I don't know all the meaning of AssignorError codes
exactly, so I referred to the existing error logs message related with AssignorError codes
to write the messages.
What do you think? Do you think i'm on right direction?
When you have some free time, Please take a look 🙇♂️.
b1650ea
to
3f16d28
Compare
Gently ping, @cadonna ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Made a pass.
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final Cluster metadata) { | |||
final boolean isMissingInputTopics = !repartitionTopics.missingSourceTopicExceptions().isEmpty(); | |||
if (isMissingInputTopics) { | |||
if (!taskManager.topologyMetadata().hasNamedTopologies()) { | |||
throw new MissingSourceTopicException("Missing source topics."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was Bruno meant was, that we need to add log.error(...)
to log the error message before throwing the exception. Seems you did not add this yet?
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final Cluster metadata) { | |||
final boolean isMissingInputTopics = !repartitionTopics.missingSourceTopicExceptions().isEmpty(); | |||
if (isMissingInputTopics) { | |||
if (!taskManager.topologyMetadata().hasNamedTopologies()) { | |||
throw new MissingSourceTopicException("Missing source topics."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding to a previous comment from Bruno, I am wondering if we should also change the error log in StreamsRebalanceListener
to point out that the missing source topic names might be logged on a different instance?
The StreamsRebalanceListener
is executed on every instance, but StreamsPartitionAssignor
only on the group leader.
Aye, i hope so. 👍 |
Gently ping, @mjsax . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chickenchickenlove -- sorry for stalling to review this further.
Overall LGTM. A few suggested for the error code description.
...ams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
Outdated
Show resolved
Hide resolved
...ams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
Outdated
Show resolved
Hide resolved
...ams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
Outdated
Show resolved
Hide resolved
...ams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
Outdated
Show resolved
Hide resolved
…nals/assignment/AssignorError.java Co-authored-by: Matthias J. Sax <mjsax@apache.org>
…nals/assignment/AssignorError.java Co-authored-by: Matthias J. Sax <mjsax@apache.org>
…nals/assignment/AssignorError.java Co-authored-by: Matthias J. Sax <mjsax@apache.org>
…nals/assignment/AssignorError.java Co-authored-by: Matthias J. Sax <mjsax@apache.org>
@mjsax thanks for your suggestion. it is very suitable 👍 |
There is a checkstyle error:
|
streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
Outdated
Show resolved
Hide resolved
…nals/RepartitionTopics.java
I took the liberty to commit a fix to retrigger the build. |
Hi @mjsax ! |
Thanks for the PR @chickenchickenlove! Merged to |
Thanks for your guideline. |
…pache#15573) MissingSourceTopicException should contain the name of the missing topic. There is one corner case for which we don't have the topic name at hand, but we can log the topic name somewhere else. Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This is minor changes!
MissingSourceTopicException
has fieldmissingTopics
to store missing topics.StreamsPartitionAssignor
throws aMissingSourceTopicException
. and it depends on result ofrepartitionTopics.missingSourceTopicExceptions().isEmpty()
. thus,missingSourceTopicExceptions()
must always contain an iterableMissingSourceTopicException
, allowing for the aggregation and throwing ofMissingSourceTopics
.StreamsRebalanceListener
can throwMissingSourceTopicException
as well. however,StreamsRebalanceListener
cannot get missing topic List becauseStreamsRebalanceListener
depends on ErrorCodeAssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()
StreamsRebalanceListener
should include missing topic list when it throwsMissingSourceTopicException
as well, we can considerThreadLocal
to reach the target.ThreadLocal
toStreamThread
as member field.ThreadLocal
.StreamsRebalanceListener
can get missing topics throughThreadLocal
..Committer Checklist (excluded from commit message)