Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for KAFKA-7974: Avoid zombie AdminClient when node host isn't resolvable #6305

Merged
merged 3 commits into from Mar 7, 2019

Conversation

@nickbp
Copy link
Contributor

nickbp commented Feb 22, 2019

When attempting to get topic list via KafkaAdminClient against a server that isn't resolvable, the worker thread can get killed as follows, leading to a zombie KafkaAdminClient:

ERROR [kafka-admin-client-thread | adminclient-1] 2019-02-18 01:00:45,597 KafkaThread.java:51 - Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1':
java.lang.IllegalStateException: No entry found for connection 0
    at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:330)
    at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:134)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:921)
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:898)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1113)
    at java.lang.Thread.run(Thread.java:748)

It looks like cause is a bug in state handling between NetworkClient and ClusterConnectionStates:

  • NetworkClient.ready() invokes this.initiateConnect() as seen in the above stacktrace
  • NetworkClient.initiateConnect() invokes ClusterConnectionStates.connecting(), which internally invokes ClientUtils.resolve() to resolve the host when creating an entry for the connection.
  • If this host lookup fails, a UnknownHostException can be thrown back to NetworkClient.initiateConnect() and the connection entry is not created in ClusterConnectionStates. This exception doesn't currently get logged so this is a guess on my part.
  • NetworkClient.initiateConnect() catches the exception and attempts to call ClusterConnectionStates.disconnected(), which throws an IllegalStateException because no entry had yet been created due to the lookup failure.
  • This IllegalStateException ends up killing the worker thread and KafkaAdminClient gets stuck, never returning from listTopics().

This PR includes a unit test which reproduces the original issue (matching stacktrace) and verifies the fix.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)
@nickbp nickbp changed the title Fix for KAFKA-7974: Avoid calling disconnect() when not yet connecting Fix for KAFKA-7974: Avoid zombie AdminClient when node host isn't resolvable Feb 22, 2019
@ijuma

This comment has been minimized.

Copy link
Contributor

ijuma commented Feb 22, 2019

@cmccabe

This comment has been minimized.

Copy link
Contributor

cmccabe commented Feb 22, 2019

Good find, @nickbp . The fix seems a bit incomplete in the sense that there are more exceptions that we could get, besides UnknownHostException. It seems like perhaps this line:

        } else {
            nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now,
                this.reconnectBackoffInitMs, host, clientDnsLookup));
        }

Could catch the possible exceptions and then create a Disconnected connection, and rethrow, so that we didn't get more issues later on if something else throws here.

Or perhaps there is another way to make this more robust-- that was just the first that came to mind.

Moves away from automatically resolving the host when the connection entry is constructed, which can leave ClusterConnectionStates in a confused state.
Instead, resolution is done on demand, ensuring that the entry in the connection list is present even if the resolution failed.
@nickbp

This comment has been minimized.

Copy link
Contributor Author

nickbp commented Feb 24, 2019

Good point. I was able to change the strategy of the fix to instead clean up when the host resolution actually occurs within ClusterConnectionStates.

Previously the resolve was happening automatically with the initial construction of the CONNECTING entry, leading to a missing entry and IllegalStateException when the connection was then marked disconnected. This fix switches to performing no resolution when the entry is first created, instead only performing it on-demand when currentAddress() is called explicitly. This allows NetworkClient to cleanly mark the entry as disconnected when resolution fails.

* @param id the id of the connection
* @param now the current time
* @throws UnknownHostException
* @param now the current time in ms
*/

This comment has been minimized.

Copy link
@cmccabe

cmccabe Feb 26, 2019

Contributor

Can we add JavaDoc for host and clientDnsLookup, while we're changing this?

This comment has been minimized.

Copy link
@nickbp

nickbp Feb 27, 2019

Author Contributor

Done

@cmccabe

This comment has been minimized.

Copy link
Contributor

cmccabe commented Feb 26, 2019

LGTM

@cmccabe

This comment has been minimized.

Copy link
Contributor

cmccabe commented Feb 26, 2019

retest this please

Copy link
Contributor Author

nickbp left a comment

FWIW I locally ran the failing CI tests against the branch and they were successful:

$ gradle :core:test --tests=*ConsumerBounce*
> Configure project :
Building project 'core' with Scala version 2.12.8
Building project 'streams-scala' with Scala version 2.12.8
[...]
> Task :core:test
kafka.api.ConsumerBounceTest > testCloseDuringRebalance PASSED
kafka.api.ConsumerBounceTest > testClose PASSED
kafka.api.ConsumerBounceTest > testSeekAndCommitWithBrokerFailures PASSED
kafka.api.ConsumerBounceTest > testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize PASSED
kafka.api.ConsumerBounceTest > testSubscribeWhenTopicUnavailable PASSED
kafka.api.ConsumerBounceTest > testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup PASSED
kafka.api.ConsumerBounceTest > testConsumptionWithBrokerFailures SKIPPED

BUILD SUCCESSFUL in 4m 41s
14 actionable tasks: 9 executed, 5 up-to-date
$ gradle :core:test --tests=*CustomQuotaCallbackTest*
> Configure project :
Building project 'core' with Scala version 2.12.8
Building project 'streams-scala' with Scala version 2.12.8
[...]
> Task :core:test
kafka.api.CustomQuotaCallbackTest > testCustomQuotaCallback PASSED

BUILD SUCCESSFUL in 1m 24s
14 actionable tasks: 8 executed, 6 up-to-date
* @param id the id of the connection
* @param now the current time
* @throws UnknownHostException
* @param now the current time in ms
*/

This comment has been minimized.

Copy link
@nickbp

nickbp Feb 27, 2019

Author Contributor

Done

@nickbp

This comment has been minimized.

Copy link
Contributor Author

nickbp commented Mar 4, 2019

Looks like the build is green now, btw. Anything else needed before a merge?

@ijuma

This comment has been minimized.

Copy link
Contributor

ijuma commented Mar 4, 2019

@cmccabe looks like this is ready to be merged since you approved it and the build is green.

@cmccabe cmccabe merged commit 7f6bf95 into apache:trunk Mar 7, 2019
2 checks passed
2 checks passed
JDK 11 and Scala 2.12 SUCCESS 10592 tests run, 69 skipped, 0 failed.
Details
JDK 8 and Scala 2.11 SUCCESS 10592 tests run, 69 skipped, 0 failed.
Details
@cmccabe

This comment has been minimized.

Copy link
Contributor

cmccabe commented Mar 7, 2019

Thanks, @nickbp !

ijuma added a commit that referenced this pull request Apr 3, 2019
…olvable (#6305)

* Fix for KAFKA-7974: Avoid calling disconnect() when not connecting

* Resolve host only when currentAddress() is called

Moves away from automatically resolving the host when the connection entry is constructed, which can leave ClusterConnectionStates in a confused state.
Instead, resolution is done on demand, ensuring that the entry in the connection list is present even if the resolution failed.

* Add Javadoc to ClusterConnectionStates.connecting()
@ijuma

This comment has been minimized.

Copy link
Contributor

ijuma commented Apr 3, 2019

@cmccabe I cherry-picked this to the 2.1 and 2.2 branches as it seems like an important fix. Also cc @rajinisivaram who is familiar with this code.

@manderson23

This comment has been minimized.

Copy link

manderson23 commented Apr 3, 2019

@ijuma is this likely to go in to the 2.2 branch soon?

@nickbp nickbp deleted the nickbp:KAFKA-7974-dns-failure-kills-adminclient-thread branch Apr 14, 2019
ijuma added a commit that referenced this pull request Apr 24, 2019
…olvable (#6305)

* Fix for KAFKA-7974: Avoid calling disconnect() when not connecting

* Resolve host only when currentAddress() is called

Moves away from automatically resolving the host when the connection entry is constructed, which can leave ClusterConnectionStates in a confused state.
Instead, resolution is done on demand, ensuring that the entry in the connection list is present even if the resolution failed.

* Add Javadoc to ClusterConnectionStates.connecting()
@ijuma

This comment has been minimized.

Copy link
Contributor

ijuma commented Apr 24, 2019

@manderson23 it's done now. I was running the tests locally before doing the push and had ran out of time.

@kpmeen

This comment has been minimized.

Copy link

kpmeen commented May 19, 2019

I stumbled onto this issue a couple of days ago when trying to run an application in K8s. Any estimates for when a build will be ready with this fix?

@nickbp

This comment has been minimized.

Copy link
Contributor Author

nickbp commented May 20, 2019

I likewise originally encountered this in a K8s environment. The core issue leading to this bug arising was the result of DNS requests round-robining between CoreDNS instances which were individually eventually consistent but as a group not consistent at all, with entries popping in and out of existence as pods were brought up and some but not all CoreDNS instances knew about them at any given moment.

To avoid the inconsistency issues, I was able to use the following workarounds in my K8s 1.13.x environment. With these workarounds a patch to the Kafka clients was not required to avoid the issue. YMMV:

  • kubectl edit service kube-dns --namespace=kube-system: Edit the sessionAffinity value to be ClientIP rather than None. This avoids rotating DNS lookups between CoreDNS instances, which can result in host records appearing to pop in and out of existence. Confirm the change with kubectl get service kube-dns --namespace=kube-system -o yaml
  • kubectl edit configmap coredns --namespace=kube-system: Edit the Corefile entry to have cache 30. By default this is 300. This ensures that DNS caches are updated every 30s rather than 5 minutes, which should result in pods deploying more quickly after having set sessionAffinity=ClientIP in the previous step. Confirm the change with kubectl get configmap coredns --namespace=kube-system -o yaml
@kpmeen

This comment has been minimized.

Copy link

kpmeen commented May 21, 2019

@nickbp interesting...

I'll give that a go too. My workaround was to implement a simple, blocking, utility function that will attempt to resolve the host(s) for a finite duration n. If the host(s) cannot be resolved within the configured time, the application is terminated. It works for me at least.

@nickbp

This comment has been minimized.

Copy link
Contributor Author

nickbp commented May 21, 2019

Yeah the issue I was seeing was that after a utility check found the entry to be resolvable, it could still then become "unresolvable" due to dns lookup round-robining to a dns instance that didn't know about the host yet

Pengxiaolong added a commit to Pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…olvable (apache#6305)

* Fix for KAFKA-7974: Avoid calling disconnect() when not connecting

* Resolve host only when currentAddress() is called

Moves away from automatically resolving the host when the connection entry is constructed, which can leave ClusterConnectionStates in a confused state.
Instead, resolution is done on demand, ensuring that the entry in the connection list is present even if the resolution failed.

* Add Javadoc to ClusterConnectionStates.connecting()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants
You can’t perform that action at this time.