/
flink_runner.py
113 lines (98 loc) · 4.66 KB
/
flink_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A runner for executing portable pipelines on Flink."""
from __future__ import absolute_import
from __future__ import print_function
import logging
import re
import sys
from apache_beam.options import pipeline_options
from apache_beam.runners.portability import flink_uber_jar_job_server
from apache_beam.runners.portability import job_server
from apache_beam.runners.portability import portable_runner
PUBLISHED_FLINK_VERSIONS = ['1.7', '1.8', '1.9']
MAGIC_HOST_NAMES = ['[local]', '[auto]']
class FlinkRunner(portable_runner.PortableRunner):
def run_pipeline(self, pipeline, options):
portable_options = options.view_as(pipeline_options.PortableOptions)
if (options.view_as(FlinkRunnerOptions).flink_master in MAGIC_HOST_NAMES
and not portable_options.environment_type
and not portable_options.output_executable_path):
portable_options.environment_type = 'LOOPBACK'
return super(FlinkRunner, self).run_pipeline(pipeline, options)
def default_job_server(self, options):
flink_master = self.add_http_scheme(
options.view_as(FlinkRunnerOptions).flink_master)
options.view_as(FlinkRunnerOptions).flink_master = flink_master
if flink_master in MAGIC_HOST_NAMES or sys.version_info < (3, 6):
return job_server.StopOnExitJobServer(FlinkJarJobServer(options))
else:
# This has to be changed [auto], otherwise we will attempt to submit a
# the pipeline remotely on the Flink JobMaster which will _fail_.
# DO NOT CHANGE the following line, unless you have tested this.
options.view_as(FlinkRunnerOptions).flink_master = '[auto]'
return flink_uber_jar_job_server.FlinkUberJarJobServer(flink_master)
@staticmethod
def add_http_scheme(flink_master):
"""Adds a http protocol scheme if none provided."""
flink_master = flink_master.strip()
if not flink_master in MAGIC_HOST_NAMES and \
not re.search('^http[s]?://', flink_master):
logging.info('Adding HTTP protocol scheme to flink_master parameter: '
'http://%s', flink_master)
flink_master = 'http://' + flink_master
return flink_master
class FlinkRunnerOptions(pipeline_options.PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--flink_master',
default='[auto]',
help='Flink master address (http://host:port)'
' Use "[local]" to start a local cluster'
' for the execution. Use "[auto]" if you'
' plan to either execute locally or let the'
' Flink job server infer the cluster address.')
parser.add_argument('--flink_version',
default=PUBLISHED_FLINK_VERSIONS[-1],
choices=PUBLISHED_FLINK_VERSIONS,
help='Flink version to use.')
parser.add_argument('--flink_job_server_jar',
help='Path or URL to a flink jobserver jar.')
parser.add_argument('--artifacts_dir', default=None)
class FlinkJarJobServer(job_server.JavaJarJobServer):
def __init__(self, options):
super(FlinkJarJobServer, self).__init__()
options = options.view_as(FlinkRunnerOptions)
self._jar = options.flink_job_server_jar
self._master_url = options.flink_master
self._flink_version = options.flink_version
self._artifacts_dir = options.artifacts_dir
def path_to_jar(self):
if self._jar:
return self._jar
else:
return self.path_to_beam_jar(
'runners:flink:%s:job-server:shadowJar' % self._flink_version)
def java_arguments(self, job_port, artifacts_dir):
return [
'--flink-master', self._master_url,
'--artifacts-dir', (self._artifacts_dir
if self._artifacts_dir else artifacts_dir),
'--job-port', job_port,
'--artifact-port', 0,
'--expansion-port', 0
]