-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug #15186
KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug #15186
Conversation
This change restores the previous behavior for the Fetcher to keep its internal sessionHandlers cache even after close() is invoked. During the close() of a Consumer, the Fetcher attempts to send out FetchRequests to signal to the relevant brokers that the consumer is closing. This allows the broker to clean up relevant resources on its end. The Fetcher was recently changed (3.5) so that it cleared its sessionHandlers cache immediately after it finished creating the final FetchRequests. The reasoning was that since the fetcher is being closed, those session handlers were no longer needed, and thus the cache could safely be cleared. However, it is evidently possible that the fetcher's response handler can still be invoked if a response for an in-flight fetch is received shortly after that cache was cleared. (There doesn't appear to be anything in the Fetcher that blocks response handling after it is closed.) In this case, the Fetcher's response handler aborts with the error log once it cannot find the FetchSessionHandler in the sessionHandlers cache.
Test failures from most recent build are known flaky tests. |
sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> { | ||
// set the session handler to notify close. This will set the next metadata request to send close message. | ||
sessionHandler.notifyClose(); | ||
sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like you took out the sessionHandlers.clear();
in the finally block - is there any reason why we don't need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if we clear the sessions, we potentially end up with inflight response handlers that fail to find their corresponding session handler. This lookup failure results in the error log line reported in the Jira.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some potential alternatives to solve this problem, but the path I took was to simply revert to the previous behavior which did not clear the cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Kirk, for the explanation - It seems like there are cases where we want to clear the cache - one I can think of is when there's a topology change but this is probably an unnoticeable optimization - i guess the size of the handler lookup never grows so large that becomes a problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's worth exploring, because I don't see that we ever remove any of the FetchSessionHandler
entries once they're created 😬
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we remove the map and that eventually leads to their GC, or does something else hold a reference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't remove the sessionHandlers
map explicitly, no. It's garbage collected when the AbstractFetch
/Fetcher
object itself is garbage collected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@philipnee—it doesn't look like there's actually an issue with respect to stale partition data in the FetchSessionHandler
. It's reset on every call to prepareFetchRequests()
.
The FetchSessionHandler
uses an inner Builder
class to keep track of the partitions the user is requesting. For each distinct fetch cycle a new Builder
is created. Each partition that is requested is added to the newly-created Builder
instance. When all the fetchable partitions have been added, the Builder.build()
method is called which creates the FetchRequestData
.
So the data kept in the FetchSessionHandler
is ephemeral. Any partition leadership changes would happen somewhere upstream and get updated in the SubscriptionState
, well before we get to the fetch request generation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some potential alternatives to solve this problem, but the path I took was to simply revert to the previous behavior which did not clear the cache.
I think that the cache was actually cleared in Fetcher.close()
before the refactoring. I suppose that we could bring it back too even if it does not bring much in the end.
Are there any network resources held by the |
No. The
Thanks! |
@ijuma / @stanislavkozlovski are either of you able to review? Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kirktrue Thanks for the patch. I left a few comments/questions.
sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> { | ||
// set the session handler to notify close. This will set the next metadata request to send close message. | ||
sessionHandler.notifyClose(); | ||
sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some potential alternatives to solve this problem, but the path I took was to simply revert to the previous behavior which did not clear the cache.
I think that the cache was actually cleared in Fetcher.close()
before the refactoring. I suppose that we could bring it back too even if it does not bring much in the end.
fetchable.put(fetchTarget, sessionHandler.newBuilder()); | ||
}); | ||
} finally { | ||
sessionHandlers.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the reference, sessionHandlers.clear()
is indeed incorrect here. The issue is that the caller of prepareCloseFetchSessionRequests
calls client.poll()
to complete the requests created here. When they do complete, the sessions are not there anymore so the warning is logged.
@@ -376,25 +376,21 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareCloseFetchSessi | |||
final Cluster cluster = metadata.fetch(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a slightly different topic, should prepareCloseFetchSessionRequests
be synchronized too? We use to have a synchronize in the close
method but it was removed during the refactoring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made the closeInternal()
method synchronized
as that more closely emulates the synchronization in <= 3.5. It covers prepareCloseFetchSessionRequests()
too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @kirktrue!
@dajac—the PR description was updated. Let me know if there's anything left before we're ready to merge. Thanks! |
…15186) Change `AbstractFetcher`/`Fetcher` to _not_ clear the `sessionHandlers` cache during `prepareCloseFetchSessionRequests()`. During `close()`, `Fetcher` calls `maybeCloseFetchSessions()` which, in turn, calls `prepareCloseFetchSessionRequests()` and then calls `NetworkClient.poll()` to complete the requests. Since `prepareCloseFetchSessionRequests()` (erroneously) clears the `sessionHandlers` cache, when the response is processed, the sessions are missing, and the warning is logged. Reviewers: Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>
Merged it to trunk and 3.7. cc @stanislavkozlovski |
Thanks @dajac! |
…pache#15186) Change `AbstractFetcher`/`Fetcher` to _not_ clear the `sessionHandlers` cache during `prepareCloseFetchSessionRequests()`. During `close()`, `Fetcher` calls `maybeCloseFetchSessions()` which, in turn, calls `prepareCloseFetchSessionRequests()` and then calls `NetworkClient.poll()` to complete the requests. Since `prepareCloseFetchSessionRequests()` (erroneously) clears the `sessionHandlers` cache, when the response is processed, the sessions are missing, and the warning is logged. Reviewers: Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>
…pache#15186) Change `AbstractFetcher`/`Fetcher` to _not_ clear the `sessionHandlers` cache during `prepareCloseFetchSessionRequests()`. During `close()`, `Fetcher` calls `maybeCloseFetchSessions()` which, in turn, calls `prepareCloseFetchSessionRequests()` and then calls `NetworkClient.poll()` to complete the requests. Since `prepareCloseFetchSessionRequests()` (erroneously) clears the `sessionHandlers` cache, when the response is processed, the sessions are missing, and the warning is logged. Reviewers: Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>
…pache#15186) Change `AbstractFetcher`/`Fetcher` to _not_ clear the `sessionHandlers` cache during `prepareCloseFetchSessionRequests()`. During `close()`, `Fetcher` calls `maybeCloseFetchSessions()` which, in turn, calls `prepareCloseFetchSessionRequests()` and then calls `NetworkClient.poll()` to complete the requests. Since `prepareCloseFetchSessionRequests()` (erroneously) clears the `sessionHandlers` cache, when the response is processed, the sessions are missing, and the warning is logged. Reviewers: Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>
Change
AbstractFetcher
/Fetcher
to not clear thesessionHandlers
cache duringprepareCloseFetchSessionRequests()
.During
close()
,Fetcher
callsmaybeCloseFetchSessions()
which, in turn, callsprepareCloseFetchSessionRequests()
and then callsNetworkClient.poll()
to complete the requests. SinceprepareCloseFetchSessionRequests()
(erroneously) clears thesessionHandlers
cache, when the response is processed, the sessions are missing, and the warning is logged.Committer Checklist (excluded from commit message)