Skip to content

Conversation

@PatrickRen
Copy link
Contributor

What is the purpose of the change

This pull request improves the logic of KafkaPartitionSplitReader that wakeup() should only wake up the blocking KafkaConsumer#poll operation, otherwise the wake-up might be delayed to the next ConsumerNetworkClient#poll call when handling new splits, which will throw a WakeupException.

Brief change log

  • Add a volatile variable isPolling to make sure the wakeup method only unblocks KafkaConsumer#poll
  • Add new unit test for KafkaPartitionSplitReader

Verifying this change

This change is covered by the new KafkaPartitionSplitReaderTest#testWakeupThenAssign case.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented May 27, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@zou-can
Copy link

zou-can commented May 30, 2022

@PatrickRen

Sorry to bother you.
I'm not sure if it's an appropriate place to continue our discussion.

I'm trying to simplify the question i had:

You have improved the logic of KafkaPartitionSplitReader that wakeup() should only wake up the blocking KafkaConsumer#poll.
But I'm not sure whether the exception will be thrown immediately even consumer is blocking in poll operation.

I'll make an example by referring source code of KafkaConsumer#poll

// KafkaConsumer#poll

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
    acquireAndEnsureOpen();
    try {
        this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());

        if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
            throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
        }

        do {
            // above method will raise WakeupException if KafkaConsumer#wakeup is called before.
            client.maybeTriggerWakeup();

            // irrelavant code...

            final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
            if (!records.isEmpty()) {
                // If KafkaConsumer#wakeup is called by another thread when poll operation is executed here,
                // current poll operation will return.
                // So WakeupException will still be raised by next call.

                if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                    client.transmitSends();
                }

                return this.interceptors.onConsume(new ConsumerRecords<>(records));
            }
        } while (timer.notExpired());

        return ConsumerRecords.empty();
    } finally {
        release();
        this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
    }
}

Is it correct?
Look forward to your reply~

@leonardBang leonardBang self-requested a review June 9, 2022 10:37
@PatrickRen PatrickRen changed the title [FLINK-27762][connector/kafka] KafkaPartitionSplitReader#wakeup should only unblock KafkaConsumer#poll invocation [FLINK-27762][connector/kafka] Catch WakeupException and retry KafkaConsumer invocations in split assignment Jun 14, 2022
@PatrickRen
Copy link
Contributor Author

Thanks for the review @zou-can ! Initially I was trying to fix this case from the root that KafkaPartitionSplitReader#wakeup should only wake up the blocking KafkaConsumer#poll invocation, but I realized that it's not possible to do so because Kafka consumer doesn't expose such an API. I have updated my code to add a wrapper on Kafka consumer calls that catch WakeupException and retry on exception.

@PatrickRen
Copy link
Contributor Author

@becketqin @leonardBang Please take a look when you are available. Thanks!

try {
return consumerCall.get();
} catch (WakeupException we) {
// Do nothing here and the loop will retry the consumer call.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add log here which is helpful if we can not leave the loop

* <p>Under this case we need to catch the {@link WakeupException} and retry the operation.
*/
private <V> V retryOnWakeup(Supplier<V> consumerCall, String description) {
while (true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we‘d better remove the loop condition as all calls on consumer is thread-safe that means we can meet the WakeupException at most once, we can retry in catch code block

try {
return consumerCall.get();
} catch (WakeupException we) {
LOG.debug(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug level log is meaningless for troubleshooting, because we still need to change the online logger config when exception happens, could we improve it to INFO level?

Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @PatrickRen for the update, LGTM.

@zou-can
Copy link

zou-can commented Jun 15, 2022

@PatrickRen It looks good. Thanks for your efforts!

@leonardBang leonardBang merged commit 50c19d9 into apache:master Jun 15, 2022
@leonardBang
Copy link
Contributor

@PatrickRen Could you also open the PR for release-1.15 and release-1.14?

PatrickRen added a commit to PatrickRen/flink that referenced this pull request Jun 16, 2022
…onsumer invocations in split assignment

This closes apache#19828.
PatrickRen added a commit to PatrickRen/flink that referenced this pull request Jun 16, 2022
…onsumer invocations in split assignment

This closes apache#19828.
huangxiaofeng10047 pushed a commit to huangxiaofeng10047/flink that referenced this pull request Jun 27, 2022
…onsumer invocations in split assignment

This closes apache#19828.
zstraw pushed a commit to zstraw/flink that referenced this pull request Jul 4, 2022
…onsumer invocations in split assignment

This closes apache#19828.
liujiawinds pushed a commit to liujiawinds/flink that referenced this pull request Jul 22, 2022
…onsumer invocations in split assignment

This closes apache#19828.
jnh5y pushed a commit to jnh5y/flink that referenced this pull request Dec 18, 2023
…onsumer invocations in split assignment

This closes apache#19828.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants