Skip to content

Commit

Permalink
Remove backslashes
Browse files Browse the repository at this point in the history
  • Loading branch information
pabloem committed Apr 7, 2020
1 parent 32e8965 commit f19de4f
Showing 1 changed file with 7 additions and 10 deletions.
Expand Up @@ -324,8 +324,8 @@ def commit_side_inputs_to_state(
data_side_input, # type: DataSideInput
):
# type: (...) -> None
for (consuming_transform_id, tag), (buffer_id, func_spec) \
in data_side_input.items():
for (consuming_transform_id,
tag), (buffer_id, func_spec) in data_side_input.items():
_, pcoll_id = split_buffer_id(buffer_id)
value_coder = self.pipeline_context.coders[self.safe_coders[
self.data_channel_coders[pcoll_id]]]
Expand All @@ -343,8 +343,7 @@ def commit_side_inputs_to_state(
transform_id=consuming_transform_id,
side_input_id=tag,
window=window))
self.worker_handler_manager.state_servicer \
.append_raw(state_key, elements_data)
self.state_servicer.append_raw(state_key, elements_data)
elif func_spec.urn == common_urns.side_inputs.MULTIMAP.urn:
for key, window, elements_data in elements_by_window.encoded_items():
state_key = beam_fn_api_pb2.StateKey(
Expand All @@ -353,8 +352,7 @@ def commit_side_inputs_to_state(
side_input_id=tag,
window=window,
key=key))
self.worker_handler_manager.state_servicer \
.append_raw(state_key, elements_data)
self.state_servicer.append_raw(state_key, elements_data)
else:
raise ValueError("Unknown access pattern: '%s'" % func_spec.urn)

Expand Down Expand Up @@ -461,17 +459,16 @@ def extract_bundle_inputs(self):
if pcoll_id not in self.execution_context.pcoll_buffers:
self.execution_context.pcoll_buffers[pcoll_id] = ListBuffer(
coder_impl=coder.get_impl())
data_input[transform.unique_name] = \
self.execution_context.pcoll_buffers[pcoll_id]
data_input[transform.unique_name] = (
self.execution_context.pcoll_buffers[pcoll_id])
elif transform.spec.urn == bundle_processor.DATA_OUTPUT_URN:
data_output[transform.unique_name] = pcoll_id
coder_id = self.execution_context.data_channel_coders[only_element(
transform.inputs.values())]
else:
raise NotImplementedError
data_spec = beam_fn_api_pb2.RemoteGrpcPort(coder_id=coder_id)
data_api_service_descriptor = \
self.data_api_service_descriptor()
data_api_service_descriptor = self.data_api_service_descriptor()
if data_api_service_descriptor:
data_spec.api_service_descriptor.url = (
data_api_service_descriptor.url)
Expand Down

0 comments on commit f19de4f

Please sign in to comment.