Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10370: consumer.seek() with SinkTaskContext's offsets when initialize #9145

Closed
wants to merge 1 commit into from

Conversation

ning2008wisc
Copy link
Contributor

@ning2008wisc ning2008wisc commented Aug 8, 2020

In WorkerSinkTask.java, when we want the consumer to consume from certain offsets, rather than from the last committed offset, WorkerSinkTaskContext provided a way to supply the offsets from external (e.g. an implementation of SinkTask) to rewind the consumer.

In the poll() method, it first calls rewind() to read the offsets from WorkerSinkTaskContext, if the offsets are not empty, then in the rewind() method, run consumer.seek(tp, offset) to rewind the consumer.

As a part of WorkerSinkTask initialization, when the SinkTask starts, we can supply the specific offsets by context.offset(supplied_offsets); in start() method, so that when the consumer does the first poll in WorkerSinkTask, it should rewind to the specific offsets in rewind() method. However in practice, we saw the following IllegalStateException when running consumer.seek(tp, offsets);

[2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648) [2020-08-07 23:53:55,752] INFO [Consumer clientId=connector-consumer-MirrorSinkConnector-0, groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 (org.apache.kafka.clients.consumer.KafkaConsumer:1592) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187) java.lang.IllegalStateException: No current assignment for partition test-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188)

As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution (that has been initially verified) proposed in this PR is to use consumer.assign() with consumer.seek(), instead of consumer.subscribe, to handle the initial position of the consumer, when specific offsets are provided by external through WorkerSinkTaskContext

@ning2008wisc ning2008wisc changed the title KAFKA-10370: rewind consumer to SinkTaskContext's offsets when init KAFKA-10370: consumer.assign /w SinkTaskContext's offsets when initialize Aug 10, 2020
@ning2008wisc ning2008wisc force-pushed the kafka-10370 branch 2 times, most recently from f794ad5 to 0000fc5 Compare August 18, 2020 08:36
@ning2008wisc ning2008wisc changed the title KAFKA-10370: consumer.assign /w SinkTaskContext's offsets when initialize KAFKA-10370: consumer.seek() with SinkTaskContext's offsets when initialize Aug 18, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant