New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5434: Console consumer hangs if not existing partition is specified #3335
Conversation
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old consumer is deprecated now, so we'd generally not be making changes to it unless it was a critical bug. You probably want to be focusing on the new consumer at this point.
Also, does it actually hang indefinitely even if you later create the topic partition?
@ewencp actually it happens with the new consumer as well (using the NewShinyConsumer class in the tool). |
@ppatierno Actually, it looks like people have been a bit more active in this part of the code than I thought, so maybe this would be fine. I'm probably the wrong person to review this as I haven't done much with the old consumer code. I'd try to isolate truly separate issues into separate JIRAs. In terms of this patch, I think one question is whether we want this behavior generally or if you need to make it somehow conditional based on the fact that it's the console consumer running. Also, for your scenarios, there could also be cases where auto topic creation is off, but one process is responsible for creating the topic and it races with the consumer process. In that case you actually want to just wait until you can see the topic partition, so always throwing an exception might not be the right solution. In any case, from the client side, you can't really tell whether auto topic creation is enabled so I don't think there's a way to handle that case differently. |
|
||
// check if the requested partition exists for the topic | ||
val availablePartitions = consumer.partitionsFor(topic).asScala | ||
val partitionInfo = availablePartitions.find(_.partition() == partitionId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use exists
instead of find
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, done.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
I think that this PR is not so useful anymore. The old consumer is deprecated so no reasons for adding more. |
Added checking partition exists before assign request