From 28951ab397cef9e655cda6ab0892f29d4f006ba7 Mon Sep 17 00:00:00 2001 From: kkloudas Date: Thu, 15 Mar 2018 13:13:46 +0100 Subject: [PATCH 1/2] [FLINK-8973] [E2E] HA end-to-end test with StateMachineExample. Adds an end-to-end test that runs the StateMachineExample on a local cluster with HA enabled. There is a single JM which gets killed and re-created and we check if the new JM picks up the job execution and if at the end the StateMachine has no ALERTs printed. --- .../run-pre-commit-tests.sh | 7 + flink-end-to-end-tests/test-scripts/common.sh | 144 +++++++++++++++++- .../test-scripts/test_ha.sh | 108 +++++++++++++ .../flink-examples-streaming/pom.xml | 8 +- .../statemachine/StateMachineExample.java | 32 +++- 5 files changed, 292 insertions(+), 7 deletions(-) create mode 100755 flink-end-to-end-tests/test-scripts/test_ha.sh diff --git a/flink-end-to-end-tests/run-pre-commit-tests.sh b/flink-end-to-end-tests/run-pre-commit-tests.sh index 1de66cdce35a3..91a3a1cf82896 100755 --- a/flink-end-to-end-tests/run-pre-commit-tests.sh +++ b/flink-end-to-end-tests/run-pre-commit-tests.sh @@ -117,6 +117,13 @@ if [ $EXIT_CODE == 0 ]; then EXIT_CODE=$? fi +#if [ $EXIT_CODE == 0 ]; then +# printf "\n==============================================================================\n" +# printf "Running HA end-to-end test\n" +# printf "==============================================================================\n" +# $END_TO_END_DIR/test-scripts/test_ha.sh +# EXIT_CODE=$? +#fi # Exit code for Travis build success/failure exit $EXIT_CODE diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index d4b91266ac865..22d2ebe094690 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -39,6 +39,93 @@ cd $TEST_ROOT export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N) echo "TEST_DATA_DIR: $TEST_DATA_DIR" +function revert_default_config() { + sed 's/^ //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL + #============================================================================== + # Common + #============================================================================== + + jobmanager.rpc.address: localhost + jobmanager.rpc.port: 6123 + jobmanager.heap.mb: 1024 + taskmanager.heap.mb: 1024 + taskmanager.numberOfTaskSlots: 1 + parallelism.default: 1 + + #============================================================================== + # Web Frontend + #============================================================================== + + web.port: 8081 +EOL +} + +function create_ha_conf() { + + # create the masters file (only one currently). + # This must have all the masters to be used in HA. + echo "localhost:8081" > ${FLINK_DIR}/conf/masters + + # then move on to create the flink-conf.yaml + + if [ -e $TEST_DATA_DIR/recovery ]; then + echo "File ${TEST_DATA_DIR}/recovery exists. Deleting it..." + rm -rf $TEST_DATA_DIR/recovery + fi + + sed 's/^ //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL + #============================================================================== + # Common + #============================================================================== + + jobmanager.rpc.address: localhost + jobmanager.rpc.port: 6123 + jobmanager.heap.mb: 1024 + taskmanager.heap.mb: 1024 + taskmanager.numberOfTaskSlots: 4 + parallelism.default: 1 + + #============================================================================== + # High Availability + #============================================================================== + + high-availability: zookeeper + high-availability.zookeeper.storageDir: file://${TEST_DATA_DIR}/recovery/ + high-availability.zookeeper.quorum: localhost:2181 + high-availability.zookeeper.path.root: /flink + high-availability.cluster-id: /test_cluster_one + + #============================================================================== + # Web Frontend + #============================================================================== + + web.port: 8081 +EOL +} + +function start_ha_cluster { + echo "Setting up HA Cluster..." + create_ha_conf + start_local_zk + start_cluster +} + +function start_local_zk { + while read server ; do + server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim + + # match server.id=address[:port[:port]] + if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: \#]+) ]]; then + id=${BASH_REMATCH[1]} + address=${BASH_REMATCH[2]} + + ${FLINK_DIR}/bin/zookeeper.sh start $id + else + echo "[WARN] Parse error. Skipping config entry '$server'." + fi + done < <(grep "^server\." "${FLINK_DIR}/conf/zoo.cfg") +} + function start_cluster { "$FLINK_DIR"/bin/start-cluster.sh @@ -59,6 +146,57 @@ function start_cluster { done } +function jm_watchdog() { + expectedJms=$1 + ipPort=$2 + + while true; do + runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | wc -l`; + missingJms=$((expectedJms-runningJms)) + for (( c=0; c /dev/null + wait ${watchdogPid} 2> /dev/null + + stop_ha_cluster +} + +verify_logs() { + expectedRetries=$1 + + # verify that we have no alerts + if ! [ `cat ${output} | wc -l` -eq 0 ]; then + echo "FAILURE: Alerts found at the StateMachineExample with 0.0 error rate." + PASS="" + fi + + # checks that all apart from the first JM recover the failes jobgraph. + if ! [ `grep -r --include '*standalonesession*.log' Recovered SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${expectedRetries} ]; then + echo "FAILURE: A JM did not take over." + PASS="" + fi + + # search the logs for JMs that log completed checkpoints + if ! [ `grep -r --include '*standalonesession*.log' Completed checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((expectedRetries + 1)) ]; then + echo "FAILURE: A JM did not execute the job." + PASS="" + fi +} + +run_ha_test() { + parallelism=$1 + backend=$2 + async=$3 + incremental=$4 + maxAttempts=$5 + rstrtInterval=$6 + output=$7 + + jmKillAndRetries=2 + checkpointDir="${TEST_DATA_DIR}/checkpoints/" + + # start the cluster on HA mode and + # verify that all JMs are running + start_ha_cluster + + echo "Running on HA mode: parallelism=${parallelism}, backend=${backend}, asyncSnapshots=${async}, and incremSnapshots=${incremental}." + + # submit a job in detached mode and let it run + $FLINK_DIR/bin/flink run -d -p ${parallelism} \ + $TEST_PROGRAM_JAR \ + --stateBackend ${backend} \ + --checkpointDir "file://${checkpointDir}" \ + --asyncCheckpoints ${async} \ + --incrementalCheckpoints ${incremental} \ + --restartAttempts ${maxAttempts} \ + --restartDelay ${rstrtInterval} \ + --output ${output} > /dev/null + + # start the watchdog that keeps the number of JMs stable + jm_watchdog 1 "8081" & + watchdogPid=$! + + # let the job run for a while to take some checkpoints + sleep 50 + + for (( c=0; c<${jmKillAndRetries}; c++ )); do + # kill the JM and wait for watchdog to + # create a new JM which will take over + kill_jm 0 + sleep 50 + done + + verify_logs ${jmKillAndRetries} + + # kill the cluster and zookeeper + stop_cluster_and_watchdog +} + +run_ha_test 1 "file" "false" "false" 3 100 "${TEST_DATA_DIR}/output.txt" +run_ha_test 1 "rocks" "false" "false" 3 100 "${TEST_DATA_DIR}/output.txt" +run_ha_test 1 "file" "true" "false" 3 100 "${TEST_DATA_DIR}/output.txt" +run_ha_test 1 "rocks" "false" "true" 3 100 "${TEST_DATA_DIR}/output.txt" +trap stop_cluster_and_watchdog EXIT diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml index ea253d8ea9771..6ff5512e810c1 100644 --- a/flink-examples/flink-examples-streaming/pom.xml +++ b/flink-examples/flink-examples-streaming/pom.xml @@ -84,7 +84,13 @@ under the License. test-jar - + + org.apache.flink + flink-statebackend-rocksdb_${scala.binary.version} + ${project.version} + + + diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java index 14757fb325e6f..d6bb028439e99 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java @@ -19,10 +19,14 @@ package org.apache.flink.streaming.examples.statemachine; import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -92,7 +96,29 @@ public static void main(String[] args) throws Exception { // create the environment to create streams and configure execution final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.enableCheckpointing(5000); + + final String checkpointDir = params.getRequired("checkpointDir"); + final String stateBackend = params.get("stateBackend", "file"); + if ("file".equals(stateBackend)) { + boolean asyncCheckpoints = params.getBoolean("asyncCheckpoints", false); + env.setStateBackend(new FsStateBackend(checkpointDir, asyncCheckpoints)); + } else if ("rocks".equals(stateBackend)) { + boolean incrementalCheckpoints = params.getBoolean("incrementalCheckpoints", false); + env.setStateBackend(new RocksDBStateBackend(checkpointDir, incrementalCheckpoints)); + } else { + throw new IllegalArgumentException("Unknown backend: " + stateBackend); + } + + env.enableCheckpointing(2000L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart( + params.getInt("restartAttempts", Integer.MAX_VALUE), + params.getLong("restartDelay", 0L)) + ); + + final String outputFile = params.getRequired("output"); + + // make parameters available in the web interface + env.getConfig().setGlobalJobParameters(params); DataStream events = env.addSource(source); @@ -105,7 +131,7 @@ public static void main(String[] args) throws Exception { .flatMap(new StateMachineMapper()); // output the alerts to std-out - alerts.print(); + alerts.writeAsText(outputFile, FileSystem.WriteMode.OVERWRITE); // trigger program execution env.execute("State machine job"); @@ -140,7 +166,7 @@ public void flatMap(Event evt, Collector out) throws Exception { state = State.Initial; } - // ask the state machine what state we should go to based on teh given event + // ask the state machine what state we should go to based on the given event State nextState = state.transition(evt.type()); if (nextState == State.InvalidTransition) { From 552746d646a5e95a268382f402e614d17c10b8e3 Mon Sep 17 00:00:00 2001 From: kkloudas Date: Fri, 23 Mar 2018 16:22:45 +0100 Subject: [PATCH 2/2] Comments + improvements --- flink-end-to-end-tests/run-nightly-tests.sh | 9 + .../run-pre-commit-tests.sh | 7 - flink-end-to-end-tests/test-scripts/common.sh | 114 ++++------- .../test-scripts/test_ha.sh | 188 ++++++++++++++---- .../statemachine/StateMachineExample.java | 33 +-- 5 files changed, 209 insertions(+), 142 deletions(-) diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index eca2e4db47af5..c173bfbb7ae9d 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -47,6 +47,15 @@ EXIT_CODE=0 # EXIT_CODE=$? # fi + +if [ $EXIT_CODE == 0 ]; then + printf "\n==============================================================================\n" + printf "Running HA end-to-end test\n" + printf "==============================================================================\n" + $END_TO_END_DIR/test-scripts/test_ha.sh + EXIT_CODE=$? +fi + if [ $EXIT_CODE == 0 ]; then printf "\n==============================================================================\n" printf "Running DataSet allround nightly end-to-end test\n" diff --git a/flink-end-to-end-tests/run-pre-commit-tests.sh b/flink-end-to-end-tests/run-pre-commit-tests.sh index 91a3a1cf82896..1de66cdce35a3 100755 --- a/flink-end-to-end-tests/run-pre-commit-tests.sh +++ b/flink-end-to-end-tests/run-pre-commit-tests.sh @@ -117,13 +117,6 @@ if [ $EXIT_CODE == 0 ]; then EXIT_CODE=$? fi -#if [ $EXIT_CODE == 0 ]; then -# printf "\n==============================================================================\n" -# printf "Running HA end-to-end test\n" -# printf "==============================================================================\n" -# $END_TO_END_DIR/test-scripts/test_ha.sh -# EXIT_CODE=$? -#fi # Exit code for Travis build success/failure exit $EXIT_CODE diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index 22d2ebe094690..0db735a1fdf93 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -40,39 +40,38 @@ export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N) echo "TEST_DATA_DIR: $TEST_DATA_DIR" function revert_default_config() { - sed 's/^ //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL - #============================================================================== - # Common - #============================================================================== - - jobmanager.rpc.address: localhost - jobmanager.rpc.port: 6123 - jobmanager.heap.mb: 1024 - taskmanager.heap.mb: 1024 - taskmanager.numberOfTaskSlots: 1 - parallelism.default: 1 - #============================================================================== - # Web Frontend - #============================================================================== + # revert our modifications to the masters file + if [ -f $FLINK_DIR/conf/masters.bak ]; then + rm $FLINK_DIR/conf/masters + mv $FLINK_DIR/conf/masters.bak $FLINK_DIR/conf/masters + fi - web.port: 8081 -EOL + # revert our modifications to the Flink conf yaml + if [ -f $FLINK_DIR/conf/flink-conf.yaml.bak ]; then + rm $FLINK_DIR/conf/flink-conf.yaml + mv $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml + fi } -function create_ha_conf() { +function create_ha_config() { - # create the masters file (only one currently). - # This must have all the masters to be used in HA. - echo "localhost:8081" > ${FLINK_DIR}/conf/masters - - # then move on to create the flink-conf.yaml + # back up the masters and flink-conf.yaml + cp $FLINK_DIR/conf/masters $FLINK_DIR/conf/masters.bak + cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak + # clean up the dir that will be used for zookeeper storage + # (see high-availability.zookeeper.storageDir below) if [ -e $TEST_DATA_DIR/recovery ]; then echo "File ${TEST_DATA_DIR}/recovery exists. Deleting it..." rm -rf $TEST_DATA_DIR/recovery fi + # create the masters file (only one currently). + # This must have all the masters to be used in HA. + echo "localhost:8081" > ${FLINK_DIR}/conf/masters + + # then move on to create the flink-conf.yaml sed 's/^ //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL #============================================================================== # Common @@ -104,13 +103,17 @@ EOL } function start_ha_cluster { - echo "Setting up HA Cluster..." - create_ha_conf + create_ha_config start_local_zk start_cluster } function start_local_zk { + # Parses the zoo.cfg and starts locally zk. + + # This is almost the same code as the + # /bin/start-zookeeper-quorum.sh without the SSH part and only running for localhost. + while read server ; do server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim @@ -119,6 +122,11 @@ function start_local_zk { id=${BASH_REMATCH[1]} address=${BASH_REMATCH[2]} + if [ "${address}" != "localhost" ]; then + echo "[ERROR] Parse error. Only available for localhost." + PASS="" + exit 1 + fi ${FLINK_DIR}/bin/zookeeper.sh start $id else echo "[WARN] Parse error. Skipping config entry '$server'." @@ -146,60 +154,14 @@ function start_cluster { done } -function jm_watchdog() { - expectedJms=$1 - ipPort=$2 - - while true; do - runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | wc -l`; - missingJms=$((expectedJms-runningJms)) - for (( c=0; c /dev/null + wait ${JM_WATCHDOG_PID} 2> /dev/null + fi -stop_cluster_and_watchdog() { - kill ${watchdogPid} 2> /dev/null - wait ${watchdogPid} 2> /dev/null + if ! [ ${TM_WATCHDOG_PID} -eq 0 ]; then + echo "Killing TM watchdog @ ${TM_WATCHDOG_PID}" + kill ${TM_WATCHDOG_PID} 2> /dev/null + wait ${TM_WATCHDOG_PID} 2> /dev/null + fi - stop_ha_cluster + cleanup + CLEARED=1 + fi } -verify_logs() { - expectedRetries=$1 +function verify_logs() { + local OUTPUT=$1 + local JM_FAILURES=$2 # verify that we have no alerts - if ! [ `cat ${output} | wc -l` -eq 0 ]; then + if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then echo "FAILURE: Alerts found at the StateMachineExample with 0.0 error rate." PASS="" fi # checks that all apart from the first JM recover the failes jobgraph. - if ! [ `grep -r --include '*standalonesession*.log' Recovered SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${expectedRetries} ]; then + if ! [ `grep -r --include '*standalonesession*.log' Recovered SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then echo "FAILURE: A JM did not take over." PASS="" fi # search the logs for JMs that log completed checkpoints - if ! [ `grep -r --include '*standalonesession*.log' Completed checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((expectedRetries + 1)) ]; then + if ! [ `grep -r --include '*standalonesession*.log' Completed checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((JM_FAILURES + 1)) ]; then echo "FAILURE: A JM did not execute the job." PASS="" fi + + if [[ ! "$PASS" ]]; then + echo "One or more tests FAILED." + exit 1 + fi +} + +function jm_watchdog() { + local EXPECTED_JMS=$1 + local IP_PORT=$2 + + while true; do + local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`; + local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS)) + for (( c=0; c because when the JM goes down, + # the job starts with reporting 0 successful checkpoints + + local RUNNING_TMS=`jps | grep 'TaskManagerRunner' | wc -l` + local TM_PIDS=`jps | grep 'TaskManagerRunner' | cut -d " " -f 1` + + local MISSING_TMS=$((EXPECTED_TMS-RUNNING_TMS)) + if [ ${MISSING_TMS} -eq 0 ]; then + # start a new TM only if we have exactly the expected number + "$FLINK_DIR"/bin/taskmanager.sh start > /dev/null + fi + + # kill an existing one + local TM_PIDS=(${TM_PIDS[@]}) + local PID=${TM_PIDS[0]} + kill -9 ${PID} + + echo "Killed TM @ ${PID}" + + SUCCESSFUL_CHCKP=${CHECKPOINTS} + fi + + sleep 11; + done } -run_ha_test() { - parallelism=$1 - backend=$2 - async=$3 - incremental=$4 - maxAttempts=$5 - rstrtInterval=$6 - output=$7 +function run_ha_test() { + PARALLELISM=$1 + BACKEND=$2 + ASYNC=$3 + INCREM=$4 + OUTPUT=$5 - jmKillAndRetries=2 - checkpointDir="${TEST_DATA_DIR}/checkpoints/" + JM_KILLS=3 + CLEARED=0 + CHECKPOINT_DIR="${TEST_DATA_DIR}/checkpoints/" - # start the cluster on HA mode and - # verify that all JMs are running + # start the cluster on HA mode start_ha_cluster - echo "Running on HA mode: parallelism=${parallelism}, backend=${backend}, asyncSnapshots=${async}, and incremSnapshots=${incremental}." + echo "Running on HA mode: parallelism=${PARALLELISM}, backend=${BACKEND}, asyncSnapshots=${ASYNC}, and incremSnapshots=${INCREM}." # submit a job in detached mode and let it run - $FLINK_DIR/bin/flink run -d -p ${parallelism} \ + JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \ $TEST_PROGRAM_JAR \ - --stateBackend ${backend} \ - --checkpointDir "file://${checkpointDir}" \ - --asyncCheckpoints ${async} \ - --incrementalCheckpoints ${incremental} \ - --restartAttempts ${maxAttempts} \ - --restartDelay ${rstrtInterval} \ - --output ${output} > /dev/null + --backend ${BACKEND} \ + --checkpoint-dir "file://${CHECKPOINT_DIR}" \ + --async-checkpoints ${ASYNC} \ + --incremental-checkpoints ${INCREM} \ + --output ${OUTPUT} | grep "Job has been submitted with JobID" | sed 's/.* //g') + + wait_job_running ${JOB_ID} # start the watchdog that keeps the number of JMs stable jm_watchdog 1 "8081" & - watchdogPid=$! + JM_WATCHDOG_PID=$! + echo "Running JM watchdog @ ${JM_WATCHDOG_PID}" + + sleep 5 + + # start the watchdog that keeps the number of TMs stable + tm_watchdog ${JOB_ID} 1 & + TM_WATCHDOG_PID=$! + echo "Running TM watchdog @ ${TM_WATCHDOG_PID}" # let the job run for a while to take some checkpoints - sleep 50 + sleep 20 - for (( c=0; c<${jmKillAndRetries}; c++ )); do + for (( c=0; c<${JM_KILLS}; c++ )); do # kill the JM and wait for watchdog to - # create a new JM which will take over - kill_jm 0 - sleep 50 + # create a new one which will take over + kill_jm + sleep 60 done - verify_logs ${jmKillAndRetries} + verify_logs "${TEST_DATA_DIR}/output.txt}" ${JM_KILLS} # kill the cluster and zookeeper stop_cluster_and_watchdog } -run_ha_test 1 "file" "false" "false" 3 100 "${TEST_DATA_DIR}/output.txt" -run_ha_test 1 "rocks" "false" "false" 3 100 "${TEST_DATA_DIR}/output.txt" -run_ha_test 1 "file" "true" "false" 3 100 "${TEST_DATA_DIR}/output.txt" -run_ha_test 1 "rocks" "false" "true" 3 100 "${TEST_DATA_DIR}/output.txt" trap stop_cluster_and_watchdog EXIT +run_ha_test 1 "file" "false" "false" "${TEST_DATA_DIR}/output.txt" +run_ha_test 1 "rocks" "false" "false" "${TEST_DATA_DIR}/output.txt" +run_ha_test 1 "file" "true" "false" "${TEST_DATA_DIR}/output.txt" +run_ha_test 1 "rocks" "false" "true" "${TEST_DATA_DIR}/output.txt" diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java index d6bb028439e99..79c42c75597e7 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.examples.statemachine; import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.utils.ParameterTool; @@ -59,6 +58,12 @@ public static void main(String[] args) throws Exception { System.out.println("Usage with built-in data generator: StateMachineExample [--error-rate ] [--sleep ]"); System.out.println("Usage with Kafka: StateMachineExample --kafka-topic [--brokers ]"); + System.out.println("Options for both the above setups: "); + System.out.println("\t[--backend ]"); + System.out.println("\t[--checkpoint-dir ]"); + System.out.println("\t[--async-checkpoints ]"); + System.out.println("\t[--incremental-checkpoints ]"); + System.out.println("\t[--output OR null for stdout]"); System.out.println(); // ---- determine whether to use the built-in source, or read from Kafka ---- @@ -96,26 +101,20 @@ public static void main(String[] args) throws Exception { // create the environment to create streams and configure execution final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(2000L); - final String checkpointDir = params.getRequired("checkpointDir"); - final String stateBackend = params.get("stateBackend", "file"); + final String stateBackend = params.get("backend", "memory"); if ("file".equals(stateBackend)) { - boolean asyncCheckpoints = params.getBoolean("asyncCheckpoints", false); + final String checkpointDir = params.get("checkpoint-dir"); + boolean asyncCheckpoints = params.getBoolean("async-checkpoints", false); env.setStateBackend(new FsStateBackend(checkpointDir, asyncCheckpoints)); } else if ("rocks".equals(stateBackend)) { - boolean incrementalCheckpoints = params.getBoolean("incrementalCheckpoints", false); + final String checkpointDir = params.get("checkpoint-dir"); + boolean incrementalCheckpoints = params.getBoolean("incremental-checkpoints", false); env.setStateBackend(new RocksDBStateBackend(checkpointDir, incrementalCheckpoints)); - } else { - throw new IllegalArgumentException("Unknown backend: " + stateBackend); } - env.enableCheckpointing(2000L); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart( - params.getInt("restartAttempts", Integer.MAX_VALUE), - params.getLong("restartDelay", 0L)) - ); - - final String outputFile = params.getRequired("output"); + final String outputFile = params.get("output"); // make parameters available in the web interface env.getConfig().setGlobalJobParameters(params); @@ -131,7 +130,11 @@ public static void main(String[] args) throws Exception { .flatMap(new StateMachineMapper()); // output the alerts to std-out - alerts.writeAsText(outputFile, FileSystem.WriteMode.OVERWRITE); + if (outputFile == null) { + alerts.print(); + } else { + alerts.writeAsText(outputFile, FileSystem.WriteMode.OVERWRITE); + } // trigger program execution env.execute("State machine job");