Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ WORKFLOW_API_KEY=
HUME_API_KEY=
HUME_CALLBACK_URL=

# Required for conversation processing. Without this, conversations stay in_progress permanently.
HOSTED_PUSHER_API_URL=

TYPESENSE_HOST=
Expand Down
124 changes: 32 additions & 92 deletions backend/routers/transcribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,12 @@
import database.users as user_db
from database.users import get_user_transcription_preferences
from database import redis_db
from database.redis_db import (
check_credits_invalidation,
get_cached_user_geolocation,
)
from database.redis_db import check_credits_invalidation
from models.conversation import (
Conversation,
ConversationPhoto,
ConversationSource,
ConversationStatus,
Geolocation,
Structured,
TranscriptSegment,
)
Expand All @@ -67,10 +63,9 @@
from models.transcript_segment import Translation
from models.users import PlanType
from utils.analytics import record_usage
from utils.app_integrations import trigger_external_integrations, trigger_realtime_integrations
from utils.app_integrations import trigger_realtime_integrations
from utils.apps import is_audio_bytes_app_enabled
from utils.conversations.location import get_google_maps_location
from utils.conversations.process_conversation import process_conversation, retrieve_in_progress_conversation
from utils.conversations.process_conversation import retrieve_in_progress_conversation
from utils.notifications import send_credit_limit_notification, send_silent_user_notification
from utils.other import endpoints as auth
from utils.other.storage import get_profile_audio_if_exists, get_user_has_speech_profile
Expand Down Expand Up @@ -706,47 +701,21 @@ def on_conversation_processing_started(conversation_id: str):
conversation = Conversation(**conversation_data)
_send_message_event(ConversationEvent(event_type="memory_processing_started", memory=conversation))

# Fallback for when pusher is not available
async def _create_conversation_fallback(conversation_data: dict):
conversation = Conversation(**conversation_data)
if conversation.status != ConversationStatus.processing:
_send_message_event(ConversationEvent(event_type="memory_processing_started", memory=conversation))
conversations_db.update_conversation_status(uid, conversation.id, ConversationStatus.processing)
conversation.status = ConversationStatus.processing

try:
# Geolocation
geolocation = get_cached_user_geolocation(uid)
if geolocation:
geolocation = Geolocation(**geolocation)
conversation.geolocation = get_google_maps_location(geolocation.latitude, geolocation.longitude)

conversation = process_conversation(uid, language, conversation)
messages = trigger_external_integrations(uid, conversation)
except Exception as e:
logger.error(f"Error processing conversation: {e} {uid} {session_id}")
conversations_db.set_conversation_as_discarded(uid, conversation.id)
conversation.discarded = True
messages = []

_send_message_event(ConversationEvent(event_type="memory_created", memory=conversation, messages=messages))

async def cleanup_processing_conversations():
processing = conversations_db.get_processing_conversations(uid)
if not processing:
logger.info(f'finalize_processing_conversations len(processing): 0 {uid} {session_id}')
return
logger.info(f'finalize_processing_conversations len(processing): {len(processing)} {uid} {session_id}')
if not processing or len(processing) == 0:
if len(processing) == 0:
return
if not request_conversation_processing:
logger.warning(f"Pusher not enabled, cannot reprocess {len(processing)} conversations {uid} {session_id}")
return

for conversation in processing:
if PUSHER_ENABLED and pusher_is_connected is not None and pusher_is_connected():
await request_conversation_processing(conversation['id'])
elif PUSHER_ENABLED and pusher_is_degraded is not None and pusher_is_degraded():
await _create_conversation_fallback(conversation)
elif not PUSHER_ENABLED:
await _create_conversation_fallback(conversation)
else:
# PUSHER_ENABLED but handler not yet initialized — try pusher
await request_conversation_processing(conversation['id'])
# Route to pusher — buffer if disconnected, send when connected (#6061)
await request_conversation_processing(conversation['id'])

async def process_pending_conversations(timed_out_id: Optional[str]):
await asyncio.sleep(7.0)
Expand Down Expand Up @@ -839,21 +808,15 @@ async def _process_conversation(conversation_id: str):
if conversation:
has_content = conversation.get('transcript_segments') or conversation.get('photos')
if has_content:
# Use pusher if enabled AND connected, otherwise fallback
if PUSHER_ENABLED and pusher_is_connected is not None and pusher_is_connected():
on_conversation_processing_started(conversation_id)
await request_conversation_processing(conversation_id)
elif PUSHER_ENABLED and pusher_is_degraded is not None and pusher_is_degraded():
logger.info(
f"Pusher degraded, using fallback for conversation {conversation_id} {uid} {session_id}"
if not request_conversation_processing:
logger.warning(
f"Pusher not enabled, skipping conversation {conversation_id} (stays in_progress) {uid} {session_id}"
)
await _create_conversation_fallback(conversation)
elif not PUSHER_ENABLED:
await _create_conversation_fallback(conversation)
else:
# PUSHER_ENABLED but handler not yet initialized — try pusher
on_conversation_processing_started(conversation_id)
await request_conversation_processing(conversation_id)
return
# Mark processing + buffer for pusher — never process locally (#6061)
conversations_db.update_conversation_status(uid, conversation_id, ConversationStatus.processing)
on_conversation_processing_started(conversation_id)
Comment on lines +811 to +818
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 PUSHER_ENABLED=false conversations accumulate as in_progress with no recovery path

When request_conversation_processing is None (i.e. PUSHER_ENABLED=false), _process_conversation() now returns early without marking the conversation as processing and without deleting it. The conversation stays in Firestore as in_progress indefinitely.

cleanup_processing_conversations() also bails out early when the handler is None, and it only queries get_processing_conversations (status = processing) anyway — so in_progress conversations from a disabled-pusher run are invisible to that path entirely.

While the PR description says "Check pusher availability BEFORE marking processing to avoid stranding conversations", the outcome is that conversations are stranded as in_progress rather than processing — which is arguably harder to detect and clean up. At a minimum the log level should be raised to error or the conversation should be explicitly discarded so the user isn't left waiting for a result that will never arrive.

await request_conversation_processing(conversation_id)
else:
logger.info(f'Clean up the conversation {conversation_id}, reason: no content {uid} {session_id}')
conversations_db.delete_conversation(uid, conversation_id)
Expand Down Expand Up @@ -1205,7 +1168,6 @@ def create_pusher_task_handler():
reconnect_attempts = 0
reconnect_task = None # single task per session
degraded_since: float = 0.0
fallback_processed_ids: set = set() # guard against duplicate processing

# Transcript (bounded to prevent memory growth when pusher is down)
segment_buffers: deque = deque(maxlen=MAX_SEGMENT_BUFFER_SIZE)
Expand Down Expand Up @@ -1434,13 +1396,12 @@ async def pusher_receive():
if not info:
continue
if info['retries'] >= MAX_RETRIES_PER_REQUEST:
logger.error(
f"Conversation {cid} failed after {MAX_RETRIES_PER_REQUEST} retries, giving up {uid} {session_id}"
logger.warning(
f"Conversation {cid} retry limit reached, keeping buffered for pusher recovery {uid} {session_id}"
)
# Route to fallback if in degraded mode
if reconnect_state == PusherReconnectState.DEGRADED:
await _fallback_process_conversation(cid)
pending_conversation_requests.pop(cid, None)
# Don't drop — conversation is marked processing in Firestore,
# cleanup_processing_conversations() will pick it up on next session (#6061)
info['sent_at'] = now # Reset timeout to avoid tight retry loop
continue
info['retries'] += 1
logger.warning(
Expand All @@ -1465,20 +1426,6 @@ def _mark_disconnected():
if reconnect_task is None or reconnect_task.done():
reconnect_task = asyncio.create_task(_pusher_reconnect_loop())

async def _fallback_process_conversation(conversation_id: str):
"""Route conversation to fallback processing (degraded mode)."""
nonlocal fallback_processed_ids
if conversation_id in fallback_processed_ids:
logger.info(
f"Conversation {conversation_id} already processed via fallback, skipping {uid} {session_id}"
)
return
conversation_data = conversations_db.get_conversation(uid, conversation_id)
if conversation_data:
fallback_processed_ids.add(conversation_id)
logger.info(f"Routing conversation {conversation_id} to fallback (degraded mode) {uid} {session_id}")
await _create_conversation_fallback(conversation_data)

async def _pusher_reconnect_loop():
"""Single reconnect loop per session — replaces 3 scattered auto-reconnect calls.

Expand All @@ -1500,10 +1447,11 @@ async def _pusher_reconnect_loop():
f"Pusher reconnect exhausted ({PUSHER_MAX_RECONNECT_ATTEMPTS} attempts), "
f"entering DEGRADED mode {uid} {session_id}"
)
# Route pending conversations to fallback (pop each to avoid race)
for cid in list(pending_conversation_requests.keys()):
pending_conversation_requests.pop(cid, None)
await _fallback_process_conversation(cid)
# Keep pending conversations buffered — will resend when pusher recovers (#6061)
if pending_conversation_requests:
logger.info(
f"Keeping {len(pending_conversation_requests)} conversations buffered for pusher recovery {uid} {session_id}"
)
continue

# Exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s (capped at 60s)
Expand Down Expand Up @@ -1535,9 +1483,7 @@ async def _pusher_reconnect_loop():
degraded_since = time.monotonic()
reconnect_attempts = 0
logger.warning(f"Circuit breaker open, skipping to DEGRADED {uid} {session_id}")
for cid in list(pending_conversation_requests.keys()):
pending_conversation_requests.pop(cid, None)
await _fallback_process_conversation(cid)
# Keep pending conversations buffered — will resend when pusher recovers (#6061)
continue
except Exception:
pass # _connect already logged the error
Expand Down Expand Up @@ -1601,7 +1547,7 @@ async def _connect():
nonlocal pusher_ws
nonlocal pusher_connected
nonlocal current_conversation_id
nonlocal reconnect_state, reconnect_attempts, fallback_processed_ids
nonlocal reconnect_state, reconnect_attempts

try:
pusher_sample_rate = TARGET_SAMPLE_RATE if is_multi_channel else sample_rate
Expand All @@ -1615,19 +1561,13 @@ async def _connect():
reconnect_state = PusherReconnectState.CONNECTED
reconnect_attempts = 0
# Re-send any pending conversation requests after reconnect
# (skip conversations already processed via fallback to avoid duplication)
if pending_conversation_requests:
logger.info(
f"Reconnected to pusher, re-sending {len(pending_conversation_requests)} pending requests {uid} {session_id}"
)
for cid in list(pending_conversation_requests.keys()):
if cid in fallback_processed_ids:
pending_conversation_requests.pop(cid, None)
continue
pending_conversation_requests[cid]['sent_at'] = time.time()
await request_conversation_processing(cid)
Comment on lines 1568 to 1570
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Retry counter not reset on reconnect

When _connect() resends buffered conversations after a pusher reconnect, it only resets sent_at but preserves the retries counter. Meanwhile, request_conversation_processing() explicitly preserves retries when re-adding to the buffer:

pending_conversation_requests[conversation_id] = {
    'sent_at': time.time(),
    'retries': pending_conversation_requests.get(conversation_id, {}).get('retries', 0),  # preserved
}

A conversation that hit MAX_RETRIES_PER_REQUEST = 3 before the disconnect is resent once on reconnect (correct). But if that single attempt fails silently (no 201 response within 120 s), pusher_receive() sees retries (3) >= MAX_RETRIES_PER_REQUEST (3), resets only sent_at, and never issues another pusher send for the rest of this session. Recovery must wait for the next session's cleanup_processing_conversations(). This undermines the point of reconnect-flush, since a previously-exhausted conversation gets only one attempt after reconnect, not a fresh retry budget.

Suggested fix — reset the retry counter when re-sending on reconnect:

Suggested change
for cid in list(pending_conversation_requests.keys()):
if cid in fallback_processed_ids:
pending_conversation_requests.pop(cid, None)
continue
pending_conversation_requests[cid]['sent_at'] = time.time()
await request_conversation_processing(cid)
for cid in list(pending_conversation_requests.keys()):
pending_conversation_requests[cid]['sent_at'] = time.time()
pending_conversation_requests[cid]['retries'] = 0 # give fresh retry budget after reconnect
await request_conversation_processing(cid)

# Clear fallback tracking after processing pending queue
fallback_processed_ids.clear()
except PusherCircuitBreakerOpen:
raise # Let caller handle circuit breaker
except Exception as e:
Expand Down
1 change: 1 addition & 0 deletions backend/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pytest tests/unit/test_vad_gate.py -v
pytest tests/unit/test_log_sanitizer.py -v
pytest tests/unit/test_pusher_heartbeat.py -v
pytest tests/unit/test_pusher_conversation_retry.py -v
pytest tests/unit/test_listen_fallback_removal.py -v
pytest tests/unit/test_desktop_updates.py -v
pytest tests/unit/test_translation_optimization.py -v
pytest tests/unit/test_conversation_source_unknown.py -v
Expand Down
Loading
Loading