SafeDeepgramSocket auto-keepalive architecture (#5870) v2#5944
Conversation
Greptile SummaryThis PR re-applies the
Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant TC as transcribe.py
participant GDS as GatedDeepgramSocket
participant SDS as SafeDeepgramSocket
participant DG as Deepgram LiveConnection
participant KAT as keepalive thread
TC->>+SDS: new SafeDeepgramSocket(dg_conn)
SDS->>KAT: start daemon thread
Note over KAT: checks idle every check_period_sec
TC->>+GDS: new GatedDeepgramSocket(safe_conn, gate)
TC-->>TC: dg_socket = GDS
loop audio chunk arrives
TC->>GDS: send(chunk)
GDS->>GDS: VAD gate decision
alt audio_to_send
GDS->>SDS: send(audio)
SDS->>SDS: reset _last_activity
SDS->>DG: send(audio)
end
alt should_finalize
GDS->>SDS: finalize()
SDS->>DG: finalize()
end
end
loop background (idle > 5s)
KAT->>SDS: acquire lock
SDS->>DG: keep_alive()
alt keep_alive returns False / raises
SDS->>SDS: _dg_dead = True
else success
SDS->>SDS: reset _last_activity
end
end
TC->>GDS: is_connection_dead?
GDS->>SDS: is_connection_dead?
SDS-->>GDS: _dg_dead bool
GDS-->>TC: bool
alt connection dead
TC->>TC: dg_socket = None
TC->>GDS: fallback: deepgram_profile_socket.send(chunk)
end
TC->>GDS: finish()
GDS->>SDS: finish()
SDS->>KAT: stop_event.set()
SDS->>KAT: thread.join(2s)
SDS->>DG: finish()
|
| def finalize(self) -> None: | ||
| """Flush pending transcript.""" | ||
| with self._lock: | ||
| if self._closed: | ||
| return | ||
| self._conn.finalize() | ||
| self._last_activity = self._clock() |
There was a problem hiding this comment.
finalize() skips dead-connection guard
send() short-circuits with if self._dg_dead or self._closed: return, but finalize() only checks _closed. When the connection is dead, finalize() still forwards to self._conn.finalize(). If the underlying DG WebSocket has been dropped, that call may raise — and the exception propagates to callers like GatedDeepgramSocket.send(), where it is caught and incorrectly counted as gate._finalize_errors. This makes it impossible to distinguish "finalize failed because the connection was already dead" from "finalize failed for some other reason".
| def finalize(self) -> None: | |
| """Flush pending transcript.""" | |
| with self._lock: | |
| if self._closed: | |
| return | |
| self._conn.finalize() | |
| self._last_activity = self._clock() | |
| def finalize(self) -> None: | |
| """Flush pending transcript.""" | |
| with self._lock: | |
| if self._dg_dead or self._closed: | |
| return | |
| self._conn.finalize() | |
| self._last_activity = self._clock() |
| self._thread = threading.Thread(target=self._keepalive_loop, daemon=True, name='dg-keepalive') | ||
| self._thread.start() |
There was a problem hiding this comment.
Thread name is not unique per session
Every SafeDeepgramSocket instance starts a daemon thread named 'dg-keepalive'. With many concurrent sessions (e.g. 100 users), all 100 keepalive threads will have the same name in thread dumps and log lines, making it impossible to correlate a specific keepalive thread to the session or user it belongs to.
Consider including session-identifying info:
self._thread = threading.Thread(
target=self._keepalive_loop,
daemon=True,
name=f'dg-keepalive', # optionally append uid or session_id if passed to constructor
)Alternatively, pass an optional name_suffix parameter so callers like process_audio_dg can supply the session ID.
L1 + L2: 5-Minute Audio Test ResultsL1 — SafeDeepgramSocket standalone (5 min)Pattern: 30s speech → 8s silence → 20s speech → 12s silence (repeating). Keepalive fires only during silence gaps >5s, resets on send(). Connection alive throughout. L2 — GatedDeepgramSocket → SafeDeepgramSocket integrated (5 min)Hardest scenario: VAD gate dropped ALL audio (synthetic tone ≠ speech), so DG received zero data for 5 full minutes. Auto-keepalive was the SOLE thing keeping the connection alive — exactly the scenario #5870 fixes. 60 keepalives = one every 5s, perfectly matching the configured interval. Summary
by AI for @beastoin |
L2 Happy-Path: 5-Minute Real Transcription Test — PASS ✅Full integrated test: WebSocket client → local backend (branch Results
Test Setup
Transcript EvidenceFirst segment (72.4s):
Last segment (298.2s):
DG Metadata (from backend log){
"type": "Metadata",
"request_id": "0e8b2451-f238-4997-b09f-0cc394ec35a7",
"sha256": "59719ef4d4b6e35fb2fd5c60828296ced2960bec0747cddb5abb0e8fb4bfd039",
"created": "2026-03-23T11:58:54.596Z",
"duration": 300.0,
"channels": 1,
"model_info": {
"421ebff2-e130-4867-8461-7567efe5bc91": {
"name": "general-nova-3",
"version": "2026-01-27.9249",
"arch": "nova-3"
}
}
}Backend Summary LogWhat This Proves
by AI for @beastoin |
L2 VAD Gate Test: 5-min Real Transcription (VAD_GATE_MODE=active)Test: Stream 5 minutes of real speech audio through Purpose: Confirm VAD gate doesn't break transcription after SafeDeepgramSocket keepalive changes (#5870). Results
VAD Gate Metrics (session end)
Comparison with Non-VAD Test (same audio, same session)
VAD gate produces more segments (78 finalize calls flush DG earlier), but total transcript quality is equivalent. Verdict: PASS
Test setup
by AI for @beastoin |
|
lgtm //kenji |
## Summary When a Deepgram WebSocket connection dies, we now capture and log **why** — not just that it died. This enables operators to distinguish DG server closures, network drops, idle timeouts, and billing limits from Cloud Logging. ### Problem SafeDeepgramSocket (PR #5944) detects dead connections but swallows the close reason. The `except Exception` blocks log generic "connection dead" without the exception type or message. Current prod error rate (~60/hr at peak) cannot be triaged because all disconnects look the same. ### Changes | File | Change | |------|--------| | `utils/stt/safe_socket.py` | `death_reason` property + `set_close_reason()` method. First-reason-wins latch on ALL write paths (send, keepalive, external close). | | `utils/stt/streaming.py` | Register `on_close` / `on_error` handlers on DG connection that feed into `safe_conn.set_close_reason()` | | `utils/stt/vad_gate.py` | `GatedDeepgramSocket.death_reason` delegates to SafeDeepgramSocket | | `routers/transcribe.py` | "DG connection died mid-session" log now includes `reason=` field | | `tests/unit/test_streaming_deepgram_backoff.py` | 14 new tests (37 total) for death_reason, set_close_reason, callback wiring, delegation | ### How it works 1. **SafeDeepgramSocket** captures exception details when `send()` or `keep_alive()` fails: - `send ConnectionResetError: Connection reset by peer` - `keep_alive TimeoutError: timed out` - `send returned False` 2. **DG SDK callbacks** feed close events into SafeDeepgramSocket via `set_close_reason()`: - `DG close event: CloseResponse(type='Close')` - `DG error event: ErrorResponse(...)` 3. **First reason wins** — ALL write paths guard with `if self._death_reason is None`. The first close event is the root cause; subsequent failures are no-ops. 4. **transcribe.py** includes the reason in the operator-facing log: ``` DG connection died mid-session uid=abc session=xyz reason=send ConnectionResetError: Connection reset by peer ``` ### Example Cloud Logging queries after deploy ``` # See all disconnect reasons "DG connection died mid-session" "reason=" # Network issues (code 1006 = abnormal) "reason=" "ConnectionResetError" # DG server errors "reason=" "DG error event" # Keepalive failures specifically "reason=" "keep_alive" ``` ### Review cycle - **CODEx review (R1)**: Found first-reason-wins bug — `send()`/`_send_keepalive_locked()` unconditionally overwrote `_death_reason`. Fixed with `if None` guard on all write paths. - **CODEx review (R2)**: Approved (`PR_APPROVED_LGTM`). Thread safety confirmed, PII risk low. - **Tester (T1)**: Found 3 coverage gaps: (1) DG callback wiring untested, (2) gated delegation untested, (3) exception permutations incomplete. - **Tester (T2)**: Approved (`TESTS_APPROVED`). All 3 gaps closed with 5 new tests. ### Testing - 37 tests passing (23 existing + 14 new) - Coverage: death_reason lifecycle, exception capture, first-reason-wins matrix (all orderings), DG callback wiring, gated delegation, non-safe socket fallback ## Deployment **Services affected**: `backend-listen` **No env vars needed** **Risk**: Low — logging-only change, no behavior change Relates to #5870 _by AI for @beastoin_
…5870) (BasedHardware#6036) ## Summary When a Deepgram WebSocket connection dies, we now capture and log **why** — not just that it died. This enables operators to distinguish DG server closures, network drops, idle timeouts, and billing limits from Cloud Logging. ### Problem SafeDeepgramSocket (PR BasedHardware#5944) detects dead connections but swallows the close reason. The `except Exception` blocks log generic "connection dead" without the exception type or message. Current prod error rate (~60/hr at peak) cannot be triaged because all disconnects look the same. ### Changes | File | Change | |------|--------| | `utils/stt/safe_socket.py` | `death_reason` property + `set_close_reason()` method. First-reason-wins latch on ALL write paths (send, keepalive, external close). | | `utils/stt/streaming.py` | Register `on_close` / `on_error` handlers on DG connection that feed into `safe_conn.set_close_reason()` | | `utils/stt/vad_gate.py` | `GatedDeepgramSocket.death_reason` delegates to SafeDeepgramSocket | | `routers/transcribe.py` | "DG connection died mid-session" log now includes `reason=` field | | `tests/unit/test_streaming_deepgram_backoff.py` | 14 new tests (37 total) for death_reason, set_close_reason, callback wiring, delegation | ### How it works 1. **SafeDeepgramSocket** captures exception details when `send()` or `keep_alive()` fails: - `send ConnectionResetError: Connection reset by peer` - `keep_alive TimeoutError: timed out` - `send returned False` 2. **DG SDK callbacks** feed close events into SafeDeepgramSocket via `set_close_reason()`: - `DG close event: CloseResponse(type='Close')` - `DG error event: ErrorResponse(...)` 3. **First reason wins** — ALL write paths guard with `if self._death_reason is None`. The first close event is the root cause; subsequent failures are no-ops. 4. **transcribe.py** includes the reason in the operator-facing log: ``` DG connection died mid-session uid=abc session=xyz reason=send ConnectionResetError: Connection reset by peer ``` ### Example Cloud Logging queries after deploy ``` # See all disconnect reasons "DG connection died mid-session" "reason=" # Network issues (code 1006 = abnormal) "reason=" "ConnectionResetError" # DG server errors "reason=" "DG error event" # Keepalive failures specifically "reason=" "keep_alive" ``` ### Review cycle - **CODEx review (R1)**: Found first-reason-wins bug — `send()`/`_send_keepalive_locked()` unconditionally overwrote `_death_reason`. Fixed with `if None` guard on all write paths. - **CODEx review (R2)**: Approved (`PR_APPROVED_LGTM`). Thread safety confirmed, PII risk low. - **Tester (T1)**: Found 3 coverage gaps: (1) DG callback wiring untested, (2) gated delegation untested, (3) exception permutations incomplete. - **Tester (T2)**: Approved (`TESTS_APPROVED`). All 3 gaps closed with 5 new tests. ### Testing - 37 tests passing (23 existing + 14 new) - Coverage: death_reason lifecycle, exception capture, first-reason-wins matrix (all orderings), DG callback wiring, gated delegation, non-safe socket fallback ## Deployment **Services affected**: `backend-listen` **No env vars needed** **Risk**: Low — logging-only change, no behavior change Relates to BasedHardware#5870 _by AI for @beastoin_
Summary
SafeDeepgramSocket auto-keepalive architecture — background daemon thread sends keepalive when DG connection idle > 5s. Fixes silent DG connection death during idle periods (e.g., speech profile phase, VAD silence gaps).
Re-applies PR #5871 (reverted in ab24914) with improved architecture.
Problem
Deepgram WebSocket connections silently die after 10s of inactivity. The previous keepalive logic was scattered across
transcribe.pyandvad_gate.py, making it hard to reason about ownership and causing race conditions.Architecture
utils/stt/safe_socket.py): Lightweight wrapper — SOLE keepalive owner for a DG connectionsend()orkeep_alive()returning False/exception marks connection permanently deadthreading.Lockserializes all operationstime.monotonicby default, injectable for deterministic testingfinish(): Second call is a no-opChanges
utils/stt/safe_socket.pyutils/stt/vad_gate.pykeep_alive()from GatedDeepgramSocket, delegates to SafeDeepgramSocketutils/stt/streaming.pyrouters/transcribe.pytests/unit/test_streaming_deepgram_backoff.pytests/unit/test_vad_gate.pyTesting
Unit tests: 138 passing (thread safety, boundaries, dead detection, profile routing fallback)
L2 Live test — without VAD gate (evidence):
L2 Live test — with VAD gate active (evidence):
Codex review: Approved (4 rounds)
Codex tester: Approved (3 rounds)
Deployment
Services affected:
backend-listen(WebSocket/v4/listenendpoint)No env vars needed — SafeDeepgramSocket uses hardcoded defaults (5s keepalive interval, 1s check period). KeepaliveConfig is constructor-injectable for future tuning.
Deployment steps:
maingcp_backend_auto_dev.yml(backend/** changes)dg-keepalivethread activity, confirm noDG connection diederrorsresource.type="k8s_container" resource.labels.container_name="backend-listen" "keepalive"— should see keepalive activity during idle periods"DG connection died"— should be zero or significantly reduced vs baselineRollback: Revert the merge commit. SafeDeepgramSocket is self-contained — removing it restores previous behavior where DG connections timeout after 10s idle.
Risk: Low. SafeDeepgramSocket wraps the existing DG connection transparently. The only behavior change is: keepalive messages are now sent during idle periods instead of letting the connection die.
Test plan
Closes #5870
by AI for @beastoin