diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index 92929afa2e1dc..c1d86aad75064 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -19,7 +19,6 @@ from __future__ import absolute_import -import argparse import logging import re @@ -59,26 +58,29 @@ def process(self, context): return words +class WordCountOptions(PipelineOptions): + + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument( + '--input', help='Input for the dataflow pipeline', + default='gs://dataflow-samples/shakespeare/kinglear.txt') + parser.add_argument( + '--output', help='Output for the dataflow pipeline', required=True) + + def run(argv=None): """Main entry point; defines and runs the wordcount pipeline.""" - parser = argparse.ArgumentParser() - parser.add_argument('--input', - dest='input', - default='gs://dataflow-samples/shakespeare/kinglear.txt', - help='Input file to process.') - parser.add_argument('--output', - dest='output', - required=True, - help='Output file to write results to.') - known_args, pipeline_args = parser.parse_known_args(argv) + pipeline_options = PipelineOptions(argv) # We use the save_main_session option because one or more DoFn's in this # workflow rely on global context (e.g., a module imported at module level). - pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True + word_count_options = pipeline_options.view_as(WordCountOptions) + p = beam.Pipeline(options=pipeline_options) # Read the text file[pattern] into a PCollection. - lines = p | 'read' >> ReadFromText(known_args.input) + lines = p | 'read' >> ReadFromText(word_count_options.input) # Count the occurrences of each word. counts = (lines @@ -93,7 +95,7 @@ def run(argv=None): # Write the output using a "Write" transform that has side effects. # pylint: disable=expression-not-assigned - output | 'write' >> WriteToText(known_args.output) + output | 'write' >> WriteToText(word_count_options.output) # Actually run the pipeline (all operations above are deferred). result = p.run() diff --git a/sdks/python/apache_beam/utils/pipeline_options.py b/sdks/python/apache_beam/utils/pipeline_options.py index 16b1640066bdc..1af3a0dfa6bf0 100644 --- a/sdks/python/apache_beam/utils/pipeline_options.py +++ b/sdks/python/apache_beam/utils/pipeline_options.py @@ -116,9 +116,16 @@ def get_all_options(self, drop_default=False): Returns: Dictionary of all args and values. """ + + # TODO(BEAM-1319): PipelineOption sub-classes in the main session might be + # repeated. Pick last unique instance of each subclass to avoid conflicts. parser = argparse.ArgumentParser() + subset = {} for cls in PipelineOptions.__subclasses__(): + subset[str(cls)] = cls + for cls in subset.values(): cls._add_argparse_args(parser) # pylint: disable=protected-access + known_args, _ = parser.parse_known_args(self._flags) result = vars(known_args)