From f19de4f9d3c9e2434f3ce1ecf11a123b923e6be9 Mon Sep 17 00:00:00 2001 From: Pablo Estrada Date: Wed, 1 Apr 2020 11:48:59 -0700 Subject: [PATCH] Remove backslashes --- .../portability/fn_api_runner/execution.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py index 6dbfc9866d3a7..2fb3d838bef9b 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py @@ -324,8 +324,8 @@ def commit_side_inputs_to_state( data_side_input, # type: DataSideInput ): # type: (...) -> None - for (consuming_transform_id, tag), (buffer_id, func_spec) \ - in data_side_input.items(): + for (consuming_transform_id, + tag), (buffer_id, func_spec) in data_side_input.items(): _, pcoll_id = split_buffer_id(buffer_id) value_coder = self.pipeline_context.coders[self.safe_coders[ self.data_channel_coders[pcoll_id]]] @@ -343,8 +343,7 @@ def commit_side_inputs_to_state( transform_id=consuming_transform_id, side_input_id=tag, window=window)) - self.worker_handler_manager.state_servicer \ - .append_raw(state_key, elements_data) + self.state_servicer.append_raw(state_key, elements_data) elif func_spec.urn == common_urns.side_inputs.MULTIMAP.urn: for key, window, elements_data in elements_by_window.encoded_items(): state_key = beam_fn_api_pb2.StateKey( @@ -353,8 +352,7 @@ def commit_side_inputs_to_state( side_input_id=tag, window=window, key=key)) - self.worker_handler_manager.state_servicer \ - .append_raw(state_key, elements_data) + self.state_servicer.append_raw(state_key, elements_data) else: raise ValueError("Unknown access pattern: '%s'" % func_spec.urn) @@ -461,8 +459,8 @@ def extract_bundle_inputs(self): if pcoll_id not in self.execution_context.pcoll_buffers: self.execution_context.pcoll_buffers[pcoll_id] = ListBuffer( coder_impl=coder.get_impl()) - data_input[transform.unique_name] = \ - self.execution_context.pcoll_buffers[pcoll_id] + data_input[transform.unique_name] = ( + self.execution_context.pcoll_buffers[pcoll_id]) elif transform.spec.urn == bundle_processor.DATA_OUTPUT_URN: data_output[transform.unique_name] = pcoll_id coder_id = self.execution_context.data_channel_coders[only_element( @@ -470,8 +468,7 @@ def extract_bundle_inputs(self): else: raise NotImplementedError data_spec = beam_fn_api_pb2.RemoteGrpcPort(coder_id=coder_id) - data_api_service_descriptor = \ - self.data_api_service_descriptor() + data_api_service_descriptor = self.data_api_service_descriptor() if data_api_service_descriptor: data_spec.api_service_descriptor.url = ( data_api_service_descriptor.url)