Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ def _default_config() -> ModelConfig:
{"gemini-free", "groq-free", "grok-free"}
)

# BUG-029 Part B: number of consecutive empty-prose responses from a local
# provider (when chain-drained) before raising AllProvidersExhaustedError.
# BUG-029/BUG-036: legacy constructor default retained for callers that still
# pass chain_drain_empty_threshold; blank provider text now fails immediately.
_CHAIN_DRAIN_EMPTY_THRESHOLD: int = 2


Expand All @@ -90,9 +90,9 @@ class ProviderRouter:
quota : QuotaTracker | None
Shared quota tracker. A default is created if not supplied.
chain_drain_empty_threshold : int
Consecutive empty-prose responses from a local provider (when all
API providers are in cooldown) before raising
AllProvidersExhaustedError. Default: 2.
Deprecated compatibility parameter. Blank provider text is treated
as provider failure immediately so graph nodes never receive empty
LLM output.
"""

def __init__(
Expand All @@ -103,6 +103,7 @@ def __init__(
) -> None:
self._providers: dict[str, BaseProvider] = providers or {}
self._quota = quota or QuotaTracker()
# Kept for constructor compatibility with BUG-029-era callers.
self._chain_drain_empty_threshold = chain_drain_empty_threshold
# {provider_name: consecutive_empty_count} — reset on non-empty response.
self._consecutive_empty: dict[str, int] = {}
Expand Down Expand Up @@ -320,7 +321,6 @@ async def call(
logger.info("Trying provider %s for role=%s", provider_name, role)
try:
resp = await provider.complete(prompt, system, cfg)
self._quota.record_success(provider_name)
except ProviderUnavailableError as exc:
self._quota.cooldown(provider_name, COOLDOWN_UNAVAILABLE)
logger.warning(
Expand Down Expand Up @@ -367,29 +367,28 @@ async def call(
))
continue

# Successful call — apply BUG-029 Part B: track consecutive empty
# responses from local providers when chain-drained.
is_local = provider_name in _LOCAL_PROVIDERS
response_empty = not (resp.text or "").strip()
if is_local and response_empty:
if response_empty:
count = self._consecutive_empty.get(provider_name, 0) + 1
self._consecutive_empty[provider_name] = count
drained = self._quota.all_api_providers_in_cooldown(
chain, local_providers=_LOCAL_PROVIDERS
self._quota.cooldown(provider_name, COOLDOWN_OTHER)
logger.warning(
"Provider %s returned empty response x%d; trying fallback",
provider_name, count,
)
if drained and count >= self._chain_drain_empty_threshold:
logger.warning(
"CHAIN_DRAINED + %s empty x%d: raising "
"AllProvidersExhaustedError to force backoff (BUG-029)",
provider_name, count,
)
raise AllProvidersExhaustedError(
f"Chain drained (all API providers in cooldown) and "
f"{provider_name!r} returned empty prose {count} consecutive "
f"time(s). Daemon should back off rather than commit empty output."
)
attempts.append(ProviderAttemptDiagnostic(
provider=provider_name,
status="failed",
skip_class="empty_response",
detail=(
f"provider returned empty response {count} consecutive "
"time(s)"
),
))
continue
else:
self._consecutive_empty.pop(provider_name, None)
self._quota.record_success(provider_name)
return resp

# All providers exhausted.
Expand Down Expand Up @@ -430,8 +429,16 @@ async def call(
pinned_writer=pin_writer if is_pinned_writer else None,
allowlist=allowlist,
)
empty_providers = [
attempt.provider for attempt in attempts
if attempt.skip_class == "empty_response"
]
empty_context = (
f" empty response from provider(s): {', '.join(empty_providers)}."
if empty_providers else ""
)
raise AllProvidersExhaustedError(
f"All providers exhausted for role={role}. "
f"All providers exhausted for role={role}.{empty_context} "
"Daemon should retry with backoff.",
attempts=attempts,
chain_state=chain_state,
Expand Down
88 changes: 39 additions & 49 deletions tests/test_provider_router_bug029.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Tests for BUG-029: thundering-herd chain-drain detection + backoff.

Part A: QuotaTracker.all_api_providers_in_cooldown + get_status exposure.
Part B: ProviderRouter raises AllProvidersExhaustedError when chain-drained
and local provider returns empty prose N consecutive times.
Part B: ProviderRouter raises AllProvidersExhaustedError or falls through when
any provider returns empty prose, so blank text never reaches graph
node state as a successful LLM response.
"""
from __future__ import annotations

Expand Down Expand Up @@ -117,79 +118,68 @@ def test_normal_local_response_returned_without_raise(self):
resp = _run(router.call("writer", "p", "s"))
assert resp.text == "real content"

def test_first_empty_local_does_not_raise(self):
"""First empty response: threshold=2, no raise yet."""
def test_first_empty_local_raises(self):
"""A blank provider response is not a successful completion."""
router, _, _ = _router_with_local(local_text="", threshold=2)
resp = _run(router.call("writer", "p", "s"))
assert resp.text == ""
with pytest.raises(AllProvidersExhaustedError, match="empty response"):
_run(router.call("writer", "p", "s"))

def test_second_empty_local_raises_when_chain_drained(self):
"""Second consecutive empty from local when all APIs in cooldown raises."""
def test_empty_local_raises_when_chain_drained(self):
"""Empty local output when all APIs are in cooldown forces backoff."""
router, _, _ = _router_with_local(local_text="", threshold=2)
_run(router.call("writer", "p", "s")) # first empty — no raise
with pytest.raises(AllProvidersExhaustedError, match="empty prose"):
with pytest.raises(AllProvidersExhaustedError, match="empty response"):
_run(router.call("writer", "p", "s"))

def test_raise_message_includes_provider_name_and_count(self):
router, _, _ = _router_with_local(local_text="", threshold=2)
_run(router.call("writer", "p", "s"))
with pytest.raises(AllProvidersExhaustedError) as exc_info:
_run(router.call("writer", "p", "s"))
msg = str(exc_info.value)
assert "ollama-local" in msg
assert "2" in msg
assert "empty response" in msg

def test_empty_counter_resets_on_non_empty_response(self):
"""After a non-empty response, the counter resets; no raise on next empty."""
quota = QuotaTracker()
call_num = 0
def test_empty_response_puts_provider_in_cooldown(self):
"""A blank response is treated as provider failure for the next call."""
router, quota, _ = _router_with_local(local_text="", threshold=2)

class _AlternatingProvider(BaseProvider):
name = "ollama-local"
family = "fake"
with pytest.raises(AllProvidersExhaustedError):
_run(router.call("writer", "p", "s"))

async def complete(self, prompt, system, config):
nonlocal call_num
call_num += 1
text = "" if call_num % 2 == 1 else "content"
return ProviderResponse(
text=text, provider="ollama-local",
model="fake", family="fake", latency_ms=0.0,
)
assert quota.cooldown_remaining("ollama-local") > 0

api_chain = [p for p in FALLBACK_CHAINS["writer"] if p != "ollama-local"]
for p in api_chain:
quota.cooldown(p, 120)
def test_threshold_1_raises_on_first_empty(self):
router, _, _ = _router_with_local(local_text="", threshold=1)
with pytest.raises(AllProvidersExhaustedError):
_run(router.call("writer", "p", "s"))

def test_empty_api_provider_falls_through_to_next_provider(self):
"""Empty API output is a provider failure, not graph node output."""
quota = QuotaTracker()
local = _FakeProvider("ollama-local", text="local content")
api = _FakeProvider("claude-code", text="")
router = ProviderRouter(
providers={"ollama-local": _AlternatingProvider()},
providers={"claude-code": api, "ollama-local": local},
quota=quota,
chain_drain_empty_threshold=2,
)
_run(router.call("writer", "p", "s")) # empty (count=1)
_run(router.call("writer", "p", "s")) # content (reset)
# Next empty is count=1 again — no raise
# claude-code is available and tried first, but an empty response must
# fall through instead of becoming an EmptyResponseError in graph state.
resp = _run(router.call("writer", "p", "s"))
assert resp.text == ""
assert resp.text == "local content"
assert api.call_count == 1
assert local.call_count == 1

def test_threshold_1_raises_on_first_empty(self):
router, _, _ = _router_with_local(local_text="", threshold=1)
with pytest.raises(AllProvidersExhaustedError):
_run(router.call("writer", "p", "s"))

def test_no_raise_when_api_provider_available(self):
"""Empty local response does NOT raise when an API provider is available."""
def test_all_empty_providers_raise_with_attempt_diagnostic(self):
quota = QuotaTracker()
local = _FakeProvider("ollama-local", text="")
api = _FakeProvider("claude-code", text="")
local = _FakeProvider("ollama-local", text="")
router = ProviderRouter(
providers={"claude-code": api, "ollama-local": local},
quota=quota,
chain_drain_empty_threshold=2,
)
# claude-code is available (not in cooldown). It should be tried first.
# Even if it returns empty, chain is NOT drained, so no raise.
resp = _run(router.call("writer", "p", "s"))
assert resp.text == ""
resp = _run(router.call("writer", "p", "s"))
assert resp.text == ""

with pytest.raises(AllProvidersExhaustedError) as exc_info:
_run(router.call("writer", "p", "s"))

assert "empty response" in str(exc_info.value)
55 changes: 31 additions & 24 deletions workflow/providers/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ def _default_config() -> ModelConfig:
{"gemini-free", "groq-free", "grok-free"}
)

# BUG-029 Part B: number of consecutive empty-prose responses from a local
# provider (when chain-drained) before raising AllProvidersExhaustedError.
# BUG-029/BUG-036: legacy constructor default retained for callers that still
# pass chain_drain_empty_threshold; blank provider text now fails immediately.
_CHAIN_DRAIN_EMPTY_THRESHOLD: int = 2


Expand All @@ -90,9 +90,9 @@ class ProviderRouter:
quota : QuotaTracker | None
Shared quota tracker. A default is created if not supplied.
chain_drain_empty_threshold : int
Consecutive empty-prose responses from a local provider (when all
API providers are in cooldown) before raising
AllProvidersExhaustedError. Default: 2.
Deprecated compatibility parameter. Blank provider text is treated
as provider failure immediately so graph nodes never receive empty
LLM output.
"""

def __init__(
Expand All @@ -103,6 +103,7 @@ def __init__(
) -> None:
self._providers: dict[str, BaseProvider] = providers or {}
self._quota = quota or QuotaTracker()
# Kept for constructor compatibility with BUG-029-era callers.
self._chain_drain_empty_threshold = chain_drain_empty_threshold
# {provider_name: consecutive_empty_count} — reset on non-empty response.
self._consecutive_empty: dict[str, int] = {}
Expand Down Expand Up @@ -320,7 +321,6 @@ async def call(
logger.info("Trying provider %s for role=%s", provider_name, role)
try:
resp = await provider.complete(prompt, system, cfg)
self._quota.record_success(provider_name)
except ProviderUnavailableError as exc:
self._quota.cooldown(provider_name, COOLDOWN_UNAVAILABLE)
logger.warning(
Expand Down Expand Up @@ -367,29 +367,28 @@ async def call(
))
continue

# Successful call — apply BUG-029 Part B: track consecutive empty
# responses from local providers when chain-drained.
is_local = provider_name in _LOCAL_PROVIDERS
response_empty = not (resp.text or "").strip()
if is_local and response_empty:
if response_empty:
count = self._consecutive_empty.get(provider_name, 0) + 1
self._consecutive_empty[provider_name] = count
drained = self._quota.all_api_providers_in_cooldown(
chain, local_providers=_LOCAL_PROVIDERS
self._quota.cooldown(provider_name, COOLDOWN_OTHER)
logger.warning(
"Provider %s returned empty response x%d; trying fallback",
provider_name, count,
)
if drained and count >= self._chain_drain_empty_threshold:
logger.warning(
"CHAIN_DRAINED + %s empty x%d: raising "
"AllProvidersExhaustedError to force backoff (BUG-029)",
provider_name, count,
)
raise AllProvidersExhaustedError(
f"Chain drained (all API providers in cooldown) and "
f"{provider_name!r} returned empty prose {count} consecutive "
f"time(s). Daemon should back off rather than commit empty output."
)
attempts.append(ProviderAttemptDiagnostic(
provider=provider_name,
status="failed",
skip_class="empty_response",
detail=(
f"provider returned empty response {count} consecutive "
"time(s)"
),
))
continue
else:
self._consecutive_empty.pop(provider_name, None)
self._quota.record_success(provider_name)
return resp

# All providers exhausted.
Expand Down Expand Up @@ -430,8 +429,16 @@ async def call(
pinned_writer=pin_writer if is_pinned_writer else None,
allowlist=allowlist,
)
empty_providers = [
attempt.provider for attempt in attempts
if attempt.skip_class == "empty_response"
]
empty_context = (
f" empty response from provider(s): {', '.join(empty_providers)}."
if empty_providers else ""
)
raise AllProvidersExhaustedError(
f"All providers exhausted for role={role}. "
f"All providers exhausted for role={role}.{empty_context} "
"Daemon should retry with backoff.",
attempts=attempts,
chain_state=chain_state,
Expand Down
Loading