diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index f994035eb777f7..95d7e9bcaabc3d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.v2.State; import org.apache.flink.core.state.InternalStateFuture; import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler; +import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.ThrowingRunnable; @@ -88,6 +89,9 @@ public class AsyncExecutionController implements StateRequestHandler { /** The state executor where the {@link StateRequest} is actually executed. */ StateExecutor stateExecutor; + /** A manager that allows for declaring processing and variables. */ + final DeclarationManager declarationManager; + /** The corresponding context that currently runs in task thread. */ RecordContext currentContext; @@ -106,6 +110,7 @@ public AsyncExecutionController( MailboxExecutor mailboxExecutor, AsyncFrameworkExceptionHandler exceptionHandler, StateExecutor stateExecutor, + DeclarationManager declarationManager, int maxParallelism, int batchSize, long bufferTimeout, @@ -114,6 +119,7 @@ public AsyncExecutionController( this.mailboxExecutor = mailboxExecutor; this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor, exceptionHandler); this.stateExecutor = stateExecutor; + this.declarationManager = declarationManager; this.batchSize = batchSize; this.bufferTimeout = bufferTimeout; this.maxInFlightRecordNum = maxInFlightRecords; @@ -152,13 +158,15 @@ public RecordContext buildContext(Object record, K key) { RecordContext.EMPTY_RECORD, key, this::disposeContext, - KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism)); + KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism), + declarationManager.variableCount()); } return new RecordContext<>( record, key, this::disposeContext, - KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism)); + KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism), + declarationManager.variableCount()); } /** @@ -169,6 +177,7 @@ public RecordContext buildContext(Object record, K key) { */ public void setCurrentContext(RecordContext switchingContext) { currentContext = switchingContext; + declarationManager.assignVariables(switchingContext); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java index 82ae4f07155dc8..d87083d3d57fd3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java @@ -20,7 +20,9 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; /** @@ -54,6 +56,8 @@ public class RecordContext extends ReferenceCounted> declaredVariables; + /** * The extra context info which is used to hold customized data defined by state backend. The * state backend can use this field to cache some data that can be used multiple times in @@ -61,13 +65,22 @@ public class RecordContext extends ReferenceCounted> disposer, int keyGroup) { + public RecordContext( + Object record, + K key, + Consumer> disposer, + int keyGroup, + int variableCount) { super(0); this.record = record; this.key = key; this.keyOccupied = false; this.disposer = disposer; this.keyGroup = keyGroup; + this.declaredVariables = new ArrayList<>(variableCount); + for (int i = 0; i < variableCount; i++) { + declaredVariables.add(null); + } } public Object getRecord() { @@ -104,6 +117,14 @@ public int getKeyGroup() { return keyGroup; } + public AtomicReference getVariableReference(int i) { + return i >= declaredVariables.size() ? null : declaredVariables.get(i); + } + + public void setVariableReference(int i, AtomicReference reference) { + declaredVariables.set(i, reference); + } + public void setExtra(Object extra) { this.extra = extra; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationChain.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclarationChain.java similarity index 85% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationChain.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclarationChain.java index fea494e6f9302c..b3a26914e4e90b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationChain.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclarationChain.java @@ -1,4 +1,22 @@ -package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing.declare; import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.util.function.FunctionWithException; @@ -7,6 +25,11 @@ import java.util.Deque; import java.util.LinkedList; +/** + * A chain-style declaration that could execute in serial. + * @param The input type of this chain. + * @param The output type of the first block of this chain. + */ public class DeclarationChain implements ThrowingConsumer { private final DeclarationContext context; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclarationContext.java similarity index 83% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationContext.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclarationContext.java index 4addb603620968..e53bd5925c87e0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclarationContext.java @@ -16,13 +16,16 @@ * limitations under the License. */ -package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; +package org.apache.flink.runtime.asyncprocessing.declare; import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.function.BiFunctionWithException; import org.apache.flink.util.function.FunctionWithException; import org.apache.flink.util.function.ThrowingConsumer; +import java.util.function.Supplier; + /** A context to declare parts of process in user-defined function/operator. */ public class DeclarationContext { @@ -76,6 +79,7 @@ public NamedBiFunction declare( /** * Declaring a processing chain. + * * @param first the first code block * @return the chain itself. * @param the in type of the first block @@ -87,6 +91,21 @@ public DeclarationChain.DeclarationStage declareChain( return new DeclarationChain<>(this, first).firstStage(); } + /** + * Declare a variable used across the callbacks. + * + * @param type the type information of the variable + * @param name the unique name of this variable + * @param initialValue the initial value when the variable created. + * @return the variable itself that can used by lambdas. + * @param the variable type. + */ + public DeclaredVariable declareVariable( + TypeInformation type, String name, Supplier initialValue) + throws DeclarationException { + return manager.register(type, name, initialValue); + } + DeclarationManager getManager() { return manager; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclarationException.java similarity index 92% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationException.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclarationException.java index 1a8e2542fd8f50..c281cadd01089d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclarationException.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; +package org.apache.flink.runtime.asyncprocessing.declare; /** Exception thrown when something wrong with declaration happens. */ public class DeclarationException extends Exception { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclarationManager.java similarity index 60% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationManager.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclarationManager.java index 8231b53a00f79d..eb15bdd3971b79 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclarationManager.java @@ -16,25 +16,31 @@ * limitations under the License. */ -package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; +package org.apache.flink.runtime.asyncprocessing.declare; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.asyncprocessing.RecordContext; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.function.FunctionWithException; import org.apache.flink.util.function.ThrowingConsumer; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; /** The manager holds all the declaration information and manage the building procedure. */ public class DeclarationManager { private final Map knownCallbacks; + private final Map knownVariables; + private int nextValidNameSequence = 0; public DeclarationManager() { this.knownCallbacks = new HashMap<>(); + this.knownVariables = new HashMap<>(); } T register(T knownCallback) throws DeclarationException { @@ -44,6 +50,16 @@ T register(T knownCallback) throws DeclarationExceptio return knownCallback; } + DeclaredVariable register( + TypeInformation typeInformation, String name, Supplier initializer) + throws DeclarationException { + DeclaredVariable variable = new DeclaredVariable<>(typeInformation, name, initializer); + if (knownVariables.put(name, variable) != null) { + throw new DeclarationException("Duplicated key " + name); + } + return variable; + } + String nextAssignedName() { String name; do { @@ -52,10 +68,26 @@ String nextAssignedName() { return name; } - public ThrowingConsumer, Exception> buildProcess( + public void assignVariables(RecordContext context) { + int i = 0; + for (DeclaredVariable variable : knownVariables.values()) { + AtomicReference reference = context.getVariableReference(i); + if (reference == null) { + reference = new AtomicReference(variable.initializer.get()); + context.setVariableReference(i, reference); + } + variable.setReference(reference); + } + } + + public int variableCount() { + return knownVariables.size(); + } + + public ThrowingConsumer buildProcess( FunctionWithException< DeclarationContext, - ThrowingConsumer, Exception>, + ThrowingConsumer, DeclarationException> declaringMethod) { final DeclarationContext context = new DeclarationContext(this); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclaredVariable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclaredVariable.java new file mode 100644 index 00000000000000..1289590cc95f56 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclaredVariable.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing.declare; + +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +/** A variable declared in async state processing. The value could be persisted in checkpoint. */ +public class DeclaredVariable { + + final TypeInformation typeInformation; + + final String name; + + final Supplier initializer; + + AtomicReference reference; + + DeclaredVariable(TypeInformation typeInformation, String name, Supplier initializer) { + this.typeInformation = typeInformation; + this.name = name; + this.initializer = initializer; + } + + void setReference(AtomicReference reference) { + this.reference = reference; + } + + public T get() { + return reference.get(); + } + + public void set(T newValue) { + reference.set(newValue); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedBiFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/NamedBiFunction.java similarity index 94% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedBiFunction.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/NamedBiFunction.java index 3d07372a228d42..7dd24861c15784 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedBiFunction.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/NamedBiFunction.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; +package org.apache.flink.runtime.asyncprocessing.declare; import org.apache.flink.util.function.BiFunctionWithException; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/NamedCallback.java similarity index 93% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedCallback.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/NamedCallback.java index 97810454973f02..81c732a3f01db7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedCallback.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/NamedCallback.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; +package org.apache.flink.runtime.asyncprocessing.declare; /** A named callback that can be identified and checkpoint. */ public abstract class NamedCallback { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedConsumer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/NamedConsumer.java similarity index 94% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedConsumer.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/NamedConsumer.java index 309f1253656d52..0230d505ce0ab8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedConsumer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/NamedConsumer.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; +package org.apache.flink.runtime.asyncprocessing.declare; import org.apache.flink.util.function.ThrowingConsumer; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/NamedFunction.java similarity index 94% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedFunction.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/NamedFunction.java index 3af7c1837a22d5..aeeeb1544cf464 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedFunction.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/NamedFunction.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; +package org.apache.flink.runtime.asyncprocessing.declare; import org.apache.flink.util.function.FunctionWithException; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java index 09de8dfb1177da..b7065687227e22 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java @@ -91,6 +91,7 @@ void setup( mailboxExecutor, exceptionHandler, stateExecutor, + null, 128, batchSize, timeout, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.java index 811fc95cae546a..3eb2538725fbe0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.java @@ -306,7 +306,7 @@ public void testComplex() { private RecordContext buildRecordContext(Object record, K key) { int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, 128); - return new RecordContext<>(record, key, (e) -> {}, keyGroup); + return new RecordContext<>(record, key, (e) -> {}, keyGroup, 0); } /** A runner that performs single-step debugging. */ diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java index 7c6887a2a2d1ca..0eff3f1ae7d59c 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java @@ -87,7 +87,7 @@ public InternalStateFuture handleRequest( protected ContextKey buildContextKey(int i) { int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(i, 128); - RecordContext recordContext = new RecordContext<>(i, i, t -> {}, keyGroup); + RecordContext recordContext = new RecordContext<>(i, i, t -> {}, keyGroup, 0); return new ContextKey<>(recordContext); } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java index 3e17f30d34b14e..9f062ebe9a8d94 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java @@ -127,7 +127,7 @@ public void testExecuteValueStateRequest() throws Exception { V value, R record) { int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, 128); - RecordContext recordContext = new RecordContext<>(record, key, t -> {}, keyGroup); + RecordContext recordContext = new RecordContext<>(record, key, t -> {}, keyGroup, 0); TestStateFuture stateFuture = new TestStateFuture<>(); return new StateRequest<>(innerTable, requestType, value, stateFuture, recordContext); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java index 40eda40b1b8a06..c8b83654d238c3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java @@ -117,7 +117,7 @@ public final ThrowingConsumer, Exception> getRecordProcessor(in (KeySelector) stateKeySelector, ((AsyncStateProcessingOperator) owner) .getDeclarationManager() - .buildProcess( + .>buildProcess( ((DeclarativeProcessingInput) this)::declareProcess)); } else { return AsyncStateProcessing.makeRecordProcessor( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java index 901d5dc9eac964..77c4cc91919ac7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.asyncprocessing.AsyncStateException; import org.apache.flink.runtime.asyncprocessing.RecordContext; +import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.AsyncKeyedStateBackend; @@ -39,7 +40,6 @@ import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; -import org.apache.flink.streaming.runtime.operators.asyncprocessing.declare.DeclarationManager; import org.apache.flink.streaming.runtime.operators.asyncprocessing.declare.DeclarativeProcessingInput; import org.apache.flink.streaming.runtime.operators.asyncprocessing.declare.DeclarativeProcessingTwoInputOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -89,6 +89,7 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager) final long asyncBufferTimeout = environment.getExecutionConfig().getAsyncStateBufferTimeout(); + this.declarationManager = new DeclarationManager(); AsyncKeyedStateBackend asyncKeyedStateBackend = stateHandler.getAsyncKeyedStateBackend(); if (asyncKeyedStateBackend != null) { this.asyncExecutionController = @@ -96,6 +97,7 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager) mailboxExecutor, this::handleAsyncStateException, asyncKeyedStateBackend.createStateExecutor(), + declarationManager, maxParallelism, asyncBufferSize, asyncBufferTimeout, @@ -105,7 +107,6 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager) throw new UnsupportedOperationException( "Current State Backend doesn't support async access, AsyncExecutionController could not work"); } - this.declarationManager = new DeclarationManager(); } private void handleAsyncStateException(String message, Throwable exception) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java index 20ec746a353f3d..d41ea4f2b914c7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.asyncprocessing.AsyncStateException; import org.apache.flink.runtime.asyncprocessing.RecordContext; +import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.AsyncKeyedStateBackend; @@ -37,7 +38,6 @@ import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.api.operators.Triggerable; -import org.apache.flink.streaming.runtime.operators.asyncprocessing.declare.DeclarationManager; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.function.ThrowingConsumer; import org.apache.flink.util.function.ThrowingRunnable; @@ -87,6 +87,7 @@ public final void initializeState(StreamTaskStateInitializer streamTaskStateMana environment.getMainMailboxExecutor(), this::handleAsyncStateException, asyncKeyedStateBackend.createStateExecutor(), + declarationManager, maxParallelism, asyncBufferSize, asyncBufferTimeout, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessingOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessingOperator.java index 14c2650c873370..8116d7fe8ca7c0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessingOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessingOperator.java @@ -20,7 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.runtime.operators.asyncprocessing.declare.DeclarationManager; +import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.function.ThrowingRunnable; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarativeProcessingInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarativeProcessingInput.java index a2478c969eaa3a..b1e7ea299bcb3c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarativeProcessingInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarativeProcessingInput.java @@ -18,6 +18,8 @@ package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; +import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext; +import org.apache.flink.runtime.asyncprocessing.declare.DeclarationException; import org.apache.flink.streaming.api.operators.Input; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.function.ThrowingConsumer; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarativeProcessingTwoInputOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarativeProcessingTwoInputOperator.java index f124ebcae212bb..3f53ade2f151f2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarativeProcessingTwoInputOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarativeProcessingTwoInputOperator.java @@ -18,6 +18,8 @@ package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; +import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext; +import org.apache.flink.runtime.asyncprocessing.declare.DeclarationException; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.function.ThrowingConsumer; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java index 4f78cd2035966f..9aea17f37ef192 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java @@ -63,6 +63,7 @@ void setup() throws Exception { new SyncMailboxExecutor(), exceptionHandler, new MockStateExecutor(), + null, 128, 2, 1000L, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/AbstractAsyncStateStreamOperatorDeclarationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/AbstractAsyncStateStreamOperatorDeclarationTest.java index 873083aabc5aa0..837a965576a74b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/AbstractAsyncStateStreamOperatorDeclarationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/AbstractAsyncStateStreamOperatorDeclarationTest.java @@ -23,6 +23,12 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.state.StateFutureUtils; +import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext; +import org.apache.flink.runtime.asyncprocessing.declare.DeclarationException; +import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable; +import org.apache.flink.runtime.asyncprocessing.declare.NamedCallback; +import org.apache.flink.runtime.asyncprocessing.declare.NamedConsumer; +import org.apache.flink.runtime.asyncprocessing.declare.NamedFunction; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -71,6 +77,20 @@ public class AbstractAsyncStateStreamOperatorDeclarationTest { subtaskIndex); } + protected KeyedOneInputStreamOperatorTestHarness, String> + createTestHarnessWithVariable( + int maxParalelism, int numSubtasks, int subtaskIndex, ElementOrder elementOrder) + throws Exception { + TestDeclareVariableOperator testOperator = new TestDeclareVariableOperator(elementOrder); + return new KeyedOneInputStreamOperatorTestHarness<>( + testOperator, + new TestKeySelector(), + BasicTypeInfo.INT_TYPE_INFO, + maxParalelism, + numSubtasks, + subtaskIndex); + } + @Test public void testRecordProcessor() throws Exception { try (KeyedOneInputStreamOperatorTestHarness, String> @@ -99,6 +119,22 @@ public void testRecordProcessorWithChain() throws Exception { } } + @Test + public void testRecordProcessorWithVariable() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness, String> + testHarness = createTestHarnessWithVariable(128, 1, 0, ElementOrder.RECORD_ORDER)) { + testHarness.open(); + TestDeclareVariableOperator testOperator = + (TestDeclareVariableOperator) testHarness.getOperator(); + ThrowingConsumer>, Exception> processor = + RecordProcessorUtils.getRecordProcessor(testOperator); + processor.accept(new StreamRecord<>(Tuple2.of(5, "5"))); + assertThat(testOperator.getValue()).isEqualTo(6); + processor.accept(new StreamRecord<>(Tuple2.of(6, "6"))); + assertThat(testOperator.getValue()).isEqualTo(13); + } + } + /** A simple testing operator. */ private static class TestDeclarationOperator extends AbstractAsyncStateStreamOperator implements OneInputStreamOperator, String>, @@ -189,6 +225,35 @@ public ThrowingConsumer>, Exception> declar } } + private static class TestDeclareVariableOperator extends TestDeclarationOperator { + + TestDeclareVariableOperator(ElementOrder elementOrder) { + super(elementOrder); + } + + @Override + public ThrowingConsumer>, Exception> declareProcess( + DeclarationContext context) throws DeclarationException { + DeclaredVariable local = + context.declareVariable(BasicTypeInfo.INT_TYPE_INFO, "local count", () -> 0); + + return context.>, Void>declareChain( + e -> { + local.set(e.getValue().f0); + return StateFutureUtils.completedVoidFuture(); + }) + .thenCompose( + v -> { + local.set(local.get() + 1); + return StateFutureUtils.completedFuture(local.get()); + }) + .withName("adder") + .thenAccept(value::addAndGet) + .withName("aggregate") + .finish(); + } + } + /** {@link KeySelector} for tests. */ static class TestKeySelector implements KeySelector, Integer> { private static final long serialVersionUID = 1L;