From ce8e87c028eb90c18319126fec1d6f1d54af8ab4 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 7 Jun 2017 23:00:43 -0700 Subject: [PATCH] Avoid flakiness in data channel for empty streams. As empty stream is used as end-of-stream marker, don't ever send it as the data itself. --- sdks/python/apache_beam/runners/worker/data_plane.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index 5edd0b490750..0cfbb12238d6 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -168,11 +168,13 @@ def input_elements(self, instruction_id, expected_targets): def output_stream(self, instruction_id, target): def add_to_send_queue(data): - self._to_send.put( - beam_fn_api_pb2.Elements.Data( - instruction_reference=instruction_id, - target=target, - data=data)) + if data: + self._to_send.put( + beam_fn_api_pb2.Elements.Data( + instruction_reference=instruction_id, + target=target, + data=data)) + # End of stream marker. self._to_send.put( beam_fn_api_pb2.Elements.Data( instruction_reference=instruction_id,