New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-35025][Runtime/State] Abstract stream operators for async state processing #24657
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Left some comments as below.
*/ | ||
@Internal | ||
@SuppressWarnings("rawtypes") | ||
public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStreamOperator<OUT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making the async functionalities an abstract class would prevent the operator from extending from other abstract classes. For example, a RichMapFunction should be a subclass of AbstractUdfStreamOperator
, and since it has extended from this abstract class, it cannot extend from AbstractAsyncStateStreamOperator
at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The async processing starts variant at AbstractStreamOperator
and AbstractStreamOperatorV2
. My brief idea is, if some operator need integrate with the async processing, they should change to extends AbstractAsyncStateStreamOperator
instead of AbstractStreamOperator
. Moreover, the AbstractAsyncStateStreamOperator
is fully compatible with AbstractStreamOperator
, if we make AbstractAsyncStateStreamOperator#isAsyncStateProcessingEnabled
configurable to false, everything is not changed compared with AbstractStreamOperator
. That's what this PR trying to introduce.
Back to the AbstractUdfStreamOperator
, it is a subclass of AbstractStreamOperator
. We have two options, the first of which is a new introduced identical class AbstractAsyncStateUdfStreamOperator
extending AbstractAsyncStateStreamOperator
, the second is to make AbstractUdfStreamOperator
extend AbstractAsyncStateStreamOperator
. I'd prefer the former, we can discuss this in following PR.
...runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
Outdated
Show resolved
Hide resolved
|
||
@Override | ||
@SuppressWarnings("unchecked") | ||
public final <T> ThrowingConsumer<StreamRecord<T>, Exception> getRecordProcessor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of introducing setAsyncKeyedContextElement
, postProcessElement
and getRecordProcessor
, how about the following implementation?
@Override
public void processElement(StreamRecord<T> record){
lastProcessContext =
asyncExecutionController.buildContext(
record.getValue(), keySelector.getKey(record.getValue()));
lastProcessContext.retain();
asyncExecutionController.setCurrentContext(lastProcessContext);
super.processElement(record);
lastProcessContext.release();
// getRecordProcessor is removed, since the implementation in RecordProcessUtils should be enough
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The processElement
will be overriden by subclasses, not defined by the superclass. We should leave the processElement
abstract and ship our implementation around this.
|
||
@Override | ||
public final boolean isAsyncStateProcessingEnabled() { | ||
// TODO: Read from config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be better to configure isAsyncStateProcessingEnabled
globally on the whole Flink job instead of at operators' granularity, and in that case this method would be unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be and will be read from the global config. All operators will have the same option here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Zakelly Thanks for the PR, I left some comments, PTAL
...streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
Show resolved
Hide resolved
...ache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java
Outdated
Show resolved
Hide resolved
...he/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java
Outdated
Show resolved
Hide resolved
.../flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java
Show resolved
Hide resolved
@fredia @yunfengzhou-hub Thanks a lot for your review. I have made another commit to introduce the element order of processElement as FLIP-425 said. This is strictly internal and not exposed to users for now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Zakelly Thanks for the update, I left some minor comments.
...ache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java
Show resolved
Hide resolved
.../java/org/apache/flink/table/runtime/operators/multipleinput/input/FirstInputOfTwoInput.java
Show resolved
Hide resolved
...ache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java
Outdated
Show resolved
Hide resolved
...runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
Outdated
Show resolved
Hide resolved
...runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, overall LGTM.
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the PR. Left some minor comments as below.
...g/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessingOperator.java
Show resolved
Hide resolved
.../java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessing.java
Outdated
Show resolved
Hide resolved
...he/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java
Outdated
Show resolved
Hide resolved
…n AEC and related components.
f2107a2
to
576ad6a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update! The changes on the stream operators LGTM.
Thanks @fredia and @yunfengzhou-hub for your detailed review! Really appreciate it. 🚀 |
What is the purpose of the change
As part of the async execution model of disaggregated state management, this PR gives the basic definition of
StreamingOperator
integrated with async execution.The philosophy behind this PR is we are trying NOT to invade the previous code path. The most strict thing is that it MUST NOT affect any hot path without this feature.
By the way, the type parameter of record around
AsyncExecutionController
is erased, since the stream operator does not have the knowledge of record type, and theAEC
does not need it.Brief change log
AsyncExecutionController
AbstractAsyncStateStreamOperator
andAbstractAsyncStateStreamOperatorV2
corresponding to theAbstracStreamOperator
andAbstracStreamOperatorV2
respectively.RecordProcessorUtils
.Verifying this change
This change around
AEC
is already covered by existing tests. Basic UTs for new operators are added.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation