-
Notifications
You must be signed in to change notification settings - Fork 13.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14972: Support async runtimes in consumer #13914
Conversation
KafkaConsumer contains a check that rejects nested invocations from different threads (method acquire). For users that use an async runtime, this is an impossible requirement. Examples of async runtimes that are affected are Kotlin co-routines (see KAFKA-7143) and Zio. In this PR we replace the thread-id check with an access-key that is stored on a thread-local variable and also put on a stack of access keys stored in the consumer. Developers that work in an async runtime can get the access-key and then use it on the thread-local variable in a thread of their choosing. Every time `acquire` is invoked, a new access-key is generated and put on the stack. When `release` is invoked, the previous access-key is restored from the stack. By validating the access-key from the thread-local variable with the stack of expected access-keys we can guarantee that only a single thread invokes the consumer from a callback, even for nested callbacks. Incorrect usage of the access key is detected and leads to a ConcurrentModificationException. For programs that are unaware of the access-key, the semantics remain unchanged. This PR solves KAFKA-14972 and KAFKA-7143.
Please advice on how to unit test this change. |
Thanks for the PR. This introduces public methods in KafkaConsumer which is part of the public API. SO in order to accept this, we first need to vote a KIP. See the process in the Kafka Improvement Proposal page on the wiki. |
- Use object identity to compare thread access key with new class ThreadAccessKey. - Move new methods lower in the file. - Throw AssertionError when unbalanced acquire/release is detected. - Small javadoc fixes.
KIP-944 has been created: https://cwiki.apache.org/confluence/x/chw0Dw |
* A key that can be used to pass access to the Kafka consumer to another thread. | ||
* Can be obtained via {@link KafkaConsumer#getThreadAccessKey()}. | ||
*/ | ||
public class ThreadAccessKey { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
?
clients/src/main/java/org/apache/kafka/clients/consumer/ThreadAccessKey.java
Show resolved
Hide resolved
Hey @erikvanoosten - Thanks for the PR. While your suggestions make sense, could you add a simple test case that demonstrates the use case mentioned in the KIP? Thanks! |
Hello @philipnee . Yes, that is definitely the intention. Unfortunately I still don't understand how the unit tests are working, the mocks/expectations require deep knowledge of the underlying layer. Anyways, I wanted to be sure that this work is going to be accepted (when executed well of course) before I spend more time on this. |
@philipnee what will you vote on KIP-944? |
Hey from the first look, I think I think this is a reasonable suggestion. Have you gotten much feedback from the discussion thread yet? |
Not a single comment. |
@erikvanoosten - I'll try to help you out on this. |
@dajac Can you please post your opinion about KIP-944 to the mailing list? |
... so that alternative locking mechanisms can be explored. For example PR apache#13914 could be implemented without changes to the Kafka library.
Withdrawn because the committers do not seem to be convinced that you cannot control on what thread code runs with an async runtime. |
The JVM based KafkaConsumer contains a check that rejects nested invocations from different threads (in method acquire). For programs that use an async runtime, this is an almost impossible requirement. Also, the check is more strict than is required; we only need to validate that there is no concurrent access to the consumer.
Examples of affected async runtimes are Kotlin co-routines (KAFKA-7143) and Zio.
In this PR we replace the thread-id check with an access-key that allows a callback to pass on its capability to access the Kafka consumer to another thread.
To keep existing programs working without changes, the access key is stored on a thread-local variable. Developers that work in an async runtime can get the access-key via
getThreadAccessKey
and then activate it on the thread-local variable in a thread of their choosing withsetThreadAccessKey
.Kafka consumer methods that need to be protected against multi-threaded access start with invoking private method
acquire
and end with invoking private methodrelease
. This commit changes the implementation ofacquire
andrelease
.For programs that are unaware of the access-key, the semantics remain unchanged.
This PR solves KAFKA-14972 and KAFKA-7143.
See also KIP-944 https://cwiki.apache.org/confluence/x/chw0Dw for more information.
Committer Checklist (excluded from commit message)