Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,15 @@ def release(self, lease: RequestAdmissionLease, outcome: RequestReleaseOutcome)
self._sequence += 1
result = ReleaseResult(released=True, reason="released")
if outcome.kind == "rate_limited":
events.append(self._request_event_locked("request_rate_limited", item=lease.item, lease=lease))
events.append(self._request_event_locked("request_rate_limited", item=active.item, lease=active))
events.append(
self._request_event_locked("request_lease_released", item=lease.item, lease=lease, result=result)
self._request_event_locked(
"request_lease_released",
item=active.item,
lease=active,
result=result,
outcome=outcome,
)
)
self._admit_waiters_locked(events)
self._condition.notify_all()
Expand Down Expand Up @@ -579,10 +585,8 @@ def _apply_outcome(
state.current_limit = max(
1, math.floor(state.current_limit * self._config.multiplicative_decrease_factor)
)
observed_limit = max(1, admitted_adaptive_limit)
state.rate_limit_ceiling = (
observed_limit if state.rate_limit_ceiling == 0 else min(state.rate_limit_ceiling, observed_limit)
)
if state.rate_limit_ceiling == 0:
state.rate_limit_ceiling = max(1, admitted_adaptive_limit)
if state.current_limit != prev_limit:
events.append(
self._request_event_locked(
Expand Down Expand Up @@ -696,6 +700,7 @@ def _request_event_locked(
lease: RequestAdmissionLease | None = None,
decision: RequestAdmissionDenied | None = None,
result: ReleaseResult | None = None,
outcome: RequestReleaseOutcome | None = None,
request_resource_key: RequestResourceKey | None = None,
diagnostics: Mapping[str, object] | None = None,
) -> RequestAdmissionEvent:
Expand All @@ -706,6 +711,8 @@ def _request_event_locked(
reason_or_outcome = None
if decision is not None:
reason_or_outcome = decision.reason
elif outcome is not None:
reason_or_outcome = outcome.kind
elif result is not None:
reason_or_outcome = result.reason
return RequestAdmissionEvent.capture(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import logging
import threading
import time
from dataclasses import replace

import pytest

Expand All @@ -26,6 +25,7 @@
RequestGroupSpec,
RequestResourceKey,
)
from data_designer.engine.observability import InMemoryAdmissionEventSink


def _item(domain: RequestDomain = RequestDomain.CHAT, timeout: float | None = None) -> RequestAdmissionItem:
Expand Down Expand Up @@ -94,7 +94,14 @@ def test_request_admission_stale_release_requires_exact_lease() -> None:
item = _item()
lease = controller.try_acquire(item)
assert isinstance(lease, RequestAdmissionLease)
stale = replace(lease, current_adaptive_limit=lease.current_adaptive_limit + 1)
stale = RequestAdmissionLease(
lease_id=lease.lease_id,
item=lease.item,
acquired_at=lease.acquired_at,
current_adaptive_limit=lease.current_adaptive_limit + 1,
effective_max=lease.effective_max,
controller_generation=lease.controller_generation,
)

stale_result = controller.release(stale, RequestReleaseOutcome(kind="provider_failure"))
snapshot = controller.pressure.snapshot(item.resource)
Expand Down Expand Up @@ -184,7 +191,7 @@ def test_request_admission_fresh_rate_limit_after_burst_decreases_again() -> Non

assert snapshot is not None
assert snapshot.current_limit == 2
assert snapshot.rate_limit_ceiling == 4
assert snapshot.rate_limit_ceiling == 8
assert snapshot.consecutive_rate_limits == 9


Expand Down Expand Up @@ -255,6 +262,21 @@ def test_request_admission_logs_sink_failures(caplog: pytest.LogCaptureFixture)
assert "Request admission event sink raised; dropping event." in caplog.text


def test_request_lease_released_event_records_release_outcome() -> None:
sink = InMemoryAdmissionEventSink()
controller = AdaptiveRequestAdmissionController(event_sink=sink)
controller.register(provider_name="nvidia", model_id="nemotron", alias="default", max_parallel_requests=1)
item = _item()
lease = controller.try_acquire(item)
assert isinstance(lease, RequestAdmissionLease)

controller.release(lease, RequestReleaseOutcome(kind="provider_failure"))

release_events = [event for event in sink.request_events if event.event_kind == "request_lease_released"]
assert release_events
assert release_events[-1].reason_or_outcome == "provider_failure"


@pytest.mark.asyncio(loop_scope="session")
async def test_acquire_sync_rejects_running_event_loop() -> None:
controller = _controller(cap=1)
Expand Down
Loading