Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions echo/server/dembrane/api/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,9 @@ async def assemblyai_webhook_callback(

if normalized_status == "error":
from dembrane.tasks import _on_chunk_transcription_done
from dembrane.transcribe import _save_chunk_error, fetch_assemblyai_result
from dembrane.transcribe import _save_chunk_error

error_detail = f"AssemblyAI error for transcript {payload.transcript_id}"
try:
fetch_assemblyai_result(payload.transcript_id)
except Exception as fetch_exc:
error_detail = str(fetch_exc)

_save_chunk_error(chunk_id, error_detail)
_save_chunk_error(chunk_id, f"AssemblyAI error for transcript {payload.transcript_id}")
_on_chunk_transcription_done(conversation_id, chunk_id, logger)
delete_assemblyai_webhook_metadata(payload.transcript_id)
return {"status": "error_handled"}
Expand Down
30 changes: 22 additions & 8 deletions echo/server/dembrane/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,31 @@ def _get_thread_event_loop() -> asyncio.AbstractEventLoop:

def run_async_in_new_loop(coro: Coroutine[Any, Any, T]) -> T:
"""
Execute an async coroutine on this thread's persistent event loop.
Execute an async coroutine in a fresh, isolated event loop.

Use from synchronous contexts such as Dramatiq actors or CLI scripts to
invoke async FastAPI handlers without hitting "Future attached to a
different loop" errors.
invoke async FastAPI handlers.

A fresh loop is created per call rather than reusing a cached thread loop.
This prevents "Future attached to a different loop" errors when multiple
concurrent Dramatiq greenlets (dramatiq-gevent uses one OS thread with many
greenlets) share the same thread ID and would otherwise share the same loop.
The coroutines invoked here (summarize_conversation, get_conversation_content)
use only stateless async operations so fresh loops per call is safe.
"""
if not asyncio.iscoroutine(coro) and not asyncio.isfuture(coro):
raise TypeError("run_async_in_new_loop expects a coroutine or Future.")

loop = _get_thread_event_loop()
logger.debug("Running async coroutine in thread loop: %s", coro)
result = loop.run_until_complete(coro)
logger.debug("Completed async coroutine: %s", coro)
return result
import nest_asyncio

loop = asyncio.new_event_loop()
# Apply nest_asyncio in case dramatiq-gevent has patched asyncio's running
# loop detection on this thread.
nest_asyncio.apply(loop)
logger.debug("Running async coroutine in fresh event loop: %s", coro)
try:
result = loop.run_until_complete(coro)
logger.debug("Completed async coroutine: %s", coro)
return result
finally:
loop.close()
2 changes: 1 addition & 1 deletion echo/server/dembrane/transcribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def transcribe_audio_assemblyai(

data: dict[str, Any] = {
"audio_url": audio_file_uri,
"speech_models": ["universal-3-pro", "universal-2"],
"speech_models": ["universal-3-pro"],
"language_detection": True,
"language_detection_options": {
"expected_languages": list(set(get_allowed_languages()) | {"pt"}),
Expand Down
123 changes: 123 additions & 0 deletions echo/server/tests/test_async_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""
Tests for run_async_in_new_loop — specifically the concurrent-greenlet scenario
that caused "Future attached to a different loop" errors under load.

Regression test for: multiple concurrent callers sharing the same OS thread
(as dramatiq-gevent greenlets do) must not share an event loop.
"""

import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed

import pytest

from dembrane.async_helpers import run_async_in_new_loop


async def _simple_coro(value: int) -> int:
"""Minimal async coroutine that does a thread-pool round-trip (like run_in_thread_pool)."""
loop = asyncio.get_running_loop()
# Simulate run_in_thread_pool: submit blocking work to the executor
result = await loop.run_in_executor(None, lambda: value * 2)
return result


async def _gather_coro(value: int) -> int:
"""Uses asyncio.gather internally — matches what summarize_conversation does."""
loop = asyncio.get_running_loop()
a, b = await asyncio.gather(
loop.run_in_executor(None, lambda: value + 1),
loop.run_in_executor(None, lambda: value + 2),
)
return a + b


def test_run_async_in_new_loop_basic():
"""Single call works correctly."""
result = run_async_in_new_loop(_simple_coro(5))
assert result == 10


def test_run_async_in_new_loop_with_gather():
"""Gather inside coroutine works correctly."""
result = run_async_in_new_loop(_gather_coro(3))
assert result == 9 # (3+1) + (3+2) = 9


def test_run_async_in_new_loop_concurrent_threads():
"""
Simulates the dramatiq-gevent scenario: N threads all calling
run_async_in_new_loop concurrently. Before the fix, they shared
a cached loop by thread ID, causing "Future attached to a different
loop" errors under concurrent load.
"""
errors = []
results = []

def worker(value: int):
try:
r = run_async_in_new_loop(_gather_coro(value))
results.append(r)
except Exception as e:
errors.append(str(e))

# Simulate 10 concurrent callers (matches or exceeds Stage 3 load test concurrency)
with ThreadPoolExecutor(max_workers=10) as pool:
futures = [pool.submit(worker, i) for i in range(10)]
for f in as_completed(futures):
f.result() # re-raises if the thread itself crashed

assert errors == [], f"Concurrent run_async_in_new_loop raised errors: {errors}"
assert len(results) == 10


def test_run_async_in_new_loop_same_thread_sequential():
"""
Calls from the same thread are safe when sequential.
Verifies loop is properly closed between calls (no 'loop is closed' error).
"""
for i in range(5):
result = run_async_in_new_loop(_simple_coro(i))
assert result == i * 2


def test_run_async_in_new_loop_same_thread_id_concurrent():
"""
Reproduces the exact bug: multiple coroutines submitted from threads
that all share the same thread ID (simulated by patching get_ident).

Before the fix (persistent loop per thread ID), all concurrent callers
shared one loop → "Future attached to a different loop".
After the fix (fresh loop per call), each call is isolated.
"""
original_get_ident = threading.get_ident
# Make all threads report the same thread ID — exactly what gevent does
threading.get_ident = lambda: 99999

errors = []
results = []

def worker(value: int):
try:
r = run_async_in_new_loop(_gather_coro(value))
results.append(r)
except Exception as e:
errors.append(str(e))

try:
with ThreadPoolExecutor(max_workers=5) as pool:
futures = [pool.submit(worker, i) for i in range(5)]
for f in as_completed(futures):
f.result()
finally:
threading.get_ident = original_get_ident

assert errors == [], f"Same-thread-ID concurrent calls raised errors: {errors}"
assert len(results) == 5


def test_run_async_in_new_loop_rejects_non_coroutine():
"""Type guard still works."""
with pytest.raises(TypeError, match="expects a coroutine or Future"):
run_async_in_new_loop(42) # type: ignore
4 changes: 2 additions & 2 deletions echo/server/tests/test_transcribe_webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def _fake_post(url: str, **kwargs: Any) -> _FakeResponse:
assert transcript is None
assert payload == {"transcript_id": "tx-1"}
assert captured["url"].endswith("/v2/transcript")
assert captured["json"]["speech_models"] == ["universal-3-pro", "universal-2"]
assert captured["json"]["speech_models"] == ["universal-3-pro"]
assert "speech_model" not in captured["json"]
assert "prompt" not in captured["json"]
assert captured["json"]["keyterms_prompt"] == ["Dembrane"]
Expand Down Expand Up @@ -92,7 +92,7 @@ def _fake_get(_url: str, **_kwargs: Any) -> _FakeResponse:
assert response["status"] == "completed"
assert payloads["polls"] == 2
post_payload = payloads["posts"][0]
assert post_payload["speech_models"] == ["universal-3-pro", "universal-2"]
assert post_payload["speech_models"] == ["universal-3-pro"]
assert "speech_model" not in post_payload
assert "webhook_url" not in post_payload

Expand Down