diff --git a/packages/data-designer-engine/src/data_designer/engine/models/request_admission/controller.py b/packages/data-designer-engine/src/data_designer/engine/models/request_admission/controller.py index 56071ed9b..8494d7d69 100644 --- a/packages/data-designer-engine/src/data_designer/engine/models/request_admission/controller.py +++ b/packages/data-designer-engine/src/data_designer/engine/models/request_admission/controller.py @@ -388,9 +388,15 @@ def release(self, lease: RequestAdmissionLease, outcome: RequestReleaseOutcome) self._sequence += 1 result = ReleaseResult(released=True, reason="released") if outcome.kind == "rate_limited": - events.append(self._request_event_locked("request_rate_limited", item=lease.item, lease=lease)) + events.append(self._request_event_locked("request_rate_limited", item=active.item, lease=active)) events.append( - self._request_event_locked("request_lease_released", item=lease.item, lease=lease, result=result) + self._request_event_locked( + "request_lease_released", + item=active.item, + lease=active, + result=result, + outcome=outcome, + ) ) self._admit_waiters_locked(events) self._condition.notify_all() @@ -579,10 +585,8 @@ def _apply_outcome( state.current_limit = max( 1, math.floor(state.current_limit * self._config.multiplicative_decrease_factor) ) - observed_limit = max(1, admitted_adaptive_limit) - state.rate_limit_ceiling = ( - observed_limit if state.rate_limit_ceiling == 0 else min(state.rate_limit_ceiling, observed_limit) - ) + if state.rate_limit_ceiling == 0: + state.rate_limit_ceiling = max(1, admitted_adaptive_limit) if state.current_limit != prev_limit: events.append( self._request_event_locked( @@ -696,6 +700,7 @@ def _request_event_locked( lease: RequestAdmissionLease | None = None, decision: RequestAdmissionDenied | None = None, result: ReleaseResult | None = None, + outcome: RequestReleaseOutcome | None = None, request_resource_key: RequestResourceKey | None = None, diagnostics: Mapping[str, object] | None = None, ) -> RequestAdmissionEvent: @@ -706,6 +711,8 @@ def _request_event_locked( reason_or_outcome = None if decision is not None: reason_or_outcome = decision.reason + elif outcome is not None: + reason_or_outcome = outcome.kind elif result is not None: reason_or_outcome = result.reason return RequestAdmissionEvent.capture( diff --git a/packages/data-designer-engine/tests/engine/models/request_admission/test_controller.py b/packages/data-designer-engine/tests/engine/models/request_admission/test_controller.py index 7de678c3e..c0c9a5801 100644 --- a/packages/data-designer-engine/tests/engine/models/request_admission/test_controller.py +++ b/packages/data-designer-engine/tests/engine/models/request_admission/test_controller.py @@ -7,7 +7,6 @@ import logging import threading import time -from dataclasses import replace import pytest @@ -26,6 +25,7 @@ RequestGroupSpec, RequestResourceKey, ) +from data_designer.engine.observability import InMemoryAdmissionEventSink def _item(domain: RequestDomain = RequestDomain.CHAT, timeout: float | None = None) -> RequestAdmissionItem: @@ -94,7 +94,14 @@ def test_request_admission_stale_release_requires_exact_lease() -> None: item = _item() lease = controller.try_acquire(item) assert isinstance(lease, RequestAdmissionLease) - stale = replace(lease, current_adaptive_limit=lease.current_adaptive_limit + 1) + stale = RequestAdmissionLease( + lease_id=lease.lease_id, + item=lease.item, + acquired_at=lease.acquired_at, + current_adaptive_limit=lease.current_adaptive_limit + 1, + effective_max=lease.effective_max, + controller_generation=lease.controller_generation, + ) stale_result = controller.release(stale, RequestReleaseOutcome(kind="provider_failure")) snapshot = controller.pressure.snapshot(item.resource) @@ -184,7 +191,7 @@ def test_request_admission_fresh_rate_limit_after_burst_decreases_again() -> Non assert snapshot is not None assert snapshot.current_limit == 2 - assert snapshot.rate_limit_ceiling == 4 + assert snapshot.rate_limit_ceiling == 8 assert snapshot.consecutive_rate_limits == 9 @@ -255,6 +262,21 @@ def test_request_admission_logs_sink_failures(caplog: pytest.LogCaptureFixture) assert "Request admission event sink raised; dropping event." in caplog.text +def test_request_lease_released_event_records_release_outcome() -> None: + sink = InMemoryAdmissionEventSink() + controller = AdaptiveRequestAdmissionController(event_sink=sink) + controller.register(provider_name="nvidia", model_id="nemotron", alias="default", max_parallel_requests=1) + item = _item() + lease = controller.try_acquire(item) + assert isinstance(lease, RequestAdmissionLease) + + controller.release(lease, RequestReleaseOutcome(kind="provider_failure")) + + release_events = [event for event in sink.request_events if event.event_kind == "request_lease_released"] + assert release_events + assert release_events[-1].reason_or_outcome == "provider_failure" + + @pytest.mark.asyncio(loop_scope="session") async def test_acquire_sync_rejects_running_event_loop() -> None: controller = _controller(cap=1)