diff --git a/sdks/python/apache_beam/runners/worker/opcounters.py b/sdks/python/apache_beam/runners/worker/opcounters.py index 66a369ba3bbb..6f5bb60762cf 100644 --- a/sdks/python/apache_beam/runners/worker/opcounters.py +++ b/sdks/python/apache_beam/runners/worker/opcounters.py @@ -217,6 +217,8 @@ def update_from_batch(self, windowed_batch): batch_length = self.producer_batch_converter.get_length( windowed_batch.values) self.element_counter.update(batch_length) + if batch_length == 0: + return mean_element_size = self.producer_batch_converter.estimate_byte_size( windowed_batch.values) / batch_length diff --git a/sdks/python/apache_beam/runners/worker/opcounters_test.py b/sdks/python/apache_beam/runners/worker/opcounters_test.py index 83b345f47c9c..aef66a725f95 100644 --- a/sdks/python/apache_beam/runners/worker/opcounters_test.py +++ b/sdks/python/apache_beam/runners/worker/opcounters_test.py @@ -184,6 +184,22 @@ def test_update_batch(self): self.verify_counters(opcounts, 200, size_per_element) + def test_update_empty_batch(self): + opcounts = OperationCounters( + CounterFactory(), + 'some-name', + coders.FastPrimitivesCoder(), + 0, + producer_batch_converter=typehints.batch.BatchConverter.from_typehints( + element_type=typehints.Any, + batch_type=typehints.List[typehints.Any])) + + self.verify_counters(opcounts, 0, math.nan) + + opcounts.update_from_batch(GlobalWindows.windowed_batch([])) + + self.verify_counters(opcounts, 0, math.nan) + def test_should_sample(self): # Order of magnitude more buckets than highest constant in code under test. buckets = [0] * 300