New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16528: Client HB timing fix #15698
Conversation
Hey @cadonna , could you take a look at this small fix when you have a chance? Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, @lianetm !
Here my feedback.
// Reset timer when sending the request, to make sure that, if waiting for the interval, | ||
// we don't include the response time (which may introduce delay) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this comment?
Additionally, I could not find a verification of this call in unit tests. Since you added a comment it seems to be important enough for a verification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, I don't think it's bringing anything not clear with the func name and action themselves. Removed.
This is covered in the new test I added here. I just included now an assert message along the lines of this comment to make it clearer in the test.
...s/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
Outdated
Show resolved
Hide resolved
// Expire timer to allow sending HB after a failure. After a failure, a next HB may be | ||
// needed with backoff (ex. errors that lead to retries, like coordinator load error), | ||
// or immediately (ex. errors that lead to rejoining, like fencing errors). | ||
heartbeatTimer.update(heartbeatTimer.currentTimeMs() + heartbeatTimer.remainingMs()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about adding a method to Timer
that expires the timer without updating it with a time point in the future. Alternatively, I think you could reset the Timer
to 0 with heartbeatTimer.reset(0)
.
Do we need a verification in the unit tests for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, good point. I changed it to reset to 0, that shows the intention of not having an interval to wait for, which is what we want on these failure scenarios.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your change to heartbeatTimer.reset(0)
is not totally equivalent to heartbeatTimer.update(heartbeatTimer.currentTimeMs() + heartbeatTimer.remainingMs());
because in the former the heartbeatTimer
is not expired until heartbeatTimer.update()
is called, whereas in the latter the heartbeat is expired after the call, but I think in this specific case it does not matter. Is my assumption correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, my previous comment is not correct. The heartbeatTimer
is expired when you call heartbeatTimer.reset(0)
.
...s/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
Outdated
Show resolved
Hide resolved
Hey @cadonna, thanks a lot for your feedback! All comments addressed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this fix, @lianetm!
One question: I noticed that the heartbeat timer is reset on errors. In testHeartbeatNotSentIfAnotherOneInFlight
, it mimics a delayed, but successful response. Does it make sense to add another unit test where instead the delayed response has an error? My understanding from this patch is that we could don't have to sleep at all and the next call to poll()
should return the next request.
Apologies if/that I'm confused 😄
Thanks!
logger.error("GroupHeartbeatRequest failed due to the instance id {} was not released: {}", | ||
logger.error("GroupHeartbeatRequest failed due to unreleased instance id {}: {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ: are these logging changes of the ‘I'll just clean this up as long as I'm in here?’ variety, or dow it have some bearing on the correctness of the logs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
both. The change for the default log is needed: it was not including the errorMessage, and that makes it hard to know what happened when you get errors like INVALID_REQUEST (I personally got it and lost time investigating, so fixed it). The other log changes are just improvements because I was already there.
// Reset timer to allow sending HB after a failure without waiting for the interval. | ||
// After a failure, a next HB may be needed with backoff (ex. errors that lead to | ||
// retries, like coordinator load error), or immediately (ex. errors that lead to | ||
// rejoining, like fencing errors). | ||
heartbeatTimer.reset(0); | ||
super.onFailedAttempt(currentTimeMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the decision to reset the heartbeat timer depend on what type of error is received?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, it doesn't. The timer is only to indicate that an interval should be respected. In cases of failures, we don't want to follow the interval (so we reset timer to 0). Each error will :
- send a next HB based on other conditions (ex. as soon as the coordinator is discovered, when releasing assignment finishes after getting fenced)
- not send a next HB at all (fatal errors)
|
||
result = heartbeatRequestManager.poll(time.milliseconds()); | ||
assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a " + | ||
"previous on in-flight"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super nit-picky, sorry 😞
"previous on in-flight"); | |
"previous one in-flight"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed (not nit-picky at all, ugly typo, good catch!)
Thanks for the comments @kirktrue, all addressed. Regarding your comment regarding tests here, we have that covered with the existing |
Hey @cadonna, could you take a look when you have chance? Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this patch, @lianetm!
I appreciate the educational responses to my naive questions 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates, @lianetm !
I wondering why no unit test is failing after your changes. That tells me that unit tests are missing that verify that the interval is reset correctly.
I am also a bit concerned that I could not find unit tests that verify that HeartbeatRequestState#canSendRequest()
and HeartbeatRequestState#nextHeartbeatMs()
work consistently. With this I mean that when the former returns true the latter returns 0. Or that when the former returns false the latter returns a time > 0.
Hi @cadonna, thanks for the comments!
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates, @lianetm !
Here my feedback.
@Override | ||
public boolean canSendRequest(final long currentTimeMs) { | ||
update(currentTimeMs); | ||
return heartbeatTimer.isExpired() && super.canSendRequest(currentTimeMs); | ||
} | ||
|
||
public long nextHeartbeatMs(final long currentTimeMs) { | ||
public long timeToNextHeartbeatMs(final long currentTimeMs) { | ||
if (heartbeatTimer.remainingMs() == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry if I comment on code outside the PR. Isn't this the same as heartbeatTimer.isExpired()
? If yes, could we please change this to make the the code more readable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They achieve the same here, and totally agree that isExpired
is more readable, fixed. (Sensible "since we're here..." to me too btw)
assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); | ||
|
||
result = heartbeatRequestManager.poll(time.milliseconds()); | ||
assertEquals(1, result.unsentRequests.size(), "A heartbeat should be sent when interval expires"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't you need a verification here that ensures that the heartbeat timer was reset after the poll?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure! I missed it. Added.
assertEquals(1, result.unsentRequests.size(), "A heartbeat should be sent when interval expires"); | ||
NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); | ||
inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE)); | ||
assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you know whether the heartbeat timer was reset in makeHeartbeatRequest()
or in onResponse()
if you verify the reset after you complete the future of the request?
I would verify the timer reset after the poll, progress the time a bit (less then the heartbeat interval), and then verify here that the time to the next heartbeat is the heartbeat interval minus the amount of time I progressed the time after the poll.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This relates to your comment above. You're right that we did not have the check to ensure the reset happened on the sent and not on the response, so I added above the check for the timeToNextHeartbeatMs
that would fail if the timer is not reset on the send, with a specific message for it. That check covers it, but still I also added the steps for advance the timer just a bit, check that no HB is sent and that the time is updated with the difference, as you suggested. All done.
Thanks for the helpful comments @cadonna , all addressed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates and considering my feedback @lianetm !
LGTM!
Fix for resetting HB timer when the request is sent, rather than when a response is received. This ensures a more accurate timing of the HB, so that a member always sends HB on the interval (not in the interval + any delay in receiving the response).
This change, along with the logic already in place for checking in-flights, ensures that if the interval expires but there is a HB in-flight, the next HB is only send after the response for the in-flight is received, without waiting for another full interval. This is btw consistent with the timer reset & inflight behaviour for the auto-commit interval.