-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-35024][Runtime/State] Implement the record buffer of AsyncExecutionController #24633
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! I post some concerns in advance!
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java
Outdated
Show resolved
Hide resolved
...runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
Outdated
Show resolved
Hide resolved
|
||
/** Migrate the blocking requests to the active buffer. */ | ||
@VisibleForTesting | ||
void migrateBlockingToActive() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I'd suggest a queueing mechanism, meaning that each blocked request
will queue under one key. While the active request/context
's reference counting reached zero, it pop one request from the corresponding queue of that key and put that into active buffer
. I think this is a high-performance implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion, I add a new class StateRequestsBuffer
which groups state requests in the blocking buffer by key. Migrating one state request from blocking buffer to active buffer will trigger in RecordContext#disposer
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your update! I left some further comments. PTAL.
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestsBuffer.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestsBuffer.java
Outdated
Show resolved
Hide resolved
...runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
Outdated
Show resolved
Hide resolved
// 3. Ensure the currentContext is restored. | ||
setCurrentContext(storedContext); | ||
inFlightRecordNum.incrementAndGet(); | ||
} catch (InterruptedException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, the InterruptedException
should be ignored, otherwise it will produce a fatal error during TM normal exit (required by JM) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍good catch, Thread.sleep(50);
is deleted, so the InterruptedException
wouldn't throw.
// be less than the max in-flight record number. | ||
// Note: the currentContext may be updated by {@code StateFutureFactory#build}. | ||
try { | ||
while (inFlightRecordNum.get() > maxInFlightRecordNum) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could clarify the definition of inFlightRecordNum
... IIUC currently the inFlightRecordNum
== keyAccountingUnit.occupiedCount
. I'm not sure which size/count should we control, the record in AEC or the running record?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inFlightRecordNum
is the records in AEC, including the records in active buffer and blocking buffer.
And the inFlightRecordNum == keyAccountingUnit.occupiedCount
may not always be true.
I rewrited the description and added some asserts in testRecordsRunInOrder
and testBasicRun
.
@Zakelly Thanks for the suggestions, I updated and rebased this PR, please take a look again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your update!
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
Outdated
Show resolved
Hide resolved
* @param <R> the type of the record | ||
* @param <K> the type of the key | ||
*/ | ||
@NotThreadSafe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better add some description about only manipulating this class within task thread
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
Outdated
Show resolved
Hide resolved
...runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
Show resolved
Hide resolved
...runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. LGTM only one minor thing.
* @param N the number of state requests to pop. | ||
* @return A list of state requests. | ||
*/ | ||
List<StateRequest<?, ?, ?>> popActive(int N) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
List<StateRequest<?, ?, ?>> popActive(int N) { | |
List<StateRequest<?, ?, ?>> popActive(int n) { |
@Zakelly Thanks for the review, updated and squashed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR.
I just left some minor suggestions. PTAL.
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
Show resolved
Hide resolved
...ime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! I checked the changes in production code and it LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. LGTM.
What is the purpose of the change
As a part of FLIP-425, this pr implements the record buffer of AsyncExecutionController.
Brief change log
activeBuffer
,blockingBuffer
andinFlightRecordNum
intoAsyncExecutionController
.AsyncExecutionController
Verifying this change
AsyncExecutionControllerTest#testInFlightRecordControl
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation