From 6189aeffdbd1e179c2ffe0c9d0873898c2a26f3a Mon Sep 17 00:00:00 2001 From: zhangminglei Date: Sun, 22 Apr 2018 15:14:57 +0800 Subject: [PATCH] [FLINK-8999] [e2e] Ensure the job has an operator with operator state --- .../flink-operator-state-test/pom.xml | 85 +++++++++++++++ .../test/OperatorStateTestProgram.java | 103 ++++++++++++++++++ flink-end-to-end-tests/pom.xml | 1 + .../run-pre-commit-tests.sh | 7 ++ .../test_streaming_operator_state.sh | 39 +++++++ 5 files changed, 235 insertions(+) create mode 100644 flink-end-to-end-tests/flink-operator-state-test/pom.xml create mode 100644 flink-end-to-end-tests/flink-operator-state-test/src/main/java/org/apache/flink/streaming/test/OperatorStateTestProgram.java create mode 100755 flink-end-to-end-tests/test-scripts/test_streaming_operator_state.sh diff --git a/flink-end-to-end-tests/flink-operator-state-test/pom.xml b/flink-end-to-end-tests/flink-operator-state-test/pom.xml new file mode 100644 index 0000000000000..0630a633199fe --- /dev/null +++ b/flink-end-to-end-tests/flink-operator-state-test/pom.xml @@ -0,0 +1,85 @@ + + + + + flink-end-to-end-tests + org.apache.flink + 1.6-SNAPSHOT + .. + + 4.0.0 + + flink-operator-state-test + flink-operator-state-test + jar + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + provided + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.0.0 + + + package + + shade + + + OperatorStateTestProgram + + + com.google.code.findbugs:jsr305 + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + org.apache.flink.streaming.test.OperatorStateTestProgram + + + + + + + + + + diff --git a/flink-end-to-end-tests/flink-operator-state-test/src/main/java/org/apache/flink/streaming/test/OperatorStateTestProgram.java b/flink-end-to-end-tests/flink-operator-state-test/src/main/java/org/apache/flink/streaming/test/OperatorStateTestProgram.java new file mode 100644 index 0000000000000..cb5f89dc1c89a --- /dev/null +++ b/flink-end-to-end-tests/flink-operator-state-test/src/main/java/org/apache/flink/streaming/test/OperatorStateTestProgram.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.test; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import java.util.Collections; +import java.util.List; + +/** + * End to end test for operator state in a job. + * + *

Program to test a operator state within an operator, to count the number to + * a new value is the old value + 1. + * + *

Program parameters: + * -outputPath Sets the path to where the result data is written. + */ +public class OperatorStateTestProgram { + public static void main(String[] args) throws Exception { + ParameterTool params = ParameterTool.fromArgs(args); + String outputPath = params.getRequired("outputPath"); + + StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + sEnv.setParallelism(1); + sEnv + .addSource(new DataGenerator()) + .map(new CountUpMap()) + .writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE); + + sEnv.execute(); + } + + /** + * The map operator that record the number by using operator state. + */ + public static class CountUpMap implements MapFunction, Tuple2>, + ListCheckpointed { + + private int count; + + @Override + public Tuple2 map(Tuple1 value) throws Exception { + count++; + return new Tuple2<>(value.f0, count); + } + + @Override + public List snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(count); + } + + @Override + public void restoreState(List state) throws Exception { + for (Integer i : state) { + count += i; + } + } + } + + /** + * Data-generating source function. + */ + public static class DataGenerator implements SourceFunction> { + + @Override + public void run(SourceContext> ctx) throws Exception { + for (int i = 0; i < 1000; i++) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(Tuple1.of("Some payloads......")); + } + } + } + + @Override + public void cancel() { + + } + } +} diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index f7a6a70563f95..4d6d81fad2d11 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -39,6 +39,7 @@ under the License. flink-dataset-allround-test flink-stream-sql-test flink-bucketing-sink-test + flink-operator-state-test diff --git a/flink-end-to-end-tests/run-pre-commit-tests.sh b/flink-end-to-end-tests/run-pre-commit-tests.sh index 2c1810b91c8bf..787cb0b052328 100755 --- a/flink-end-to-end-tests/run-pre-commit-tests.sh +++ b/flink-end-to-end-tests/run-pre-commit-tests.sh @@ -93,6 +93,13 @@ if [ $EXIT_CODE == 0 ]; then EXIT_CODE=$? fi +if [ $EXIT_CODE == 0 ]; then + printf "\n==============================================================================\n" + printf "Running Streaming Operator State end-to-end test\n" + printf "==============================================================================\n" + $END_TO_END_DIR/test-scripts/test_streaming_operator_state.sh + EXIT_CODE=$? +fi # Exit code for Travis build success/failure exit $EXIT_CODE diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_operator_state.sh b/flink-end-to-end-tests/test-scripts/test_streaming_operator_state.sh new file mode 100755 index 0000000000000..65eb994ddc85b --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_streaming_operator_state.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-operator-state-test/target/OperatorStateTestProgram.jar + +start_cluster + +$FLINK_DIR/bin/flink run -p 1 $TEST_PROGRAM_JAR \ +--outputPath $TEST_DATA_DIR/complete_result + +function operator_state_cleanup() { + stop_cluster + + # make sure to run regular cleanup as well + cleanup +} + +trap operator_state_cleanup INT +trap operator_state_cleanup EXIT + +check_result_hash "Operator State" $TEST_DATA_DIR/complete_result "4a12dbf4937219c60e27b69aecc023d8"