From 8a8fa34b67016f3294068c8fb4f920fbdc92b05e Mon Sep 17 00:00:00 2001 From: beastoin Date: Sat, 28 Mar 2026 02:09:47 +0000 Subject: [PATCH 01/14] fix(transcribe): remove local conversation fallback, keep buffered for pusher (#6061) Remove _create_conversation_fallback() and _fallback_process_conversation() which ran process_conversation() (LLM calls, embeddings, Firestore writes) directly on the listen event loop when pusher was degraded, blocking for 10-24s and triggering pod kills via liveness probe timeout. Instead, keep conversations buffered in pending_conversation_requests through DEGRADED state. Conversations are marked as processing in Firestore before buffering, so cleanup_processing_conversations() picks them up on next session if the current session dies. Co-Authored-By: Claude Opus 4.6 --- backend/routers/transcribe.py | 111 +++++++--------------------------- 1 file changed, 21 insertions(+), 90 deletions(-) diff --git a/backend/routers/transcribe.py b/backend/routers/transcribe.py index 2367ce0928..6d970479ee 100644 --- a/backend/routers/transcribe.py +++ b/backend/routers/transcribe.py @@ -38,16 +38,12 @@ import database.users as user_db from database.users import get_user_transcription_preferences from database import redis_db -from database.redis_db import ( - check_credits_invalidation, - get_cached_user_geolocation, -) +from database.redis_db import check_credits_invalidation from models.conversation import ( Conversation, ConversationPhoto, ConversationSource, ConversationStatus, - Geolocation, Structured, TranscriptSegment, ) @@ -67,10 +63,9 @@ from models.transcript_segment import Translation from models.users import PlanType from utils.analytics import record_usage -from utils.app_integrations import trigger_external_integrations, trigger_realtime_integrations +from utils.app_integrations import trigger_realtime_integrations from utils.apps import is_audio_bytes_app_enabled -from utils.conversations.location import get_google_maps_location -from utils.conversations.process_conversation import process_conversation, retrieve_in_progress_conversation +from utils.conversations.process_conversation import retrieve_in_progress_conversation from utils.notifications import send_credit_limit_notification, send_silent_user_notification from utils.other import endpoints as auth from utils.other.storage import get_profile_audio_if_exists, get_user_has_speech_profile @@ -707,30 +702,6 @@ def on_conversation_processing_started(conversation_id: str): _send_message_event(ConversationEvent(event_type="memory_processing_started", memory=conversation)) # Fallback for when pusher is not available - async def _create_conversation_fallback(conversation_data: dict): - conversation = Conversation(**conversation_data) - if conversation.status != ConversationStatus.processing: - _send_message_event(ConversationEvent(event_type="memory_processing_started", memory=conversation)) - conversations_db.update_conversation_status(uid, conversation.id, ConversationStatus.processing) - conversation.status = ConversationStatus.processing - - try: - # Geolocation - geolocation = get_cached_user_geolocation(uid) - if geolocation: - geolocation = Geolocation(**geolocation) - conversation.geolocation = get_google_maps_location(geolocation.latitude, geolocation.longitude) - - conversation = process_conversation(uid, language, conversation) - messages = trigger_external_integrations(uid, conversation) - except Exception as e: - logger.error(f"Error processing conversation: {e} {uid} {session_id}") - conversations_db.set_conversation_as_discarded(uid, conversation.id) - conversation.discarded = True - messages = [] - - _send_message_event(ConversationEvent(event_type="memory_created", memory=conversation, messages=messages)) - async def cleanup_processing_conversations(): processing = conversations_db.get_processing_conversations(uid) logger.info(f'finalize_processing_conversations len(processing): {len(processing)} {uid} {session_id}') @@ -738,15 +709,8 @@ async def cleanup_processing_conversations(): return for conversation in processing: - if PUSHER_ENABLED and pusher_is_connected is not None and pusher_is_connected(): - await request_conversation_processing(conversation['id']) - elif PUSHER_ENABLED and pusher_is_degraded is not None and pusher_is_degraded(): - await _create_conversation_fallback(conversation) - elif not PUSHER_ENABLED: - await _create_conversation_fallback(conversation) - else: - # PUSHER_ENABLED but handler not yet initialized — try pusher - await request_conversation_processing(conversation['id']) + # Always route to pusher — buffer if disconnected, send when connected (#6061) + await request_conversation_processing(conversation['id']) async def process_pending_conversations(timed_out_id: Optional[str]): await asyncio.sleep(7.0) @@ -840,20 +804,10 @@ async def _process_conversation(conversation_id: str): has_content = conversation.get('transcript_segments') or conversation.get('photos') if has_content: # Use pusher if enabled AND connected, otherwise fallback - if PUSHER_ENABLED and pusher_is_connected is not None and pusher_is_connected(): - on_conversation_processing_started(conversation_id) - await request_conversation_processing(conversation_id) - elif PUSHER_ENABLED and pusher_is_degraded is not None and pusher_is_degraded(): - logger.info( - f"Pusher degraded, using fallback for conversation {conversation_id} {uid} {session_id}" - ) - await _create_conversation_fallback(conversation) - elif not PUSHER_ENABLED: - await _create_conversation_fallback(conversation) - else: - # PUSHER_ENABLED but handler not yet initialized — try pusher - on_conversation_processing_started(conversation_id) - await request_conversation_processing(conversation_id) + # Mark processing + buffer for pusher — never process locally (#6061) + conversations_db.update_conversation_status(uid, conversation_id, ConversationStatus.processing) + on_conversation_processing_started(conversation_id) + await request_conversation_processing(conversation_id) else: logger.info(f'Clean up the conversation {conversation_id}, reason: no content {uid} {session_id}') conversations_db.delete_conversation(uid, conversation_id) @@ -1205,7 +1159,6 @@ def create_pusher_task_handler(): reconnect_attempts = 0 reconnect_task = None # single task per session degraded_since: float = 0.0 - fallback_processed_ids: set = set() # guard against duplicate processing # Transcript (bounded to prevent memory growth when pusher is down) segment_buffers: deque = deque(maxlen=MAX_SEGMENT_BUFFER_SIZE) @@ -1434,13 +1387,12 @@ async def pusher_receive(): if not info: continue if info['retries'] >= MAX_RETRIES_PER_REQUEST: - logger.error( - f"Conversation {cid} failed after {MAX_RETRIES_PER_REQUEST} retries, giving up {uid} {session_id}" + logger.warning( + f"Conversation {cid} retry limit reached, keeping buffered for pusher recovery {uid} {session_id}" ) - # Route to fallback if in degraded mode - if reconnect_state == PusherReconnectState.DEGRADED: - await _fallback_process_conversation(cid) - pending_conversation_requests.pop(cid, None) + # Don't drop — conversation is marked processing in Firestore, + # cleanup_processing_conversations() will pick it up on next session (#6061) + info['sent_at'] = now # Reset timeout to avoid tight retry loop continue info['retries'] += 1 logger.warning( @@ -1465,20 +1417,6 @@ def _mark_disconnected(): if reconnect_task is None or reconnect_task.done(): reconnect_task = asyncio.create_task(_pusher_reconnect_loop()) - async def _fallback_process_conversation(conversation_id: str): - """Route conversation to fallback processing (degraded mode).""" - nonlocal fallback_processed_ids - if conversation_id in fallback_processed_ids: - logger.info( - f"Conversation {conversation_id} already processed via fallback, skipping {uid} {session_id}" - ) - return - conversation_data = conversations_db.get_conversation(uid, conversation_id) - if conversation_data: - fallback_processed_ids.add(conversation_id) - logger.info(f"Routing conversation {conversation_id} to fallback (degraded mode) {uid} {session_id}") - await _create_conversation_fallback(conversation_data) - async def _pusher_reconnect_loop(): """Single reconnect loop per session — replaces 3 scattered auto-reconnect calls. @@ -1500,10 +1438,11 @@ async def _pusher_reconnect_loop(): f"Pusher reconnect exhausted ({PUSHER_MAX_RECONNECT_ATTEMPTS} attempts), " f"entering DEGRADED mode {uid} {session_id}" ) - # Route pending conversations to fallback (pop each to avoid race) - for cid in list(pending_conversation_requests.keys()): - pending_conversation_requests.pop(cid, None) - await _fallback_process_conversation(cid) + # Keep pending conversations buffered — will resend when pusher recovers (#6061) + if pending_conversation_requests: + logger.info( + f"Keeping {len(pending_conversation_requests)} conversations buffered for pusher recovery {uid} {session_id}" + ) continue # Exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s (capped at 60s) @@ -1535,9 +1474,7 @@ async def _pusher_reconnect_loop(): degraded_since = time.monotonic() reconnect_attempts = 0 logger.warning(f"Circuit breaker open, skipping to DEGRADED {uid} {session_id}") - for cid in list(pending_conversation_requests.keys()): - pending_conversation_requests.pop(cid, None) - await _fallback_process_conversation(cid) + # Keep pending conversations buffered — will resend when pusher recovers (#6061) continue except Exception: pass # _connect already logged the error @@ -1601,7 +1538,7 @@ async def _connect(): nonlocal pusher_ws nonlocal pusher_connected nonlocal current_conversation_id - nonlocal reconnect_state, reconnect_attempts, fallback_processed_ids + nonlocal reconnect_state, reconnect_attempts try: pusher_sample_rate = TARGET_SAMPLE_RATE if is_multi_channel else sample_rate @@ -1615,19 +1552,13 @@ async def _connect(): reconnect_state = PusherReconnectState.CONNECTED reconnect_attempts = 0 # Re-send any pending conversation requests after reconnect - # (skip conversations already processed via fallback to avoid duplication) if pending_conversation_requests: logger.info( f"Reconnected to pusher, re-sending {len(pending_conversation_requests)} pending requests {uid} {session_id}" ) for cid in list(pending_conversation_requests.keys()): - if cid in fallback_processed_ids: - pending_conversation_requests.pop(cid, None) - continue pending_conversation_requests[cid]['sent_at'] = time.time() await request_conversation_processing(cid) - # Clear fallback tracking after processing pending queue - fallback_processed_ids.clear() except PusherCircuitBreakerOpen: raise # Let caller handle circuit breaker except Exception as e: From 2f23521eb8539bf5103d8ad9f15a9ee2cba8f508 Mon Sep 17 00:00:00 2001 From: beastoin Date: Sat, 28 Mar 2026 02:18:31 +0000 Subject: [PATCH 02/14] fix(transcribe): guard against PUSHER_ENABLED=false in conversation processing (#6061) Add null checks for request_conversation_processing before calling it in _process_conversation() and cleanup_processing_conversations(). When pusher is not enabled, log a warning instead of crashing with NoneType. Co-Authored-By: Claude Opus 4.6 --- backend/routers/transcribe.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/backend/routers/transcribe.py b/backend/routers/transcribe.py index 6d970479ee..485a07e00b 100644 --- a/backend/routers/transcribe.py +++ b/backend/routers/transcribe.py @@ -701,15 +701,17 @@ def on_conversation_processing_started(conversation_id: str): conversation = Conversation(**conversation_data) _send_message_event(ConversationEvent(event_type="memory_processing_started", memory=conversation)) - # Fallback for when pusher is not available async def cleanup_processing_conversations(): processing = conversations_db.get_processing_conversations(uid) logger.info(f'finalize_processing_conversations len(processing): {len(processing)} {uid} {session_id}') if not processing or len(processing) == 0: return + if not request_conversation_processing: + logger.warning(f"Pusher not enabled, cannot reprocess {len(processing)} conversations {uid} {session_id}") + return for conversation in processing: - # Always route to pusher — buffer if disconnected, send when connected (#6061) + # Route to pusher — buffer if disconnected, send when connected (#6061) await request_conversation_processing(conversation['id']) async def process_pending_conversations(timed_out_id: Optional[str]): @@ -807,7 +809,12 @@ async def _process_conversation(conversation_id: str): # Mark processing + buffer for pusher — never process locally (#6061) conversations_db.update_conversation_status(uid, conversation_id, ConversationStatus.processing) on_conversation_processing_started(conversation_id) - await request_conversation_processing(conversation_id) + if request_conversation_processing: + await request_conversation_processing(conversation_id) + else: + logger.warning( + f"Pusher not enabled, conversation {conversation_id} stuck in processing {uid} {session_id}" + ) else: logger.info(f'Clean up the conversation {conversation_id}, reason: no content {uid} {session_id}') conversations_db.delete_conversation(uid, conversation_id) From 7a452aaaca75c5251ec4746cb42eff643e792a7d Mon Sep 17 00:00:00 2001 From: beastoin Date: Sat, 28 Mar 2026 02:27:04 +0000 Subject: [PATCH 03/14] test(transcribe): add 15 tests for #6061 fallback removal behavioral contracts Tests cover all changed paths: - _process_conversation() always marks processing + routes to pusher - _process_conversation() null guard when pusher disabled - cleanup_processing_conversations() routes through pusher, null-safe - Retry exhaustion keeps buffered (not dropped) with sent_at reset - DEGRADED transitions preserve pending_conversation_requests - Reconnect resends all buffered without fallback_processed_ids dedup Co-Authored-By: Claude Opus 4.6 --- .../unit/test_listen_fallback_removal.py | 353 ++++++++++++++++++ 1 file changed, 353 insertions(+) create mode 100644 backend/tests/unit/test_listen_fallback_removal.py diff --git a/backend/tests/unit/test_listen_fallback_removal.py b/backend/tests/unit/test_listen_fallback_removal.py new file mode 100644 index 0000000000..fadd9696ff --- /dev/null +++ b/backend/tests/unit/test_listen_fallback_removal.py @@ -0,0 +1,353 @@ +"""Tests for #6061: local conversation fallback removal. + +Verifies that listen never runs process_conversation() locally. +All conversation processing is routed through pusher via +request_conversation_processing(). When pusher is unavailable, +conversations stay buffered in pending_conversation_requests. + +These tests mirror the behavioral contracts from transcribe.py's +deeply nested closures (not directly importable) and assert the +NEW behavior introduced by #6061. +""" + +import asyncio +import time +from enum import Enum +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + + +# --------------------------------------------------------------------------- +# Constants (mirrored from transcribe.py) +# --------------------------------------------------------------------------- +PENDING_REQUEST_TIMEOUT = 120 +MAX_RETRIES_PER_REQUEST = 3 +PUSHER_MAX_RECONNECT_ATTEMPTS = 6 + + +class PusherReconnectState(Enum): + CONNECTED = "connected" + RECONNECT_BACKOFF = "reconnect_backoff" + DEGRADED = "degraded" + HALF_OPEN_PROBE = "half_open_probe" + + +class ConversationStatus: + processing = "processing" + + +# --------------------------------------------------------------------------- +# Helpers: mirror the changed behavioral contracts from transcribe.py +# --------------------------------------------------------------------------- + + +async def _process_conversation( + conversation_id: str, + conversations_db, + uid: str, + request_conversation_processing, + on_conversation_processing_started, +): + """Mirrors _process_conversation() from transcribe.py after #6061. + + Key contract: NEVER calls process_conversation() locally. + Always marks processing + routes through request_conversation_processing. + """ + conversation = conversations_db.get_conversation(uid, conversation_id) + if conversation: + has_content = conversation.get('transcript_segments') or conversation.get('photos') + if has_content: + conversations_db.update_conversation_status(uid, conversation_id, ConversationStatus.processing) + on_conversation_processing_started(conversation_id) + if request_conversation_processing: + await request_conversation_processing(conversation_id) + else: + pass # Warning logged — pusher not enabled + else: + conversations_db.delete_conversation(uid, conversation_id) + + +async def cleanup_processing_conversations( + conversations_db, + uid: str, + request_conversation_processing, +): + """Mirrors cleanup_processing_conversations() from transcribe.py after #6061. + + Key contract: routes all processing conversations through + request_conversation_processing. Never processes locally. + """ + processing = conversations_db.get_processing_conversations(uid) + if not processing or len(processing) == 0: + return + if not request_conversation_processing: + return + + for conversation in processing: + await request_conversation_processing(conversation['id']) + + +def check_timed_out_requests_6061(pending_requests: dict, now: float): + """Mirrors the retry-exhaustion logic from pusher_receive() after #6061. + + Key contract: retry exhaustion keeps the request buffered and resets + sent_at instead of dropping it. cleanup_processing_conversations() + picks it up on next session. + """ + timed_out = [cid for cid, info in list(pending_requests.items()) if now - info['sent_at'] > PENDING_REQUEST_TIMEOUT] + actions = [] + for cid in timed_out: + info = pending_requests.get(cid) + if not info: + continue + if info['retries'] >= MAX_RETRIES_PER_REQUEST: + # #6061: Don't drop — keep buffered, reset timeout + info['sent_at'] = now + actions.append(('keep_buffered', cid)) + continue + info['retries'] += 1 + info['sent_at'] = now + actions.append(('retry', cid, info['retries'])) + return actions + + +def degraded_transition_6061(pending_conversation_requests: dict, reconnect_state: PusherReconnectState): + """Mirrors the DEGRADED transition logic from _pusher_reconnect_loop() after #6061. + + Key contract: pending conversations are KEPT buffered when entering DEGRADED. + Never popped + fallback-processed. + """ + if reconnect_state == PusherReconnectState.RECONNECT_BACKOFF: + # Transition to DEGRADED — keep pending buffered + return PusherReconnectState.DEGRADED, dict(pending_conversation_requests) + return reconnect_state, pending_conversation_requests + + +def reconnect_resend_6061(pending_conversation_requests: dict): + """Mirrors _connect() resend logic from transcribe.py after #6061. + + Key contract: all pending conversations are resent on reconnect. + No fallback_processed_ids dedup — all buffered conversations are replayed. + """ + resent = [] + for cid in list(pending_conversation_requests.keys()): + pending_conversation_requests[cid]['sent_at'] = time.time() + resent.append(cid) + return resent + + +# --------------------------------------------------------------------------- +# Tests: _process_conversation() — always marks processing + buffers +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_process_conversation_marks_processing_and_routes_to_pusher(): + """_process_conversation() marks status=processing and calls request_conversation_processing.""" + db = MagicMock() + db.get_conversation.return_value = {'transcript_segments': [{'text': 'hello'}]} + request_fn = AsyncMock() + on_started = MagicMock() + + await _process_conversation('conv-1', db, 'uid-1', request_fn, on_started) + + db.update_conversation_status.assert_called_once_with('uid-1', 'conv-1', ConversationStatus.processing) + on_started.assert_called_once_with('conv-1') + request_fn.assert_awaited_once_with('conv-1') + + +@pytest.mark.asyncio +async def test_process_conversation_never_calls_local_fallback(): + """_process_conversation() never imports or calls process_conversation locally.""" + db = MagicMock() + db.get_conversation.return_value = {'transcript_segments': [{'text': 'hello'}]} + request_fn = AsyncMock() + on_started = MagicMock() + + await _process_conversation('conv-1', db, 'uid-1', request_fn, on_started) + + # The key assertion: request_conversation_processing is the ONLY processing path + request_fn.assert_awaited_once() + # No local process_conversation, no trigger_external_integrations, no get_google_maps_location + + +@pytest.mark.asyncio +async def test_process_conversation_null_guard_no_crash(): + """When request_conversation_processing is None (pusher disabled), no crash occurs.""" + db = MagicMock() + db.get_conversation.return_value = {'transcript_segments': [{'text': 'hello'}]} + on_started = MagicMock() + + # Should not raise + await _process_conversation('conv-1', db, 'uid-1', None, on_started) + + # Still marks processing in Firestore + db.update_conversation_status.assert_called_once_with('uid-1', 'conv-1', ConversationStatus.processing) + on_started.assert_called_once_with('conv-1') + + +@pytest.mark.asyncio +async def test_process_conversation_no_content_deletes(): + """Conversation with no content is deleted, not processed.""" + db = MagicMock() + db.get_conversation.return_value = {'transcript_segments': [], 'photos': []} + request_fn = AsyncMock() + on_started = MagicMock() + + await _process_conversation('conv-1', db, 'uid-1', request_fn, on_started) + + db.delete_conversation.assert_called_once_with('uid-1', 'conv-1') + request_fn.assert_not_awaited() + on_started.assert_not_called() + + +# --------------------------------------------------------------------------- +# Tests: cleanup_processing_conversations() — routes through pusher +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_cleanup_routes_all_processing_to_pusher(): + """cleanup_processing_conversations() calls request_conversation_processing for each.""" + db = MagicMock() + db.get_processing_conversations.return_value = [{'id': 'conv-1'}, {'id': 'conv-2'}, {'id': 'conv-3'}] + request_fn = AsyncMock() + + await cleanup_processing_conversations(db, 'uid-1', request_fn) + + assert request_fn.await_count == 3 + request_fn.assert_any_await('conv-1') + request_fn.assert_any_await('conv-2') + request_fn.assert_any_await('conv-3') + + +@pytest.mark.asyncio +async def test_cleanup_null_handler_returns_safely(): + """cleanup_processing_conversations() with null handler does not crash.""" + db = MagicMock() + db.get_processing_conversations.return_value = [{'id': 'conv-1'}] + + # Should not raise + await cleanup_processing_conversations(db, 'uid-1', None) + + +@pytest.mark.asyncio +async def test_cleanup_empty_processing_is_noop(): + """cleanup_processing_conversations() with no processing conversations is a no-op.""" + db = MagicMock() + db.get_processing_conversations.return_value = [] + request_fn = AsyncMock() + + await cleanup_processing_conversations(db, 'uid-1', request_fn) + + request_fn.assert_not_awaited() + + +# --------------------------------------------------------------------------- +# Tests: retry exhaustion — keep buffered, don't drop (#6061) +# --------------------------------------------------------------------------- + + +def test_retry_exhaustion_keeps_buffered(): + """After MAX_RETRIES, conversation stays in pending with reset sent_at.""" + now = time.time() + pending = {"conv-1": {"sent_at": now - PENDING_REQUEST_TIMEOUT - 1, "retries": MAX_RETRIES_PER_REQUEST}} + + actions = check_timed_out_requests_6061(pending, now) + + assert len(actions) == 1 + assert actions[0] == ('keep_buffered', 'conv-1') + assert "conv-1" in pending, "Must NOT be removed from pending" + assert pending["conv-1"]["sent_at"] == now, "sent_at must be reset" + + +def test_retry_exhaustion_does_not_drop(): + """Retry exhaustion must never remove the conversation from pending.""" + now = time.time() + pending = {"conv-1": {"sent_at": now - PENDING_REQUEST_TIMEOUT - 1, "retries": MAX_RETRIES_PER_REQUEST + 5}} + + check_timed_out_requests_6061(pending, now) + + assert "conv-1" in pending, "Conversation must stay buffered regardless of retry count" + + +def test_normal_retry_still_increments(): + """Before MAX_RETRIES, normal retry behavior is preserved.""" + now = time.time() + pending = {"conv-1": {"sent_at": now - PENDING_REQUEST_TIMEOUT - 1, "retries": 1}} + + actions = check_timed_out_requests_6061(pending, now) + + assert actions[0] == ('retry', 'conv-1', 2) + assert pending["conv-1"]["retries"] == 2 + + +# --------------------------------------------------------------------------- +# Tests: DEGRADED transition preserves pending (#6061) +# --------------------------------------------------------------------------- + + +def test_degraded_transition_preserves_pending(): + """Entering DEGRADED from RECONNECT_BACKOFF keeps all pending conversations.""" + pending = { + "conv-1": {"sent_at": time.time(), "retries": 0}, + "conv-2": {"sent_at": time.time(), "retries": 2}, + } + + new_state, remaining = degraded_transition_6061(pending, PusherReconnectState.RECONNECT_BACKOFF) + + assert new_state == PusherReconnectState.DEGRADED + assert "conv-1" in remaining + assert "conv-2" in remaining + assert len(remaining) == 2 + + +def test_degraded_transition_never_pops_pending(): + """DEGRADED transition must not pop any conversations from pending.""" + pending = {"conv-1": {"sent_at": time.time(), "retries": MAX_RETRIES_PER_REQUEST}} + original_keys = set(pending.keys()) + + degraded_transition_6061(pending, PusherReconnectState.RECONNECT_BACKOFF) + + assert set(pending.keys()) == original_keys + + +# --------------------------------------------------------------------------- +# Tests: reconnect resend — all buffered conversations replayed (#6061) +# --------------------------------------------------------------------------- + + +def test_reconnect_resends_all_buffered(): + """After reconnect, all pending conversations are resent (no dedup filter).""" + pending = { + "conv-1": {"sent_at": time.time() - 200, "retries": 0}, + "conv-2": {"sent_at": time.time() - 100, "retries": 2}, + "conv-3": {"sent_at": time.time() - 50, "retries": MAX_RETRIES_PER_REQUEST}, + } + + resent = reconnect_resend_6061(pending) + + assert set(resent) == {"conv-1", "conv-2", "conv-3"} + assert len(resent) == 3 + + +def test_reconnect_no_fallback_dedup(): + """Reconnect resend has no fallback_processed_ids filter — all are replayed.""" + pending = { + "conv-already-processed": {"sent_at": time.time() - 300, "retries": MAX_RETRIES_PER_REQUEST}, + } + + resent = reconnect_resend_6061(pending) + + assert "conv-already-processed" in resent, "No dedup — all must be resent" + + +def test_reconnect_resets_sent_at(): + """Reconnect resend resets sent_at for each conversation.""" + old_time = time.time() - 500 + pending = {"conv-1": {"sent_at": old_time, "retries": 1}} + + reconnect_resend_6061(pending) + + assert pending["conv-1"]["sent_at"] > old_time From 654b3282e789d8cb7b782e8e74bca8b48d019bf8 Mon Sep 17 00:00:00 2001 From: beastoin Date: Sat, 28 Mar 2026 02:27:08 +0000 Subject: [PATCH 04/14] chore(test): add test_listen_fallback_removal.py to test.sh (#6061) Co-Authored-By: Claude Opus 4.6 --- backend/test.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/test.sh b/backend/test.sh index 229d41f865..0bfc75508c 100755 --- a/backend/test.sh +++ b/backend/test.sh @@ -34,6 +34,7 @@ pytest tests/unit/test_vad_gate.py -v pytest tests/unit/test_log_sanitizer.py -v pytest tests/unit/test_pusher_heartbeat.py -v pytest tests/unit/test_pusher_conversation_retry.py -v +pytest tests/unit/test_listen_fallback_removal.py -v pytest tests/unit/test_desktop_updates.py -v pytest tests/unit/test_translation_optimization.py -v pytest tests/unit/test_conversation_source_unknown.py -v From 1fa380b9381e6fe86bdea986ec19b8fcdf5709ef Mon Sep 17 00:00:00 2001 From: beastoin Date: Sat, 28 Mar 2026 03:52:19 +0000 Subject: [PATCH 05/14] fix(transcribe): check pusher before marking processing to avoid stranding (#6061) When PUSHER_ENABLED=false, skip marking conversation as processing since nothing will process it. Conversation stays in_progress instead of getting stuck in processing state indefinitely. Co-Authored-By: Claude Opus 4.6 --- backend/routers/transcribe.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/backend/routers/transcribe.py b/backend/routers/transcribe.py index 485a07e00b..4adefa3216 100644 --- a/backend/routers/transcribe.py +++ b/backend/routers/transcribe.py @@ -805,16 +805,15 @@ async def _process_conversation(conversation_id: str): if conversation: has_content = conversation.get('transcript_segments') or conversation.get('photos') if has_content: - # Use pusher if enabled AND connected, otherwise fallback + if not request_conversation_processing: + logger.warning( + f"Pusher not enabled, skipping conversation {conversation_id} (stays in_progress) {uid} {session_id}" + ) + return # Mark processing + buffer for pusher — never process locally (#6061) conversations_db.update_conversation_status(uid, conversation_id, ConversationStatus.processing) on_conversation_processing_started(conversation_id) - if request_conversation_processing: - await request_conversation_processing(conversation_id) - else: - logger.warning( - f"Pusher not enabled, conversation {conversation_id} stuck in processing {uid} {session_id}" - ) + await request_conversation_processing(conversation_id) else: logger.info(f'Clean up the conversation {conversation_id}, reason: no content {uid} {session_id}') conversations_db.delete_conversation(uid, conversation_id) From deeaf70914783395ebcda66f72498f569e470e15 Mon Sep 17 00:00:00 2001 From: beastoin Date: Sat, 28 Mar 2026 03:52:25 +0000 Subject: [PATCH 06/14] test(transcribe): update null guard test + remove unused imports (#6061) - Remove unused asyncio and patch imports - Update null guard test to verify conversation stays in_progress when pusher is disabled (not marked as processing) Co-Authored-By: Claude Opus 4.6 --- .../unit/test_listen_fallback_removal.py | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/backend/tests/unit/test_listen_fallback_removal.py b/backend/tests/unit/test_listen_fallback_removal.py index fadd9696ff..908ffc4003 100644 --- a/backend/tests/unit/test_listen_fallback_removal.py +++ b/backend/tests/unit/test_listen_fallback_removal.py @@ -10,10 +10,9 @@ NEW behavior introduced by #6061. """ -import asyncio import time from enum import Enum -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, MagicMock import pytest @@ -52,18 +51,17 @@ async def _process_conversation( """Mirrors _process_conversation() from transcribe.py after #6061. Key contract: NEVER calls process_conversation() locally. - Always marks processing + routes through request_conversation_processing. + Checks pusher availability BEFORE marking processing to avoid stranding conversations. """ conversation = conversations_db.get_conversation(uid, conversation_id) if conversation: has_content = conversation.get('transcript_segments') or conversation.get('photos') if has_content: + if not request_conversation_processing: + return # Warning logged — pusher not enabled, skip (stays in_progress) conversations_db.update_conversation_status(uid, conversation_id, ConversationStatus.processing) on_conversation_processing_started(conversation_id) - if request_conversation_processing: - await request_conversation_processing(conversation_id) - else: - pass # Warning logged — pusher not enabled + await request_conversation_processing(conversation_id) else: conversations_db.delete_conversation(uid, conversation_id) @@ -173,8 +171,8 @@ async def test_process_conversation_never_calls_local_fallback(): @pytest.mark.asyncio -async def test_process_conversation_null_guard_no_crash(): - """When request_conversation_processing is None (pusher disabled), no crash occurs.""" +async def test_process_conversation_null_guard_skips_processing(): + """When request_conversation_processing is None (pusher disabled), conversation stays in_progress.""" db = MagicMock() db.get_conversation.return_value = {'transcript_segments': [{'text': 'hello'}]} on_started = MagicMock() @@ -182,9 +180,9 @@ async def test_process_conversation_null_guard_no_crash(): # Should not raise await _process_conversation('conv-1', db, 'uid-1', None, on_started) - # Still marks processing in Firestore - db.update_conversation_status.assert_called_once_with('uid-1', 'conv-1', ConversationStatus.processing) - on_started.assert_called_once_with('conv-1') + # Must NOT mark processing — no way to process without pusher + db.update_conversation_status.assert_not_called() + on_started.assert_not_called() @pytest.mark.asyncio From 655f27533fdeffd9fb39e1d7b37772d790f27314 Mon Sep 17 00:00:00 2001 From: beastoin Date: Sat, 28 Mar 2026 03:57:15 +0000 Subject: [PATCH 07/14] test(transcribe): add boundary tests for reconnect cap, circuit breaker, TTL (#6061) - Add reconnect cap boundary tests (at cap, below cap) - Add circuit breaker triggered degraded tests - Add TTL boundary tests (exact, just-below, just-above threshold) - Update degraded_transition helper to include reconnect_attempts and circuit_breaker conditions Co-Authored-By: Claude Opus 4.6 --- .../unit/test_listen_fallback_removal.py | 114 +++++++++++++++++- 1 file changed, 109 insertions(+), 5 deletions(-) diff --git a/backend/tests/unit/test_listen_fallback_removal.py b/backend/tests/unit/test_listen_fallback_removal.py index 908ffc4003..332c09a3a6 100644 --- a/backend/tests/unit/test_listen_fallback_removal.py +++ b/backend/tests/unit/test_listen_fallback_removal.py @@ -110,15 +110,23 @@ def check_timed_out_requests_6061(pending_requests: dict, now: float): return actions -def degraded_transition_6061(pending_conversation_requests: dict, reconnect_state: PusherReconnectState): +def degraded_transition_6061( + pending_conversation_requests: dict, + reconnect_state: PusherReconnectState, + reconnect_attempts: int = 0, + circuit_breaker_open: bool = False, +): """Mirrors the DEGRADED transition logic from _pusher_reconnect_loop() after #6061. Key contract: pending conversations are KEPT buffered when entering DEGRADED. Never popped + fallback-processed. + Transitions to DEGRADED when: attempts >= cap OR circuit breaker open. """ if reconnect_state == PusherReconnectState.RECONNECT_BACKOFF: - # Transition to DEGRADED — keep pending buffered - return PusherReconnectState.DEGRADED, dict(pending_conversation_requests) + if circuit_breaker_open or reconnect_attempts >= PUSHER_MAX_RECONNECT_ATTEMPTS: + return PusherReconnectState.DEGRADED, dict(pending_conversation_requests) + # Still in backoff, not yet at cap + return PusherReconnectState.RECONNECT_BACKOFF, pending_conversation_requests return reconnect_state, pending_conversation_requests @@ -293,7 +301,9 @@ def test_degraded_transition_preserves_pending(): "conv-2": {"sent_at": time.time(), "retries": 2}, } - new_state, remaining = degraded_transition_6061(pending, PusherReconnectState.RECONNECT_BACKOFF) + new_state, remaining = degraded_transition_6061( + pending, PusherReconnectState.RECONNECT_BACKOFF, reconnect_attempts=PUSHER_MAX_RECONNECT_ATTEMPTS + ) assert new_state == PusherReconnectState.DEGRADED assert "conv-1" in remaining @@ -306,7 +316,9 @@ def test_degraded_transition_never_pops_pending(): pending = {"conv-1": {"sent_at": time.time(), "retries": MAX_RETRIES_PER_REQUEST}} original_keys = set(pending.keys()) - degraded_transition_6061(pending, PusherReconnectState.RECONNECT_BACKOFF) + degraded_transition_6061( + pending, PusherReconnectState.RECONNECT_BACKOFF, reconnect_attempts=PUSHER_MAX_RECONNECT_ATTEMPTS + ) assert set(pending.keys()) == original_keys @@ -349,3 +361,95 @@ def test_reconnect_resets_sent_at(): reconnect_resend_6061(pending) assert pending["conv-1"]["sent_at"] > old_time + + +# --------------------------------------------------------------------------- +# Tests: DEGRADED transition — reconnect cap and circuit breaker boundaries +# --------------------------------------------------------------------------- + + +def test_degraded_at_exact_reconnect_cap(): + """Transition to DEGRADED when reconnect_attempts == PUSHER_MAX_RECONNECT_ATTEMPTS.""" + pending = {"conv-1": {"sent_at": time.time(), "retries": 0}} + + new_state, remaining = degraded_transition_6061( + pending, PusherReconnectState.RECONNECT_BACKOFF, reconnect_attempts=PUSHER_MAX_RECONNECT_ATTEMPTS + ) + + assert new_state == PusherReconnectState.DEGRADED + assert "conv-1" in remaining + + +def test_no_degraded_below_reconnect_cap(): + """Stay in RECONNECT_BACKOFF when reconnect_attempts < PUSHER_MAX_RECONNECT_ATTEMPTS.""" + pending = {"conv-1": {"sent_at": time.time(), "retries": 0}} + + new_state, remaining = degraded_transition_6061( + pending, PusherReconnectState.RECONNECT_BACKOFF, reconnect_attempts=PUSHER_MAX_RECONNECT_ATTEMPTS - 1 + ) + + assert new_state == PusherReconnectState.RECONNECT_BACKOFF + + +def test_degraded_on_circuit_breaker_open(): + """Transition to DEGRADED immediately when circuit breaker is open, regardless of attempt count.""" + pending = {"conv-1": {"sent_at": time.time(), "retries": 0}} + + new_state, remaining = degraded_transition_6061( + pending, PusherReconnectState.RECONNECT_BACKOFF, reconnect_attempts=0, circuit_breaker_open=True + ) + + assert new_state == PusherReconnectState.DEGRADED + assert "conv-1" in remaining + + +def test_circuit_breaker_degraded_preserves_all_pending(): + """Circuit breaker triggered DEGRADED keeps all pending conversations.""" + pending = { + "conv-1": {"sent_at": time.time(), "retries": 0}, + "conv-2": {"sent_at": time.time(), "retries": MAX_RETRIES_PER_REQUEST}, + } + + new_state, remaining = degraded_transition_6061( + pending, PusherReconnectState.RECONNECT_BACKOFF, reconnect_attempts=1, circuit_breaker_open=True + ) + + assert len(remaining) == 2 + assert "conv-1" in remaining + assert "conv-2" in remaining + + +# --------------------------------------------------------------------------- +# Tests: TTL boundary — exact and just-below timeout threshold +# --------------------------------------------------------------------------- + + +def test_timeout_exact_boundary_not_timed_out(): + """Request at exactly PENDING_REQUEST_TIMEOUT is NOT timed out (uses strict >).""" + now = time.time() + pending = {"conv-1": {"sent_at": now - PENDING_REQUEST_TIMEOUT, "retries": 0}} + + actions = check_timed_out_requests_6061(pending, now) + + assert len(actions) == 0, "Exact boundary should not trigger timeout (strict >)" + + +def test_timeout_just_below_threshold_not_timed_out(): + """Request 1 second before timeout threshold is not timed out.""" + now = time.time() + pending = {"conv-1": {"sent_at": now - PENDING_REQUEST_TIMEOUT + 1, "retries": 0}} + + actions = check_timed_out_requests_6061(pending, now) + + assert len(actions) == 0 + + +def test_timeout_just_above_threshold_triggers(): + """Request 1 second past timeout threshold triggers retry.""" + now = time.time() + pending = {"conv-1": {"sent_at": now - PENDING_REQUEST_TIMEOUT - 1, "retries": 0}} + + actions = check_timed_out_requests_6061(pending, now) + + assert len(actions) == 1 + assert actions[0] == ('retry', 'conv-1', 1) From b808a76ffec85a722539f4cfe5e70c900fc15e15 Mon Sep 17 00:00:00 2001 From: beastoin Date: Sat, 28 Mar 2026 04:24:36 +0000 Subject: [PATCH 08/14] fix(transcribe): guard cleanup_processing_conversations against None before len() Move `if not processing` check before `len(processing)` to avoid TypeError when get_processing_conversations() returns None. Co-Authored-By: Claude Opus 4.6 --- backend/routers/transcribe.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/backend/routers/transcribe.py b/backend/routers/transcribe.py index 4adefa3216..330f3f9f9e 100644 --- a/backend/routers/transcribe.py +++ b/backend/routers/transcribe.py @@ -703,8 +703,11 @@ def on_conversation_processing_started(conversation_id: str): async def cleanup_processing_conversations(): processing = conversations_db.get_processing_conversations(uid) + if not processing: + logger.info(f'finalize_processing_conversations len(processing): 0 {uid} {session_id}') + return logger.info(f'finalize_processing_conversations len(processing): {len(processing)} {uid} {session_id}') - if not processing or len(processing) == 0: + if len(processing) == 0: return if not request_conversation_processing: logger.warning(f"Pusher not enabled, cannot reprocess {len(processing)} conversations {uid} {session_id}") From 13aac5ac25ffeaddbe348a78b2786772c250cc17 Mon Sep 17 00:00:00 2001 From: beastoin Date: Sat, 28 Mar 2026 04:24:41 +0000 Subject: [PATCH 09/14] test(transcribe): add None boundary test for cleanup_processing_conversations Add test_cleanup_none_processing_no_crash to verify no TypeError when get_processing_conversations() returns None. Update test helper to match production null guard order. Co-Authored-By: Claude Opus 4.6 --- .../tests/unit/test_listen_fallback_removal.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/backend/tests/unit/test_listen_fallback_removal.py b/backend/tests/unit/test_listen_fallback_removal.py index 332c09a3a6..97b6d6091b 100644 --- a/backend/tests/unit/test_listen_fallback_removal.py +++ b/backend/tests/unit/test_listen_fallback_removal.py @@ -75,9 +75,12 @@ async def cleanup_processing_conversations( Key contract: routes all processing conversations through request_conversation_processing. Never processes locally. + Guards None before calling len() to avoid TypeError. """ processing = conversations_db.get_processing_conversations(uid) - if not processing or len(processing) == 0: + if not processing: + return + if len(processing) == 0: return if not request_conversation_processing: return @@ -250,6 +253,19 @@ async def test_cleanup_empty_processing_is_noop(): request_fn.assert_not_awaited() +@pytest.mark.asyncio +async def test_cleanup_none_processing_no_crash(): + """cleanup_processing_conversations() handles None from get_processing_conversations without TypeError.""" + db = MagicMock() + db.get_processing_conversations.return_value = None + request_fn = AsyncMock() + + # Must not raise TypeError on len(None) + await cleanup_processing_conversations(db, 'uid-1', request_fn) + + request_fn.assert_not_awaited() + + # --------------------------------------------------------------------------- # Tests: retry exhaustion — keep buffered, don't drop (#6061) # --------------------------------------------------------------------------- From 97f0a07ee11530594ecc41fc45e2f3e18682c815 Mon Sep 17 00:00:00 2001 From: beastoin Date: Sat, 28 Mar 2026 04:24:47 +0000 Subject: [PATCH 10/14] docs(listen): update pipeline docs for #6061 local fallback removal MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Update conversation lifecycle, disconnect, and timing diagrams to reflect: - No local fallback processing — all routes through pusher - Pending conversation buffering during DEGRADED state - Reconnect flush of buffered conversations - New timing constants (PENDING_REQUEST_TIMEOUT, MAX_RETRIES, etc.) - Added pusher reconnect & pending flush sequence diagram Co-Authored-By: Claude Opus 4.6 --- .../backend/listen_pusher_pipeline.mdx | 84 +++++++++++++++---- 1 file changed, 67 insertions(+), 17 deletions(-) diff --git a/docs/doc/developer/backend/listen_pusher_pipeline.mdx b/docs/doc/developer/backend/listen_pusher_pipeline.mdx index a5df785a85..e7b48262be 100644 --- a/docs/doc/developer/backend/listen_pusher_pipeline.mdx +++ b/docs/doc/developer/backend/listen_pusher_pipeline.mdx @@ -6,7 +6,7 @@ description: "Sequence diagrams for the /v4/listen WebSocket and Pusher processi # Listen + Pusher Pipeline — Sequence Diagrams -> Last updated: 2026-03-16 (PR #5624 E2E testing) +> Last updated: 2026-03-28 (PR #6061 — remove local fallback) > > These diagrams document the real behavior observed during E2E testing with > live services (backend, pusher, Deepgram, embedding API). Update when the @@ -46,6 +46,11 @@ sequenceDiagram This is the normal path when the client stays connected but stops speaking. +**Key design rule (#6061):** Listen NEVER processes conversations locally. +All conversation processing routes through pusher via `request_conversation_processing()`. +When pusher is unavailable, conversations stay buffered in `pending_conversation_requests` +and are flushed when pusher reconnects. + ```mermaid sequenceDiagram participant Client @@ -62,19 +67,26 @@ sequenceDiagram end Backend->>Backend: _process_conversation(conversation_id) - Backend->>Firestore: Set status=processing - Backend->>Pusher: Send process_conversation request - Backend->>Firestore: Create new stub conversation - - Pusher->>OpenAI: Generate title, overview, category, emoji, action_items - OpenAI-->>Pusher: Structured data - Pusher->>Firestore: Write structured data + set status=completed - Pusher->>OpenAI: Extract memories - Pusher->>Firestore: Save embedding vector - Pusher-->>Backend: Opcode 201 (conversation processed callback) - - Backend-->>Client: {type: "memory_processing_started"} - Backend-->>Client: {type: "memory_created", memory: {...}} + alt Pusher available (request_conversation_processing != None) + Backend->>Firestore: Set status=processing + Backend->>Pusher: Send process_conversation request (opcode 104) + Backend->>Firestore: Create new stub conversation + else Pusher unavailable + Note over Backend: Skip processing — conversation stays in_progress
Will be picked up by cleanup_processing_conversations on next session + end + + alt Pusher connected + Pusher->>OpenAI: Generate title, overview, category, emoji, action_items + OpenAI-->>Pusher: Structured data + Pusher->>Firestore: Write structured data + set status=completed + Pusher->>OpenAI: Extract memories + Pusher->>Firestore: Save embedding vector + Pusher-->>Backend: Opcode 201 (conversation processed callback) + Backend-->>Client: {type: "memory_processing_started"} + Backend-->>Client: {type: "memory_created", memory: {...}} + else Pusher disconnected (RECONNECT_BACKOFF / DEGRADED) + Note over Backend: Request buffered in pending_conversation_requests
Resent automatically when pusher reconnects + end ``` ## 3. Disconnect Path @@ -91,8 +103,12 @@ sequenceDiagram Backend->>Backend: Log "Client disconnected" alt Multi-channel - Backend->>Backend: _process_conversation(current_conversation_id) ✅ - Backend->>Pusher: Send process_conversation request + Backend->>Backend: _process_conversation(current_conversation_id) + alt Pusher available + Backend->>Pusher: Send process_conversation request (opcode 104) + else Pusher unavailable + Note over Backend: Conversation stays in_progress
Picked up by cleanup_processing_conversations on next session + end else Single-channel Note over Backend: No explicit _process_conversation() call
Relies on next session to pick up as stale end @@ -101,7 +117,37 @@ sequenceDiagram Backend->>Backend: Close Pusher socket Backend->>Backend: Cleanup memory (clear caches, buffers) - Note over Backend: Single-channel: conversation stays
in_progress until next session
picks it up as stale + Note over Backend: No local fallback processing (#6061).
Conversations stay buffered or in_progress
until pusher recovers or next session starts. +``` + +## 3.1 Pusher Reconnect & Pending Flush + +When pusher reconnects after a disconnection, all buffered conversations are replayed. + +```mermaid +sequenceDiagram + participant Backend as Backend + participant Pusher + + Note over Backend: Pusher disconnects → RECONNECT_BACKOFF + + loop _pusher_reconnect_loop (exponential backoff) + Backend->>Pusher: Attempt reconnect + alt Connection fails + Note over Backend: Increment reconnect_attempts + alt attempts >= 6 OR circuit breaker open + Note over Backend: Transition to DEGRADED
Pending conversations KEPT buffered + end + else Connection succeeds + Note over Backend: Transition to CONNECTED + Backend->>Backend: Flush pending_conversation_requests + loop Each buffered conversation + Backend->>Pusher: Re-send process_conversation (opcode 104) + end + end + end + + Note over Backend: Retry exhaustion: after MAX_RETRIES (3),
conversation stays buffered with reset sent_at.
cleanup_processing_conversations picks it up next session. ``` ## 4. Speaker ID Lifecycle (2-Session Flow) @@ -208,4 +254,8 @@ sequenceDiagram | `lifecycle_manager` poll | 5s | transcribe.py:1683 | Check `finished_at` interval | | Pusher audio batch | 60s | pusher | GCS upload batch size | | Speaker match threshold | 0.45 | transcribe.py | Cosine distance cutoff | +| `PENDING_REQUEST_TIMEOUT` | 120s | transcribe.py | Timeout before retrying a pending request | +| `MAX_RETRIES_PER_REQUEST` | 3 | transcribe.py | Max retries before keeping buffered | +| `PUSHER_MAX_RECONNECT_ATTEMPTS` | 6 | transcribe.py | Reconnect attempts before DEGRADED | +| `MAX_PENDING_REQUESTS` | 100 | transcribe.py | Max buffered conversations per session | From 294c2e6bad0dd52b35ead4bbda5a5f5309ac8645 Mon Sep 17 00:00:00 2001 From: beastoin Date: Sat, 28 Mar 2026 04:29:50 +0000 Subject: [PATCH 11/14] docs(listen): fix misleading recovery claim for no-pusher mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Clarify that without HOSTED_PUSHER_API_URL, conversations stay in_progress permanently — cleanup_processing_conversations also requires pusher. Pusher is required for conversation processing. Co-Authored-By: Claude Opus 4.6 --- docs/doc/developer/backend/listen_pusher_pipeline.mdx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/doc/developer/backend/listen_pusher_pipeline.mdx b/docs/doc/developer/backend/listen_pusher_pipeline.mdx index e7b48262be..d1f10d4b69 100644 --- a/docs/doc/developer/backend/listen_pusher_pipeline.mdx +++ b/docs/doc/developer/backend/listen_pusher_pipeline.mdx @@ -71,8 +71,8 @@ sequenceDiagram Backend->>Firestore: Set status=processing Backend->>Pusher: Send process_conversation request (opcode 104) Backend->>Firestore: Create new stub conversation - else Pusher unavailable - Note over Backend: Skip processing — conversation stays in_progress
Will be picked up by cleanup_processing_conversations on next session + else Pusher not configured (HOSTED_PUSHER_API_URL unset) + Note over Backend: Skip processing — conversation stays in_progress permanently.
Pusher is required for conversation processing in production. end alt Pusher connected From fc231eaa5f0bce9a991f48ac54a82dd416595665 Mon Sep 17 00:00:00 2001 From: beastoin Date: Sat, 28 Mar 2026 04:29:54 +0000 Subject: [PATCH 12/14] docs(env): note HOSTED_PUSHER_API_URL is required for conversation processing Co-Authored-By: Claude Opus 4.6 --- backend/.env.template | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/.env.template b/backend/.env.template index 96d39ca965..2065313645 100644 --- a/backend/.env.template +++ b/backend/.env.template @@ -24,6 +24,7 @@ WORKFLOW_API_KEY= HUME_API_KEY= HUME_CALLBACK_URL= +# Required for conversation processing. Without this, conversations stay in_progress permanently. HOSTED_PUSHER_API_URL= TYPESENSE_HOST= From fe79101a8a346d2224f26f7f4131080245f7df81 Mon Sep 17 00:00:00 2001 From: beastoin Date: Sat, 28 Mar 2026 04:29:58 +0000 Subject: [PATCH 13/14] test(transcribe): clarify test architecture note re: closure mirroring Explain that tests mirror closures (not importable) and that live integration tests (CP9) verify actual production behavior. Co-Authored-By: Claude Opus 4.6 --- backend/tests/unit/test_listen_fallback_removal.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/backend/tests/unit/test_listen_fallback_removal.py b/backend/tests/unit/test_listen_fallback_removal.py index 97b6d6091b..34136e970b 100644 --- a/backend/tests/unit/test_listen_fallback_removal.py +++ b/backend/tests/unit/test_listen_fallback_removal.py @@ -5,9 +5,11 @@ request_conversation_processing(). When pusher is unavailable, conversations stay buffered in pending_conversation_requests. -These tests mirror the behavioral contracts from transcribe.py's -deeply nested closures (not directly importable) and assert the -NEW behavior introduced by #6061. +Architecture note: transcribe.py's handlers are deeply nested closures +inside websocket_endpoint() and cannot be imported directly. These tests +mirror the behavioral contracts and verify the logic in isolation. +Live integration tests (CP9) verify the actual production closures via +WebSocket connections to a running backend. """ import time From 7e26118a34a8b61c6f85ed13e2128d4f4854153f Mon Sep 17 00:00:00 2001 From: beastoin Date: Sat, 28 Mar 2026 04:32:27 +0000 Subject: [PATCH 14/14] docs(listen): fix remaining stale recovery claims in disconnect diagram Lines 109 and 120 still claimed conversations would be picked up by cleanup_processing_conversations or next session without pusher. Fixed to state they stay in_progress permanently until pusher is configured. Co-Authored-By: Claude Opus 4.6 --- docs/doc/developer/backend/listen_pusher_pipeline.mdx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/doc/developer/backend/listen_pusher_pipeline.mdx b/docs/doc/developer/backend/listen_pusher_pipeline.mdx index d1f10d4b69..336a00d315 100644 --- a/docs/doc/developer/backend/listen_pusher_pipeline.mdx +++ b/docs/doc/developer/backend/listen_pusher_pipeline.mdx @@ -106,8 +106,8 @@ sequenceDiagram Backend->>Backend: _process_conversation(current_conversation_id) alt Pusher available Backend->>Pusher: Send process_conversation request (opcode 104) - else Pusher unavailable - Note over Backend: Conversation stays in_progress
Picked up by cleanup_processing_conversations on next session + else Pusher not configured + Note over Backend: Conversation stays in_progress permanently
until HOSTED_PUSHER_API_URL is configured end else Single-channel Note over Backend: No explicit _process_conversation() call
Relies on next session to pick up as stale @@ -117,7 +117,7 @@ sequenceDiagram Backend->>Backend: Close Pusher socket Backend->>Backend: Cleanup memory (clear caches, buffers) - Note over Backend: No local fallback processing (#6061).
Conversations stay buffered or in_progress
until pusher recovers or next session starts. + Note over Backend: No local fallback processing (#6061).
Pusher is required for conversation processing.
Without it, conversations stay in_progress permanently. ``` ## 3.1 Pusher Reconnect & Pending Flush