-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-24530][datastream] GlobalCommitter might not commit all records on drain #17536
base: master
Are you sure you want to change the base?
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit a629d69 (Thu Oct 21 11:45:31 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
…tructor into factory method. The nature of this constructor is transformative and should be a factory method accordingly to differentiate between primary and secondary construction.
…or improved commit performance. * Splits batch committer from global commmitter. * Ensures blocking exchange between writer and committer in batch mode. * Simplify CommitterHandler signatures. Note that the construction of the (global) committer handler will be overhauled in a future commit. The implementation in this commit is used for easier review and successful CI.
…bal committer. Before this commit, all committables where immediately forwarded to the global committer. Retried committables where not emitted at all since they have already been sent. This commit: * CommitterHandler's only emit successful committables. All non-failed committables are deemed successful. Note that this change assumes that failed committables are a subset of the input committables and no new instances are created to reflect failures. JavaDoc is adjusted accordingly. * Allows CommitterHandlers to return which committables have been successfully retried. * CommitterRetrier can send successfully retried elements downstream with a callback.
…tirely into SinkTransformationTranslator. Before this commit SinkOperatorFactory and CommitterOperatorFactory had knowledge about the execution mode and created the operator accordingly. With this commit: * SinkTransformationTranslator is the only place where knowledge about the execution mode exists. The translator directly chooses the appropriate CommitterHandler and passes that information to the operator factories. * Factories are now much simplified. The SinkOperatorFactory retained the logic of choosing the appropriate writer state handler as that is independent of the execution mode. * Since operator factories are serializable, CommitterHandler received a serializable Factory layer, such that we do not need to make the CommitterHandler serializable. This refactoring is a preparation for cases where the sink pipeline will become more complex in the future as now SinkTransformationTranslator is the only place that needs to be touched.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The refactoring looks great! I left some comments regarding the cleanup
Additional I was wondering whether we also need a blocking exchange between the committer and globalCommitter.
*/ | ||
class ForwardCommittingHandler<CommT> extends AbstractCommitterHandler<CommT, CommT, CommT> { | ||
class ForwardCommittingHandler<CommT> extends AbstractCommitterHandler<CommT, Void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure whether the abstraction of ForwardingCommittingHandler
and NoopCommittingHandler
really makes sense anymore after the refactoring. In the end, it could be a simple boolean flag whether the committables should be sent downstream if there is a global committer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are still needed in the current SinkOperator
:
- In batch with committer, we need
ForwardingCommittingHandler
. - In stream/batch without any committer, we need
NoopCommittingHandler
.
We could replace them by booleans but it's getting a bit ugly:
- In streaming, we only emit on
notifyCheckpointCompleted
. - In batch, we only emit on
preSnapshotBarrier
.
So you'd need two booleans afaik. It's certainly less code but I'm not sure if it's easier to understand.
@@ -51,6 +53,7 @@ | |||
Object, SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT>> { | |||
|
|||
protected static final Logger LOG = LoggerFactory.getLogger(SinkTransformationTranslator.class); | |||
public static final TypeInformation<byte[]> BYTES = TypeInformation.of(byte[].class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private?
} | ||
|
||
protected abstract void retry(List<StateT> recoveredCommittables) | ||
throws IOException, InterruptedException; | ||
protected Collection<CommT> retry(List<StateT> recoveredCommittables) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICT StreamingCommitterHandler
and BatchCommitterHandler
override this method but have identical implementations
@Override
protected Collection<CommT> retry(List<CommT> recoveredCommittables)
throws IOException, InterruptedException {
return commitAndReturnSuccess(recoveredCommittables);
}
Why can't we move the implementation to this class in the retry
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's unfortunately not that easy because of the global committers: Currently all committers are emitting CommT
and not GlobalCommT
anymore after this refactor. This is possible because in fact the global committers are not emitting anything.
Now commitAndReturnSuccess
is working on the internal type (GlobalCommT
in case of global committers). Hence, the signature is conflicting here.
We could create mix-ins interfaces for non-global and global committers where we can implement them. The question is if that's simpler. We could also re-introduce an emit type to CommitterHandler
.
this(processingTimeService, committerHandler, SystemClock.getInstance()); | ||
ProcessingTimeService processingTimeService, | ||
CommitterHandler<CommT> committerHandler, | ||
ThrowingConsumer<? super Collection<CommT>, IOException> committableConsumer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like passing a lambda here because it is only used for emitting. I think we can simplify it by only passing a boolean to determine if emitting is necessary.
Also currently the CommitterOperator#emitCommittables
references the commiterRetrier
I think this may lead to weird situations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used a lambda here as a callback for timer-based retry. Not sure how this can be solved differently.
I certainly would try to avoid having the retrier in the emitCommittables - that can probably lead to nasty stack exceptions.
default <T> T checkSerializerPresent(Optional<T> optional, boolean global) { | ||
String scope = global ? " global" : ""; | ||
checkState( | ||
optional.isPresent(), | ||
"Internal error: a%s committer should only be created if the sink has a%s committable serializer.", | ||
scope, | ||
scope); | ||
return optional.get(); | ||
} | ||
|
||
default <T> T checkCommitterPresent(Optional<T> optional, boolean global) { | ||
String scope = global ? " global" : ""; | ||
checkState( | ||
optional.isPresent(), | ||
"Expected a%s committer because%s committable serializer is set.", | ||
scope, | ||
scope); | ||
return optional.get(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I really like this approach of sharing utility functions but having them in separate class probably does not change much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about I change the Factory interface to an abstract class? It might be weird because they are default methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The abstract class sounds better then the scope is also more limited and the methods are not exposed to external calleers.
What is the purpose of the change
This PR refactors
SinkOperator
setup and ensures thatGlobalCommitter
does not usenotifyCheckpointComplete
anymore since it may actually invoked before allCommitters
are notified. Thus, the global committer receives an incomplete set ofCommittables
which will cause incorrect results in a final checkpoint setting.Brief change log
SinkOperator
andCommitterOperator
setup such that the actual distribution logic resides inSinkTransformationTranslator
.Committer
to p and ensure a blocking pipeline in batch mode.GlobalCommitter
when all downstreamCommitters
emitted the committables.Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation