Skip to content

Commit

Permalink
[FLINK-xxx3][Runtime/State] Declaring local variables in async proces…
Browse files Browse the repository at this point in the history
…sing
  • Loading branch information
Zakelly committed May 13, 2024
1 parent db1283c commit b98a0c6
Show file tree
Hide file tree
Showing 23 changed files with 253 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.core.state.InternalStateFuture;
import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
Expand Down Expand Up @@ -88,6 +89,9 @@ public class AsyncExecutionController<K> implements StateRequestHandler {
/** The state executor where the {@link StateRequest} is actually executed. */
StateExecutor stateExecutor;

/** A manager that allows for declaring processing and variables. */
final DeclarationManager declarationManager;

/** The corresponding context that currently runs in task thread. */
RecordContext<K> currentContext;

Expand All @@ -106,6 +110,7 @@ public AsyncExecutionController(
MailboxExecutor mailboxExecutor,
AsyncFrameworkExceptionHandler exceptionHandler,
StateExecutor stateExecutor,
DeclarationManager declarationManager,
int maxParallelism,
int batchSize,
long bufferTimeout,
Expand All @@ -114,6 +119,7 @@ public AsyncExecutionController(
this.mailboxExecutor = mailboxExecutor;
this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor, exceptionHandler);
this.stateExecutor = stateExecutor;
this.declarationManager = declarationManager;
this.batchSize = batchSize;
this.bufferTimeout = bufferTimeout;
this.maxInFlightRecordNum = maxInFlightRecords;
Expand Down Expand Up @@ -152,13 +158,15 @@ public RecordContext<K> buildContext(Object record, K key) {
RecordContext.EMPTY_RECORD,
key,
this::disposeContext,
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism));
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism),
declarationManager.variableCount());
}
return new RecordContext<>(
record,
key,
this::disposeContext,
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism));
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism),
declarationManager.variableCount());
}

/**
Expand All @@ -169,6 +177,7 @@ public RecordContext<K> buildContext(Object record, K key) {
*/
public void setCurrentContext(RecordContext<K> switchingContext) {
currentContext = switchingContext;
declarationManager.assignVariables(switchingContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -54,20 +56,31 @@ public class RecordContext<K> extends ReferenceCounted<RecordContext.DisposerRun
/** The keyGroup to which key belongs. */
private final int keyGroup;

private final ArrayList<AtomicReference<?>> declaredVariables;

/**
* The extra context info which is used to hold customized data defined by state backend. The
* state backend can use this field to cache some data that can be used multiple times in
* different stages of asynchronous state execution.
*/
private @Nullable volatile Object extra;

public RecordContext(Object record, K key, Consumer<RecordContext<K>> disposer, int keyGroup) {
public RecordContext(
Object record,
K key,
Consumer<RecordContext<K>> disposer,
int keyGroup,
int variableCount) {
super(0);
this.record = record;
this.key = key;
this.keyOccupied = false;
this.disposer = disposer;
this.keyGroup = keyGroup;
this.declaredVariables = new ArrayList<>(variableCount);
for (int i = 0; i < variableCount; i++) {
declaredVariables.add(null);
}
}

public Object getRecord() {
Expand Down Expand Up @@ -104,6 +117,14 @@ public int getKeyGroup() {
return keyGroup;
}

public AtomicReference<?> getVariableReference(int i) {
return i >= declaredVariables.size() ? null : declaredVariables.get(i);
}

public void setVariableReference(int i, AtomicReference<?> reference) {
declaredVariables.set(i, reference);
}

public void setExtra(Object extra) {
this.extra = extra;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,22 @@
package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.asyncprocessing.declare;

import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.util.function.FunctionWithException;
Expand All @@ -7,6 +25,11 @@
import java.util.Deque;
import java.util.LinkedList;

/**
* A chain-style declaration that could execute in serial.
* @param <IN> The input type of this chain.
* @param <FIRST> The output type of the first block of this chain.
*/
public class DeclarationChain<IN, FIRST> implements ThrowingConsumer<IN, Exception> {

private final DeclarationContext context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare;
package org.apache.flink.runtime.asyncprocessing.declare;

import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;

import java.util.function.Supplier;

/** A context to declare parts of process in user-defined function/operator. */
public class DeclarationContext {

Expand Down Expand Up @@ -76,6 +79,7 @@ public <T, U, V> NamedBiFunction<T, U, V> declare(

/**
* Declaring a processing chain.
*
* @param first the first code block
* @return the chain itself.
* @param <IN> the in type of the first block
Expand All @@ -87,6 +91,21 @@ public <IN, T> DeclarationChain<IN, T>.DeclarationStage<T> declareChain(
return new DeclarationChain<>(this, first).firstStage();
}

/**
* Declare a variable used across the callbacks.
*
* @param type the type information of the variable
* @param name the unique name of this variable
* @param initialValue the initial value when the variable created.
* @return the variable itself that can used by lambdas.
* @param <T> the variable type.
*/
public <T> DeclaredVariable<T> declareVariable(
TypeInformation<T> type, String name, Supplier<T> initialValue)
throws DeclarationException {
return manager.register(type, name, initialValue);
}

DeclarationManager getManager() {
return manager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare;
package org.apache.flink.runtime.asyncprocessing.declare;

/** Exception thrown when something wrong with declaration happens. */
public class DeclarationException extends Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,31 @@
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare;
package org.apache.flink.runtime.asyncprocessing.declare;

import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/** The manager holds all the declaration information and manage the building procedure. */
public class DeclarationManager {

private final Map<String, NamedCallback> knownCallbacks;

private final Map<String, DeclaredVariable> knownVariables;

private int nextValidNameSequence = 0;

public DeclarationManager() {
this.knownCallbacks = new HashMap<>();
this.knownVariables = new HashMap<>();
}

<T extends NamedCallback> T register(T knownCallback) throws DeclarationException {
Expand All @@ -44,6 +50,16 @@ <T extends NamedCallback> T register(T knownCallback) throws DeclarationExceptio
return knownCallback;
}

<T> DeclaredVariable<T> register(
TypeInformation<T> typeInformation, String name, Supplier<T> initializer)
throws DeclarationException {
DeclaredVariable<T> variable = new DeclaredVariable<>(typeInformation, name, initializer);
if (knownVariables.put(name, variable) != null) {
throw new DeclarationException("Duplicated key " + name);
}
return variable;
}

String nextAssignedName() {
String name;
do {
Expand All @@ -52,10 +68,26 @@ String nextAssignedName() {
return name;
}

public <T> ThrowingConsumer<StreamRecord<T>, Exception> buildProcess(
public void assignVariables(RecordContext<?> context) {
int i = 0;
for (DeclaredVariable variable : knownVariables.values()) {
AtomicReference reference = context.getVariableReference(i);
if (reference == null) {
reference = new AtomicReference(variable.initializer.get());
context.setVariableReference(i, reference);
}
variable.setReference(reference);
}
}

public int variableCount() {
return knownVariables.size();
}

public <T> ThrowingConsumer<T, Exception> buildProcess(
FunctionWithException<
DeclarationContext,
ThrowingConsumer<StreamRecord<T>, Exception>,
ThrowingConsumer<T, Exception>,
DeclarationException>
declaringMethod) {
final DeclarationContext context = new DeclarationContext(this);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.asyncprocessing.declare;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/** A variable declared in async state processing. The value could be persisted in checkpoint. */
public class DeclaredVariable<T> {

final TypeInformation<T> typeInformation;

final String name;

final Supplier<T> initializer;

AtomicReference<T> reference;

DeclaredVariable(TypeInformation<T> typeInformation, String name, Supplier<T> initializer) {
this.typeInformation = typeInformation;
this.name = name;
this.initializer = initializer;
}

void setReference(AtomicReference<T> reference) {
this.reference = reference;
}

public T get() {
return reference.get();
}

public void set(T newValue) {
reference.set(newValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare;
package org.apache.flink.runtime.asyncprocessing.declare;

import org.apache.flink.util.function.BiFunctionWithException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare;
package org.apache.flink.runtime.asyncprocessing.declare;

/** A named callback that can be identified and checkpoint. */
public abstract class NamedCallback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare;
package org.apache.flink.runtime.asyncprocessing.declare;

import org.apache.flink.util.function.ThrowingConsumer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare;
package org.apache.flink.runtime.asyncprocessing.declare;

import org.apache.flink.util.function.FunctionWithException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ void setup(
mailboxExecutor,
exceptionHandler,
stateExecutor,
null,
128,
batchSize,
timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public void testComplex() {

private <K> RecordContext<K> buildRecordContext(Object record, K key) {
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, 128);
return new RecordContext<>(record, key, (e) -> {}, keyGroup);
return new RecordContext<>(record, key, (e) -> {}, keyGroup, 0);
}

/** A runner that performs single-step debugging. */
Expand Down

0 comments on commit b98a0c6

Please sign in to comment.