Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34986][Runtime/State] Basic framework of async execution for state #24614

Merged
merged 3 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public <U, V> StateFuture<V> thenCombine(
});
}

@Override
public void complete(T result) {
throw new UnsupportedOperationException("This state future has already been completed.");
}

@Override
public void thenSyncAccept(Consumer<? super T> action) {
action.accept(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
@Internal
public interface InternalStateFuture<T> extends StateFuture<T> {

/** Complete this future. */
void complete(T result);

/**
* Accept the action in the same thread with the one of complete (or current thread if it has
* been completed).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,33 @@
@Internal
public class StateFutureImpl<T> implements InternalStateFuture<T> {

/** The future holds the result. The completes in async threads. */
CompletableFuture<T> completableFuture;
/** The future holds the result. This may complete in async threads. */
private final CompletableFuture<T> completableFuture;

/** The callback runner. */
CallbackRunner callbackRunner;
protected final CallbackRunner callbackRunner;

StateFutureImpl(CallbackRunner callbackRunner) {
public StateFutureImpl(CallbackRunner callbackRunner) {
this.completableFuture = new CompletableFuture<>();
this.callbackRunner = callbackRunner;
}

@Override
public <U> StateFuture<U> thenApply(Function<? super T, ? extends U> fn) {
callbackRegistered();
try {
if (completableFuture.isDone()) {
U r = fn.apply(completableFuture.get());
callbackFinished();
return StateFutureUtils.completedFuture(r);
} else {
StateFutureImpl<U> ret = new StateFutureImpl<>(callbackRunner);
StateFutureImpl<U> ret = makeNewStateFuture();
completableFuture.thenAccept(
(t) -> {
callbackRunner.submit(
() -> {
ret.complete(fn.apply(t));
callbackFinished();
});
});
return ret;
Expand All @@ -75,18 +78,21 @@ public <U> StateFuture<U> thenApply(Function<? super T, ? extends U> fn) {

@Override
public StateFuture<Void> thenAccept(Consumer<? super T> action) {
callbackRegistered();
try {
if (completableFuture.isDone()) {
action.accept(completableFuture.get());
callbackFinished();
return StateFutureUtils.completedVoidFuture();
} else {
StateFutureImpl<Void> ret = new StateFutureImpl<>(callbackRunner);
StateFutureImpl<Void> ret = makeNewStateFuture();
completableFuture.thenAccept(
(t) -> {
callbackRunner.submit(
() -> {
action.accept(t);
ret.complete(null);
callbackFinished();
});
});
return ret;
Expand All @@ -98,17 +104,20 @@ public StateFuture<Void> thenAccept(Consumer<? super T> action) {

@Override
public <U> StateFuture<U> thenCompose(Function<? super T, ? extends StateFuture<U>> action) {
callbackRegistered();
try {
if (completableFuture.isDone()) {
callbackFinished();
return action.apply(completableFuture.get());
} else {
StateFutureImpl<U> ret = new StateFutureImpl<>(callbackRunner);
StateFutureImpl<U> ret = makeNewStateFuture();
completableFuture.thenAccept(
(t) -> {
callbackRunner.submit(
() -> {
StateFuture<U> su = action.apply(t);
su.thenAccept(ret::complete);
callbackFinished();
});
});
return ret;
Expand All @@ -121,20 +130,22 @@ public <U> StateFuture<U> thenCompose(Function<? super T, ? extends StateFuture<
@Override
public <U, V> StateFuture<V> thenCombine(
StateFuture<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn) {
callbackRegistered();
try {
if (completableFuture.isDone()) {
return other.thenCompose(
(u) -> {
try {
V v = fn.apply(completableFuture.get(), u);
callbackFinished();
return StateFutureUtils.completedFuture(v);
} catch (Throwable e) {
throw new FlinkRuntimeException(
"Error binding or executing callback", e);
}
});
} else {
StateFutureImpl<V> ret = new StateFutureImpl<>(callbackRunner);
StateFutureImpl<V> ret = makeNewStateFuture();
((InternalStateFuture<? extends U>) other)
.thenSyncAccept(
(u) -> {
Expand All @@ -143,6 +154,7 @@ public <U, V> StateFuture<V> thenCombine(
callbackRunner.submit(
() -> {
ret.complete(fn.apply(t, u));
callbackFinished();
});
});
});
Expand All @@ -153,12 +165,35 @@ public <U, V> StateFuture<V> thenCombine(
}
}

/**
* Make a new future based on context of this future. Subclasses need to overload this method to
* generate their own instances (if needed).
*
* @return the new created future.
*/
public <A> StateFutureImpl<A> makeNewStateFuture() {
return new StateFutureImpl<>(callbackRunner);
}

@Override
public void complete(T result) {
completableFuture.complete(result);
postComplete();
}

/** Will be triggered when a callback is registered. */
public void callbackRegistered() {
// does nothing by default.
}

/** Will be triggered when this future completes. */
public void postComplete() {
// does nothing by default.
}

/** Will be triggered when a callback finishes processing. */
public void callbackFinished() {
// does nothing by default.
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.asyncprocessing;

import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.core.state.InternalStateFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

/**
* The Async Execution Controller (AEC) receives processing requests from operators, and put them
* into execution according to some strategies.
*
* <p>It is responsible for:
* <li>Preserving the sequence of elements bearing the same key by delaying subsequent requests
* until the processing of preceding ones is finalized.
* <li>Tracking the in-flight data(records) and blocking the input if too much data in flight
* (back-pressure). It invokes {@link MailboxExecutor#yield()} to pause current operations,
* allowing for the execution of callbacks (mails in Mailbox).
*
* @param <R> the type of the record
* @param <K> the type of the key
*/
public class AsyncExecutionController<R, K> {

private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class);

public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;

/** The max allowed number of in-flight records. */
private final int maxInFlightRecordNum;

/** The key accounting unit which is used to detect the key conflict. */
final KeyAccountingUnit<R, K> keyAccountingUnit;
Zakelly marked this conversation as resolved.
Show resolved Hide resolved

/**
* A factory to build {@link org.apache.flink.core.state.InternalStateFuture}, this will auto
* wire the created future with mailbox executor. Also conducting the context switch.
*/
private final StateFutureFactory<R, K> stateFutureFactory;

/** The state executor where the {@link StateRequest} is actually executed. */
final StateExecutor stateExecutor;
Zakelly marked this conversation as resolved.
Show resolved Hide resolved

/** The corresponding context that currently runs in task thread. */
RecordContext<R, K> currentContext;
Zakelly marked this conversation as resolved.
Show resolved Hide resolved

public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {
this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
}

public AsyncExecutionController(
MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int maxInFlightRecords) {
this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor);
this.stateExecutor = stateExecutor;
this.maxInFlightRecordNum = maxInFlightRecords;
LOG.info("Create AsyncExecutionController: maxInFlightRecordsNum {}", maxInFlightRecords);
}

/**
* Build a new context based on record and key. Also wired with internal {@link
* KeyAccountingUnit}.
*
* @param record the given record.
* @param key the given key.
* @return the built record context.
*/
public RecordContext<R, K> buildContext(R record, K key) {
masteryhx marked this conversation as resolved.
Show resolved Hide resolved
return new RecordContext<>(record, key, this::disposeContext);
}

/**
* Each time before a code segment (callback) is about to run in mailbox (task thread), this
* method should be called to switch a context in AEC.
*
* @param switchingContext the context to switch.
*/
public void setCurrentContext(RecordContext<R, K> switchingContext) {
currentContext = switchingContext;
}

/**
* Dispose a context.
*
* @param toDispose the context to dispose.
*/
public void disposeContext(RecordContext<R, K> toDispose) {
keyAccountingUnit.release(toDispose.getRecord(), toDispose.getKey());
}

/**
* Try to occupy a key by a given context.
*
* @param recordContext the given context.
* @return true if occupy succeed or the key has already occupied by this context.
*/
boolean tryOccupyKey(RecordContext<R, K> recordContext) {
boolean occupied = recordContext.isKeyOccupied();
if (!occupied
&& keyAccountingUnit.occupy(recordContext.getRecord(), recordContext.getKey())) {
recordContext.setKeyOccupied();
occupied = true;
}
return occupied;
}

/**
* Submit a {@link StateRequest} to this AEC and trigger if needed.
*
* @param state the state to request. Could be {@code null} if the type is {@link
* StateRequestType#SYNC_POINT}.
* @param type the type of this request.
* @param payload the payload input for this request.
* @return the state future.
*/
public <IN, OUT> InternalStateFuture<OUT> handleRequest(
@Nullable State state, StateRequestType type, @Nullable IN payload) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please remind me of the cases when a null state would be passed to AEC?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some description in javadoc. This is for the strict order of 'processElement' for same-key records.

// Step 1: build state future & assign context.
InternalStateFuture<OUT> stateFuture = stateFutureFactory.create(currentContext);
StateRequest<K, IN, OUT> request =
new StateRequest<>(state, type, payload, stateFuture, currentContext);
// Step 2: try to occupy the key and place it into right buffer.
if (tryOccupyKey(currentContext)) {
insertActiveBuffer(request);
} else {
insertBlockingBuffer(request);
}
// Step 3: trigger the (active) buffer if needed.
triggerIfNeeded(false);
return stateFuture;
}

<IN, OUT> void insertActiveBuffer(StateRequest<K, IN, OUT> request) {
// TODO: implement the active buffer.
}

<IN, OUT> void insertBlockingBuffer(StateRequest<K, IN, OUT> request) {
// TODO: implement the blocking buffer.
}

/**
* Trigger a batch of requests.
*
* @param force whether to trigger requests in force.
*/
void triggerIfNeeded(boolean force) {
// TODO: implement the trigger logic.
}
}
Loading