From 21d0e3356e545b55a929f0b694763cd443b1a8d7 Mon Sep 17 00:00:00 2001 From: Sourabh Bajaj Date: Wed, 14 Jun 2017 16:35:45 -0700 Subject: [PATCH] [BEAM-1585] Add beam plugins as pipeline options --- sdks/python/apache_beam/io/filesystem.py | 14 ++----- .../apache_beam/options/pipeline_options.py | 8 ++++ .../runners/dataflow/dataflow_runner.py | 10 +++++ sdks/python/apache_beam/utils/plugin.py | 42 +++++++++++++++++++ 4 files changed, 63 insertions(+), 11 deletions(-) create mode 100644 sdks/python/apache_beam/utils/plugin.py diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index db6a1d0f6e6c..f5530262b4ca 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -26,6 +26,8 @@ import logging import time +from apache_beam.utils.plugin import BeamPlugin + logger = logging.getLogger(__name__) DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024 @@ -409,7 +411,7 @@ def __init__(self, msg, exception_details=None): self.exception_details = exception_details -class FileSystem(object): +class FileSystem(BeamPlugin): """A class that defines the functions that can be performed on a filesystem. All methods are abstract and they are for file system providers to @@ -428,16 +430,6 @@ def _get_compression_type(path, compression_type): 'was %s' % type(compression_type)) return compression_type - @classmethod - def get_all_subclasses(cls): - """Get all the subclasses of the FileSystem class - """ - all_subclasses = [] - for subclass in cls.__subclasses__(): - all_subclasses.append(subclass) - all_subclasses.extend(subclass.get_all_subclasses()) - return all_subclasses - @classmethod def scheme(cls): """URI scheme for the FileSystem diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index daef3a71bb28..9a0cc737aaa2 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -553,6 +553,14 @@ def _add_argparse_args(cls, parser): 'During job submission a source distribution will be built and the ' 'worker will install the resulting package before running any custom ' 'code.')) + parser.add_argument( + '--beam_plugins', + default=None, + help= + ('Bootstrap the python process before executing any code by importing ' + 'all the plugins used in the pipeline. Please pass a comma separated' + 'list of import paths to be included. This is currently an ' + 'experimental flag and provides no stability.')) parser.add_argument( '--save_main_session', default=False, diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index d6944b280264..cc9274ec40c7 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -46,6 +46,8 @@ from apache_beam.transforms.display import DisplayData from apache_beam.typehints import typehints from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.utils.plugin import BeamPlugin __all__ = ['DataflowRunner'] @@ -226,6 +228,14 @@ def run(self, pipeline): raise ImportError( 'Google Cloud Dataflow runner not available, ' 'please install apache_beam[gcp]') + + # Add setup_options for all the BeamPlugin imports + setup_options = pipeline._options.view_as(SetupOptions) + plugins = BeamPlugin.get_all_plugin_paths() + if setup_options.beam_plugins is not None: + plugins = list(set(plugins + setup_options.beam_plugins.split(','))) + setup_options.beam_plugins = plugins + self.job = apiclient.Job(pipeline._options) # Dataflow runner requires a KV type for GBK inputs, hence we enforce that diff --git a/sdks/python/apache_beam/utils/plugin.py b/sdks/python/apache_beam/utils/plugin.py new file mode 100644 index 000000000000..563b93c54c7d --- /dev/null +++ b/sdks/python/apache_beam/utils/plugin.py @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A BeamPlugin base class. + +For experimental usage only; no backwards-compatibility guarantees. +""" + + +class BeamPlugin(object): + """Plugin base class to be extended by dependent users such as FileSystem. + Any instantiated subclass will be imported at worker startup time.""" + + @classmethod + def get_all_subclasses(cls): + """Get all the subclasses of the BeamPlugin class.""" + all_subclasses = [] + for subclass in cls.__subclasses__(): + all_subclasses.append(subclass) + all_subclasses.extend(subclass.get_all_subclasses()) + return all_subclasses + + @classmethod + def get_all_plugin_paths(cls): + """Get full import paths of the BeamPlugin subclass.""" + def fullname(o): + return o.__module__ + "." + o.__name__ + return [fullname(o) for o in cls.get_all_subclasses()]