Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from collections import defaultdict
from typing import DefaultDict

from loguru import logger

from proto.org.apache.texera.amber.core import PortIdentity
from proto.org.apache.texera.amber.engine.architecture.worker import (
WorkerStatistics,
Expand Down Expand Up @@ -53,9 +55,12 @@ def get_statistics(self) -> WorkerStatistics:
],
self._data_processing_time,
self._control_processing_time,
self._total_execution_time
- self._data_processing_time
- self._control_processing_time,
max(
0,
self._total_execution_time
- self._data_processing_time
- self._control_processing_time,
),
)

def increase_input_statistics(self, port_id: PortIdentity, size: int) -> None:
Expand Down Expand Up @@ -85,7 +90,23 @@ def update_total_execution_time(self, time: int) -> None:
raise ValueError(
"Current time must be greater than or equal to worker start time"
)
self._total_execution_time = time - self._worker_start_time
new_total = time - self._worker_start_time
if new_total < self._total_execution_time:
logger.warning(
f"update_total_execution_time called with non-monotonic time: "
f"new total {new_total}ns < current total {self._total_execution_time}ns. "
"Clock skew or out-of-order call detected."
)
processing_total = self._data_processing_time + self._control_processing_time
if new_total < processing_total:
logger.warning(
f"idle_time drift: total_execution_time ({new_total}ns) < "
f"data ({self._data_processing_time}ns) + control "
f"({self._control_processing_time}ns). "
"update_total_execution_time should be called after increase_*_processing_time "
"with the same end timestamp. idle_time will be clamped to 0."
)
self._total_execution_time = new_total

def initialize_worker_start_time(self, time: int) -> None:
# Set the worker start time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,31 +135,13 @@ def test_total_execution_time_before_start_raises(self):
):
mgr.update_total_execution_time(999)

def test_idle_time_can_go_negative_when_processing_exceeds_total(self):
# Pin a real-but-questionable behavior: get_statistics computes
# idle_time = total_execution - data - control with NO clamp. If
# instrumentation overcounts (or update_total_execution_time was
# called early), idle goes negative. Filed as a Bug — see the
# accompanying issue. A future fix that floors at 0 must also
# update this test deliberately.
def test_idle_time_clamped_to_zero_when_processing_overshoots(self):
# When data+control exceed total_execution_time (e.g. update_total was
# called before all increase_* calls for that interval), idle_time is
# clamped to 0 and a warning is logged. It must never be negative.
mgr = StatisticsManager()
mgr.initialize_worker_start_time(1_000)
mgr.update_total_execution_time(1_100) # 100ns total
mgr.increase_data_processing_time(80)
mgr.increase_control_processing_time(50) # 130 > 100 total
stats = mgr.get_statistics()
assert stats.idle_time == -30

@pytest.mark.xfail(
strict=True,
reason="Bug: idle_time goes negative when data+control processing time "
"overshoots total_execution_time. The fix should floor at 0 (or surface "
"the inconsistency); flips to XPASS when corrected.",
)
def test_idle_time_should_never_be_negative(self):
mgr = StatisticsManager()
mgr.initialize_worker_start_time(1_000)
mgr.update_total_execution_time(1_100)
mgr.increase_data_processing_time(80)
mgr.increase_control_processing_time(50)
assert mgr.get_statistics().idle_time >= 0
mgr.increase_control_processing_time(50) # 130 > 100
assert mgr.get_statistics().idle_time == 0
Loading