diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index a95ffc6e54237..04f8802287c48 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -35,6 +35,7 @@ from apache_beam.io.gcp.pubsub import WriteToPubSub from apache_beam.io.gcp.pubsub import _PubSubSink from apache_beam.io.gcp.pubsub import _PubSubSource +from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.runners.direct import transform_evaluator from apache_beam.runners.direct.direct_runner import _DirectReadFromPubSub @@ -109,8 +110,9 @@ def test_repr(self): class TestReadFromPubSubOverride(unittest.TestCase): def test_expand_with_topic(self): - p = TestPipeline() - p.options.view_as(StandardOptions).streaming = True + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + p = TestPipeline(options=options) pcoll = (p | ReadFromPubSub('projects/fakeprj/topics/a_topic', None, 'a_label', with_attributes=False, @@ -119,7 +121,7 @@ def test_expand_with_topic(self): self.assertEqual(bytes, pcoll.element_type) # Apply the necessary PTransformOverrides. - overrides = _get_transform_overrides(p.options) + overrides = _get_transform_overrides(options) p.replace_all(overrides) # Note that the direct output of ReadFromPubSub will be replaced @@ -132,8 +134,9 @@ def test_expand_with_topic(self): self.assertEqual('a_label', source.id_label) def test_expand_with_subscription(self): - p = TestPipeline() - p.options.view_as(StandardOptions).streaming = True + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + p = TestPipeline(options=options) pcoll = (p | ReadFromPubSub( None, 'projects/fakeprj/subscriptions/a_subscription', @@ -142,7 +145,7 @@ def test_expand_with_subscription(self): self.assertEqual(bytes, pcoll.element_type) # Apply the necessary PTransformOverrides. - overrides = _get_transform_overrides(p.options) + overrides = _get_transform_overrides(options) p.replace_all(overrides) # Note that the direct output of ReadFromPubSub will be replaced @@ -167,8 +170,9 @@ def test_expand_with_both_topic_and_subscription(self): with_attributes=False, timestamp_attribute=None) def test_expand_with_other_options(self): - p = TestPipeline() - p.options.view_as(StandardOptions).streaming = True + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + p = TestPipeline(options=options) pcoll = (p | ReadFromPubSub('projects/fakeprj/topics/a_topic', None, 'a_label', with_attributes=True, @@ -177,7 +181,7 @@ def test_expand_with_other_options(self): self.assertEqual(PubsubMessage, pcoll.element_type) # Apply the necessary PTransformOverrides. - overrides = _get_transform_overrides(p.options) + overrides = _get_transform_overrides(options) p.replace_all(overrides) # Note that the direct output of ReadFromPubSub will be replaced @@ -193,15 +197,16 @@ def test_expand_with_other_options(self): @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed') class TestWriteStringsToPubSubOverride(unittest.TestCase): def test_expand_deprecated(self): - p = TestPipeline() - p.options.view_as(StandardOptions).streaming = True + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + p = TestPipeline(options=options) pcoll = (p | ReadFromPubSub('projects/fakeprj/topics/baz') | WriteStringsToPubSub('projects/fakeprj/topics/a_topic') | beam.Map(lambda x: x)) # Apply the necessary PTransformOverrides. - overrides = _get_transform_overrides(p.options) + overrides = _get_transform_overrides(options) p.replace_all(overrides) # Note that the direct output of ReadFromPubSub will be replaced @@ -212,8 +217,9 @@ def test_expand_deprecated(self): self.assertEqual('a_topic', write_transform.dofn.short_topic_name) def test_expand(self): - p = TestPipeline() - p.options.view_as(StandardOptions).streaming = True + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + p = TestPipeline(options=options) pcoll = (p | ReadFromPubSub('projects/fakeprj/topics/baz') | WriteToPubSub('projects/fakeprj/topics/a_topic', @@ -221,7 +227,7 @@ def test_expand(self): | beam.Map(lambda x: x)) # Apply the necessary PTransformOverrides. - overrides = _get_transform_overrides(p.options) + overrides = _get_transform_overrides(options) p.replace_all(overrides) # Note that the direct output of ReadFromPubSub will be replaced @@ -342,8 +348,9 @@ def test_read_messages_success(self, mock_pubsub): [window.GlobalWindow()])] mock_pubsub.return_value.pull.return_value = pull_response - p = TestPipeline() - p.options.view_as(StandardOptions).streaming = True + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + p = TestPipeline(options=options) pcoll = (p | ReadFromPubSub('projects/fakeprj/topics/a_topic', None, None, with_attributes=True)) @@ -362,8 +369,9 @@ def test_read_strings_success(self, mock_pubsub): expected_elements = [data] mock_pubsub.return_value.pull.return_value = pull_response - p = TestPipeline() - p.options.view_as(StandardOptions).streaming = True + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + p = TestPipeline(options=options) pcoll = (p | ReadStringsFromPubSub('projects/fakeprj/topics/a_topic', None, None)) @@ -380,8 +388,9 @@ def test_read_data_success(self, mock_pubsub): expected_elements = [data_encoded] mock_pubsub.return_value.pull.return_value = pull_response - p = TestPipeline() - p.options.view_as(StandardOptions).streaming = True + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + p = TestPipeline(options=options) pcoll = (p | ReadFromPubSub('projects/fakeprj/topics/a_topic', None, None)) assert_that(pcoll, equal_to(expected_elements)) @@ -407,8 +416,9 @@ def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub): ] mock_pubsub.return_value.pull.return_value = pull_response - p = TestPipeline() - p.options.view_as(StandardOptions).streaming = True + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + p = TestPipeline(options=options) pcoll = (p | ReadFromPubSub( 'projects/fakeprj/topics/a_topic', None, None, @@ -436,8 +446,9 @@ def test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub): ] mock_pubsub.return_value.pull.return_value = pull_response - p = TestPipeline() - p.options.view_as(StandardOptions).streaming = True + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + p = TestPipeline(options=options) pcoll = (p | ReadFromPubSub( 'projects/fakeprj/topics/a_topic', None, None, @@ -466,8 +477,9 @@ def test_read_messages_timestamp_attribute_missing(self, mock_pubsub): ] mock_pubsub.return_value.pull.return_value = pull_response - p = TestPipeline() - p.options.view_as(StandardOptions).streaming = True + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + p = TestPipeline(options=options) pcoll = (p | ReadFromPubSub( 'projects/fakeprj/topics/a_topic', None, None, @@ -489,8 +501,9 @@ def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub): ]) mock_pubsub.return_value.pull.return_value = pull_response - p = TestPipeline() - p.options.view_as(StandardOptions).streaming = True + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + p = TestPipeline(options=options) _ = (p | ReadFromPubSub( 'projects/fakeprj/topics/a_topic', None, None, @@ -501,8 +514,9 @@ def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub): def test_read_message_id_label_unsupported(self, unused_mock_pubsub): # id_label is unsupported in DirectRunner. - p = TestPipeline() - p.options.view_as(StandardOptions).streaming = True + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + p = TestPipeline(options=options) _ = (p | ReadFromPubSub('projects/fakeprj/topics/a_topic', None, 'a_label')) with self.assertRaisesRegexp(NotImplementedError, r'id_label is not supported'): @@ -517,8 +531,9 @@ def test_write_messages_success(self, mock_pubsub): data = 'data' payloads = [data] - p = TestPipeline() - p.options.view_as(StandardOptions).streaming = True + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + p = TestPipeline(options=options) _ = (p | Create(payloads) | WriteToPubSub('projects/fakeprj/topics/a_topic', @@ -531,8 +546,9 @@ def test_write_messages_deprecated(self, mock_pubsub): data = 'data' payloads = [data] - p = TestPipeline() - p.options.view_as(StandardOptions).streaming = True + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + p = TestPipeline(options=options) _ = (p | Create(payloads) | WriteStringsToPubSub('projects/fakeprj/topics/a_topic')) @@ -545,8 +561,9 @@ def test_write_messages_with_attributes_success(self, mock_pubsub): attributes = {'key': 'value'} payloads = [PubsubMessage(data, attributes)] - p = TestPipeline() - p.options.view_as(StandardOptions).streaming = True + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + p = TestPipeline(options=options) _ = (p | Create(payloads) | WriteToPubSub('projects/fakeprj/topics/a_topic', @@ -560,8 +577,9 @@ def test_write_messages_with_attributes_error(self, mock_pubsub): # Sending raw data when WriteToPubSub expects a PubsubMessage object. payloads = [data] - p = TestPipeline() - p.options.view_as(StandardOptions).streaming = True + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + p = TestPipeline(options=options) _ = (p | Create(payloads) | WriteToPubSub('projects/fakeprj/topics/a_topic', @@ -575,8 +593,9 @@ def test_write_messages_unsupported_features(self, mock_pubsub): attributes = {'key': 'value'} payloads = [PubsubMessage(data, attributes)] - p = TestPipeline() - p.options.view_as(StandardOptions).streaming = True + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + p = TestPipeline(options=options) _ = (p | Create(payloads) | WriteToPubSub('projects/fakeprj/topics/a_topic', @@ -584,8 +603,9 @@ def test_write_messages_unsupported_features(self, mock_pubsub): with self.assertRaisesRegexp(NotImplementedError, r'id_label is not supported'): p.run() - p = TestPipeline() - p.options.view_as(StandardOptions).streaming = True + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + p = TestPipeline(options=options) _ = (p | Create(payloads) | WriteToPubSub('projects/fakeprj/topics/a_topic', diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 10f4a02c065db..4618c2c6cbd9a 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -415,7 +415,7 @@ def run(self, test_runner_api=True): pickler.dump_session(os.path.join(tmpdir, 'main_session.pickle')) finally: shutil.rmtree(tmpdir) - return self.runner.run_pipeline(self) + return self.runner.run_pipeline(self, self._options) def __enter__(self): return self @@ -512,7 +512,7 @@ def apply(self, transform, pvalueish=None, label=None): if type_options.pipeline_type_check: transform.type_check_inputs(pvalueish) - pvalueish_result = self.runner.apply(transform, pvalueish) + pvalueish_result = self.runner.apply(transform, pvalueish, self._options) if type_options is not None and type_options.pipeline_type_check: transform.type_check_outputs(pvalueish_result) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index a135251fa8e29..88b03d47a3049 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -311,7 +311,7 @@ def visit_transform(self, transform_node): return FlattenInputVisitor() - def run_pipeline(self, pipeline): + def run_pipeline(self, pipeline, options): """Remotely executes entire pipeline or parts reachable from node.""" # Import here to avoid adding the dependency for local running scenarios. try: @@ -323,7 +323,7 @@ def run_pipeline(self, pipeline): 'please install apache_beam[gcp]') # Convert all side inputs into a form acceptable to Dataflow. - if apiclient._use_fnapi(pipeline._options): + if apiclient._use_fnapi(options): pipeline.visit(self.side_input_visitor()) # Performing configured PTransform overrides. Note that this is currently @@ -336,7 +336,7 @@ def run_pipeline(self, pipeline): return_context=True) # Add setup_options for all the BeamPlugin imports - setup_options = pipeline._options.view_as(SetupOptions) + setup_options = options.view_as(SetupOptions) plugins = BeamPlugin.get_all_plugin_paths() if setup_options.beam_plugins is not None: plugins = list(set(plugins + setup_options.beam_plugins)) @@ -344,15 +344,15 @@ def run_pipeline(self, pipeline): # Elevate "min_cpu_platform" to pipeline option, but using the existing # experiment. - debug_options = pipeline._options.view_as(DebugOptions) - worker_options = pipeline._options.view_as(WorkerOptions) + debug_options = options.view_as(DebugOptions) + worker_options = options.view_as(WorkerOptions) if worker_options.min_cpu_platform: experiments = ["min_cpu_platform=%s" % worker_options.min_cpu_platform] if debug_options.experiments is not None: experiments = list(set(experiments + debug_options.experiments)) debug_options.experiments = experiments - self.job = apiclient.Job(pipeline._options, self.proto_pipeline) + self.job = apiclient.Job(options, self.proto_pipeline) # Dataflow runner requires a KV type for GBK inputs, hence we enforce that # here. @@ -363,20 +363,19 @@ def run_pipeline(self, pipeline): pipeline.visit(self.flatten_input_visitor()) # The superclass's run will trigger a traversal of all reachable nodes. - super(DataflowRunner, self).run_pipeline(pipeline) + super(DataflowRunner, self).run_pipeline(pipeline, options) - test_options = pipeline._options.view_as(TestOptions) + test_options = options.view_as(TestOptions) # If it is a dry run, return without submitting the job. if test_options.dry_run: return None # Get a Dataflow API client and set its options - self.dataflow_client = apiclient.DataflowApplicationClient( - pipeline._options) + self.dataflow_client = apiclient.DataflowApplicationClient(options) dataflow_worker_jar = getattr(worker_options, 'dataflow_worker_jar', None) if dataflow_worker_jar is not None: - if not apiclient._use_fnapi(pipeline._options): + if not apiclient._use_fnapi(options): logging.fatal( 'Typical end users should not use this worker jar feature. ' 'It can only be used when fnapi is enabled.') @@ -509,9 +508,8 @@ def _add_singleton_step( self.serialize_windowing_strategy(windowing_strategy)) return step - def run_Impulse(self, transform_node): - standard_options = ( - transform_node.outputs[None].pipeline._options.view_as(StandardOptions)) + def run_Impulse(self, transform_node, options): + standard_options = options.view_as(StandardOptions) step = self._add_step( TransformNames.READ, transform_node.full_label, transform_node) if standard_options.streaming: @@ -535,7 +533,7 @@ def run_Impulse(self, transform_node): PropertyNames.ENCODING: step.encoding, PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) - def run_Flatten(self, transform_node): + def run_Flatten(self, transform_node, options): step = self._add_step(TransformNames.FLATTEN, transform_node.full_label, transform_node) inputs = [] @@ -554,16 +552,16 @@ def run_Flatten(self, transform_node): PropertyNames.ENCODING: step.encoding, PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) - def apply_WriteToBigQuery(self, transform, pcoll): + def apply_WriteToBigQuery(self, transform, pcoll, options): # Make sure this is the WriteToBigQuery class that we expected if not isinstance(transform, beam.io.WriteToBigQuery): - return self.apply_PTransform(transform, pcoll) - standard_options = pcoll.pipeline._options.view_as(StandardOptions) + return self.apply_PTransform(transform, pcoll, options) + standard_options = options.view_as(StandardOptions) if standard_options.streaming: if (transform.write_disposition == beam.io.BigQueryDisposition.WRITE_TRUNCATE): raise RuntimeError('Can not use write truncation mode in streaming') - return self.apply_PTransform(transform, pcoll) + return self.apply_PTransform(transform, pcoll, options) else: return pcoll | 'WriteToBigQuery' >> beam.io.Write( beam.io.BigQuerySink( @@ -574,7 +572,7 @@ def apply_WriteToBigQuery(self, transform, pcoll): transform.create_disposition, transform.write_disposition)) - def apply_GroupByKey(self, transform, pcoll): + def apply_GroupByKey(self, transform, pcoll, options): # Infer coder of parent. # # TODO(ccy): make Coder inference and checking less specialized and more @@ -594,7 +592,7 @@ def apply_GroupByKey(self, transform, pcoll): return pvalue.PCollection(pcoll.pipeline) - def run_GroupByKey(self, transform_node): + def run_GroupByKey(self, transform_node, options): input_tag = transform_node.inputs[0].tag input_step = self._cache.get_pvalue(transform_node.inputs[0]) step = self._add_step( @@ -617,7 +615,7 @@ def run_GroupByKey(self, transform_node): PropertyNames.SERIALIZED_FN, self.serialize_windowing_strategy(windowing)) - def run_ParDo(self, transform_node): + def run_ParDo(self, transform_node, options): transform = transform_node.transform input_tag = transform_node.inputs[0].tag input_step = self._cache.get_pvalue(transform_node.inputs[0]) @@ -672,7 +670,7 @@ def run_ParDo(self, transform_node): from apache_beam.runners.dataflow.internal import apiclient transform_proto = self.proto_context.transforms.get_proto(transform_node) transform_id = self.proto_context.transforms.get_id(transform_node) - if (apiclient._use_fnapi(transform_node.inputs[0].pipeline._options) + if (apiclient._use_fnapi(options) and transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn): # Patch side input ids to be unique across a given pipeline. if (label_renames and @@ -742,10 +740,10 @@ def _pardo_fn_data(transform_node, get_label): return (transform.fn, transform.args, transform.kwargs, si_tags_and_types, transform_node.inputs[0].windowing) - def apply_CombineValues(self, transform, pcoll): + def apply_CombineValues(self, transform, pcoll, options): return pvalue.PCollection(pcoll.pipeline) - def run_CombineValues(self, transform_node): + def run_CombineValues(self, transform_node, options): transform = transform_node.transform input_tag = transform_node.inputs[0].tag input_step = self._cache.get_pvalue(transform_node.inputs[0]) @@ -755,7 +753,7 @@ def run_CombineValues(self, transform_node): # The data transmitted in SERIALIZED_FN is different depending on whether # this is a fnapi pipeline or not. from apache_beam.runners.dataflow.internal import apiclient - if apiclient._use_fnapi(transform_node.inputs[0].pipeline._options): + if apiclient._use_fnapi(options): # Fnapi pipelines send the transform ID of the CombineValues transform's # parent composite because Dataflow expects the ID of a CombinePerKey # transform. @@ -792,28 +790,30 @@ def run_CombineValues(self, transform_node): PropertyNames.OUTPUT_NAME: PropertyNames.OUT}) step.add_property(PropertyNames.OUTPUT_INFO, outputs) - def apply_Read(self, transform, pbegin): + def apply_Read(self, transform, pbegin, options): if hasattr(transform.source, 'format'): # Consider native Read to be a primitive for dataflow. return beam.pvalue.PCollection(pbegin.pipeline) else: - options = pbegin.pipeline.options.view_as(DebugOptions) - if options.experiments and 'beam_fn_api' in options.experiments: + debug_options = options.view_as(DebugOptions) + if ( + debug_options.experiments and + 'beam_fn_api' in debug_options.experiments + ): # Expand according to FnAPI primitives. - return self.apply_PTransform(transform, pbegin) + return self.apply_PTransform(transform, pbegin, options) else: # Custom Read is also a primitive for non-FnAPI on dataflow. return beam.pvalue.PCollection(pbegin.pipeline) - def run_Read(self, transform_node): + def run_Read(self, transform_node, options): transform = transform_node.transform step = self._add_step( TransformNames.READ, transform_node.full_label, transform_node) # TODO(mairbek): refactor if-else tree to use registerable functions. # Initialize the source specific properties. - standard_options = transform_node.inputs[0].pipeline.options.view_as( - StandardOptions) + standard_options = options.view_as(StandardOptions) if not hasattr(transform.source, 'format'): # If a format is not set, we assume the source to be a custom source. source_dict = {} @@ -919,7 +919,7 @@ def run_Read(self, transform_node): PropertyNames.ENCODING: step.encoding, PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) - def run__NativeWrite(self, transform_node): + def run__NativeWrite(self, transform_node, options): transform = transform_node.transform input_tag = transform_node.inputs[0].tag input_step = self._cache.get_pvalue(transform_node.inputs[0]) @@ -965,8 +965,7 @@ def run__NativeWrite(self, transform_node): step.add_property( PropertyNames.BIGQUERY_SCHEMA, transform.sink.schema_as_json()) elif transform.sink.format == 'pubsub': - standard_options = ( - transform_node.inputs[0].pipeline.options.view_as(StandardOptions)) + standard_options = options.view_as(StandardOptions) if not standard_options.streaming: raise ValueError('Cloud Pub/Sub is currently available for use ' 'only in streaming pipelines.') diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py index b1a845a2784f9..219f34b879e3b 100644 --- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py @@ -38,18 +38,19 @@ class TestDataflowRunner(DataflowRunner): - def run_pipeline(self, pipeline): + def run_pipeline(self, pipeline, options): """Execute test pipeline and verify test matcher""" - options = pipeline._options.view_as(TestOptions) - on_success_matcher = options.on_success_matcher - wait_duration = options.wait_until_finish_duration + test_options = options.view_as(TestOptions) + on_success_matcher = test_options.on_success_matcher + wait_duration = test_options.wait_until_finish_duration is_streaming = options.view_as(StandardOptions).streaming # [BEAM-1889] Do not send this to remote workers also, there is no need to # send this option to remote executors. - options.on_success_matcher = None + test_options.on_success_matcher = None - self.result = super(TestDataflowRunner, self).run_pipeline(pipeline) + self.result = super(TestDataflowRunner, self).run_pipeline( + pipeline, options) if self.result.has_job: # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs # in some cases. diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index d410992ab1d9f..b882eb385ea72 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -68,11 +68,11 @@ class SwitchingDirectRunner(PipelineRunner): implemented in the FnApiRunner. """ - def run_pipeline(self, pipeline): + def run_pipeline(self, pipeline, options): use_fnapi_runner = True # Streaming mode is not yet supported on the FnApiRunner. - if pipeline._options.view_as(StandardOptions).streaming: + if options.view_as(StandardOptions).streaming: use_fnapi_runner = False from apache_beam.pipeline import PipelineVisitor @@ -136,7 +136,7 @@ def visit_transform(self, applied_ptransform): else: runner = BundleBasedDirectRunner() - return runner.run_pipeline(pipeline) + return runner.run_pipeline(pipeline, options) # Type variables. @@ -346,7 +346,7 @@ def get_replacement_transform(self, transform): class BundleBasedDirectRunner(PipelineRunner): """Executes a single pipeline on the local machine.""" - def run_pipeline(self, pipeline): + def run_pipeline(self, pipeline, options): """Execute the entire pipeline and returns an DirectPipelineResult.""" # TODO: Move imports to top. Pipeline <-> Runner dependency cause problems @@ -362,7 +362,7 @@ def run_pipeline(self, pipeline): from apache_beam.testing.test_stream import TestStream # Performing configured PTransform overrides. - pipeline.replace_all(_get_transform_overrides(pipeline.options)) + pipeline.replace_all(_get_transform_overrides(options)) # If the TestStream I/O is used, use a mock test clock. class _TestStreamUsageVisitor(PipelineVisitor): @@ -387,8 +387,8 @@ def visit_transform(self, applied_ptransform): pipeline.visit(self.consumer_tracking_visitor) evaluation_context = EvaluationContext( - pipeline._options, - BundleFactory(stacked=pipeline._options.view_as(DirectOptions) + options, + BundleFactory(stacked=options.view_as(DirectOptions) .direct_runner_use_stacked_bundle), self.consumer_tracking_visitor.root_transforms, self.consumer_tracking_visitor.value_to_consumers, diff --git a/sdks/python/apache_beam/runners/direct/test_direct_runner.py b/sdks/python/apache_beam/runners/direct/test_direct_runner.py index 23dfeabc2abe5..04dbe50938703 100644 --- a/sdks/python/apache_beam/runners/direct/test_direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/test_direct_runner.py @@ -30,17 +30,17 @@ class TestDirectRunner(DirectRunner): - def run_pipeline(self, pipeline): + def run_pipeline(self, pipeline, options): """Execute test pipeline and verify test matcher""" - options = pipeline._options.view_as(TestOptions) - on_success_matcher = options.on_success_matcher + test_options = options.view_as(TestOptions) + on_success_matcher = test_options.on_success_matcher is_streaming = options.view_as(StandardOptions).streaming # [BEAM-1889] Do not send this to remote workers also, there is no need to # send this option to remote executors. - options.on_success_matcher = None + test_options.on_success_matcher = None - self.result = super(TestDirectRunner, self).run_pipeline(pipeline) + self.result = super(TestDirectRunner, self).run_pipeline(pipeline, options) try: if not is_streaming: diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py b/sdks/python/apache_beam/runners/interactive/interactive_runner.py index 45bc246d4acbb..391f3f0451f32 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py @@ -98,11 +98,11 @@ def end_session(self): def cleanup(self): self._cache_manager.cleanup() - def apply(self, transform, pvalueish): + def apply(self, transform, pvalueish, options): # TODO(qinyeli, BEAM-646): Remove runner interception of apply. - return self._underlying_runner.apply(transform, pvalueish) + return self._underlying_runner.apply(transform, pvalueish, options) - def run_pipeline(self, pipeline): + def run_pipeline(self, pipeline, options): if not hasattr(self, '_desired_cache_labels'): self._desired_cache_labels = set() @@ -111,7 +111,7 @@ def run_pipeline(self, pipeline): pipeline = beam.pipeline.Pipeline.from_runner_api( pipeline.to_runner_api(use_fake_coders=True), pipeline.runner, - pipeline._options) + options) # Snapshot the pipeline in a portable proto before mutating it. pipeline_proto, original_context = pipeline.to_runner_api( @@ -121,7 +121,7 @@ def run_pipeline(self, pipeline): analyzer = pipeline_analyzer.PipelineAnalyzer(self._cache_manager, pipeline_proto, self._underlying_runner, - pipeline._options, + options, self._desired_cache_labels) # Should be only accessed for debugging purpose. self._analyzer = analyzer @@ -129,7 +129,7 @@ def run_pipeline(self, pipeline): pipeline_to_execute = beam.pipeline.Pipeline.from_runner_api( analyzer.pipeline_proto_to_execute(), self._underlying_runner, - pipeline._options) + options) display = display_manager.DisplayManager( pipeline_proto=pipeline_proto, diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index e8e1f9063a599..76155492fe577 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -234,7 +234,7 @@ def _next_uid(self): self._last_uid += 1 return str(self._last_uid) - def run_pipeline(self, pipeline): + def run_pipeline(self, pipeline, options): MetricsEnvironment.set_metrics_supported(False) RuntimeValueProvider.set_runtime_options({}) # This is sometimes needed if type checking is disabled @@ -242,10 +242,10 @@ def run_pipeline(self, pipeline): # are known to be KVs. from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner pipeline.visit(DataflowRunner.group_by_key_input_visitor()) - self._bundle_repeat = self._bundle_repeat or pipeline._options.view_as( + self._bundle_repeat = self._bundle_repeat or options.view_as( pipeline_options.DirectOptions).direct_runner_bundle_repeat self._profiler_factory = profiler.Profile.factory_from_options( - pipeline._options.view_as(pipeline_options.ProfilingOptions)) + options.view_as(pipeline_options.ProfilingOptions)) return self.run_via_runner_api(pipeline.to_runner_api()) def run_via_runner_api(self, pipeline_proto): diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py index 0b539af4535a4..025cb1066780f 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner.py @@ -105,14 +105,14 @@ def _create_environment(options): env=(config.get('env') or '') ).SerializeToString()) - def run_pipeline(self, pipeline): - portable_options = pipeline.options.view_as(PortableOptions) + def run_pipeline(self, pipeline, options): + portable_options = options.view_as(PortableOptions) job_endpoint = portable_options.job_endpoint # TODO: https://issues.apache.org/jira/browse/BEAM-5525 # portable runner specific default - if pipeline.options.view_as(SetupOptions).sdk_location == 'default': - pipeline.options.view_as(SetupOptions).sdk_location = 'container' + if options.view_as(SetupOptions).sdk_location == 'default': + options.view_as(SetupOptions).sdk_location = 'container' if not job_endpoint: docker = DockerizedJobServer() @@ -134,9 +134,9 @@ def run_pipeline(self, pipeline): # TODO: Define URNs for options. # convert int values: https://issues.apache.org/jira/browse/BEAM-5509 - options = {'beam:option:' + k + ':v1': (str(v) if type(v) == int else v) - for k, v in pipeline._options.get_all_options().items() - if v is not None} + p_options = {'beam:option:' + k + ':v1': (str(v) if type(v) == int else v) + for k, v in options.get_all_options().items() + if v is not None} channel = grpc.insecure_channel(job_endpoint) grpc.channel_ready_future(channel).result() @@ -153,7 +153,7 @@ def send_prepare_request(max_retries=5): return job_service.Prepare( beam_job_api_pb2.PrepareJobRequest( job_name='job', pipeline=proto_pipeline, - pipeline_options=job_utils.dict_to_struct(options))) + pipeline_options=job_utils.dict_to_struct(p_options))) except grpc._channel._Rendezvous as e: num_retries += 1 if num_retries > max_retries: @@ -165,7 +165,7 @@ def send_prepare_request(max_retries=5): grpc.insecure_channel(prepare_response.artifact_staging_endpoint.url), prepare_response.staging_session_token) retrieval_token, _ = stager.stage_job_resources( - pipeline._options, + options, staging_location='') else: retrieval_token = None diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index b0bafa55d68cf..71845620d81db 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -151,7 +151,7 @@ def run_async(self, transform, options=None): transform(PBegin(p)) return p.run() - def run_pipeline(self, pipeline): + def run_pipeline(self, pipeline, options): """Execute the entire pipeline or the sub-DAG reachable from a node. Runners should override this method. @@ -168,14 +168,14 @@ def __init__(self, runner): def visit_transform(self, transform_node): try: - self.runner.run_transform(transform_node) + self.runner.run_transform(transform_node, options) except: logging.error('Error while visiting %s', transform_node.full_label) raise pipeline.visit(RunVisitor(self)) - def apply(self, transform, input): + def apply(self, transform, input, options): """Runner callback for a pipeline.apply call. Args: @@ -190,15 +190,15 @@ def apply(self, transform, input): for cls in transform.__class__.mro(): m = getattr(self, 'apply_%s' % cls.__name__, None) if m: - return m(transform, input) + return m(transform, input, options) raise NotImplementedError( 'Execution of [%s] not implemented in runner %s.' % (transform, self)) - def apply_PTransform(self, transform, input): + def apply_PTransform(self, transform, input, options): # The base case of apply is to call the transform's expand. return transform.expand(input) - def run_transform(self, transform_node): + def run_transform(self, transform_node, options): """Runner callback for a pipeline.run call. Args: @@ -211,7 +211,7 @@ def run_transform(self, transform_node): for cls in transform_node.transform.__class__.mro(): m = getattr(self, 'run_%s' % cls.__name__, None) if m: - return m(transform_node) + return m(transform_node, options) raise NotImplementedError( 'Execution of [%s] not implemented in runner %s.' % ( transform_node.transform, self))