diff --git a/.test-infra/jenkins/job_PostCommit_PortableJar_Flink.groovy b/.test-infra/jenkins/job_PostCommit_PortableJar_Flink.groovy index 80b2aa30852a..b526bce0981c 100644 --- a/.test-infra/jenkins/job_PostCommit_PortableJar_Flink.groovy +++ b/.test-infra/jenkins/job_PostCommit_PortableJar_Flink.groovy @@ -25,7 +25,7 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_PortableJar_Flink', description('Tests creation and execution of portable pipeline Jars on the Flink runner.') // Set common parameters. - commonJobProperties.setTopLevelMainJobProperties(delegate) + commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 120) // Gradle goals for this job. steps { diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index c7f8cc09ef67..3d87851d5841 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -128,6 +128,7 @@ test { configurations { validatesRunner + miniCluster } dependencies { @@ -170,6 +171,7 @@ dependencies { validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest") validatesRunner project(path: ":runners:core-java", configuration: "testRuntime") validatesRunner project(project.path) + miniCluster "org.apache.flink:flink-runtime-web_2.11:$flink_version" } class ValidatesRunnerConfig { diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index f0cae9dc47dc..27a116fef70e 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -181,20 +181,76 @@ project.ext.validatesCrossLanguageRunner = createCrossLanguageValidatesRunnerTas ] ) -task testPipelineJar() { - dependsOn shadowJar - dependsOn ":sdks:python:container:py35:docker" - doLast{ - exec { - executable "sh" - def options = [ - "--flink_job_server_jar ${shadowJar.archivePath}", - "--env_dir ${project.rootProject.buildDir}/gradleenv/${project.path.hashCode()}", - "--python_root_dir ${project.rootDir}/sdks/python", - "--python_version 3.5", - "--python_container_image apachebeam/python3.5_sdk:${project['python_sdk_version']}", - ] - args "-c", "../../job-server/test_pipeline_jar.sh ${options.join(' ')}" +def addTestJavaJarCreator(String pyVersion) { + def pyBuildPath = pyVersion.startsWith("2") ? "2" : pyVersion.replaceAll("\\.", "") + project.tasks.create(name: "testJavaJarCreatorPy${pyBuildPath}") { + dependsOn shadowJar + dependsOn ":sdks:python:container:py${pyBuildPath}:docker" + doLast{ + exec { + executable "sh" + def options = [ + "--flink_job_server_jar ${shadowJar.archivePath}", + "--env_dir ${project.rootProject.buildDir}/gradleenv/${project.path.hashCode()}", + "--python_root_dir ${project.rootDir}/sdks/python", + "--python_version ${pyVersion}", + "--python_container_image apachebeam/python${pyVersion}_sdk:${project['python_sdk_version']}", + ] + args "-c", "../../job-server/test_pipeline_jar.sh ${options.join(' ')}" + } } } } + +// miniCluster jar starts an embedded Flink cluster intended for use in testing. +task miniCluster(type: Jar, dependsOn: shadowJar) { + archiveBaseName = "${project.archivesBaseName}-mini-cluster" + dependencies { + runtime project(path: flinkRunnerProject, configuration: "miniCluster") + } + from zipTree(shadowJar.archivePath).matching { + // If these classes aren't excluded from the mini cluster jar, they will be loaded instead of + // the corresponding classes in the submitted job jar, preventing pipeline resources from + // loading successfully. + exclude "**/FlinkPipelineRunner*" + exclude "**/PortablePipelineJarUtils*" + } + manifest { + attributes('Main-Class': 'org.apache.beam.runners.flink.FlinkMiniClusterEntryPoint') + } + zip64 true // jar needs to contain more than 65535 files +} + +def addTestFlinkUberJarPy(String pyVersion) { + def pyBuildPath = pyVersion.startsWith("2") ? "2" : pyVersion.replaceAll("\\.", "") + project.tasks.create(name: "testFlinkUberJarPy${pyBuildPath}") { + dependsOn miniCluster + dependsOn shadowJar + dependsOn ":sdks:python:container:py${pyBuildPath}:docker" + doLast{ + exec { + executable "sh" + def options = [ + "--flink_job_server_jar ${shadowJar.archivePath}", + "--flink_mini_cluster_jar ${miniCluster.archivePath}", + "--env_dir ${project.rootProject.buildDir}/gradleenv/${project.path.hashCode()}", + "--python_root_dir ${project.rootDir}/sdks/python", + "--python_version ${pyVersion}", + "--python_container_image apachebeam/python${pyVersion}_sdk:${project['python_sdk_version']}", + ] + args "-c", "../../job-server/test_flink_uber_jar.sh ${options.join(' ')}" + } + } + } +} + +["2.7", "3.5", "3.6", "3.7"].each{ pyVersion -> + addTestJavaJarCreator(pyVersion) + addTestFlinkUberJarPy(pyVersion) +} + +task testPipelineJar() { + dependsOn testJavaJarCreatorPy37 + dependsOn testFlinkUberJarPy36 + dependsOn testFlinkUberJarPy37 +} diff --git a/runners/flink/job-server/test_flink_uber_jar.sh b/runners/flink/job-server/test_flink_uber_jar.sh new file mode 100755 index 000000000000..f24c32ac879c --- /dev/null +++ b/runners/flink/job-server/test_flink_uber_jar.sh @@ -0,0 +1,141 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# test_flink_uber_jar.sh tests the Python FlinkRunner class. + +set -e +set -v + +while [[ $# -gt 0 ]] +do +key="$1" +case $key in + --flink_job_server_jar) + FLINK_JOB_SERVER_JAR="$2" + shift # past argument + shift # past value + ;; + --flink_mini_cluster_jar) + FLINK_MINI_CLUSTER_JAR="$2" + shift # past argument + shift # past value + ;; + --env_dir) + ENV_DIR="$2" + shift # past argument + shift # past value + ;; + --python_root_dir) + PYTHON_ROOT_DIR="$2" + shift # past argument + shift # past value + ;; + --python_version) + PYTHON_VERSION="$2" + shift # past argument + shift # past value + ;; + --python_container_image) + PYTHON_CONTAINER_IMAGE="$2" + shift # past argument + shift # past value + ;; + *) # unknown option + echo "Unknown option: $1" + exit 1 + ;; +esac +done + + +# Go to the root of the repository +cd "$(git rev-parse --show-toplevel)" + +# Verify docker command exists +command -v docker +docker -v + +# Verify container has already been built +docker images --format "{{.Repository}}:{{.Tag}}" | grep "$PYTHON_CONTAINER_IMAGE" + +# Set up Python environment +virtualenv -p "python$PYTHON_VERSION" "$ENV_DIR" +. $ENV_DIR/bin/activate +pip install --retries 10 -e "$PYTHON_ROOT_DIR" + +# Hacky python script to find a free port. Note there is a small chance the chosen port could +# get taken before being claimed. +SOCKET_SCRIPT=" +from __future__ import print_function +import socket +s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +s.bind(('localhost', 0)) +print(s.getsockname()[1]) +s.close() +" +FLINK_PORT=$(python -c "$SOCKET_SCRIPT") + +echo "Starting Flink mini cluster listening on port $FLINK_PORT" +java -jar "$FLINK_MINI_CLUSTER_JAR" --rest-port "$FLINK_PORT" & + +PIPELINE_PY=" +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to +from apache_beam.transforms import Create +from apache_beam.transforms import Map + +# To test that our main session is getting plumbed through artifact staging +# correctly, create a global variable. If the main session is not plumbed +# through properly, global_var will be undefined and the pipeline will fail. +global_var = 1 + +pipeline_options = PipelineOptions() +pipeline_options.view_as(SetupOptions).save_main_session = True +pipeline = beam.Pipeline(options=pipeline_options) +pcoll = (pipeline + | Create([0, 1, 2]) + | Map(lambda x: x + global_var)) +assert_that(pcoll, equal_to([1, 2, 3])) + +result = pipeline.run() +result.wait_until_finish() +" + +(python -c "$PIPELINE_PY" \ + --runner FlinkRunner \ + --flink_job_server_jar "$FLINK_JOB_SERVER_JAR" \ + --parallelism 1 \ + --environment_type DOCKER \ + --environment_config "$PYTHON_CONTAINER_IMAGE" \ + --flink_master "localhost:$FLINK_PORT" \ + --flink_submit_uber_jar \ +) || TEST_EXIT_CODE=$? # don't fail fast here; clean up before exiting + +kill %1 || echo "Failed to shut down Flink mini cluster" + +rm -rf "$ENV_DIR" + +if [[ "$TEST_EXIT_CODE" -eq 0 ]]; then + echo ">>> SUCCESS" +else + echo ">>> FAILURE" +fi +exit $TEST_EXIT_CODE diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkMiniClusterEntryPoint.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkMiniClusterEntryPoint.java new file mode 100644 index 000000000000..fec9a881b0f3 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkMiniClusterEntryPoint.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.kohsuke.args4j.CmdLineException; +import org.kohsuke.args4j.CmdLineParser; +import org.kohsuke.args4j.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Entry point for starting an embedded Flink cluster. */ +public class FlinkMiniClusterEntryPoint { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkMiniClusterEntryPoint.class); + + static class MiniClusterArgs { + @Option(name = "--rest-port") + int restPort = 0; + + @Option(name = "--num-task-managers") + int numTaskManagers = 1; + + @Option(name = "--num-task-slots-per-taskmanager") + int numSlotsPerTaskManager = 1; + } + + public static void main(String[] args) throws Exception { + MiniClusterArgs miniClusterArgs = parseArgs(args); + + Configuration flinkConfig = new Configuration(); + flinkConfig.setInteger(RestOptions.PORT, miniClusterArgs.restPort); + + MiniClusterConfiguration clusterConfig = + new MiniClusterConfiguration.Builder() + .setConfiguration(flinkConfig) + .setNumTaskManagers(miniClusterArgs.numTaskManagers) + .setNumSlotsPerTaskManager(miniClusterArgs.numSlotsPerTaskManager) + .build(); + + try (MiniCluster miniCluster = new MiniCluster(clusterConfig)) { + miniCluster.start(); + System.out.println( + String.format( + "Started Flink mini cluster (%s TaskManagers with %s task slots) with Rest API at %s", + miniClusterArgs.numTaskManagers, + miniClusterArgs.numSlotsPerTaskManager, + miniCluster.getRestAddress())); + Thread.sleep(Long.MAX_VALUE); + } + } + + private static MiniClusterArgs parseArgs(String[] args) { + MiniClusterArgs configuration = new MiniClusterArgs(); + CmdLineParser parser = new CmdLineParser(configuration); + try { + parser.parseArgument(args); + } catch (CmdLineException e) { + LOG.error("Unable to parse command line arguments.", e); + printUsage(parser); + throw new IllegalArgumentException("Unable to parse command line arguments.", e); + } + return configuration; + } + + private static void printUsage(CmdLineParser parser) { + System.err.println( + String.format( + "Usage: java %s arguments...", FlinkMiniClusterEntryPoint.class.getSimpleName())); + parser.printUsage(System.err); + System.err.println(); + } +} diff --git a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py index b3189716ffbe..e40a7bcae524 100644 --- a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py +++ b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py @@ -157,9 +157,11 @@ def artifact_staging_endpoint(self): return self._artifact_staging_endpoint def request(self, method, path, expected_status=200, **kwargs): - response = method('%s/%s' % (self._master_url, path), **kwargs) + url = '%s/%s' % (self._master_url, path) + response = method(url, **kwargs) if response.status_code != expected_status: - raise RuntimeError(response.text) + raise RuntimeError("Request to %s failed with status %d: %s" % + (url, response.status_code, response.text)) if response.text: return response.json()