diff --git a/packaging/claude-plugin/plugins/workflow-universe-server/runtime/workflow/providers/router.py b/packaging/claude-plugin/plugins/workflow-universe-server/runtime/workflow/providers/router.py index 21acac17..df096d20 100644 --- a/packaging/claude-plugin/plugins/workflow-universe-server/runtime/workflow/providers/router.py +++ b/packaging/claude-plugin/plugins/workflow-universe-server/runtime/workflow/providers/router.py @@ -74,8 +74,8 @@ def _default_config() -> ModelConfig: {"gemini-free", "groq-free", "grok-free"} ) -# BUG-029 Part B: number of consecutive empty-prose responses from a local -# provider (when chain-drained) before raising AllProvidersExhaustedError. +# BUG-029/BUG-036: legacy constructor default retained for callers that still +# pass chain_drain_empty_threshold; blank provider text now fails immediately. _CHAIN_DRAIN_EMPTY_THRESHOLD: int = 2 @@ -90,9 +90,9 @@ class ProviderRouter: quota : QuotaTracker | None Shared quota tracker. A default is created if not supplied. chain_drain_empty_threshold : int - Consecutive empty-prose responses from a local provider (when all - API providers are in cooldown) before raising - AllProvidersExhaustedError. Default: 2. + Deprecated compatibility parameter. Blank provider text is treated + as provider failure immediately so graph nodes never receive empty + LLM output. """ def __init__( @@ -103,6 +103,7 @@ def __init__( ) -> None: self._providers: dict[str, BaseProvider] = providers or {} self._quota = quota or QuotaTracker() + # Kept for constructor compatibility with BUG-029-era callers. self._chain_drain_empty_threshold = chain_drain_empty_threshold # {provider_name: consecutive_empty_count} — reset on non-empty response. self._consecutive_empty: dict[str, int] = {} @@ -320,7 +321,6 @@ async def call( logger.info("Trying provider %s for role=%s", provider_name, role) try: resp = await provider.complete(prompt, system, cfg) - self._quota.record_success(provider_name) except ProviderUnavailableError as exc: self._quota.cooldown(provider_name, COOLDOWN_UNAVAILABLE) logger.warning( @@ -367,29 +367,28 @@ async def call( )) continue - # Successful call — apply BUG-029 Part B: track consecutive empty - # responses from local providers when chain-drained. - is_local = provider_name in _LOCAL_PROVIDERS response_empty = not (resp.text or "").strip() - if is_local and response_empty: + if response_empty: count = self._consecutive_empty.get(provider_name, 0) + 1 self._consecutive_empty[provider_name] = count - drained = self._quota.all_api_providers_in_cooldown( - chain, local_providers=_LOCAL_PROVIDERS + self._quota.cooldown(provider_name, COOLDOWN_OTHER) + logger.warning( + "Provider %s returned empty response x%d; trying fallback", + provider_name, count, ) - if drained and count >= self._chain_drain_empty_threshold: - logger.warning( - "CHAIN_DRAINED + %s empty x%d: raising " - "AllProvidersExhaustedError to force backoff (BUG-029)", - provider_name, count, - ) - raise AllProvidersExhaustedError( - f"Chain drained (all API providers in cooldown) and " - f"{provider_name!r} returned empty prose {count} consecutive " - f"time(s). Daemon should back off rather than commit empty output." - ) + attempts.append(ProviderAttemptDiagnostic( + provider=provider_name, + status="failed", + skip_class="empty_response", + detail=( + f"provider returned empty response {count} consecutive " + "time(s)" + ), + )) + continue else: self._consecutive_empty.pop(provider_name, None) + self._quota.record_success(provider_name) return resp # All providers exhausted. @@ -430,8 +429,16 @@ async def call( pinned_writer=pin_writer if is_pinned_writer else None, allowlist=allowlist, ) + empty_providers = [ + attempt.provider for attempt in attempts + if attempt.skip_class == "empty_response" + ] + empty_context = ( + f" empty response from provider(s): {', '.join(empty_providers)}." + if empty_providers else "" + ) raise AllProvidersExhaustedError( - f"All providers exhausted for role={role}. " + f"All providers exhausted for role={role}.{empty_context} " "Daemon should retry with backoff.", attempts=attempts, chain_state=chain_state, diff --git a/tests/test_provider_router_bug029.py b/tests/test_provider_router_bug029.py index 6a671d27..822be173 100644 --- a/tests/test_provider_router_bug029.py +++ b/tests/test_provider_router_bug029.py @@ -1,8 +1,9 @@ """Tests for BUG-029: thundering-herd chain-drain detection + backoff. Part A: QuotaTracker.all_api_providers_in_cooldown + get_status exposure. -Part B: ProviderRouter raises AllProvidersExhaustedError when chain-drained - and local provider returns empty prose N consecutive times. +Part B: ProviderRouter raises AllProvidersExhaustedError or falls through when + any provider returns empty prose, so blank text never reaches graph + node state as a successful LLM response. """ from __future__ import annotations @@ -117,79 +118,68 @@ def test_normal_local_response_returned_without_raise(self): resp = _run(router.call("writer", "p", "s")) assert resp.text == "real content" - def test_first_empty_local_does_not_raise(self): - """First empty response: threshold=2, no raise yet.""" + def test_first_empty_local_raises(self): + """A blank provider response is not a successful completion.""" router, _, _ = _router_with_local(local_text="", threshold=2) - resp = _run(router.call("writer", "p", "s")) - assert resp.text == "" + with pytest.raises(AllProvidersExhaustedError, match="empty response"): + _run(router.call("writer", "p", "s")) - def test_second_empty_local_raises_when_chain_drained(self): - """Second consecutive empty from local when all APIs in cooldown raises.""" + def test_empty_local_raises_when_chain_drained(self): + """Empty local output when all APIs are in cooldown forces backoff.""" router, _, _ = _router_with_local(local_text="", threshold=2) - _run(router.call("writer", "p", "s")) # first empty — no raise - with pytest.raises(AllProvidersExhaustedError, match="empty prose"): + with pytest.raises(AllProvidersExhaustedError, match="empty response"): _run(router.call("writer", "p", "s")) def test_raise_message_includes_provider_name_and_count(self): router, _, _ = _router_with_local(local_text="", threshold=2) - _run(router.call("writer", "p", "s")) with pytest.raises(AllProvidersExhaustedError) as exc_info: _run(router.call("writer", "p", "s")) msg = str(exc_info.value) assert "ollama-local" in msg - assert "2" in msg + assert "empty response" in msg - def test_empty_counter_resets_on_non_empty_response(self): - """After a non-empty response, the counter resets; no raise on next empty.""" - quota = QuotaTracker() - call_num = 0 + def test_empty_response_puts_provider_in_cooldown(self): + """A blank response is treated as provider failure for the next call.""" + router, quota, _ = _router_with_local(local_text="", threshold=2) - class _AlternatingProvider(BaseProvider): - name = "ollama-local" - family = "fake" + with pytest.raises(AllProvidersExhaustedError): + _run(router.call("writer", "p", "s")) - async def complete(self, prompt, system, config): - nonlocal call_num - call_num += 1 - text = "" if call_num % 2 == 1 else "content" - return ProviderResponse( - text=text, provider="ollama-local", - model="fake", family="fake", latency_ms=0.0, - ) + assert quota.cooldown_remaining("ollama-local") > 0 - api_chain = [p for p in FALLBACK_CHAINS["writer"] if p != "ollama-local"] - for p in api_chain: - quota.cooldown(p, 120) + def test_threshold_1_raises_on_first_empty(self): + router, _, _ = _router_with_local(local_text="", threshold=1) + with pytest.raises(AllProvidersExhaustedError): + _run(router.call("writer", "p", "s")) + def test_empty_api_provider_falls_through_to_next_provider(self): + """Empty API output is a provider failure, not graph node output.""" + quota = QuotaTracker() + local = _FakeProvider("ollama-local", text="local content") + api = _FakeProvider("claude-code", text="") router = ProviderRouter( - providers={"ollama-local": _AlternatingProvider()}, + providers={"claude-code": api, "ollama-local": local}, quota=quota, chain_drain_empty_threshold=2, ) - _run(router.call("writer", "p", "s")) # empty (count=1) - _run(router.call("writer", "p", "s")) # content (reset) - # Next empty is count=1 again — no raise + # claude-code is available and tried first, but an empty response must + # fall through instead of becoming an EmptyResponseError in graph state. resp = _run(router.call("writer", "p", "s")) - assert resp.text == "" + assert resp.text == "local content" + assert api.call_count == 1 + assert local.call_count == 1 - def test_threshold_1_raises_on_first_empty(self): - router, _, _ = _router_with_local(local_text="", threshold=1) - with pytest.raises(AllProvidersExhaustedError): - _run(router.call("writer", "p", "s")) - - def test_no_raise_when_api_provider_available(self): - """Empty local response does NOT raise when an API provider is available.""" + def test_all_empty_providers_raise_with_attempt_diagnostic(self): quota = QuotaTracker() - local = _FakeProvider("ollama-local", text="") api = _FakeProvider("claude-code", text="") + local = _FakeProvider("ollama-local", text="") router = ProviderRouter( providers={"claude-code": api, "ollama-local": local}, quota=quota, chain_drain_empty_threshold=2, ) - # claude-code is available (not in cooldown). It should be tried first. - # Even if it returns empty, chain is NOT drained, so no raise. - resp = _run(router.call("writer", "p", "s")) - assert resp.text == "" - resp = _run(router.call("writer", "p", "s")) - assert resp.text == "" + + with pytest.raises(AllProvidersExhaustedError) as exc_info: + _run(router.call("writer", "p", "s")) + + assert "empty response" in str(exc_info.value) diff --git a/workflow/providers/router.py b/workflow/providers/router.py index 21acac17..df096d20 100644 --- a/workflow/providers/router.py +++ b/workflow/providers/router.py @@ -74,8 +74,8 @@ def _default_config() -> ModelConfig: {"gemini-free", "groq-free", "grok-free"} ) -# BUG-029 Part B: number of consecutive empty-prose responses from a local -# provider (when chain-drained) before raising AllProvidersExhaustedError. +# BUG-029/BUG-036: legacy constructor default retained for callers that still +# pass chain_drain_empty_threshold; blank provider text now fails immediately. _CHAIN_DRAIN_EMPTY_THRESHOLD: int = 2 @@ -90,9 +90,9 @@ class ProviderRouter: quota : QuotaTracker | None Shared quota tracker. A default is created if not supplied. chain_drain_empty_threshold : int - Consecutive empty-prose responses from a local provider (when all - API providers are in cooldown) before raising - AllProvidersExhaustedError. Default: 2. + Deprecated compatibility parameter. Blank provider text is treated + as provider failure immediately so graph nodes never receive empty + LLM output. """ def __init__( @@ -103,6 +103,7 @@ def __init__( ) -> None: self._providers: dict[str, BaseProvider] = providers or {} self._quota = quota or QuotaTracker() + # Kept for constructor compatibility with BUG-029-era callers. self._chain_drain_empty_threshold = chain_drain_empty_threshold # {provider_name: consecutive_empty_count} — reset on non-empty response. self._consecutive_empty: dict[str, int] = {} @@ -320,7 +321,6 @@ async def call( logger.info("Trying provider %s for role=%s", provider_name, role) try: resp = await provider.complete(prompt, system, cfg) - self._quota.record_success(provider_name) except ProviderUnavailableError as exc: self._quota.cooldown(provider_name, COOLDOWN_UNAVAILABLE) logger.warning( @@ -367,29 +367,28 @@ async def call( )) continue - # Successful call — apply BUG-029 Part B: track consecutive empty - # responses from local providers when chain-drained. - is_local = provider_name in _LOCAL_PROVIDERS response_empty = not (resp.text or "").strip() - if is_local and response_empty: + if response_empty: count = self._consecutive_empty.get(provider_name, 0) + 1 self._consecutive_empty[provider_name] = count - drained = self._quota.all_api_providers_in_cooldown( - chain, local_providers=_LOCAL_PROVIDERS + self._quota.cooldown(provider_name, COOLDOWN_OTHER) + logger.warning( + "Provider %s returned empty response x%d; trying fallback", + provider_name, count, ) - if drained and count >= self._chain_drain_empty_threshold: - logger.warning( - "CHAIN_DRAINED + %s empty x%d: raising " - "AllProvidersExhaustedError to force backoff (BUG-029)", - provider_name, count, - ) - raise AllProvidersExhaustedError( - f"Chain drained (all API providers in cooldown) and " - f"{provider_name!r} returned empty prose {count} consecutive " - f"time(s). Daemon should back off rather than commit empty output." - ) + attempts.append(ProviderAttemptDiagnostic( + provider=provider_name, + status="failed", + skip_class="empty_response", + detail=( + f"provider returned empty response {count} consecutive " + "time(s)" + ), + )) + continue else: self._consecutive_empty.pop(provider_name, None) + self._quota.record_success(provider_name) return resp # All providers exhausted. @@ -430,8 +429,16 @@ async def call( pinned_writer=pin_writer if is_pinned_writer else None, allowlist=allowlist, ) + empty_providers = [ + attempt.provider for attempt in attempts + if attempt.skip_class == "empty_response" + ] + empty_context = ( + f" empty response from provider(s): {', '.join(empty_providers)}." + if empty_providers else "" + ) raise AllProvidersExhaustedError( - f"All providers exhausted for role={role}. " + f"All providers exhausted for role={role}.{empty_context} " "Daemon should retry with backoff.", attempts=attempts, chain_state=chain_state,