Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-25569][core] Add decomposed Sink V2 interface #18302

Merged
merged 6 commits into from
Jan 13, 2022

Conversation

fapaul
Copy link

@fapaul fapaul commented Jan 7, 2022

What is the purpose of the change

This is the first PR of FLIP-191 (https://issues.apache.org/jira/browse/FLINK-25555) it introduces the basic decomposed interfaces that are the replacement for the existing Sink V1 interfaces. The PR only adds the public-facing interfaces and does not implement the stream graph translation yet. It is a follow-up task

Brief change log

  • fe71154 Clarifies the retry behaviour of committer and global committer
  • 5022abf Fixes a problem if Transformations do not properly implement hashcode and equals
  • d742c25 Expose parts of the internal ProcessingTimeService through an external facade
  • 412c841 Introduction of the Sink V2 interfaces

Verifying this change

The PR mostly consists of interface additions and file movements.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@fapaul
Copy link
Author

fapaul commented Jan 7, 2022

@gaoyunhaii do you also want to have a look at this PR?

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 7, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 7, 2022

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 412c841 (Fri Jan 07 16:37:39 UTC 2022)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@gaoyunhaii
Copy link
Contributor

Very thanks @fapaul for drafting the PR! I'll also have a look soon~

Copy link
Contributor

@alpreu alpreu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to have a look at this because I'm interested in the new Sink interface as well. I left a few comments :)

Comment on lines +68 to +71
* <p>Currently calling this method only logs the error, discards the comittable and
* continues. In the future the behaviour might be configurable.
Copy link
Contributor

@alpreu alpreu Jan 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the usages (also of the other methods in the CommitRequest interface to which this applies as well) but I don't see any reference implementation. Is the docstring correct as it is then?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR only introduces the public-facing API but not the internal implementation. I did this to split the PR into more reviewable chunks.

In general, the two failure methods are designed to provide in the future the possibility to add failure side channels but in the first version, they will only log or fail the job.

Copy link
Contributor

@gaoyunhaii gaoyunhaii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very thanks @fapaul for drafting the PR! I have left some comments~

* the given {@link ProcessingTimeCallback} when firing.
*/
@PublicEvolving
public interface ProcessingTimeService {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @fapaul~ could you elaborate me a bit why we want to split the ProcessingTimeService into two classes~? I'm asking since the remaining methods seems to be similar to registerTimer, like scheduleWithFixedDelay. Is it possible we directly move the original ProcessingTimeService into core~?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is to decouple the internal ProcessingTimeService from the one we want to expose. The internal one for example implements ProcessingTimeService#quiesce which we should not expose to the user.

Regarding the methods, you have mentioned we can migrate them in the future to the public ProcessingTimeService but currently I do not see the need yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still a bit concern since tasks.ProcessingTimeService is also exposed to users via classes like ProcessingTimeServiceAware and AbstractStreamOperator. But also since of that, we indeed have to also keep the tasks.ProcessingTimeService there, and if we want to break the reverse dependency, we could indeed only either introduce a separate sets of processing timer service or extract a new super interface. So currently let's keep the current option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might further consider it when it is acceptable to do some api-break changes, perhaps like rename the tasks. ProcessingTimeService.

* <p>Currently calling this method only logs the error, discards the comittable and
* continues. In the future the behaviour might be configurable.
*/
void failedWithKnownReason(Throwable t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the methods expected to be called by Committer? If so would failWithKnownReason and failWithUnknownReason be better~?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the term failed is correct here because it describes the state of the comittable. In the future, we may add a general configuration on how to handle failures i.e. submit to dead letter queue.

Since we cannot really rename the method anymore I already made it "future-proof".

@fapaul
Copy link
Author

fapaul commented Jan 10, 2022

@zentol FYI 5c2fd49 adds public annotations to the metrics classes

@fapaul
Copy link
Author

fapaul commented Jan 10, 2022

Thanks for the review @gaoyunhaii @alpreu I have addressed all your comments. PTAL.

Copy link
Contributor

@gaoyunhaii gaoyunhaii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very thanks @fapaul for the PR! LGTM~

AHeise and others added 6 commits January 12, 2022 15:24
…ng a IdentityHashMap to track transformations.

The already transformed transformation are copied into a
different map and compared. If the transformation does not properly
implement equals the isTransformed check may fail and the transformation
is copied multiple times. Now that is hardened because we
check the object reference.
The new interface separates concerns and will make future refactorings and extensions easier. The user immediately which methods needs to be implemented.
* permanently fail after reaching that maximum. Else the committable will be retried as
* long as this method is invoked after each attempt.
*/
void retryLater();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if I understood it correctly after reading the java doc. Does it mean that this method will be called as long as the maximum is not exceeded? The name retryLater sounds like an asynch call, Is that your intention? The follow-up question will be how late? Will the time period be controlled by the configuration, since there is no input of this method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far none of our sinks sets a number of maximum retries but in the future, we might consider it. The retry mechanism will work internally similar to the current implementation [1]. As soon as the committable is retried we enqueue in the mailbox that is polled "periodically" and retried. Moreover during the next checkpoint, the committable is retried as well.

[1]

*
* @return the serializer of the writer's state type.
*/
SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional has been removed from multiple methods, this is one of them. Could you explain a little more about your thoughts?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing all these optionals was one of the intentions behind designing the new interfaces. Sink developers can now explicitly decide which functionality they want to support and implement the interfaces accordingly [1]. With the Sink V1 interfaces they basically always had to implement everything except that some of the methods have default implementations.

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction#FLIP191:ExtendunifiedSinkinterfacetosupportsmallfilecompaction-SimpleSink

@fapaul fapaul merged commit 0619274 into apache:master Jan 13, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants