-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14785: Connect offset read REST API #13434
Conversation
bc6a744
to
f54aab6
Compare
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java
Show resolved
Hide resolved
f54aab6
to
050d80f
Compare
050d80f
to
7f2af62
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Yash, this is looking great. I've finished a full pass of the changes to the main codebase; I'll probably wait to look at tests until the bigger questions/suggestions have been addressed.
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
…orOffsets; use connector class loader while retrieving offsets in Worker; refactor Worker::connectorOffsets to be an async API; use timeout for admin client request; move sink related keys from ConnectorOffset to SinkUtils; log and ignore offsets with unexpected partition key formats
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review Chris!
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java
Show resolved
Hide resolved
...ct/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorOffset.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Yash, looking great! Just a few more minor comments and then it should be time to start looking at the tests for this.
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java
Show resolved
Hide resolved
…handling in each branch
…erderRequestHandler::completeRequest; replace usages of HerderRequestHandler::completeOrForwardRequest with HerderRequestHandler::completeRequest for requests that don't involve forwarding
… duplicate partition key processing logic from OffsetBackingStore implementations to OffsetUtils; add unit tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Yash, looking great! Going to start going over testing changes next; all the comments here are fairly minor.
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java
Show resolved
Hide resolved
…der executor thread; other minor refactors
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Phew, done with the tests! This is looking really good.
...ct/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
Outdated
Show resolved
Hide resolved
...ct/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
Outdated
Show resolved
Hide resolved
...t/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
Outdated
Show resolved
Hide resolved
...ct/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
Show resolved
Hide resolved
…ts; other minor refactors
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
Show resolved
Hide resolved
Looking great! Few small comments left and then this should be good to merge. |
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
Outdated
Show resolved
Hide resolved
…andaloneHerder to avoid concurrent calls to Worker; fix failing unit test in AbstractHerderTest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Great work, Yash.
FYI I'm going to be taking next week off relaxing on a warm beach somewhere. I don't think we'll have time to get the write/reset API merged in time for the April 12th feature freeze unless someone else can review the PRs for those changes; it may be worth pinging somebody for review on them for visibility. |
* @deprecated use {@link #KafkaOffsetBackingStore(Supplier, Supplier)} instead | ||
*/ | ||
@Deprecated | ||
public KafkaOffsetBackingStore() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for making noise on this PR. out of curiosity, should we remove deprecated constructors from KafkaStatusBackingStore
and KafkaConfigBackingStore
too? not sure whether those internal classes need the deprecation cycle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…re classes (#15865) - These constructors were deprecated over 3 years ago in KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics #9780. - While these classes are not a part of Connect's public API, deprecation was still introduced instead of outright removal because they are useful utility classes that might've been used outside of Connect. - The KafkaOffsetBackingStore's deprecated constructor was removed in KAFKA-14785: Connect offset read REST API #13434. - This patch removes the deprecated constructors for KafkaConfigBackingStore and KafkaStatusBackingStore. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
…re classes (apache#15865) - These constructors were deprecated over 3 years ago in KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics apache#9780. - While these classes are not a part of Connect's public API, deprecation was still introduced instead of outright removal because they are useful utility classes that might've been used outside of Connect. - The KafkaOffsetBackingStore's deprecated constructor was removed in KAFKA-14785: Connect offset read REST API apache#13434. - This patch removes the deprecated constructors for KafkaConfigBackingStore and KafkaStatusBackingStore. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
…re classes (apache#15865) - These constructors were deprecated over 3 years ago in KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics apache#9780. - While these classes are not a part of Connect's public API, deprecation was still introduced instead of outright removal because they are useful utility classes that might've been used outside of Connect. - The KafkaOffsetBackingStore's deprecated constructor was removed in KAFKA-14785: Connect offset read REST API apache#13434. - This patch removes the deprecated constructors for KafkaConfigBackingStore and KafkaStatusBackingStore. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
…re classes (apache#15865) - These constructors were deprecated over 3 years ago in KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics apache#9780. - While these classes are not a part of Connect's public API, deprecation was still introduced instead of outright removal because they are useful utility classes that might've been used outside of Connect. - The KafkaOffsetBackingStore's deprecated constructor was removed in KAFKA-14785: Connect offset read REST API apache#13434. - This patch removes the deprecated constructors for KafkaConfigBackingStore and KafkaStatusBackingStore. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
GET /connectors/{connector}/offsets
REST APICommitter Checklist (excluded from commit message)