Skip to content

Commit

Permalink
[FLINK-xxx2][Runtime] Declaring processing by chain
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed May 13, 2024
1 parent 22bec8b commit db1283c
Show file tree
Hide file tree
Showing 3 changed files with 250 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare;

import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;

import java.util.Deque;
import java.util.LinkedList;

public class DeclarationChain<IN, FIRST> implements ThrowingConsumer<IN, Exception> {

private final DeclarationContext context;

private final FunctionWithException<IN, StateFuture<FIRST>, Exception> first;

private final Deque<Transformation<?, ?>> transformations;

private DeclarationStage<?> currentStage;

DeclarationChain(
DeclarationContext context,
FunctionWithException<IN, StateFuture<FIRST>, Exception> first) {
this.context = context;
this.first = first;
this.transformations = new LinkedList<>();
}

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public void accept(IN in) throws Exception {
StateFuture future = first.apply(in);
for (Transformation trans : transformations) {
future = trans.apply(future);
}
}

public DeclarationStage<FIRST> firstStage() throws DeclarationException {
if (currentStage != null) {
throw new DeclarationException(
"Diverged declaration. Please make sure you call firstStage() once.");
}
DeclarationStage<FIRST> declarationStage = new DeclarationStage<>();
currentStage = declarationStage;
return declarationStage;
}

Transformation<?, ?> getLastTransformation() {
return transformations.getLast();
}

public class DeclarationStage<T> {

private boolean afterThen = false;

private void preCheck() throws DeclarationException {
if (afterThen) {
throw new DeclarationException(
"Double thenCompose called for single declaration block.");
}
if (currentStage != this) {
throw new DeclarationException(
"Diverged declaration. Please make sure you are declaring on the last point.");
}
afterThen = true;
}

public <U> DeclarationStage<U> thenCompose(
FunctionWithException<T, StateFuture<U>, Exception> action)
throws DeclarationException {
preCheck();
DeclarationStage<U> next = new DeclarationStage<>();
ComposeTransformation<T, U> trans = new ComposeTransformation<>(action, next);
transformations.add(trans);
currentStage = next;
getLastTransformation().declare();
return next;
}

public DeclarationStage<Void> thenAccept(ThrowingConsumer<T, Exception> action)
throws DeclarationException {
preCheck();
DeclarationStage<Void> next = new DeclarationStage<>();
AcceptTransformation<T> trans = new AcceptTransformation<>(action, next);
transformations.add(trans);
currentStage = next;
getLastTransformation().declare();
return next;
}

public DeclarationStage<T> withName(String name) throws DeclarationException {
getLastTransformation().withName(name);
return this;
}

public DeclarationChain<IN, FIRST> finish() throws DeclarationException {
preCheck();
getLastTransformation().declare();
return DeclarationChain.this;
}
}

interface Transformation<FROM, TO> {
StateFuture<TO> apply(StateFuture<FROM> upstream) throws Exception;

void withName(String name) throws DeclarationException;

void declare() throws DeclarationException;
}

abstract static class AbstractTransformation<FROM, TO> implements Transformation<FROM, TO> {

String name = null;

@Override
public void withName(String newName) throws DeclarationException {
if (name != null) {
throw new DeclarationException("Double naming");
}
name = newName;
declare();
}
}

class ComposeTransformation<FROM, TO> extends AbstractTransformation<FROM, TO> {

DeclarationStage<TO> to;

FunctionWithException<FROM, StateFuture<TO>, ? extends Exception> action;

NamedFunction<FROM, StateFuture<TO>> namedFunction;

ComposeTransformation(
FunctionWithException<FROM, StateFuture<TO>, Exception> action,
DeclarationStage<TO> to) {
this.action = action;
this.to = to;
}

@Override
public StateFuture<TO> apply(StateFuture<FROM> upstream) throws Exception {
return upstream.thenCompose(namedFunction);
}

@Override
public void declare() throws DeclarationException {
if (namedFunction == null) {
if (name == null) {
namedFunction = context.declare(action);
} else {
namedFunction = context.declare(name, action);
}
}
}
}

class AcceptTransformation<FROM> extends AbstractTransformation<FROM, Void> {

DeclarationStage<Void> to;

ThrowingConsumer<FROM, Exception> action;

NamedConsumer<FROM> namedFunction;

AcceptTransformation(ThrowingConsumer<FROM, Exception> action, DeclarationStage<Void> to) {
this.action = action;
this.to = to;
}

@Override
public StateFuture<Void> apply(StateFuture<FROM> upstream) throws Exception {
return upstream.thenAccept(namedFunction);
}

@Override
public void declare() throws DeclarationException {
if (namedFunction == null) {
if (name == null) {
namedFunction = context.declare(action);
} else {
namedFunction = context.declare(name, action);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare;

import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;
Expand Down Expand Up @@ -73,6 +74,19 @@ public <T, U, V> NamedBiFunction<T, U, V> declare(
return declare(manager.nextAssignedName(), callback);
}

/**
* Declaring a processing chain.
* @param first the first code block
* @return the chain itself.
* @param <IN> the in type of the first block
* @param <T> the out type of the state future given by the first block
*/
public <IN, T> DeclarationChain<IN, T>.DeclarationStage<T> declareChain(
FunctionWithException<IN, StateFuture<T>, Exception> first)
throws DeclarationException {
return new DeclarationChain<>(this, first).firstStage();
}

DeclarationManager getManager() {
return manager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ public class AbstractAsyncStateStreamOperatorDeclarationTest {
subtaskIndex);
}

protected KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
createTestHarnessWithChain(
int maxParalelism, int numSubtasks, int subtaskIndex, ElementOrder elementOrder)
throws Exception {
TestDeclarationChainOperator testOperator = new TestDeclarationChainOperator(elementOrder);
return new KeyedOneInputStreamOperatorTestHarness<>(
testOperator,
new TestKeySelector(),
BasicTypeInfo.INT_TYPE_INFO,
maxParalelism,
numSubtasks,
subtaskIndex);
}

@Test
public void testRecordProcessor() throws Exception {
try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
Expand All @@ -71,6 +85,20 @@ public void testRecordProcessor() throws Exception {
}
}

@Test
public void testRecordProcessorWithChain() throws Exception {
try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
testHarness = createTestHarnessWithChain(128, 1, 0, ElementOrder.RECORD_ORDER)) {
testHarness.open();
TestDeclarationChainOperator testOperator =
(TestDeclarationChainOperator) testHarness.getOperator();
ThrowingConsumer<StreamRecord<Tuple2<Integer, String>>, Exception> processor =
RecordProcessorUtils.getRecordProcessor(testOperator);
processor.accept(new StreamRecord<>(Tuple2.of(5, "5")));
assertThat(testOperator.getValue()).isEqualTo(12);
}
}

/** A simple testing operator. */
private static class TestDeclarationOperator extends AbstractAsyncStateStreamOperator<String>
implements OneInputStreamOperator<Tuple2<Integer, String>, String>,
Expand Down Expand Up @@ -138,6 +166,29 @@ public int getValue() {
}
}

private static class TestDeclarationChainOperator extends TestDeclarationOperator {

TestDeclarationChainOperator(ElementOrder elementOrder) {
super(elementOrder);
}

@Override
public ThrowingConsumer<StreamRecord<Tuple2<Integer, String>>, Exception> declareProcess(
DeclarationContext context) throws DeclarationException {

return context.<StreamRecord<Tuple2<Integer, String>>, Void>declareChain(
e -> {
value.addAndGet(e.getValue().f0);
return StateFutureUtils.completedVoidFuture();
})
.thenCompose(v -> StateFutureUtils.completedFuture(value.incrementAndGet()))
.withName("adder")
.thenAccept(value::addAndGet)
.withName("doubler")
.finish();
}
}

/** {@link KeySelector} for tests. */
static class TestKeySelector implements KeySelector<Tuple2<Integer, String>, Integer> {
private static final long serialVersionUID = 1L;
Expand Down

0 comments on commit db1283c

Please sign in to comment.