diff --git a/amber/src/main/python/core/architecture/managers/statistics_manager.py b/amber/src/main/python/core/architecture/managers/statistics_manager.py index 6b36b78e577..8151ca3bf1c 100644 --- a/amber/src/main/python/core/architecture/managers/statistics_manager.py +++ b/amber/src/main/python/core/architecture/managers/statistics_manager.py @@ -18,6 +18,8 @@ from collections import defaultdict from typing import DefaultDict +from loguru import logger + from proto.org.apache.texera.amber.core import PortIdentity from proto.org.apache.texera.amber.engine.architecture.worker import ( WorkerStatistics, @@ -53,9 +55,12 @@ def get_statistics(self) -> WorkerStatistics: ], self._data_processing_time, self._control_processing_time, - self._total_execution_time - - self._data_processing_time - - self._control_processing_time, + max( + 0, + self._total_execution_time + - self._data_processing_time + - self._control_processing_time, + ), ) def increase_input_statistics(self, port_id: PortIdentity, size: int) -> None: @@ -85,7 +90,23 @@ def update_total_execution_time(self, time: int) -> None: raise ValueError( "Current time must be greater than or equal to worker start time" ) - self._total_execution_time = time - self._worker_start_time + new_total = time - self._worker_start_time + if new_total < self._total_execution_time: + logger.warning( + f"update_total_execution_time called with non-monotonic time: " + f"new total {new_total}ns < current total {self._total_execution_time}ns. " + "Clock skew or out-of-order call detected." + ) + processing_total = self._data_processing_time + self._control_processing_time + if new_total < processing_total: + logger.warning( + f"idle_time drift: total_execution_time ({new_total}ns) < " + f"data ({self._data_processing_time}ns) + control " + f"({self._control_processing_time}ns). " + "update_total_execution_time should be called after increase_*_processing_time " + "with the same end timestamp. idle_time will be clamped to 0." + ) + self._total_execution_time = new_total def initialize_worker_start_time(self, time: int) -> None: # Set the worker start time diff --git a/amber/src/main/python/core/architecture/managers/test_statistics_manager.py b/amber/src/main/python/core/architecture/managers/test_statistics_manager.py index 26d1a3ec648..5abf7a36b45 100644 --- a/amber/src/main/python/core/architecture/managers/test_statistics_manager.py +++ b/amber/src/main/python/core/architecture/managers/test_statistics_manager.py @@ -135,31 +135,13 @@ def test_total_execution_time_before_start_raises(self): ): mgr.update_total_execution_time(999) - def test_idle_time_can_go_negative_when_processing_exceeds_total(self): - # Pin a real-but-questionable behavior: get_statistics computes - # idle_time = total_execution - data - control with NO clamp. If - # instrumentation overcounts (or update_total_execution_time was - # called early), idle goes negative. Filed as a Bug — see the - # accompanying issue. A future fix that floors at 0 must also - # update this test deliberately. + def test_idle_time_clamped_to_zero_when_processing_overshoots(self): + # When data+control exceed total_execution_time (e.g. update_total was + # called before all increase_* calls for that interval), idle_time is + # clamped to 0 and a warning is logged. It must never be negative. mgr = StatisticsManager() mgr.initialize_worker_start_time(1_000) mgr.update_total_execution_time(1_100) # 100ns total mgr.increase_data_processing_time(80) - mgr.increase_control_processing_time(50) # 130 > 100 total - stats = mgr.get_statistics() - assert stats.idle_time == -30 - - @pytest.mark.xfail( - strict=True, - reason="Bug: idle_time goes negative when data+control processing time " - "overshoots total_execution_time. The fix should floor at 0 (or surface " - "the inconsistency); flips to XPASS when corrected.", - ) - def test_idle_time_should_never_be_negative(self): - mgr = StatisticsManager() - mgr.initialize_worker_start_time(1_000) - mgr.update_total_execution_time(1_100) - mgr.increase_data_processing_time(80) - mgr.increase_control_processing_time(50) - assert mgr.get_statistics().idle_time >= 0 + mgr.increase_control_processing_time(50) # 130 > 100 + assert mgr.get_statistics().idle_time == 0