Skip to content

fix(workflow): close loop-bound llm clients after calls#138

Merged
duguwanglong merged 4 commits intomainfrom
fix/workflow-httpx-loop-cleanup
Apr 17, 2026
Merged

fix(workflow): close loop-bound llm clients after calls#138
duguwanglong merged 4 commits intomainfrom
fix/workflow-httpx-loop-cleanup

Conversation

@xiami762
Copy link
Copy Markdown
Contributor

Ensure workflow LLM requests close cached AsyncOpenAI clients before asyncio.run tears down the temporary loop.
This prevents loop-closed cleanup noise without changing provider chat behavior for long-lived model usage.

Ensure workflow LLM requests close cached AsyncOpenAI clients before asyncio.run tears down the temporary loop. This prevents loop-closed cleanup noise without changing provider chat behavior for long-lived model usage.

Made-with: Cursor
@xiami762 xiami762 requested a review from duguwanglong April 17, 2026 06:11
xiami762 added 3 commits April 17, 2026 14:41
Revert commit 13aeb20. That fix wrapped LLMClient.ask()._call() with a
try/finally that closed provider._client after every call, but Provider
instances are global singletons: in concurrent workflow execution two
parallel ask() invocations against the same provider could close the
shared AsyncOpenAI/httpx client mid-flight for the other caller, causing
stream interruptions and random request failures.

This commit restores the prior behavior verbatim (no helper on
OpenAIBaseProvider, no finally block, no close_calls assertions in tests)
so the follow-up commit can fix the original \"Event loop is closed\"
noise in a way that does not mutate shared provider state.

Made-with: Cursor
Workflow code is synchronous at the edges but the provider SDKs it drives
(openai + httpx + anyio) are async and hold loop-bound resources such as
httpx.AsyncClient connection pools. The previous _run_coro_sync spun up an
ephemeral asyncio.run loop per call; when an upstream 504/TTFT cut the
stream short, late GC aclose() on the dangling OpenAI stream hit the
already-closed loop and dumped a noisy but harmless "RuntimeError: Event
loop is closed" traceback to stderr.

Fix the problem at its structural root by introducing a single persistent
asyncio event loop running on a dedicated daemon thread
(flocks/workflow/_async_runtime.py). All workflow-triggered coroutines
(Config.get, resolve_default_llm, Provider.apply_config, provider.chat)
are submitted to this shared loop via run_coroutine_threadsafe, so their
loop-bound resources always have a live loop available for cleanup.

Why this replaces the earlier attempt (commit 13aeb20, reverted in the
preceding commit):

  * No shared-state mutation on Provider singletons. The earlier attempt
    closed provider._client in a finally block, which raced concurrent
    workflow calls that shared the same Provider instance.
  * Identical external semantics for LLMClient.ask(): still synchronous,
    still blocks on the result, still raises ValueError / TimeoutError.
  * Scope is limited to flocks/workflow/llm.py::_run_coro_sync. Other
    _run_coro_sync implementations in runner/engine/tools_adapter are
    untouched - they only drive short-lived coroutines without loop-bound
    resources and do not need this treatment.

Tests: * New tests/workflow/test_async_runtime.py covers loop persistence,
    concurrent submissions (64 tasks from 8 threads), and the
    self-deadlock guard for calls originating from inside the loop.
  * Existing tests/workflow/test_workflow_llm.py (9 cases) and
    tests/provider/test_openai_base_provider.py (25 cases) all pass.
  * The 13 pre-existing failures under tests/provider/ and
    tests/workflow/test_workflow_center.py reproduce at HEAD~1, so they
    are environmental / unrelated to this change.
Made-with: Cursor
The previous commit routed LLMClient sync calls through
asyncio.run_coroutine_threadsafe(...).result(), but that silently changed
cancellation semantics: asyncio.run(coro) used to surface an
asyncio.CancelledError (inherits from BaseException in Python 3.12, so it
flows past `except Exception`), while concurrent.futures.Future.result()
raises concurrent.futures.CancelledError which still inherits from
Exception on CPython 3.12.

As a result, LLMClient.ask()'s retry loop (`except Exception as exc:
last_exc = exc`) would catch a real cancellation, retry it up to
max_retries times, and finally rewrap it as
`ValueError("Workflow 默认模型不可用...")`. That broke clean
cancellation propagation for FastAPI request teardown and any future
asyncio.timeout / outer-cancel usage.

Map concurrent.futures.CancelledError back to asyncio.CancelledError at
the run_sync boundary so callers observe the same behavior as before the
shared-loop refactor. Two regression tests added:

  * run_sync must raise asyncio.CancelledError (not
    concurrent.futures.CancelledError) when the coroutine is cancelled.
  * A try/except Exception wrapper mirroring LLMClient.ask's retry loop
    must NOT swallow the cancellation.

Made-with: Cursor
@duguwanglong duguwanglong merged commit 3eb27ca into main Apr 17, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants