Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-1319] Add conflict resolution to the PipelineOptions internal argparse. #1848

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 16 additions & 14 deletions sdks/python/apache_beam/examples/wordcount.py
Expand Up @@ -19,7 +19,6 @@

from __future__ import absolute_import

import argparse
import logging
import re

Expand Down Expand Up @@ -59,26 +58,29 @@ def process(self, context):
return words


class WordCountOptions(PipelineOptions):

@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input', help='Input for the dataflow pipeline',
default='gs://dataflow-samples/shakespeare/kinglear.txt')
parser.add_argument(
'--output', help='Output for the dataflow pipeline', required=True)


def run(argv=None):
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Input file to process.')
parser.add_argument('--output',
dest='output',
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
word_count_options = pipeline_options.view_as(WordCountOptions)

p = beam.Pipeline(options=pipeline_options)

# Read the text file[pattern] into a PCollection.
lines = p | 'read' >> ReadFromText(known_args.input)
lines = p | 'read' >> ReadFromText(word_count_options.input)

# Count the occurrences of each word.
counts = (lines
Expand All @@ -93,7 +95,7 @@ def run(argv=None):

# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | 'write' >> WriteToText(known_args.output)
output | 'write' >> WriteToText(word_count_options.output)

# Actually run the pipeline (all operations above are deferred).
result = p.run()
Expand Down
7 changes: 7 additions & 0 deletions sdks/python/apache_beam/utils/pipeline_options.py
Expand Up @@ -116,9 +116,16 @@ def get_all_options(self, drop_default=False):
Returns:
Dictionary of all args and values.
"""

# TODO(BEAM-1319): PipelineOption sub-classes in the main session might be
# repeated. Pick last unique instance of each subclass to avoid conflicts.
parser = argparse.ArgumentParser()
subset = {}
for cls in PipelineOptions.__subclasses__():
subset[str(cls)] = cls
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test for this ?

for cls in subset.values():
cls._add_argparse_args(parser) # pylint: disable=protected-access

known_args, _ = parser.parse_known_args(self._flags)
result = vars(known_args)

Expand Down