New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MINOR: Move processor response queue into Processor #4542
MINOR: Move processor response queue into Processor #4542
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji Thanks for the PR. LGTM, left one minor comment.
wakeup() | ||
} | ||
|
||
private def receiveResponse(): RequestChannel.Response = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps this method name could be changed to use dequeue
or poll
instead of receive
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose dequeueResponse
since we also now have enqueueResponse
.
@hachikuji Thanks for the update, LGTM. |
@rajinisivaram Thanks for the review. I'll cherry-pick for 1.1 as well since it's harmless and then you'll only need to fix conflicts once for your dynamic broker patch. |
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
private[kafka] val metricTags = mutable.LinkedHashMap( | ||
"listener" -> listenerName.value, | ||
"networkProcessor" -> id.toString | ||
).asJava | ||
|
||
newGauge("ResponseQueueSize", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this change the full metric name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you are right. I forgot about the type attribute. @rajinisivaram Maybe you can fix it in #4539?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji I had forgotten too. Yes, I will update in #4539
Small refactor which moves the processor response queue into the Processor object itself. This simplifies the logic for dequeuing a response for sending and also eliminates the response listeners collection which was only used to wakeup the Processor after a new response had been enqueued.
Committer Checklist (excluded from commit message)