From 3c01687c981eacf9a14a26d9e63e36543bf453f0 Mon Sep 17 00:00:00 2001 From: goutamadwant Date: Fri, 29 May 2026 19:54:08 -0700 Subject: [PATCH 1/2] Handle empty batches in Python worker counters --- .../apache_beam/runners/worker/opcounters.py | 2 ++ .../runners/worker/opcounters_test.py | 16 ++++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/sdks/python/apache_beam/runners/worker/opcounters.py b/sdks/python/apache_beam/runners/worker/opcounters.py index 66a369ba3bbb..6f5bb60762cf 100644 --- a/sdks/python/apache_beam/runners/worker/opcounters.py +++ b/sdks/python/apache_beam/runners/worker/opcounters.py @@ -217,6 +217,8 @@ def update_from_batch(self, windowed_batch): batch_length = self.producer_batch_converter.get_length( windowed_batch.values) self.element_counter.update(batch_length) + if batch_length == 0: + return mean_element_size = self.producer_batch_converter.estimate_byte_size( windowed_batch.values) / batch_length diff --git a/sdks/python/apache_beam/runners/worker/opcounters_test.py b/sdks/python/apache_beam/runners/worker/opcounters_test.py index 83b345f47c9c..dca59e39905b 100644 --- a/sdks/python/apache_beam/runners/worker/opcounters_test.py +++ b/sdks/python/apache_beam/runners/worker/opcounters_test.py @@ -184,6 +184,22 @@ def test_update_batch(self): self.verify_counters(opcounts, 200, size_per_element) + def test_update_empty_batch(self): + opcounts = OperationCounters( + CounterFactory(), + 'some-name', + coders.FastPrimitivesCoder(), + 0, + producer_batch_converter=typehints.batch.BatchConverter.from_typehints( + element_type=typehints.Any, + batch_type=typehints.List[typehints.Any])) + + self.verify_counters(opcounts, 0, float('nan')) + + opcounts.update_from_batch(GlobalWindows.windowed_batch([])) + + self.verify_counters(opcounts, 0, float('nan')) + def test_should_sample(self): # Order of magnitude more buckets than highest constant in code under test. buckets = [0] * 300 From 8132e0cb8ea853e90d2d460a3a861ac0a08213cb Mon Sep 17 00:00:00 2001 From: Goutam Adwant <8672451+goutamadwant@users.noreply.github.com> Date: Fri, 29 May 2026 20:09:18 -0700 Subject: [PATCH 2/2] Apply suggestion from @gemini-code-assist[bot] Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- sdks/python/apache_beam/runners/worker/opcounters_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/opcounters_test.py b/sdks/python/apache_beam/runners/worker/opcounters_test.py index dca59e39905b..aef66a725f95 100644 --- a/sdks/python/apache_beam/runners/worker/opcounters_test.py +++ b/sdks/python/apache_beam/runners/worker/opcounters_test.py @@ -194,11 +194,11 @@ def test_update_empty_batch(self): element_type=typehints.Any, batch_type=typehints.List[typehints.Any])) - self.verify_counters(opcounts, 0, float('nan')) + self.verify_counters(opcounts, 0, math.nan) opcounts.update_from_batch(GlobalWindows.windowed_batch([])) - self.verify_counters(opcounts, 0, float('nan')) + self.verify_counters(opcounts, 0, math.nan) def test_should_sample(self): # Order of magnitude more buckets than highest constant in code under test.