Skip to content

Conversation

JingsongLi
Copy link
Contributor

@JingsongLi JingsongLi commented Jan 20, 2022

What is the purpose of the change

In Table Store, we want to get the offsets of kafka writer, only the offset returned by the callback inside the KafkaWriter is accurate, so we need this callback mechanism.

This ticket wants to add metadataConsumer to InitContext in Sink:

/**
 * Returns a metadata consumer, the {@link SinkWriter} can publish metadata events of type
 * {@link MetaT} to the consumer. The consumer can accept metadata events in an asynchronous
 * thread, and the {@link Consumer#accept} method is executed very fast.
 */
default <MetaT> Optional<Consumer<MetaT>> metadataConsumer() {
    return Optional.empty();
}

SinkWriter can get this consumer, and publish metadata to the consumer implemented by table store sink.

Brief change log

  • Introduce metadataConsumer method
  • KafkaWriter get metadataConsumer

Verifying this change

KafkaWriterITCase.testMetadataPublisher

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@JingsongLi JingsongLi requested a review from fapaul January 20, 2022 05:57
@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 91a7a14 (Thu Jan 20 06:00:01 UTC 2022)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 20, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@fapaul fapaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for drafting this PR. It looks mostly good I left two inline questions.

Comment on lines 405 to 417
for (Consumer<RecordMetadata> metadataConsumer : metadataConsumers) {
metadataConsumer.accept(metadata);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really safe? Currently one of the Kafka producer threads will update the metadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I've summarized below

class KafkaWriter<IN> implements SinkWriter<IN, KafkaCommittable, KafkaWriterState> {
class KafkaWriter<IN>
implements SinkWriter<IN, KafkaCommittable, KafkaWriterState>,
MetadataPublisher<RecordMetadata> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, my initial idea was to implement the MetadataPublisher with the Sink and only pass the list of subscribers to the writer.

I am not sure about the final design of the table store but my assumption was that there are only new subscribers during the writer creation and not during runtime. So exposing adding new subscribers through the writer is unnecessary.
Does that make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The big question is whether the subscribe call occurs before or after serialization

expected.add("testMetadataPublisher-0@" + i);
}
writer.prepareCommit(false);
assertThat(metadataList, equalTo(expected));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: please use assertJ for new assertions

@JingsongLi
Copy link
Contributor Author

Thanks @fapaul for the review, I've rethought it and it's probably not a good idea either way.

  • approach 1: SinkWriter implements MetadataPublisher, actually, subscribe method should be similar to open, before other calls. Otherwise, there are thread safety issues. But the thing is, SinkWriter dose not have open. It is opened in Sink.createWriter.
  • approach 2: Sink implements MetadataPublisher, the question is when to call subscribe? On the client side or the server side? What about the serialization call on the client side, does the Consumer still work properly after serialization?

@JingsongLi
Copy link
Contributor Author

So I am thinking maybe we can move forward in JingsongLi#18 (comment)
Add a method to InitContext:

/**
* Returns a metadata subscriber, the {@link SinkWriter} can publish metadata events of type
* {@link MetaT} to the subscriber.
*/
<MetaT> Optional<Consumer<MetaT>> metadataSubscriber();

Sink can decide for itself whether to publish meta information to subscriber.

What do you think?

@JingsongLi JingsongLi requested a review from fapaul January 21, 2022 08:46
@fapaul
Copy link
Contributor

fapaul commented Jan 21, 2022

Thanks @fapaul for the review, I've rethought it and it's probably not a good idea either way.

  • approach 1: SinkWriter implements MetadataPublisher, actually, subscribe method should be similar to open, before other calls. Otherwise, there are thread safety issues. But the thing is, SinkWriter dose not have open. It is opened in Sink.createWriter.

I think here we have two different thread-safety issues. One is that the consumer is invoked by the KafkaProducer that you probably need to fix and trigger the callback via the Mailbox. The other issue is around adding the subscribers concurrently.

  • approach 2: Sink implements MetadataPublisher, the question is when to call subscribe? On the client side or the server side? What about the serialization call on the client side, does the Consumer still work properly after serialization?

I agree the Consumer needs to be serializable but if we pass an unmodifiable list to the sink writer it solves a lot of the thread-safety issues because it is not possible to add new subscribers after the sink translation. I guess if that works depends on your implementation that you have in mind to use the metadata.

So I am thinking maybe we can move forward in JingsongLi#18 (comment) Add a method to InitContext:

/**
* Returns a metadata subscriber, the {@link SinkWriter} can publish metadata events of type
* {@link MetaT} to the subscriber.
*/
<MetaT> Optional<Consumer<MetaT>> metadataSubscriber();

Sink can decide for itself whether to publish meta information to subscriber.

What do you think?

Isn't this approach very similar to option 2 you have outlined? You probably still need a serializable consumer that you can pass via the context to the sink writer. How is the consumer set in the init context?

@JingsongLi
Copy link
Contributor Author

Isn't this approach very similar to option 2 you have outlined? You probably still need a serializable consumer that you can pass via the context to the sink writer. How is the consumer set in the init context?

The TableStoreSink will have KafkaSink, it can create a new context when it createWriter. The code just like:

TableStoreSink implements Sink {
    Sink kafkaSink;
    TableStoreSink(Sink kafkaSink) {
        this.kafkaSink = kafkaSink;
    }

    SinkWriter createWriter(InitContext context) {
         Consumer metaSubscriber = new Consumer();
         InitContext contextWithSubscriber = new WrappedInitContext();
         SinkWriter kafkaWriter = kafkaSink.createWriter(contextWithSubscriber);
         return new TableStoreSinkWriter(kafkaWriter, metaSubscriber, .....);
    }
}

@JingsongLi
Copy link
Contributor Author

I updated in 256bff6

@JingsongLi
Copy link
Contributor Author

I think here we have two different thread-safety issues. One is that the consumer is invoked by the KafkaProducer that you probably need to fix and trigger the callback via the Mailbox. The other issue is around adding the subscribers concurrently.

I don't think we need to put it inside the mailbox, it would be very performance intensive, it's a per record operation. A callback consumer, which I think has asynchronous processing reasonable.

I agree the Consumer needs to be serializable but if we pass an unmodifiable list to the sink writer it solves a lot of the thread-safety issues because it is not possible to add new subscribers after the sink translation. I guess if that works depends on your implementation that you have in mind to use the metadata.

Yes, the problem is that this is a runtime statistic, which needs to expose information to an external caller, and it's hard to implement by a serializable class.

@JingsongLi
Copy link
Contributor Author

@flinkbot run azure

@fapaul
Copy link
Contributor

fapaul commented Jan 24, 2022

I don't think we need to put it inside the mailbox, it would be very performance intensive, it's a per record operation. A callback consumer, which I think has asynchronous processing reasonable.

From a Kafka connector side, the subscriber is not updated on every record, and just when the KafkaProducer is flushed it is only updated for a bulk of records (either during a checkpoint or if the internal buffer size is reached).
I would definitely prefer to handle the consumer in the mailbox and not by the Kafka thread. The Kafka threads might have surprising effects on the overall pipeline stability i.e. the shutdown is blocked because the producer cannot be stopped because it is executing the metadata consumer.

Regarding adding a method to the InitContext I think that is okay. Do you think there will be ever multiple Subscribers? Maybe it is safer to already add a list instead of an optional.

I am still a bit surprised that the TableStoreSink reads all metadata offsets nevertheless if they are committed in Kafka or not.

@JingsongLi
Copy link
Contributor Author

JingsongLi commented Jan 25, 2022

From a Kafka connector side, the subscriber is not updated on every record, and just when the KafkaProducer is flushed it is only updated for a bulk of records (either during a checkpoint or if the internal buffer size is reached).
I would definitely prefer to handle the consumer in the mailbox and not by the Kafka thread. The Kafka threads might have surprising effects on the overall pipeline stability i.e. the shutdown is blocked because the producer cannot be stopped because it is executing the metadata consumer.

Thanks for the information. I share my concern here:

  • Performance problem: If we put every meta into mailbox (we don't know bulk in the callback), this performance will be very poor, you can look at TaskMailboxImpl.put, every data is locked will lead to very poor throughput.
  • Consistency issues: for example, the data flushed at the time of transaction commit, then re-stuff their meta into the mailbox, these data belong to the next checkpoint, no longer the current checkpoint, and TableStore wants the metas of the current checkpoint.
  • Block problem: I think from the protocol Callback similar interfaces should be executed quickly, we can add comments, you can look at org.apache.kafka.clients.producer.Callback, it is also required without very heavy logic.

Regarding adding a method to the InitContext I think that is okay. Do you think there will be ever multiple Subscribers? Maybe it is safer to already add a list instead of an optional.

I think we can let the implementer assemble it himself, if there is a need for list.

I am still a bit surprised that the TableStoreSink reads all metadata offsets nevertheless if they are committed in Kafka or not.

TableStoreSink will only read these offsets at preSnapshot time, which is used to synchronize full (file) and incremental (log) data, the records must be flushed at this time.

@JingsongLi JingsongLi changed the title [FLINK-25696][datastream] Introduce MetadataPublisher interface to SinkWriter [FLINK-25696][datastream] Introduce metadataConsumer to InitContext in Sink Jan 25, 2022
Copy link
Contributor

@fapaul fapaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Consistency issues: for example, the data flushed at the time of transaction commit, then re-stuff their meta into the mailbox, these data belong to the next checkpoint, no longer the current checkpoint, and TableStore wants the metas of the current checkpoint.

You can also have consistency problems with the current approach because updating the metadata consumer does not imply that the offsets are committed with the same checkpoint. The commit can either fail and be retried or the notifyCheckpointComplete may get lost and the committable is committed with one of the next checkpoints.

Overall I am a bit skeptical about the benefit of the current metadata consumer if the sink works with transactions.

* {@link MetaT} to the consumer. The consumer can accept metadata events in an asynchronous
* thread, and the {@link Consumer#accept} method is executed very fast.
*/
default <MetaT> Optional<Consumer<MetaT>> metadataConsumer() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd mark this @Experimental to leave some room to change the threading model later if necessary.

@JingsongLi
Copy link
Contributor Author

You can also have consistency problems with the current approach because updating the metadata consumer does not imply that the offsets are committed with the same checkpoint. The commit can either fail and be retried or the notifyCheckpointComplete may get lost and the committable is committed with one of the next checkpoints.

Overall I am a bit skeptical about the benefit of the current metadata consumer if the sink works with transactions.

KakfaProducer.send:

Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.

KakfaProducer.flush:

     * Invoking this method makes all buffered records immediately available to send (even if <code>linger.ms</code> is
     * greater than 0) and blocks on the completion of the requests associated with these records. The post-condition
     * of <code>flush()</code> is that any previously sent record will have completed (e.g. <code>Future.isDone() == true</code>).
     * A request is considered completed when it is successfully acknowledged
     * according to the <code>acks</code> configuration you have specified or else it results in an error.

And the default value of acks is 1, so in this mode, after flushing, the Callback must be called. At this point I go to get the offset, there is no consistency problem.
As for notifyCheckpointComplete, the table store will handle the idempotent of commit.

Copy link
Contributor

@fapaul fapaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for providing all the helpful information. From my side, it is good to move forward with PR.

You only need to ensure please that you add the MetadataConsumer to the new Sink [1] interface and not the old one. We will deprecate the old ones in this release.

[1]

@JingsongLi
Copy link
Contributor Author

Thanks for providing all the helpful information. From my side, it is good to move forward with PR.

You only need to ensure please that you add the MetadataConsumer to the new Sink [1] interface and not the old one. We will deprecate the old ones in this release.

[1]

Thanks, I have reverted the modification of old one.

Copy link
Contributor Author

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merging...

@JingsongLi JingsongLi merged commit bb1e504 into apache:master Feb 9, 2022
MrWhiteSike pushed a commit to MrWhiteSike/flink that referenced this pull request Mar 3, 2022
@JingsongLi JingsongLi deleted the metadata branch March 8, 2022 07:11
jnh5y pushed a commit to jnh5y/flink that referenced this pull request Dec 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants