From 428d5427227343479b6d63daf7fced8f1bf9a69c Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Wed, 25 Jul 2018 14:58:46 +0800 Subject: [PATCH] [FLINK-8993] [tests] Let general purpose DataStream job uses KryoSerializer via type extraction --- .../DataStreamAllroundTestJobFactory.java | 45 +++++++++++++++---- .../tests/DataStreamAllroundTestProgram.java | 3 +- .../builder/ArtificialListStateBuilder.java | 13 +++--- .../builder/ArtificialValueStateBuilder.java | 12 +++-- .../StatefulStreamJobUpgradeTestProgram.java | 25 ++++++----- 5 files changed, 63 insertions(+), 35 deletions(-) diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java index 123807297efa5..fb92960bb8684 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigOption; @@ -382,13 +384,38 @@ static MapFunction createExceptionThrowingFailureMapper(ParameterT static ArtificialKeyedStateMapper createArtificialKeyedStateMapper( MapFunction mapFunction, JoinFunction inputAndOldStateToNewState, - List> stateSerializers) { + List> stateSerializers, + List> stateClasses) { List> artificialStateBuilders = new ArrayList<>(stateSerializers.size()); for (TypeSerializer typeSerializer : stateSerializers) { - artificialStateBuilders.add(createValueStateBuilder(inputAndOldStateToNewState, typeSerializer)); - artificialStateBuilders.add(createListStateBuilder(inputAndOldStateToNewState, typeSerializer)); + artificialStateBuilders.add(createValueStateBuilder( + inputAndOldStateToNewState, + new ValueStateDescriptor<>( + "valueState-" + typeSerializer.getClass().getSimpleName(), + typeSerializer))); + + artificialStateBuilders.add(createListStateBuilder( + inputAndOldStateToNewState, + new ListStateDescriptor<>( + "listState-" + typeSerializer.getClass().getSimpleName(), + typeSerializer))); } + + for (Class stateClass : stateClasses) { + artificialStateBuilders.add(createValueStateBuilder( + inputAndOldStateToNewState, + new ValueStateDescriptor<>( + "valueState-" + stateClass.getSimpleName(), + stateClass))); + + artificialStateBuilders.add(createListStateBuilder( + inputAndOldStateToNewState, + new ListStateDescriptor<>( + "listState-" + stateClass.getSimpleName(), + stateClass))); + } + return new ArtificialKeyedStateMapper<>(mapFunction, artificialStateBuilders); } @@ -400,17 +427,17 @@ static ArtificalOperatorStateMapper createArtificialOperatorS static ArtificialStateBuilder createValueStateBuilder( JoinFunction inputAndOldStateToNewState, - TypeSerializer typeSerializer) { + ValueStateDescriptor valueStateDescriptor) { return new ArtificialValueStateBuilder<>( - "valueState-" + typeSerializer.getClass().getSimpleName(), + valueStateDescriptor.getName(), inputAndOldStateToNewState, - typeSerializer); + valueStateDescriptor); } static ArtificialStateBuilder createListStateBuilder( JoinFunction inputAndOldStateToNewState, - TypeSerializer typeSerializer) { + ListStateDescriptor listStateDescriptor) { JoinFunction, List> listStateGenerator = (first, second) -> { List newState = new ArrayList<>(); @@ -421,9 +448,9 @@ static ArtificialStateBuilder createListStateBuilder( }; return new ArtificialListStateBuilder<>( - "listState-" + typeSerializer.getClass().getSimpleName(), + listStateDescriptor.getName(), listStateGenerator, listStateGenerator, - typeSerializer); + listStateDescriptor); } } diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java index ea90e6551038e..30c1c24ee3fee 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java @@ -86,7 +86,8 @@ public static void main(String[] args) throws Exception { return new ComplexPayload(first, KEYED_STATE_OPER_NAME); }, Collections.singletonList( - new KryoSerializer<>(ComplexPayload.class, env.getConfig())) + new KryoSerializer<>(ComplexPayload.class, env.getConfig())), // custom KryoSerializer + Collections.singletonList(ComplexPayload.class) // KryoSerializer via type extraction ) ) .name(KEYED_STATE_OPER_NAME) diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java index a2c63877dd3d4..b29e535cfb2ab 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java @@ -21,8 +21,8 @@ import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.util.Preconditions; import java.util.List; @@ -35,7 +35,7 @@ public class ArtificialListStateBuilder extends ArtificialStateBuilde private transient ListState listOperatorState; private transient ListState listKeyedState; - private final TypeSerializer typeSerializer; + private final ListStateDescriptor listStateDescriptor; private final JoinFunction, List> keyedStateGenerator; private final JoinFunction, List> operatorStateGenerator; @@ -43,11 +43,11 @@ public ArtificialListStateBuilder( String stateName, JoinFunction, List> keyedStateGenerator, JoinFunction, List> operatorStateGenerator, - TypeSerializer typeSerializer) { + ListStateDescriptor listStateDescriptor) { super(stateName); - this.typeSerializer = typeSerializer; - this.keyedStateGenerator = keyedStateGenerator; - this.operatorStateGenerator = operatorStateGenerator; + this.listStateDescriptor = Preconditions.checkNotNull(listStateDescriptor); + this.keyedStateGenerator = Preconditions.checkNotNull(keyedStateGenerator); + this.operatorStateGenerator = Preconditions.checkNotNull(operatorStateGenerator); } @Override @@ -58,7 +58,6 @@ public void artificialStateForElement(IN event) throws Exception { @Override public void initialize(FunctionInitializationContext initializationContext) throws Exception { - ListStateDescriptor listStateDescriptor = new ListStateDescriptor<>(stateName, typeSerializer); listOperatorState = initializationContext.getOperatorStateStore().getListState(listStateDescriptor); listKeyedState = initializationContext.getKeyedStateStore().getListState(listStateDescriptor); } diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java index 6d74e0964f532..421a682351db2 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java @@ -21,8 +21,8 @@ import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.util.Preconditions; /** * An {@link ArtificialStateBuilder} for user {@link ValueState}s. @@ -32,16 +32,16 @@ public class ArtificialValueStateBuilder extends ArtificialStateBuild private static final long serialVersionUID = -1205814329756790916L; private transient ValueState valueState; - private final TypeSerializer typeSerializer; + private final ValueStateDescriptor valueStateDescriptor; private final JoinFunction stateValueGenerator; public ArtificialValueStateBuilder( String stateName, JoinFunction stateValueGenerator, - TypeSerializer typeSerializer) { + ValueStateDescriptor valueStateDescriptor) { super(stateName); - this.typeSerializer = typeSerializer; - this.stateValueGenerator = stateValueGenerator; + this.valueStateDescriptor = Preconditions.checkNotNull(valueStateDescriptor); + this.stateValueGenerator = Preconditions.checkNotNull(stateValueGenerator); } @Override @@ -51,8 +51,6 @@ public void artificialStateForElement(IN event) throws Exception { @Override public void initialize(FunctionInitializationContext initializationContext) { - ValueStateDescriptor valueStateDescriptor = - new ValueStateDescriptor<>(stateName, typeSerializer); valueState = initializationContext.getKeyedStateStore().getState(valueStateDescriptor); } } diff --git a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java index 0b3b5ed4b8964..4f77f954d3e81 100644 --- a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java +++ b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java @@ -84,8 +84,8 @@ public static void main(String[] args) throws Exception { Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig())); KeyedStream afterStatefulOperations = isOriginalJobVariant(pt) ? - applyOriginalStatefulOperations(source, stateSer) : - applyUpgradedStatefulOperations(source, stateSer); + applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) : + applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList()); afterStatefulOperations .flatMap(createSemanticsCheckMapper(pt)) @@ -109,26 +109,29 @@ private static boolean isOriginalJobVariant(final ParameterTool pt) { private static KeyedStream applyOriginalStatefulOperations( KeyedStream source, - List> stateSer) { - source = applyTestStatefulOperator("stateMap1", simpleStateUpdate("stateMap1"), source, stateSer); - return applyTestStatefulOperator("stateMap2", lastStateUpdate("stateMap2"), source, stateSer); + List> stateSer, + List> stateClass) { + source = applyTestStatefulOperator("stateMap1", simpleStateUpdate("stateMap1"), source, stateSer, stateClass); + return applyTestStatefulOperator("stateMap2", lastStateUpdate("stateMap2"), source, stateSer, stateClass); } private static KeyedStream applyUpgradedStatefulOperations( KeyedStream source, - List> stateSer) { - source = applyTestStatefulOperator("stateMap2", simpleStateUpdate("stateMap2"), source, stateSer); - source = applyTestStatefulOperator("stateMap1", lastStateUpdate("stateMap1"), source, stateSer); - return applyTestStatefulOperator("stateMap3", simpleStateUpdate("stateMap3"), source, stateSer); + List> stateSer, + List> stateClass) { + source = applyTestStatefulOperator("stateMap2", simpleStateUpdate("stateMap2"), source, stateSer, stateClass); + source = applyTestStatefulOperator("stateMap1", lastStateUpdate("stateMap1"), source, stateSer, stateClass); + return applyTestStatefulOperator("stateMap3", simpleStateUpdate("stateMap3"), source, stateSer, stateClass); } private static KeyedStream applyTestStatefulOperator( String name, JoinFunction stateFunc, KeyedStream source, - List> stateSer) { + List> stateSer, + List> stateClass) { return source - .map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer)) + .map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer, stateClass)) .name(name) .uid(name) .returns(Event.class)