Skip to content

Backend async architecture: migrate blocking I/O to async-native patterns (30+ call sites) #6369

@beastoin

Description

@beastoin

Summary

Full-backend migration from blocking synchronous I/O to async-native patterns. Currently 30+ blocking HTTP call sites, 12 Thread+join sites, and 6 blocking file I/O operations exist in production async code paths, causing event loop starvation under load.

This is the architecture-level parent issue. See #6368 for the pusher-specific subset.

Current State

Blocking HTTP (requests.*) in async context — 12 direct event loop blockers

Priority File:Line Context Timeout Frequency
P0 utils/webhooks.py:120 realtime transcript webhook 15s Per transcript flush per WS
P0 utils/webhooks.py:168 audio bytes developer webhook 15s Every ~1s per WS
P0 utils/app_integrations.py:185→557 realtime app integration webhook 10s Per transcript flush per WS
P0 utils/app_integrations.py:191→504 realtime audio bytes app webhook 15s Every ~1s per WS
P0 utils/conversations/location.py:32 Google Maps geocoding (via pusher.py:85) none Per conversation
P1 routers/auth.py:286 Google OAuth token exchange none Every login
P1 routers/auth.py:343 Apple OAuth token exchange none Every login
P1 routers/auth.py:410 Firebase signInWithIdp none Every login
P1 routers/auth.py:478 Apple JWKS fetch none Every Apple login
P1 routers/oauth.py:142 App setup verification none Per OAuth callback
P2 utils/social.py:92,126 Sync httpx.get() in async Twitter flows default Per Twitter API call
P2 routers/apps.py:1746 App setup check none Per app enable

Blocking HTTP (requests.*) in sync context — called from async via Thread/to_thread

Priority File:Line Context Timeout
P2 utils/stt/speaker_embedding.py:54,93 Speaker embedding RPC 300s
P2 utils/stt/vad.py:34 Hosted VAD RPC 300s
P2 utils/stt/speech_profile.py:25 Speech profile matching default
P2 routers/sync.py:746 Audio download for speaker ID 60s
P3 utils/retrieval/tools/perplexity_tools.py:64 Perplexity search 30s
P3 utils/retrieval/tools/calendar_tools.py:52,93,109 Google Contacts default
P3 utils/retrieval/tools/google_utils.py:38,74 Google OAuth refresh + shared wrapper default
P3 routers/calendar_onboarding.py:154 Calendar token exchange default
P3 routers/custom_auth.py:33 Firebase email sign-in default
P3 utils/other/hume.py:150 Hume batch submit default
P3 utils/apps.py:1367 App manifest fetch default
P3 utils/app_integrations.py:75,89 GitHub docs crawl default

Thread+join blocking — 12 sites in production

Priority File:Line Context
P0 utils/app_integrations.py:514,598 Realtime webhook fan-out (called from async pusher)
P1 routers/sync.py:1104,1318,1432 Chunked parallel processing in async sync endpoints
P1 utils/app_integrations.py:175 Conversation-created webhook fan-out
P2 utils/conversations/process_conversation.py:358,608 LLM processing fan-out
P2 utils/retrieval/rag.py:40,49,110 RAG parallel retrieval
P2 utils/apps.py:728 App processing fan-out
P3 utils/stt/safe_socket.py:164 Deepgram socket teardown (2s timeout)

Other blocking patterns

Priority File:Line Pattern
P2 utils/social.py:81 time.sleep() in retry helper called from async Twitter flows
P3 routers/apps.py:562,602,1896 Blocking file I/O in async handlers
P3 routers/chat.py:376 Blocking file I/O in async handler
P3 routers/imports.py:59 Blocking file I/O in async handler
P3 routers/transcribe.py:1733 Blocking file I/O in async handler

Existing Async Patterns (already correct)

  • asyncio.to_thread: pusher.py, transcribe.py, speaker_identification.py, speaker_sample.py, streaming.py, notifications.py, onboarding.py
  • run_in_executor: translation_coordinator.py, other/notifications.py
  • httpx.AsyncClient: agent_tools.py, firmware.py, integrations.py, task_integrations.py, task_sync.py, mcp_client.py, app_generator.py, app_tools.py
  • Client reuse exists only in integrations.py and task_integrations.py — most other httpx usage creates per-call clients (no connection pooling)

Target Architecture

3 lanes with strict separation

┌─────────────────────────────────────────────────────────────┐
│  ASYNC HTTP LANE — httpx.AsyncClient (shared, lifecycle)    │
│  • All outbound HTTP: webhooks, auth, geocoding, APIs       │
│  • Connection pooling per service                           │
│  • Bounded concurrency via asyncio.Semaphore                │
│  • Per-target circuit breakers for webhooks                  │
│  • Latest-wins dropping for audio-byte-level calls          │
├─────────────────────────────────────────────────────────────┤
│  CRITICAL EXECUTOR — dedicated ThreadPoolExecutor (2-4)     │
│  • process_conversation, audio file creation                │
│  • DB-critical sync operations                              │
│  • Never shared with best-effort work                       │
├─────────────────────────────────────────────────────────────┤
│  TASK QUEUE LANE — Cloud Tasks / Pub/Sub                    │
│  • Fire-and-forget: developer webhooks, app integrations    │
│  • Durable: survives pod restarts                           │
│  • Built-in retries and dead-letter handling                │
└─────────────────────────────────────────────────────────────┘

Shared HTTP infrastructure

# backend/utils/http_client.py — created at app startup, closed at shutdown
clients = {
    'default':  httpx.AsyncClient(timeout=15, limits=Limits(max_connections=100)),
    'webhooks': httpx.AsyncClient(timeout=Timeout(5, connect=1), limits=Limits(max_connections=64)),
    'auth':     httpx.AsyncClient(timeout=10, limits=Limits(max_connections=20)),
    'stt':      httpx.AsyncClient(timeout=300, limits=Limits(max_connections=8)),
}

Dedicated executors

# backend/utils/executors.py — no use of default asyncio executor
critical_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="critical")
storage_executor  = ThreadPoolExecutor(max_workers=2, thread_name_prefix="storage")

Migration Plan

Phase 1: Stop the bleeding (P0 — pusher hot path)

  • Replace requests.post in webhooks.py with httpx.AsyncClient
  • Replace Thread+join fan-out in app_integrations.py realtime functions with asyncio.gather + httpx
  • Add asyncio.to_thread for get_google_maps_location in pusher.py (or migrate to httpx)
  • Split executors: critical_executor for process_conversation, default pool untouched
  • Scope: 6 files, ~50 lines changed
  • Risk: Low — webhook delivery semantics unchanged

Phase 2: Auth and user-facing paths (P1)

  • Replace requests.post/get in auth.py (4 calls) with shared httpx auth client
  • Replace requests.get in oauth.py, apps.py setup checks
  • Replace sync httpx in social.py with async
  • Scope: 4 files, ~30 lines changed
  • Risk: Low — same HTTP calls, just async

Phase 3: Thread+join elimination (P1-P2)

  • Replace Thread+join in sync.py sync_local_files with asyncio.gather + run_in_executor
  • Replace Thread+join in process_conversation.py with async fan-out or dedicated executor
  • Replace Thread+join in rag.py, apps.py with async equivalents
  • Scope: 5 files, ~100 lines changed
  • Risk: Medium — changes concurrency model

Phase 4: Long-timeout STT services (P2)

  • Create async STT client for speaker_embedding, vad, speech_profile
  • Dedicated bounded executor as interim
  • Scope: 3 files
  • Risk: Medium — 300s timeout operations

Phase 5: Fire-and-forget to Cloud Tasks (P2-P3)

  • Move developer webhooks to Cloud Tasks
  • Move app integration webhooks to Cloud Tasks
  • Move non-critical notifications to queue
  • Scope: Architecture change
  • Risk: Higher — changes delivery model, needs dead-letter handling

Phase 6: Regression prevention

  • CI lint rule: ban requests.*, sync httpx.*, time.sleep, Thread.join() in async context
  • Event loop lag metrics (prometheus histogram)
  • Slow-downstream load test in CI

Related


by AI for @beastoin

Metadata

Metadata

Assignees

No one assigned

    Labels

    backendBackend Task (python)p2Priority: Important (score 14-21)

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions