From edae441617d31d896c5a5ceb4550faaadbe1e58c Mon Sep 17 00:00:00 2001 From: Rong Rong Date: Tue, 17 Apr 2018 13:45:23 -0700 Subject: [PATCH 1/3] initial commit to support CLI test, excluding YARN test --- flink-end-to-end-tests/flink-api-test/pom.xml | 105 ++++++++++++ .../runtime/tests/PeriodicStreamingJob.java | 127 ++++++++++++++ flink-end-to-end-tests/pom.xml | 1 + .../test-scripts/test_cli_api.sh | 155 ++++++++++++++++++ 4 files changed, 388 insertions(+) create mode 100644 flink-end-to-end-tests/flink-api-test/pom.xml create mode 100644 flink-end-to-end-tests/flink-api-test/src/main/java/org/apache/flink/runtime/tests/PeriodicStreamingJob.java create mode 100755 flink-end-to-end-tests/test-scripts/test_cli_api.sh diff --git a/flink-end-to-end-tests/flink-api-test/pom.xml b/flink-end-to-end-tests/flink-api-test/pom.xml new file mode 100644 index 0000000000000..24a85e02bef7b --- /dev/null +++ b/flink-end-to-end-tests/flink-api-test/pom.xml @@ -0,0 +1,105 @@ + + + + + + flink-end-to-end-tests + org.apache.flink + 1.6-SNAPSHOT + .. + + + 4.0.0 + + flink-api-test + flink-api-test + jar + + + + org.apache.flink + flink-core + ${project.version} + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + + + org.apache.flink + flink-connector-filesystem_${scala.binary.version} + ${project.version} + + + org.apache.flink + flink-runtime-web_${scala.binary.version} + ${project.version} + test-jar + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.0.0 + + + package + + shade + + + PeriodicStreamingJob + + + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + org.apache.flink.runtime.tests.PeriodicStreamingJob + + + + + + + + + + + diff --git a/flink-end-to-end-tests/flink-api-test/src/main/java/org/apache/flink/runtime/tests/PeriodicStreamingJob.java b/flink-end-to-end-tests/flink-api-test/src/main/java/org/apache/flink/runtime/tests/PeriodicStreamingJob.java new file mode 100644 index 0000000000000..ee8a49b858350 --- /dev/null +++ b/flink-end-to-end-tests/flink-api-test/src/main/java/org/apache/flink/runtime/tests/PeriodicStreamingJob.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.tests; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.windowing.time.Time; + +import java.util.Collections; +import java.util.List; + +/** + * This is a periodic streaming job that runs for API testing purpose. + * + *

The stream is bounded and will complete after the specified duration. + * + *

Parameters: + * -outputPath Sets the path to where the result data is written. + * -recordsPerSecond Sets the output record frequency. + * -durationInSecond Sets the running duration of the job. + * -offsetInSecond Sets the startup delay before the processing first message. + */ +public class PeriodicStreamingJob { + + public static void main(String[] args) throws Exception { + ParameterTool params = ParameterTool.fromArgs(args); + String outputPath = params.getRequired("outputPath"); + int recordsPerSecond = params.getInt("recordsPerSecond", 10); + int duration = params.getInt("durationInSecond", 60); + int offset = params.getInt("offsetInSecond", 0); + + StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + sEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + sEnv.enableCheckpointing(4000); + sEnv.getConfig().setAutoWatermarkInterval(1000); + + // execute a simple pass through program. + PeriodicSourceGenerator generator = new PeriodicSourceGenerator( + recordsPerSecond, duration, offset); + DataStream rows = sEnv.addSource(generator); + + DataStream result = rows + .keyBy(1) + .timeWindow(Time.seconds(5)) + .sum(0); + + result.writeAsText(outputPath + "/result.txt", FileSystem.WriteMode.OVERWRITE) + .setParallelism(1); + + sEnv.execute(); + } + + /** + * Data-generating source function. + */ + public static class PeriodicSourceGenerator implements SourceFunction, ResultTypeQueryable, ListCheckpointed { + private final int sleepMs; + private final int durationMs; + private final int offsetSeconds; + private long ms = 0; + + public PeriodicSourceGenerator(float rowsPerSecond, int durationSeconds, int offsetSeconds) { + this.durationMs = durationSeconds * 1000; + this.sleepMs = (int) (1000 / rowsPerSecond); + this.offsetSeconds = offsetSeconds; + } + + @Override + public void run(SourceContext ctx) throws Exception { + long offsetMs = offsetSeconds * 1000L; + + while (ms < durationMs) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(new Tuple2<>(ms + offsetMs, "key")); + } + ms += sleepMs; + Thread.sleep(sleepMs); + } + } + + @Override + public void cancel() { } + + @Override + public TypeInformation getProducedType() { + return Types.TUPLE(Types.LONG, Types.STRING); + } + + @Override + public List snapshotState(long checkpointId, long timestamp) { + return Collections.singletonList(ms); + } + + @Override + public void restoreState(List state) { + for (Long l : state) { + ms += l; + } + } + } +} diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index 581abc84d3c64..4b1d287297b10 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -35,6 +35,7 @@ under the License. flink-end-to-end-tests + flink-api-test flink-parent-child-classloading-test flink-dataset-allround-test flink-datastream-allround-test diff --git a/flink-end-to-end-tests/test-scripts/test_cli_api.sh b/flink-end-to-end-tests/test-scripts/test_cli_api.sh new file mode 100755 index 0000000000000..7af30c36c1d36 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_cli_api.sh @@ -0,0 +1,155 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +source "$(dirname "$0")"/common.sh + +start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +# Test for CLI commands. +# verify only the return code the content correctness of the API results. +PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" +SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." + +EXIT_CODE=0 + +function extract_job_id_from_job_submission_return() { + if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; + then + JOB_ID="${BASH_REMATCH[1]}"; + else + JOB_ID="" + fi + echo "$JOB_ID" +} + +function extract_savepoint_path_from_savepoint_return() { + if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]]; + then + SAVEPOINT_PATH="${BASH_REMATCH[1]}"; + else + SAVEPOINT_PATH="" + fi + echo "$SAVEPOINT_PATH" +} + +function cleanup_cli_test() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all + + cleanup +} + +printf "\n==============================================================================\n" +printf "Test default job launch with non-detach mode\n" +printf "==============================================================================\n" +if [ $EXIT_CODE == 0 ]; then + eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar" + EXIT_CODE=$? +fi + +printf "\n==============================================================================\n" +printf "Test run with complex parameter set\n" +printf "==============================================================================\n" +if [ $EXIT_CODE == 0 ]; then + eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \ + -c org.apache.flink.examples.java.wordcount.WordCount \ + $FLINK_DIR/examples/batch/WordCount.jar \ + --input file:///$FLINK_DIR/README.txt \ + --output file:///${TEST_DATA_DIR}/out/result" + EXIT_CODE=$? +fi + +printf "\n==============================================================================\n" +printf "Test information APIs\n" +printf "==============================================================================\n" +if [ $EXIT_CODE == 0 ]; then + eval "$FLINK_DIR/bin/flink info $FLINK_DIR/examples/batch/WordCount.jar" + EXIT_CODE=$? +fi +if [ $EXIT_CODE == 0 ]; then + eval "$FLINK_DIR/bin/flink list" + EXIT_CODE=$? +fi +if [ $EXIT_CODE == 0 ]; then + eval "$FLINK_DIR/bin/flink list -s" + EXIT_CODE=$? +fi +if [ $EXIT_CODE == 0 ]; then + eval "$FLINK_DIR/bin/flink list -r" + EXIT_CODE=$? +fi + +printf "\n==============================================================================\n" +printf "Test operation on running streaming jobs\n" +printf "==============================================================================\n" +if [ $EXIT_CODE == 0 ]; then + RETURN=`$FLINK_DIR/bin/flink run -d \ + $PERIODIC_JOB_JAR --outputPath file:///${TEST_DATA_DIR}/out/result` + echo "job submission returns: $RETURN" + JOB_ID=`extract_job_id_from_job_submission_return "$RETURN"` + eval "$FLINK_DIR/bin/flink cancel ${JOB_ID}" + EXIT_CODE=$? +fi + +printf "\n==============================================================================\n" +printf "Test savepoint for a running streaming jobs\n" +printf "==============================================================================\n" +SAVEPOINT_JOB_ID="" +SAVEPOINT_PATH="" +if [ $EXIT_CODE == 0 ]; then + RETURN=`$FLINK_DIR/bin/flink run -d \ + $PERIODIC_JOB_JAR --outputPath file:///${TEST_DATA_DIR}/out/result` + echo "job submission returns: $RETURN" + SAVEPOINT_JOB_ID=`extract_job_id_from_job_submission_return "$RETURN"` + EXIT_CODE=$? +fi +if [ $EXIT_CODE == 0 ]; then + eval "$FLINK_DIR/bin/flink savepoint ${SAVEPOINT_JOB_ID} file:///${TEST_DATA_DIR}/savepoint" + EXIT_CODE=$? +fi +if [ $EXIT_CODE == 0 ]; then + RETURN=`$FLINK_DIR/bin/flink cancel -s file:///${TEST_DATA_DIR}/savepoint ${SAVEPOINT_JOB_ID}` + echo "job savepoint returns: $RETURN" + SAVEPOINT_PATH=`extract_savepoint_path_from_savepoint_return "$RETURN"` + EXIT_CODE=$? +fi +if [ $EXIT_CODE == 0 ]; then + eval "$FLINK_DIR/bin/flink run -s ${SAVEPOINT_PATH} -d \ + ${PERIODIC_JOB_JAR} --outputPath file:///${TEST_DATA_DIR}/out/result" + EXIT_CODE=$? +fi + +printf "\n==============================================================================\n" +printf "Cleaning up... \n" +printf "==============================================================================\n" +trap cleanup_cli_test INT +trap cleanup_cli_test EXIT + +if [ $EXIT_CODE == 0 ]; + then + echo "All CLI test passed!"; + else + echo "CLI test failed: $EXIT_CODE"; + PASS="" + exit 1 +fi From 39c02f77c91357440700c1cbf50cbff70a0b7160 Mon Sep 17 00:00:00 2001 From: Rong Rong Date: Mon, 21 May 2018 11:23:07 -0700 Subject: [PATCH 2/3] address diff comments --- .../test-scripts/test_cli_api.sh | 113 ++++++++++++------ 1 file changed, 77 insertions(+), 36 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/test_cli_api.sh b/flink-end-to-end-tests/test-scripts/test_cli_api.sh index 7af30c36c1d36..e8302b05bed1b 100755 --- a/flink-end-to-end-tests/test-scripts/test_cli_api.sh +++ b/flink-end-to-end-tests/test-scripts/test_cli_api.sh @@ -20,15 +20,15 @@ source "$(dirname "$0")"/common.sh start_cluster -$FLINK_DIR/bin/taskmanager.sh start -$FLINK_DIR/bin/taskmanager.sh start -$FLINK_DIR/bin/taskmanager.sh start # Test for CLI commands. # verify only the return code the content correctness of the API results. PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." +JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR="\"pact\": \"(Data Source)\"" +JOB_INFO_PACT_DATA_SINK_REGEX_EXTRACTOR="\"pact\": \"(Data Sink)\"" +JOB_LIST_REGEX_EXTRACTOR_BY_STATUS="([0-9,a-f]*) :" EXIT_CODE=0 @@ -52,6 +52,35 @@ function extract_savepoint_path_from_savepoint_return() { echo "$SAVEPOINT_PATH" } +function extract_valid_pact_from_job_info_return() { + PACT_MATCH=0 + if [[ $1 =~ $JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR ]]; + then + PACT_MATCH=$PACT_MATCH + else + PACT_MATCH=-1 + fi + if [[ $1 =~ $JOB_INFO_PACT_DATA_SINK_REGEX_EXTRACTOR ]]; + then + PACT_MATCH=$PACT_MATCH + else + PACT_MATCH=-1 + fi + echo ${PACT_MATCH} +} + +function extract_valid_job_list_by_type_from_job_list_return() { + JOB_LIST_MATCH=0 + JOB_LIST_REGEX_EXTRACTOR="$JOB_LIST_REGEX_EXTRACTOR_BY_STATUS $2 $3" + if [[ $1 =~ $JOB_LIST_REGEX_EXTRACTOR ]]; + then + JOB_LIST_MATCH=$JOB_LIST_MATCH + else + JOB_LIST_MATCH=-1 + fi + echo ${JOB_LIST_MATCH} +} + function cleanup_cli_test() { stop_cluster $FLINK_DIR/bin/taskmanager.sh stop-all @@ -68,7 +97,7 @@ if [ $EXIT_CODE == 0 ]; then fi printf "\n==============================================================================\n" -printf "Test run with complex parameter set\n" +printf "Test job launch with complex parameter set\n" printf "==============================================================================\n" if [ $EXIT_CODE == 0 ]; then eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \ @@ -83,59 +112,71 @@ printf "\n====================================================================== printf "Test information APIs\n" printf "==============================================================================\n" if [ $EXIT_CODE == 0 ]; then - eval "$FLINK_DIR/bin/flink info $FLINK_DIR/examples/batch/WordCount.jar" - EXIT_CODE=$? -fi -if [ $EXIT_CODE == 0 ]; then - eval "$FLINK_DIR/bin/flink list" - EXIT_CODE=$? -fi -if [ $EXIT_CODE == 0 ]; then - eval "$FLINK_DIR/bin/flink list -s" - EXIT_CODE=$? -fi -if [ $EXIT_CODE == 0 ]; then - eval "$FLINK_DIR/bin/flink list -r" - EXIT_CODE=$? + RETURN=`$FLINK_DIR/bin/flink info $FLINK_DIR/examples/batch/WordCount.jar` + echo "job info returns: $RETURN" + PACT_MATCH=`extract_valid_pact_from_job_info_return "$RETURN"` + echo "job info regex match: $PACT_MATCH" + if [[ $PACT_MATCH == -1 ]]; then # expect at least a Data Source and a Data Sink pact match + EXIT_CODE=-1 + else + EXIT_CODE=0 + fi fi printf "\n==============================================================================\n" printf "Test operation on running streaming jobs\n" printf "==============================================================================\n" +JOB_ID="" if [ $EXIT_CODE == 0 ]; then RETURN=`$FLINK_DIR/bin/flink run -d \ $PERIODIC_JOB_JAR --outputPath file:///${TEST_DATA_DIR}/out/result` echo "job submission returns: $RETURN" JOB_ID=`extract_job_id_from_job_submission_return "$RETURN"` - eval "$FLINK_DIR/bin/flink cancel ${JOB_ID}" - EXIT_CODE=$? + EXIT_CODE=$? # expect matching job id extraction fi printf "\n==============================================================================\n" -printf "Test savepoint for a running streaming jobs\n" +printf "Test list API on a streaming job \n" printf "==============================================================================\n" -SAVEPOINT_JOB_ID="" -SAVEPOINT_PATH="" if [ $EXIT_CODE == 0 ]; then - RETURN=`$FLINK_DIR/bin/flink run -d \ - $PERIODIC_JOB_JAR --outputPath file:///${TEST_DATA_DIR}/out/result` - echo "job submission returns: $RETURN" - SAVEPOINT_JOB_ID=`extract_job_id_from_job_submission_return "$RETURN"` - EXIT_CODE=$? + RETURN=`$FLINK_DIR/bin/flink list -a` + echo "job list all returns: $RETURN" + JOB_LIST_MATCH=`extract_valid_job_list_by_type_from_job_list_return "$RETURN" "Flink Streaming Job" ""` + echo "job list all regex match: $JOB_LIST_MATCH" + if [[ $JOB_LIST_MATCH == -1 ]]; then # expect match for all job + EXIT_CODE=-1 + else + EXIT_CODE=0 + fi fi if [ $EXIT_CODE == 0 ]; then - eval "$FLINK_DIR/bin/flink savepoint ${SAVEPOINT_JOB_ID} file:///${TEST_DATA_DIR}/savepoint" - EXIT_CODE=$? + RETURN=`$FLINK_DIR/bin/flink list -r` + echo "job list running returns: $RETURN" + JOB_LIST_MATCH=`extract_valid_job_list_by_type_from_job_list_return "$RETURN" "Flink Streaming Job" "\(RUNNING\)"` + echo "job list running regex match: $JOB_LIST_MATCH" + if [[ $JOB_LIST_MATCH == -1 ]]; then # expect match for running job + EXIT_CODE=-1 + else + EXIT_CODE=0 + fi fi if [ $EXIT_CODE == 0 ]; then - RETURN=`$FLINK_DIR/bin/flink cancel -s file:///${TEST_DATA_DIR}/savepoint ${SAVEPOINT_JOB_ID}` - echo "job savepoint returns: $RETURN" - SAVEPOINT_PATH=`extract_savepoint_path_from_savepoint_return "$RETURN"` - EXIT_CODE=$? + RETURN=`$FLINK_DIR/bin/flink list -s` + echo "job list scheduled returns: $RETURN" + JOB_LIST_MATCH=`extract_valid_job_list_by_type_from_job_list_return "$RETURN" "Flink Streaming Job" "\(CREATED\)"` + echo "job list scheduled regex match: $JOB_LIST_MATCH" + if [[ $JOB_LIST_MATCH == -1 ]]; then # expect no match for scheduled job + EXIT_CODE=0 + else + EXIT_CODE=-1 + fi fi + +printf "\n==============================================================================\n" +printf "Test canceling a running streaming jobs\n" +printf "==============================================================================\n" if [ $EXIT_CODE == 0 ]; then - eval "$FLINK_DIR/bin/flink run -s ${SAVEPOINT_PATH} -d \ - ${PERIODIC_JOB_JAR} --outputPath file:///${TEST_DATA_DIR}/out/result" + eval "$FLINK_DIR/bin/flink cancel ${JOB_ID}" EXIT_CODE=$? fi From 89049bc63e322f33bc80e691fad5562ed9d94fd2 Mon Sep 17 00:00:00 2001 From: Rong Rong Date: Thu, 24 May 2018 21:26:53 -0700 Subject: [PATCH 3/3] address diff comments --- .../test-scripts/test_cli_api.sh | 34 +++++++++++++++---- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/test_cli_api.sh b/flink-end-to-end-tests/test-scripts/test_cli_api.sh index e8302b05bed1b..fd9d305ea6369 100755 --- a/flink-end-to-end-tests/test-scripts/test_cli_api.sh +++ b/flink-end-to-end-tests/test-scripts/test_cli_api.sh @@ -20,6 +20,9 @@ source "$(dirname "$0")"/common.sh start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start # Test for CLI commands. # verify only the return code the content correctness of the API results. @@ -81,25 +84,29 @@ function extract_valid_job_list_by_type_from_job_list_return() { echo ${JOB_LIST_MATCH} } +function extract_task_manager_slot_request_count() { + COUNT=`grep "Receive slot request" $FLINK_DIR/log/*taskexecutor*.log | wc -l` + echo $COUNT +} + function cleanup_cli_test() { - stop_cluster $FLINK_DIR/bin/taskmanager.sh stop-all cleanup } -printf "\n==============================================================================\n" -printf "Test default job launch with non-detach mode\n" -printf "==============================================================================\n" if [ $EXIT_CODE == 0 ]; then + printf "\n==============================================================================\n" + printf "Test default job launch with non-detach mode\n" + printf "==============================================================================\n" eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar" EXIT_CODE=$? fi -printf "\n==============================================================================\n" -printf "Test job launch with complex parameter set\n" -printf "==============================================================================\n" if [ $EXIT_CODE == 0 ]; then + printf "\n==============================================================================\n" + printf "Test job launch with complex parameter set\n" + printf "==============================================================================\n" eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \ -c org.apache.flink.examples.java.wordcount.WordCount \ $FLINK_DIR/examples/batch/WordCount.jar \ @@ -108,6 +115,19 @@ if [ $EXIT_CODE == 0 ]; then EXIT_CODE=$? fi +if [ $EXIT_CODE == 0 ]; then + printf "\n==============================================================================\n" + printf "Validate job launch parallelism configuration\n" + printf "==============================================================================\n" + RECEIVED_TASKMGR_REQUEST=`extract_task_manager_slot_request_count` + # expected 1 from default launch and 4 from complex parameter set. + if [[ $RECEIVED_TASKMGR_REQUEST == 5 ]]; then + EXIT_CODE=0 + else + EXIT_CODE=-1 + fi +fi + printf "\n==============================================================================\n" printf "Test information APIs\n" printf "==============================================================================\n"