Skip to content

Batch DG usage Redis writes every 60s instead of per-chunk#5868

Merged
beastoin merged 5 commits intomainfrom
fix/redis-dg-usage-batch-5854
Mar 21, 2026
Merged

Batch DG usage Redis writes every 60s instead of per-chunk#5868
beastoin merged 5 commits intomainfrom
fix/redis-dg-usage-batch-5854

Conversation

@beastoin
Copy link
Copy Markdown
Collaborator

@beastoin beastoin commented Mar 20, 2026

Summary

Fixes Redis ops/sec spike caused by record_dg_usage_ms being called on every audio chunk in the realtime transcription pipeline.

Root cause

record_dg_usage_ms (2 Redis ops: INCRBY + EXPIRE) was called per audio chunk (~50 chunks/sec/session). With ~100 concurrent sessions across 22 pods, this produced 8,500-12,500 Redis ops/sec on Redis Cloud — up from ~0 before FAIR_USE_ENABLED=true was deployed.

Fix

Added local accumulator dg_usage_ms_pending that batches DG usage locally and flushes to Redis every 60s via the existing _record_usage_periodically loop, plus on session end.

  • 4 per-chunk record_dg_usage_ms(uid, chunk_ms) calls → dg_usage_ms_pending += chunk_ms
  • 2 flush points: periodic (every 60s) and session-end cleanup
  • Proper nonlocal declarations for all 3 nested functions (_record_usage_periodically, receive_data, flush_stt_buffer)
  • DG flush moved above use_custom_stt guard for consistency

Impact

  • Redis ops: ~100/sec/session → ~0.03/sec/session (~3000x reduction)
  • Expected total Redis ops: 8,500-12,500 → ~10-15 ops/sec
  • DG budget enforcement delay: up to 60s (acceptable — budget is daily, not per-second)

Review cycle

  • Greptile: found 2 issues (missing nonlocal in receive_data, flush ordering) — both fixed
  • CODEx reviewer: PR_APPROVED_LGTM (2 rounds)
  • CODEx tester: TESTS_APPROVED

Tests

  • 11 new unit tests (test_dg_usage_batch.py): structure (5), behavior (4), math (2)
  • Updated test_fair_use_api.py: budget accounting test matches batched pattern
  • test.sh updated with new test file
  • 81 existing fair-use tests pass
tests/unit/test_dg_usage_batch.py      — 11 passed
tests/integration/test_fair_use_api.py — 22 passed
tests/unit/test_fair_use_engine.py     — 56 passed
tests/unit/test_fair_use_models.py     — 11 passed
tests/unit/test_fair_use_async.py      — 14 passed

Local dev backend evidence

  • Backend starts cleanly on PR branch
  • WebSocket /v4/listen reachable (403 for unauthenticated — correct)
  • 6/6 batching verification tests pass

Risks

  • DG budget enforcement has up to 60s delay — acceptable for a daily budget
  • Session-end flush ensures no usage lost on disconnect

by AI for @beastoin

record_dg_usage_ms was called on every audio chunk (~50/sec/session),
causing ~100 Redis ops/sec/session. With ~100 concurrent sessions this
produced 8.5-12.5k ops/sec on Redis Cloud.

Fix: accumulate DG usage ms locally in dg_usage_ms_pending and flush
to Redis every 60s in _record_usage_periodically, plus on session end.
Reduces Redis ops from ~100/sec/session to ~0.03/sec/session (~3000x).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Mar 20, 2026

Greptile Summary

This PR reduces Redis write pressure by replacing per-chunk record_dg_usage_ms calls (previously ~50/sec/session) with a local integer accumulator (dg_usage_ms_pending) that is flushed to Redis every 60 s via the existing _record_usage_periodically loop and once more at session teardown — a sound approach for a daily-budget metric.

Key issues found:

  • Critical (UnboundLocalError in multi-channel path): The augmented assignment dg_usage_ms_pending += mc_chunk_ms at line 2427 lives directly inside receive_data, but that function's nonlocal declarations do not include dg_usage_ms_pending. Python will treat it as a local variable and raise UnboundLocalError for every multi-channel fair-use session. The nonlocal added to the nested flush_stt_buffer does not cover this.
  • DG flush positioned after use_custom_stt: continue: In _record_usage_periodically, the new flush block (lines 451-454) is placed after the guard that calls continue for custom-STT sessions. While harmless today (custom-STT sessions never open DG/Soniox/Speechmatics sockets), moving the flush above the guard would be more defensive and consistent with how speech-ms flushes are handled.
  • Otherwise correct: Periodic flush + session-end flush in finally cover all normal and short-session cases; nonlocal dg_usage_ms_pending is properly declared in both _record_usage_periodically and flush_stt_buffer; accumulation conditions match flush conditions.

Confidence Score: 2/5

  • Not safe to merge as-is: a missing nonlocal declaration causes UnboundLocalError for all multi-channel fair-use sessions.
  • The optimization logic is correct and the single-channel path works, but the multi-channel code path in receive_data is broken due to a missing nonlocal dg_usage_ms_pending declaration — a one-line fix, but a runtime crash for affected users.
  • backend/routers/transcribe.py — specifically the receive_data function's nonlocal declarations and the ordering of the DG flush within _record_usage_periodically.

Important Files Changed

Filename Overview
backend/routers/transcribe.py Batches per-chunk Redis writes into a 60s accumulator (dg_usage_ms_pending). Critical bug: missing nonlocal dg_usage_ms_pending in receive_data causes UnboundLocalError for all multi-channel sessions; secondary concern about the DG flush being placed after use_custom_stt: continue.

Sequence Diagram

sequenceDiagram
    participant AC as Audio Chunk (per 20ms)
    participant RD as receive_data / flush_stt_buffer
    participant ACC as dg_usage_ms_pending (local int)
    participant PL as _record_usage_periodically (60s)
    participant FIN as finally block (session end)
    participant RD2 as Redis (record_dg_usage_ms)

    AC->>RD: audio arrives (~50x/sec)
    RD->>ACC: dg_usage_ms_pending += chunk_ms
    Note over RD,ACC: No Redis write here (was 2 ops/chunk before)

    loop Every 60 seconds
        PL->>ACC: read dg_usage_ms_pending
        PL->>RD2: record_dg_usage_ms(uid, pending)
        PL->>ACC: dg_usage_ms_pending = 0
    end

    Note over FIN: Session disconnect / timeout
    FIN->>ACC: read dg_usage_ms_pending (remaining)
    FIN->>RD2: record_dg_usage_ms(uid, pending)
    FIN->>ACC: dg_usage_ms_pending = 0
Loading

Comments Outside Diff (2)

  1. backend/routers/transcribe.py, line 2279-2282 (link)

    P0 Missing nonlocal dg_usage_ms_pending in receive_data

    The augmented assignment dg_usage_ms_pending += mc_chunk_ms at line 2427 is directly inside receive_data, not inside the nested flush_stt_buffer. Because Python treats any function that contains an assignment (including +=) to a name as if that name is a local variable, this will raise an UnboundLocalError: local variable 'dg_usage_ms_pending' referenced before assignment at runtime for every multi-channel session where FAIR_USE_ENABLED and FAIR_USE_RESTRICT_DAILY_DG_MS > 0.

    The nonlocal dg_usage_ms_pending added to flush_stt_buffer (line 2292) does NOT help here — each function scope is analyzed independently. receive_data itself needs the declaration.

  2. backend/routers/transcribe.py, line 436-454 (link)

    P1 DG usage flush skipped entirely for non-custom-STT sessions with Soniox/Speechmatics

    The if use_custom_stt: continue on line 437 causes every loop iteration to skip the DG usage flush (lines 451-454) when use_custom_stt is True. For pure custom-STT sessions this is harmless because none of the STT sockets (DG / Soniox / Speechmatics) are opened, so dg_usage_ms_pending stays at 0.

    However, there is a subtler concern: the check appears before the DG flush, meaning any future code path that sets use_custom_stt = True while still using a Soniox or Speechmatics socket would silently leak accumulated usage and never flush to Redis. The session-end finally flush is also guarded by if not use_custom_stt, so those sessions would lose all pending DG usage at teardown too.

    As a defensive measure, consider moving the DG flush above the use_custom_stt guard (or outside it), mirroring how the speech-profile flush is handled unconditionally:

    # Flush batched DG usage to Redis — always, regardless of use_custom_stt
    if FAIR_USE_ENABLED and FAIR_USE_RESTRICT_DAILY_DG_MS > 0 and dg_usage_ms_pending > 0:
        record_dg_usage_ms(uid, dg_usage_ms_pending)
        dg_usage_ms_pending = 0
    
    if use_custom_stt:
        continue

Last reviewed commit: "Batch DG usage Redis..."

@beastoin
Copy link
Copy Markdown
Collaborator Author

Local Dev Backend Test Evidence

1. Backend startup — OK

INFO:     Loading environment from '.env'
Using cache found in pretrained_models/snakers4_silero-vad_master
INFO:     Started server process [4000907]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:10260 (Press CTRL+C to quit)

2. WebSocket endpoint reachable — OK

/v4/listen → HTTP 403 (correctly rejects unauthenticated)

3. DG Usage Batching Verification — 6/6 PASSED

TEST 1: Verify per-chunk paths use local accumulator, not Redis
  Total record_dg_usage_ms() calls in transcribe.py: 2
    Line 453: record_dg_usage_ms(uid, dg_usage_ms_pending)   ← periodic flush
    Line 2674: record_dg_usage_ms(uid, dg_usage_ms_pending)  ← session-end flush
  PASS: Only 2 record_dg_usage_ms calls (periodic + session-end flush)

TEST 2: Verify hot path uses dg_usage_ms_pending accumulation
  dg_usage_ms_pending += occurrences: 4
    Line 2315: dg_usage_ms_pending += chunk_ms    ← Deepgram
    Line 2345: dg_usage_ms_pending += chunk_ms    ← Soniox
    Line 2365: dg_usage_ms_pending += chunk_ms    ← Speechmatics
    Line 2427: dg_usage_ms_pending += mc_chunk_ms ← multi-channel
  PASS: 4 accumulation points

TEST 3: Verify nonlocal declarations for dg_usage_ms_pending
  PASS: 2 nonlocal declarations (periodic loop + flush_stt_buffer)

TEST 4: Verify flush resets accumulator to 0
  PASS: 2 reset points (periodic flush + session-end flush)

TEST 5: Verify record_dg_usage_ms function is importable and callable
  PASS: record_dg_usage_ms works correctly with batched input

TEST 6: Redis ops/sec reduction calculation
  Before: 100 Redis ops/sec/session (50 chunks × 2 ops)
  After:  0.0333 Redis ops/sec/session (1 flush / 60s × 2 ops)
  Reduction: 3000x
  At 100 concurrent sessions:
    Before: 10,000 ops/sec
    After:  3.3 ops/sec
  PASS: 3000x reduction verified

ALL 6 TESTS PASSED

4. Existing fair-use tests — 81 passed

tests/unit/test_fair_use_engine.py   — 56 passed
tests/unit/test_fair_use_models.py   — 11 passed
tests/unit/test_fair_use_async.py    — 14 passed

by AI for @beastoin

Copy link
Copy Markdown
Collaborator Author

@beastoin beastoin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Greptile review fixes:
1. Add nonlocal dg_usage_ms_pending to receive_data() — without it,
   multi-channel sessions would hit UnboundLocalError on the += line.
2. Move DG usage flush above use_custom_stt:continue guard so all
   STT paths get flushed consistently.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@beastoin
Copy link
Copy Markdown
Collaborator Author

Greptile Review Fixes Applied

Both issues from Greptile's review have been fixed in commit d8d7a17:

Fix 1 (Critical): nonlocal in receive_data()

  • Added nonlocal dg_usage_ms_pending to receive_data() function
  • Without this, multi-channel sessions would hit UnboundLocalError on dg_usage_ms_pending += mc_chunk_ms
  • Now 3 nonlocal declarations cover all nested functions: _record_usage_periodically, receive_data, flush_stt_buffer

Fix 2: Move DG flush before use_custom_stt guard

  • Moved the record_dg_usage_ms flush block above the if use_custom_stt: continue guard
  • Ensures all STT paths get flushed consistently

Re-verified

  • Backend starts cleanly with fixes
  • 6/6 batching verification tests pass (updated to expect 3 nonlocal declarations)
  • 81 existing fair-use unit tests pass

by AI for @beastoin

beastoin and others added 3 commits March 20, 2026 23:55
11 tests covering:
- Structure: no per-chunk Redis calls, 4 accumulation points, 3 nonlocal
  declarations, 2 flush resets, flush before custom-STT guard
- Behavior: batched 60s single Redis write, large accumulation no overflow,
  disabled skips Redis, zero ms skips Redis
- Math: 3000x reduction factor, 100 sessions ops calculation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
test_budget_accounting_across_providers now checks for 4 accumulation
points (dg_usage_ms_pending +=) and 2 flush calls instead of 4 direct
record_dg_usage_ms calls.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@beastoin
Copy link
Copy Markdown
Collaborator Author

All checkpoints passed — PR ready for merge

Checkpoint Status
CP0-CP6 Implementation + PR created
CP7 CODEx reviewer: PR_APPROVED_LGTM (2 rounds, Greptile fixes included)
CP8 CODEx tester: TESTS_APPROVED (11 new tests + updated integration test)
CP9 Local dev backend validation: startup OK, WebSocket reachable, batching verified

Awaiting explicit merge approval.

by AI for @beastoin

@beastoin beastoin merged commit 6ef55ad into main Mar 21, 2026
2 checks passed
@beastoin beastoin deleted the fix/redis-dg-usage-batch-5854 branch March 21, 2026 00:02
@beastoin
Copy link
Copy Markdown
Collaborator Author

Post-deploy verification — SUCCESS

Deployed by @mon, 25/25 pods running.

Metric Before After Change
Redis ops/sec 8,500-12,500 313 -97%
Redis errors (5min) periodic bursts 0 Resolved
fair_use keys active 129+ bucket keys Normal
fair_use logs flowing flowing Normal

The remaining ~313 ops/sec is the expected baseline from record_speech_ms (every 60s/session) + cap checks (every 5min) — not the per-chunk DG tracking that was causing the spike.

by AI for @beastoin

Glucksberg pushed a commit to Glucksberg/omi-local that referenced this pull request Apr 28, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant