Skip to content

Commit

Permalink
[BEAM-9295] Add Flink 1.10 build target and Make FlinkRunner compatib…
Browse files Browse the repository at this point in the history
…le with Flink 1.10
  • Loading branch information
sunjincheng121 committed Mar 10, 2020
1 parent 6e69b26 commit 1f513ad
Show file tree
Hide file tree
Showing 51 changed files with 592 additions and 218 deletions.
4 changes: 2 additions & 2 deletions .test-infra/dataproc/flink_cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
# HARNESS_IMAGES_TO_PULL='gcr.io/<IMAGE_REPOSITORY>/python:latest gcr.io/<IMAGE_REPOSITORY>/java:latest' \
# JOB_SERVER_IMAGE=gcr.io/<IMAGE_REPOSITORY>/job-server-flink:latest \
# ARTIFACTS_DIR=gs://<bucket-for-artifacts> \
# FLINK_DOWNLOAD_URL=https://archive.apache.org/dist/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz \
# HADOOP_DOWNLOAD_URL=https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-7.0/flink-shaded-hadoop-2-uber-2.8.3-7.0.jar \
# FLINK_DOWNLOAD_URL=https://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz \
# HADOOP_DOWNLOAD_URL=https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-9.0/flink-shaded-hadoop-2-uber-2.8.3-9.0.jar \
# FLINK_NUM_WORKERS=2 \
# FLINK_TASKMANAGER_SLOTS=1 \
# DETACHED_MODE=false \
Expand Down
2 changes: 1 addition & 1 deletion .test-infra/jenkins/CommonTestProperties.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class CommonTestProperties {
DATAFLOW: ":runners:google-cloud-dataflow-java",
SPARK: ":runners:spark",
SPARK_STRUCTURED_STREAMING: ":runners:spark",
FLINK: ":runners:flink:1.9",
FLINK: ":runners:flink:1.10",
DIRECT: ":runners:direct-java"
],
PYTHON: [
Expand Down
4 changes: 2 additions & 2 deletions .test-infra/jenkins/Flink.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
*/

class Flink {
private static final String flinkDownloadUrl = 'https://archive.apache.org/dist/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz'
private static final String hadoopDownloadUrl = 'https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-7.0/flink-shaded-hadoop-2-uber-2.8.3-7.0.jar'
private static final String flinkDownloadUrl = 'https://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz'
private static final String hadoopDownloadUrl = 'https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-9.0/flink-shaded-hadoop-2-uber-2.8.3-9.0.jar'
private static final String FLINK_DIR = '"$WORKSPACE/src/.test-infra/dataproc"'
private static final String FLINK_SCRIPT = 'flink_cluster.sh'
private def job
Expand Down
4 changes: 2 additions & 2 deletions .test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ def batchLoadTestJob = { scope, triggeringContext ->
List<Map> testScenarios = scenarios(datasetName, pythonHarnessImageTag)

publisher.publish(':sdks:python:container:py2:docker', 'beam_python2.7_sdk')
publisher.publish(':runners:flink:1.9:job-server-container:docker', 'beam_flink1.9_job_server')
publisher.publish(':runners:flink:1.10:job-server-container:docker', 'beam_flink1.10_job_server')
def flink = new Flink(scope, 'beam_LoadTests_Python_Combine_Flink_Batch')
flink.setUp([pythonHarnessImageTag], numberOfWorkers, publisher.getFullImageName('beam_flink1.9_job_server'))
flink.setUp([pythonHarnessImageTag], numberOfWorkers, publisher.getFullImageName('beam_flink1.10_job_server'))

defineTestSteps(scope, testScenarios, [
'Combine Python Load test: 2GB Fanout 4',
Expand Down
4 changes: 2 additions & 2 deletions .test-infra/jenkins/job_LoadTests_GBK_Flink_Python.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ def loadTest = { scope, triggeringContext ->
List<Map> testScenarios = scenarios(datasetName, pythonHarnessImageTag)

publisher.publish(':sdks:python:container:py2:docker', 'beam_python2.7_sdk')
publisher.publish(':runners:flink:1.9:job-server-container:docker', 'beam_flink1.9_job_server')
publisher.publish(':runners:flink:1.10:job-server-container:docker', 'beam_flink1.10_job_server')
def flink = new Flink(scope, 'beam_LoadTests_Python_GBK_Flink_Batch')
flink.setUp([pythonHarnessImageTag], numberOfWorkers, publisher.getFullImageName('beam_flink1.9_job_server'))
flink.setUp([pythonHarnessImageTag], numberOfWorkers, publisher.getFullImageName('beam_flink1.10_job_server'))

def configurations = testScenarios.findAll { it.pipelineOptions?.parallelism?.value == numberOfWorkers }
loadTestsBuilder.loadTests(scope, sdk, configurations, "GBK", "batch")
Expand Down
4 changes: 2 additions & 2 deletions .test-infra/jenkins/job_LoadTests_ParDo_Flink_Python.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ def loadTest = { scope, triggeringContext ->
List<Map> testScenarios = scenarios(datasetName, pythonHarnessImageTag)

publisher.publish(':sdks:python:container:py2:docker', 'beam_python2.7_sdk')
publisher.publish(':runners:flink:1.9:job-server-container:docker', 'beam_flink1.9_job_server')
publisher.publish(':runners:flink:1.10:job-server-container:docker', 'beam_flink1.10_job_server')
Flink flink = new Flink(scope, 'beam_LoadTests_Python_ParDo_Flink_Batch')
flink.setUp([pythonHarnessImageTag], numberOfWorkers, publisher.getFullImageName('beam_flink1.9_job_server'))
flink.setUp([pythonHarnessImageTag], numberOfWorkers, publisher.getFullImageName('beam_flink1.10_job_server'))

loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.PYTHON, testScenarios, 'ParDo', 'batch')
}
Expand Down
4 changes: 2 additions & 2 deletions .test-infra/jenkins/job_LoadTests_coGBK_Flink_Python.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ def loadTest = { scope, triggeringContext ->
List<Map> testScenarios = scenarios(datasetName, pythonHarnessImageTag)

publisher.publish(':sdks:python:container:py2:docker', 'beam_python2.7_sdk')
publisher.publish(':runners:flink:1.9:job-server-container:docker', 'beam_flink1.9_job_server')
publisher.publish(':runners:flink:1.10:job-server-container:docker', 'beam_flink1.10_job_server')
def flink = new Flink(scope, 'beam_LoadTests_Python_CoGBK_Flink_Batch')
flink.setUp([pythonHarnessImageTag], numberOfWorkers, publisher.getFullImageName('beam_flink1.9_job_server'))
flink.setUp([pythonHarnessImageTag], numberOfWorkers, publisher.getFullImageName('beam_flink1.10_job_server'))

loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.PYTHON, testScenarios, 'CoGBK', 'batch')
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_XVR_Flink',
shell('echo *** RUN CROSS-LANGUAGE FLINK USING PYTHON 2.7 ***')
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':runners:flink:1.9:job-server:validatesCrossLanguageRunner')
tasks(':runners:flink:1.10:job-server:validatesCrossLanguageRunner')
commonJobProperties.setGradleSwitches(delegate)
switches('-PpythonVersion=2.7')
}
shell('echo *** RUN CROSS-LANGUAGE FLINK USING PYTHON 3.5 ***')
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':runners:flink:1.9:job-server:validatesCrossLanguageRunner')
tasks(':runners:flink:1.10:job-server:validatesCrossLanguageRunner')
commonJobProperties.setGradleSwitches(delegate)
switches('-PpythonVersion=3.5')
}
Expand Down
8 changes: 4 additions & 4 deletions .test-infra/jenkins/job_PostCommit_Java_Nexmark_Flink.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ NoPhraseTriggeringPostCommitBuilder.postCommitJob('beam_PostCommit_Java_Nexmark_
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':sdks:java:testing:nexmark:run')
commonJobProperties.setGradleSwitches(delegate)
switches('-Pnexmark.runner=":runners:flink:1.9"' +
switches('-Pnexmark.runner=":runners:flink:1.10"' +
' -Pnexmark.args="' +
[NexmarkBigqueryProperties.nexmarkBigQueryArgs,
'--runner=FlinkRunner',
Expand All @@ -57,7 +57,7 @@ NoPhraseTriggeringPostCommitBuilder.postCommitJob('beam_PostCommit_Java_Nexmark_
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':sdks:java:testing:nexmark:run')
commonJobProperties.setGradleSwitches(delegate)
switches('-Pnexmark.runner=":runners:flink:1.9"' +
switches('-Pnexmark.runner=":runners:flink:1.10"' +
' -Pnexmark.args="' +
[NexmarkBigqueryProperties.nexmarkBigQueryArgs,
'--runner=FlinkRunner',
Expand All @@ -74,7 +74,7 @@ NoPhraseTriggeringPostCommitBuilder.postCommitJob('beam_PostCommit_Java_Nexmark_
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':sdks:java:testing:nexmark:run')
commonJobProperties.setGradleSwitches(delegate)
switches('-Pnexmark.runner=":runners:flink:1.9"' +
switches('-Pnexmark.runner=":runners:flink:1.10"' +
' -Pnexmark.args="' +
[NexmarkBigqueryProperties.nexmarkBigQueryArgs,
'--runner=FlinkRunner',
Expand All @@ -91,7 +91,7 @@ NoPhraseTriggeringPostCommitBuilder.postCommitJob('beam_PostCommit_Java_Nexmark_
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':sdks:java:testing:nexmark:run')
commonJobProperties.setGradleSwitches(delegate)
switches('-Pnexmark.runner=":runners:flink:1.9"' +
switches('-Pnexmark.runner=":runners:flink:1.10"' +
' -Pnexmark.args="' +
[NexmarkBigqueryProperties.nexmarkBigQueryArgs,
'--runner=FlinkRunner',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_PVR_Flink_Batch',
steps {
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':runners:flink:1.9:job-server:validatesPortableRunnerBatch')
tasks(':runners:flink:1.10:job-server:validatesPortableRunnerBatch')
commonJobProperties.setGradleSwitches(delegate)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_PVR_Flink_Streaming',
steps {
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':runners:flink:1.9:job-server:validatesPortableRunnerStreaming')
tasks(':runners:flink:1.10:job-server:validatesPortableRunnerStreaming')
commonJobProperties.setGradleSwitches(delegate)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_ValidatesRunner_Flink',
steps {
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':runners:flink:1.9:validatesRunner')
tasks(':runners:flink:1.10:validatesRunner')
commonJobProperties.setGradleSwitches(delegate)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_PortableJar_Flink',
steps {
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':runners:flink:1.9:job-server:testPipelineJar')
tasks(':runners:flink:1.10:job-server:testPipelineJar')
commonJobProperties.setGradleSwitches(delegate)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ def chicagoTaxiJob = { scope ->

Docker publisher = new Docker(scope, LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY)
publisher.publish(':sdks:python:container:py2:docker', 'beam_python2.7_sdk')
publisher.publish(':runners:flink:1.9:job-server-container:docker', 'beam_flink1.9_job_server')
publisher.publish(':runners:flink:1.10:job-server-container:docker', 'beam_flink1.10_job_server')
String pythonHarnessImageTag = publisher.getFullImageName('beam_python2.7_sdk')
Flink flink = new Flink(scope, 'beam_PostCommit_Python_Chicago_Taxi_Flink')
flink.setUp([pythonHarnessImageTag], numberOfWorkers, publisher.getFullImageName('beam_flink1.9_job_server'))
flink.setUp([pythonHarnessImageTag], numberOfWorkers, publisher.getFullImageName('beam_flink1.10_job_server'))

def pipelineOptions = [
parallelism : numberOfWorkers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1339,7 +1339,7 @@ class BeamModulePlugin implements Plugin<Project> {
}

if (runner?.equalsIgnoreCase('flink')) {
testRuntime it.project(path: ":runners:flink:1.9", configuration: 'testRuntime')
testRuntime it.project(path: ":runners:flink:1.10", configuration: 'testRuntime')
}

if (runner?.equalsIgnoreCase('spark')) {
Expand Down Expand Up @@ -1961,8 +1961,8 @@ class BeamModulePlugin implements Plugin<Project> {
project.task('portableWordCount' + (runner.equals("PortableRunner") ? "" : runner) + (isStreaming ? 'Streaming' : 'Batch')) {
dependsOn = ['installGcpTest']
mustRunAfter = [
':runners:flink:1.9:job-server-container:docker',
':runners:flink:1.9:job-server:shadowJar',
':runners:flink:1.10:job-server-container:docker',
':runners:flink:1.10:job-server:shadowJar',
':runners:spark:job-server:shadowJar',
':sdks:python:container:py2:docker',
':sdks:python:container:py35:docker',
Expand All @@ -1978,7 +1978,7 @@ class BeamModulePlugin implements Plugin<Project> {
"--parallelism=2",
"--shutdown_sources_on_final_watermark",
"--sdk_worker_parallelism=1",
"--flink_job_server_jar=${project.project(':runners:flink:1.9:job-server').shadowJar.archivePath}",
"--flink_job_server_jar=${project.project(':runners:flink:1.10:job-server').shadowJar.archivePath}",
"--spark_job_server_jar=${project.project(':runners:spark:job-server').shadowJar.archivePath}",
]
if (isStreaming)
Expand Down
2 changes: 1 addition & 1 deletion examples/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ dependencies {
// https://issues.apache.org/jira/browse/BEAM-3583
// apexRunnerPreCommit project(":runners:apex")
directRunnerPreCommit project(path: ":runners:direct-java", configuration: "shadow")
flinkRunnerPreCommit project(":runners:flink:1.9")
flinkRunnerPreCommit project(":runners:flink:1.10")
// TODO: Make the netty version used configurable, we add netty-all 4.1.17.Final so it appears on the classpath
// before 4.1.8.Final defined by Apache Beam
sparkRunnerPreCommit "io.netty:netty-all:4.1.17.Final"
Expand Down
2 changes: 1 addition & 1 deletion examples/kotlin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ dependencies {
// https://issues.apache.org/jira/browse/BEAM-3583
// apexRunnerPreCommit project(":runners:apex")
directRunnerPreCommit project(path: ":runners:direct-java", configuration: "shadow")
flinkRunnerPreCommit project(":runners:flink:1.9")
flinkRunnerPreCommit project(":runners:flink:1.10")
// TODO: Make the netty version used configurable, we add netty-all 4.1.17.Final so it appears on the classpath
// before 4.1.8.Final defined by Apache Beam
sparkRunnerPreCommit "io.netty:netty-all:4.1.17.Final"
Expand Down
2 changes: 1 addition & 1 deletion release/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ task runJavaExamplesValidationTask {
dependsOn ":runners:google-cloud-dataflow-java:runQuickstartJavaDataflow"
dependsOn ":runners:apex:runQuickstartJavaApex"
dependsOn ":runners:spark:runQuickstartJavaSpark"
dependsOn ":runners:flink:1.9:runQuickstartJavaFlinkLocal"
dependsOn ":runners:flink:1.10:runQuickstartJavaFlinkLocal"
dependsOn ":runners:direct-java:runMobileGamingJavaDirect"
dependsOn ":runners:google-cloud-dataflow-java:runMobileGamingJavaDataflow"
}
2 changes: 1 addition & 1 deletion release/src/main/scripts/build_release_candidate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ DOCKER_IMAGE_DEFAULT_REPO_ROOT=apache
DOCKER_IMAGE_DEFAULT_REPO_PREFIX=beam_

PYTHON_VER=("python2.7" "python3.5" "python3.6" "python3.7")
FLINK_VER=("1.7" "1.8" "1.9")
FLINK_VER=("1.7" "1.8" "1.9" "1.10")

echo "================Setting Up Environment Variables==========="
echo "Which release version are you working on: "
Expand Down
2 changes: 1 addition & 1 deletion release/src/main/scripts/run_rc_validation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ if [[ "$java_quickstart_flink_local" = true ]]; then
echo "*************************************************************"
echo "* Running Java Quickstart with Flink local runner"
echo "*************************************************************"
./gradlew :runners:flink:1.9:runQuickstartJavaFlinkLocal \
./gradlew :runners:flink:1.10:runQuickstartJavaFlinkLocal \
-Prepourl=${REPO_URL} \
-Pver=${RELEASE_VER}
else
Expand Down
33 changes: 33 additions & 0 deletions runners/flink/1.10/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '..'
/* All properties required for loading the Flink build script */
project.ext {
// Set the version of all Flink-related dependencies here.
flink_version = '1.10.0'
// Version specific code overrides.
main_source_overrides = ["${basePath}/1.7/src/main/java", "${basePath}/1.8/src/main/java", "${basePath}/1.9/src/main/java", './src/main/java']
test_source_overrides = ["${basePath}/1.7/src/test/java", "${basePath}/1.8/src/test/java", "${basePath}/1.9/src/test/java", './src/test/java']
main_resources_overrides = []
test_resources_overrides = []
archives_base_name = 'beam-runners-flink-1.10'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_runner.gradle"
26 changes: 26 additions & 0 deletions runners/flink/1.10/job-server-container/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '../../job-server-container'

project.ext {
resource_path = basePath
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_job_server_container.gradle"
31 changes: 31 additions & 0 deletions runners/flink/1.10/job-server/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '../../job-server'

project.ext {
// Look for the source code in the parent module
main_source_dirs = ["$basePath/src/main/java"]
test_source_dirs = ["$basePath/src/test/java"]
main_resources_dirs = ["$basePath/src/main/resources"]
test_resources_dirs = ["$basePath/src/test/resources"]
archives_base_name = 'beam-runners-flink-1.10-job-server'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_job_server.gradle"

0 comments on commit 1f513ad

Please sign in to comment.