From 86c173eb7991594168a53c609747ec6a91651133 Mon Sep 17 00:00:00 2001 From: Ahmet Altay Date: Wed, 1 Feb 2017 10:39:59 -0800 Subject: [PATCH] In some instances where a PipelineOptions subclass was defined in the main session and save_main_session option is enabled, that subclass may appear multiple times in the PipelineOptions.__subclassess__() list. This is causing problems with the argparse because options are not unique any more. This changes filter the subclasses by name, and pick the last unique instance of each subclass. --- .../apache_beam/utils/pipeline_options.py | 7 +++++++ .../apache_beam/utils/pipeline_options_test.py | 17 +++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/sdks/python/apache_beam/utils/pipeline_options.py b/sdks/python/apache_beam/utils/pipeline_options.py index 16b1640066bd..1af3a0dfa6bf 100644 --- a/sdks/python/apache_beam/utils/pipeline_options.py +++ b/sdks/python/apache_beam/utils/pipeline_options.py @@ -116,9 +116,16 @@ def get_all_options(self, drop_default=False): Returns: Dictionary of all args and values. """ + + # TODO(BEAM-1319): PipelineOption sub-classes in the main session might be + # repeated. Pick last unique instance of each subclass to avoid conflicts. parser = argparse.ArgumentParser() + subset = {} for cls in PipelineOptions.__subclasses__(): + subset[str(cls)] = cls + for cls in subset.values(): cls._add_argparse_args(parser) # pylint: disable=protected-access + known_args, _ = parser.parse_known_args(self._flags) result = vars(known_args) diff --git a/sdks/python/apache_beam/utils/pipeline_options_test.py b/sdks/python/apache_beam/utils/pipeline_options_test.py index 054b6a5e4c89..507a8275500d 100644 --- a/sdks/python/apache_beam/utils/pipeline_options_test.py +++ b/sdks/python/apache_beam/utils/pipeline_options_test.py @@ -170,6 +170,23 @@ def test_template_location(self): options = PipelineOptions(flags=['']) self.assertEqual(options.get_all_options()['template_location'], None) + def test_redefine_options(self): + + class TestRedefinedOptios(PipelineOptions): # pylint: disable=unused-variable + + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument('--redefined_flag', action='store_true') + + class TestRedefinedOptios(PipelineOptions): + + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument('--redefined_flag', action='store_true') + + options = PipelineOptions(['--redefined_flag']) + self.assertEqual(options.get_all_options()['redefined_flag'], True) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main()