Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_PortableJar_Flink',
description('Tests creation and execution of portable pipeline Jars on the Flink runner.')

// Set common parameters.
commonJobProperties.setTopLevelMainJobProperties(delegate)
commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 120)

// Gradle goals for this job.
steps {
Expand Down
2 changes: 2 additions & 0 deletions runners/flink/flink_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ test {

configurations {
validatesRunner
miniCluster
}

dependencies {
Expand Down Expand Up @@ -170,6 +171,7 @@ dependencies {
validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
validatesRunner project(path: ":runners:core-java", configuration: "testRuntime")
validatesRunner project(project.path)
miniCluster "org.apache.flink:flink-runtime-web_2.11:$flink_version"
}

class ValidatesRunnerConfig {
Expand Down
84 changes: 70 additions & 14 deletions runners/flink/job-server/flink_job_server.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -181,20 +181,76 @@ project.ext.validatesCrossLanguageRunner = createCrossLanguageValidatesRunnerTas
]
)

task testPipelineJar() {
dependsOn shadowJar
dependsOn ":sdks:python:container:py35:docker"
doLast{
exec {
executable "sh"
def options = [
"--flink_job_server_jar ${shadowJar.archivePath}",
"--env_dir ${project.rootProject.buildDir}/gradleenv/${project.path.hashCode()}",
"--python_root_dir ${project.rootDir}/sdks/python",
"--python_version 3.5",
"--python_container_image apachebeam/python3.5_sdk:${project['python_sdk_version']}",
]
args "-c", "../../job-server/test_pipeline_jar.sh ${options.join(' ')}"
def addTestJavaJarCreator(String pyVersion) {
def pyBuildPath = pyVersion.startsWith("2") ? "2" : pyVersion.replaceAll("\\.", "")
project.tasks.create(name: "testJavaJarCreatorPy${pyBuildPath}") {
dependsOn shadowJar
dependsOn ":sdks:python:container:py${pyBuildPath}:docker"
doLast{
exec {
executable "sh"
def options = [
"--flink_job_server_jar ${shadowJar.archivePath}",
"--env_dir ${project.rootProject.buildDir}/gradleenv/${project.path.hashCode()}",
"--python_root_dir ${project.rootDir}/sdks/python",
"--python_version ${pyVersion}",
"--python_container_image apachebeam/python${pyVersion}_sdk:${project['python_sdk_version']}",
]
args "-c", "../../job-server/test_pipeline_jar.sh ${options.join(' ')}"
}
}
}
}

// miniCluster jar starts an embedded Flink cluster intended for use in testing.
task miniCluster(type: Jar, dependsOn: shadowJar) {
archiveBaseName = "${project.archivesBaseName}-mini-cluster"
dependencies {
runtime project(path: flinkRunnerProject, configuration: "miniCluster")
}
from zipTree(shadowJar.archivePath).matching {
// If these classes aren't excluded from the mini cluster jar, they will be loaded instead of
// the corresponding classes in the submitted job jar, preventing pipeline resources from
// loading successfully.
exclude "**/FlinkPipelineRunner*"
exclude "**/PortablePipelineJarUtils*"
}
manifest {
attributes('Main-Class': 'org.apache.beam.runners.flink.FlinkMiniClusterEntryPoint')
}
zip64 true // jar needs to contain more than 65535 files
}

def addTestFlinkUberJarPy(String pyVersion) {
def pyBuildPath = pyVersion.startsWith("2") ? "2" : pyVersion.replaceAll("\\.", "")
project.tasks.create(name: "testFlinkUberJarPy${pyBuildPath}") {
dependsOn miniCluster
dependsOn shadowJar
dependsOn ":sdks:python:container:py${pyBuildPath}:docker"
doLast{
exec {
executable "sh"
def options = [
"--flink_job_server_jar ${shadowJar.archivePath}",
"--flink_mini_cluster_jar ${miniCluster.archivePath}",
"--env_dir ${project.rootProject.buildDir}/gradleenv/${project.path.hashCode()}",
"--python_root_dir ${project.rootDir}/sdks/python",
"--python_version ${pyVersion}",
"--python_container_image apachebeam/python${pyVersion}_sdk:${project['python_sdk_version']}",
]
args "-c", "../../job-server/test_flink_uber_jar.sh ${options.join(' ')}"
}
}
}
}

["2.7", "3.5", "3.6", "3.7"].each{ pyVersion ->
addTestJavaJarCreator(pyVersion)
addTestFlinkUberJarPy(pyVersion)
}

task testPipelineJar() {
dependsOn testJavaJarCreatorPy37
dependsOn testFlinkUberJarPy36
dependsOn testFlinkUberJarPy37
}
141 changes: 141 additions & 0 deletions runners/flink/job-server/test_flink_uber_jar.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# test_flink_uber_jar.sh tests the Python FlinkRunner class.

set -e
set -v

while [[ $# -gt 0 ]]
do
key="$1"
case $key in
--flink_job_server_jar)
FLINK_JOB_SERVER_JAR="$2"
shift # past argument
shift # past value
;;
--flink_mini_cluster_jar)
FLINK_MINI_CLUSTER_JAR="$2"
shift # past argument
shift # past value
;;
--env_dir)
ENV_DIR="$2"
shift # past argument
shift # past value
;;
--python_root_dir)
PYTHON_ROOT_DIR="$2"
shift # past argument
shift # past value
;;
--python_version)
PYTHON_VERSION="$2"
shift # past argument
shift # past value
;;
--python_container_image)
PYTHON_CONTAINER_IMAGE="$2"
shift # past argument
shift # past value
;;
*) # unknown option
echo "Unknown option: $1"
exit 1
;;
esac
done


# Go to the root of the repository
cd "$(git rev-parse --show-toplevel)"

# Verify docker command exists
command -v docker
docker -v

# Verify container has already been built
docker images --format "{{.Repository}}:{{.Tag}}" | grep "$PYTHON_CONTAINER_IMAGE"

# Set up Python environment
virtualenv -p "python$PYTHON_VERSION" "$ENV_DIR"
. $ENV_DIR/bin/activate
pip install --retries 10 -e "$PYTHON_ROOT_DIR"

# Hacky python script to find a free port. Note there is a small chance the chosen port could
# get taken before being claimed.
SOCKET_SCRIPT="
from __future__ import print_function
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('localhost', 0))
print(s.getsockname()[1])
s.close()
"
FLINK_PORT=$(python -c "$SOCKET_SCRIPT")

echo "Starting Flink mini cluster listening on port $FLINK_PORT"
java -jar "$FLINK_MINI_CLUSTER_JAR" --rest-port "$FLINK_PORT" &

PIPELINE_PY="
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms import Create
from apache_beam.transforms import Map

# To test that our main session is getting plumbed through artifact staging
# correctly, create a global variable. If the main session is not plumbed
# through properly, global_var will be undefined and the pipeline will fail.
global_var = 1

pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline = beam.Pipeline(options=pipeline_options)
pcoll = (pipeline
| Create([0, 1, 2])
| Map(lambda x: x + global_var))
assert_that(pcoll, equal_to([1, 2, 3]))

result = pipeline.run()
result.wait_until_finish()
"

(python -c "$PIPELINE_PY" \
--runner FlinkRunner \
--flink_job_server_jar "$FLINK_JOB_SERVER_JAR" \
--parallelism 1 \
--environment_type DOCKER \
--environment_config "$PYTHON_CONTAINER_IMAGE" \
--flink_master "localhost:$FLINK_PORT" \
--flink_submit_uber_jar \
) || TEST_EXIT_CODE=$? # don't fail fast here; clean up before exiting

kill %1 || echo "Failed to shut down Flink mini cluster"

rm -rf "$ENV_DIR"

if [[ "$TEST_EXIT_CODE" -eq 0 ]]; then
echo ">>> SUCCESS"
else
echo ">>> FAILURE"
fi
exit $TEST_EXIT_CODE
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Entry point for starting an embedded Flink cluster. */
public class FlinkMiniClusterEntryPoint {

private static final Logger LOG = LoggerFactory.getLogger(FlinkMiniClusterEntryPoint.class);

static class MiniClusterArgs {
@Option(name = "--rest-port")
int restPort = 0;

@Option(name = "--num-task-managers")
int numTaskManagers = 1;

@Option(name = "--num-task-slots-per-taskmanager")
int numSlotsPerTaskManager = 1;
}

public static void main(String[] args) throws Exception {
MiniClusterArgs miniClusterArgs = parseArgs(args);

Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(RestOptions.PORT, miniClusterArgs.restPort);

MiniClusterConfiguration clusterConfig =
new MiniClusterConfiguration.Builder()
.setConfiguration(flinkConfig)
.setNumTaskManagers(miniClusterArgs.numTaskManagers)
.setNumSlotsPerTaskManager(miniClusterArgs.numSlotsPerTaskManager)
.build();

try (MiniCluster miniCluster = new MiniCluster(clusterConfig)) {
miniCluster.start();
System.out.println(
String.format(
"Started Flink mini cluster (%s TaskManagers with %s task slots) with Rest API at %s",
miniClusterArgs.numTaskManagers,
miniClusterArgs.numSlotsPerTaskManager,
miniCluster.getRestAddress()));
Thread.sleep(Long.MAX_VALUE);
}
}

private static MiniClusterArgs parseArgs(String[] args) {
MiniClusterArgs configuration = new MiniClusterArgs();
CmdLineParser parser = new CmdLineParser(configuration);
try {
parser.parseArgument(args);
} catch (CmdLineException e) {
LOG.error("Unable to parse command line arguments.", e);
printUsage(parser);
throw new IllegalArgumentException("Unable to parse command line arguments.", e);
}
return configuration;
}

private static void printUsage(CmdLineParser parser) {
System.err.println(
String.format(
"Usage: java %s arguments...", FlinkMiniClusterEntryPoint.class.getSimpleName()));
parser.printUsage(System.err);
System.err.println();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,11 @@ def artifact_staging_endpoint(self):
return self._artifact_staging_endpoint

def request(self, method, path, expected_status=200, **kwargs):
response = method('%s/%s' % (self._master_url, path), **kwargs)
url = '%s/%s' % (self._master_url, path)
response = method(url, **kwargs)
if response.status_code != expected_status:
raise RuntimeError(response.text)
raise RuntimeError("Request to %s failed with status %d: %s" %
(url, response.status_code, response.text))
if response.text:
return response.json()

Expand Down