Skip to content

fix(pusher): cap unbounded private_cloud_queue and increase storage workers to prevent OOM#6762

Merged
beastoin merged 21 commits into
mainfrom
fix/pusher-memory-copies-6022
Apr 17, 2026
Merged

fix(pusher): cap unbounded private_cloud_queue and increase storage workers to prevent OOM#6762
beastoin merged 21 commits into
mainfrom
fix/pusher-memory-copies-6022

Conversation

@beastoin
Copy link
Copy Markdown
Collaborator

@beastoin beastoin commented Apr 17, 2026

Summary

  • Root cause: Unbounded private_cloud_queue (List[dict]) in pusher.py grew to 150+ items per user (~147MB each) when storage_executor (4 workers) was saturated. 96 users × 90 avg items × 960KB ≈ 8.3GB queued audio across pods → OOM kills (173 kills/24h)
  • Cap private_cloud_queue with deque(maxlen=20) — drops oldest chunk when full instead of OOM-killing the entire pod (which loses ALL data for ALL users). Sized for 30 connections/pod (activeConnectionsPerPod: 30), safe cap = floor((4096-2048)/(30×2×0.92)) = 37, using 20 for 1.85× safety margin
  • Increase storage_executor from 4 to 16 workers — 30 concurrent connections need > 4 workers (each flush requires 3 sequential executor calls). With 16 workers: capacity = 192 flushes/min vs 30 peak arrival = 6.4× headroom
  • Add circuit breaker access-time tracking for proper eviction (active endpoints never evicted)
  • Convert bytearray to bytes inline at httpx call sites (httpx 0.28 treats bytearray as iterable)
  • Add TTL eviction to circuit breaker and latest-wins registries
  • Free batch bytearray immediately after bytes() copy in _flush_batch

Sizing math (Codex-verified)

Queue cap (pod limit 4096 MiB, activeConnectionsPerPod: 30):

chunk_size = 8000 Hz × 2 bytes × 60s = 960,000 bytes ≈ 0.92 MiB
effective_cost = 2× (queue item + pending batch copy during flush)
safe_cap = floor((4096 - 2048 reserve) / (30 conns × 2 × 0.92 MiB)) = 37
chosen_cap = 20 (1.85× safety margin)
max_memory = 30 × 20 × 0.92 MiB ≈ 552 MiB (within 2 GiB budget)

Worker count (each flush = 3 sequential executor calls, 2-5s each):

peak_workers_needed = ceil(30 conns × 5s / 60s) = 3 (minimum)
with shared workload = 8 (comfortable minimum)
chosen = 16 (6.4× headroom, handles GCS slowdowns)
capacity = 16 × 60/5 = 192 flushes/min vs 30 arrival = 6.4× margin

Live profiling evidence (prod pod gqxxz, 905 MB RSS)

=== VARIABLE-LEVEL BREAKDOWN ===
private_cloud_queue:       322 items,  528 MB  (58% of RSS)
private_cloud_sync_buffer:              18 MB
pending batches:                         1 MB
audiobuffer / trigger_audiobuffer:       0 KB

storage_executor: workers=4, queue=35  ← BOTTLENECK
critical_executor: workers=8, queue=0
active asyncio tasks: 305
active WebSocket connections: 34

Per-connection queue sizes (sampled):

uid=w2dmLiIN: pcq=12 items, 18634 KB
uid=h8FqZBnt: pcq=12 items, 17113 KB
uid=h4marMt0: pcq=10 items, 17821 KB
uid=ysYXmkJ0: pcq=9  items, 15180 KB
uid=pVs6EWV3: pcq=9  items, 17009 KB
uid=mLn3xpaw: pcq=9  items, 16913 KB
uid=i4ZjmXWe: pcq=9  items, 13325 KB
uid=v3Fvmjpn: pcq=7  items, 11365 KB
uid=nxPUo6cK: pcq=7  items, 11026 KB
uid=xJ7bU0R5: pcq=6  items, 11391 KB

Memory growth trajectory (pod w5p69, 2-hour capture):

07:26  0.9 GB → 09:24  4.7 GB  (1.9 GB/hour, perfectly linear)

Kill timeline: 173 kills in 24h, pods hit 4608Mi limit in ~2 hours

Root cause chain:

  1. 34 connections with private_cloud_sync → each queues ~960KB chunks every 60s
  2. storage_executor has only 4 workers, queue=35 → drain rate < ingest rate
  3. private_cloud_queue is unbounded List[dict] → grows at ~1.9 GB/hour
  4. Pod hits 4608Mi limit → OOM killed → restart cycle

How this PR fixes each link:

Root cause Fix Impact
528 MB in unbounded queue (58% RSS) deque(maxlen=20) (sized for 30 conns/pod) Max ~18 MB/connection, 552 MB pod-wide vs 528 MB single-connection observed
storage_executor queue=35 (4 workers) 4→16 workers (6.4× headroom for 30 conns) Eliminates drain bottleneck, queue should stay near 0
bytes(data) copies Inline bytes() at call site only Prevents extra pinned copies during webhook delivery
Unbounded circuit breaker/latest-wins dicts TTL eviction + access-time tracking Bounds secondary memory growth

Changes

File Change
backend/routers/pusher.py Cap private_cloud_queue with deque(maxlen=20), add drop-oldest logging at all 3 enqueue points, free batch data in _flush_batch
backend/utils/executors.py Increase storage_executor from 4 to 16 workers
backend/utils/http_client.py Add _last_access_time tracking for circuit breaker eviction, TTL eviction for all states, latest-wins TTL eviction
backend/utils/app_integrations.py bytes() inline at httpx call, cap asyncio.gather to 8 apps/batch
backend/utils/webhooks.py bytes() inline at httpx call
backend/CLAUDE.md Update storage_executor to 16 workers, update queue cap gotcha
backend/tests/unit/test_async_http_infrastructure.py Add queue cap, deque drop, circuit breaker access tracking tests
backend/tests/unit/test_async_app_integrations.py Add 12-app chunked fanout test, fix cross-test contamination
backend/tests/unit/test_async_webhooks.py Fix import stubs, update bytes conversion test

Test plan

  • 73 unit tests pass (infra + circuit breaker + app integrations + webhooks)
  • 46/46 E2E code verification tests pass (executor, deque, circuit breaker, httpx, source structural)
  • wscat WebSocket handshake + concurrent connection tests pass
  • Async lint check passes
  • Format check passes
  • Pusher Dockerfile verified — all changed paths included in Docker image

Expected impact

  • Queue memory: Max ~18 MB/connection (20 × 0.92MB) vs 528 MB observed across 34 connections
  • Throughput: 16 storage workers → queue depth near 0 vs queue=35 observed
  • Pod stability: Queue cap prevents OOM; worst case drops oldest 60s chunk per user instead of killing pod
  • Growth rate: Should drop from 1.9 GB/hour to near-zero steady state
  • Data loss risk: Cap=20 (2× old cap) means queue only fills during sustained GCS outage, not normal operation

Closes #6022

by AI for @beastoin

beastoin and others added 3 commits April 17, 2026 08:49
…ency

Remove intermediate bytes(data) copy in _async_trigger_realtime_audio_bytes
that pinned an extra ~64KB per audio chunk for the full duration of the
slowest webhook call. Pass the bytearray directly to httpx (it accepts
bytes-like objects). Also cap per-call concurrency to 8 apps at a time
to limit memory pressure from concurrent webhook calls.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…webhook

Pass bytearray directly to httpx instead of creating an intermediate
bytes copy. Eliminates one ~64KB allocation per audio chunk webhook call.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Both _webhook_circuit_breakers and _latest_wins_versions grew unbounded.
Add TTL-based eviction:
- Circuit breakers: evict idle entries after 1 hour (max 500 entries)
- Latest-wins: evict UIDs not seen in 10 minutes (max 10000 entries)
Also clear both registries on shutdown.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 17, 2026

Greptile Summary

This PR reduces pusher memory pressure by removing redundant bytes(data) copies in the audio webhook fan-out path, capping asyncio.gather concurrency to 8 apps at a time, and adding idle-TTL eviction to the previously unbounded _webhook_circuit_breakers and _latest_wins_versions registries.

  • P1: test_async_webhooks.py::test_bytearray_converted_to_bytes (line 157) asserts isinstance(sent_content, bytes), which fails now that content=data passes a bytearray directly. The test file was not in the PR's listed test runs. It must be updated to assert isinstance(sent_content, (bytes, bytearray)) before merging.
  • P2: _evict_stale_circuit_breakers only considers 'closed' entries; permanently-failing webhook targets remain in 'open' state and are never evicted until pod restart, even after the 1 h idle TTL.

Confidence Score: 4/5

Safe to merge after updating the one broken test assertion in test_async_webhooks.py.

One P1 finding: the existing test test_bytearray_converted_to_bytes explicitly asserts isinstance(sent_content, bytes), which will fail with the new bytearray passthrough. The PR did not run test_async_webhooks.py. All other changes are strictly additive or remove unnecessary copies, and the P2 eviction gap is bounded by the 500-entry soft cap.

backend/utils/webhooks.py (broken companion test) and backend/utils/http_client.py (open-CB eviction gap).

Important Files Changed

Filename Overview
backend/utils/webhooks.py Removes bytes(data) copy — valid memory optimization, but the companion test test_bytearray_converted_to_bytes asserts isinstance(content, bytes) which will fail with bytearray input.
backend/utils/http_client.py Adds TTL eviction for circuit breakers and latest-wins tracking, and clears registries on shutdown; eviction logic has a gap where 'open' circuit breakers are never evicted, and caps are soft rather than hard.
backend/utils/app_integrations.py Removes audio_data = bytes(data) copy and adds per-uid chunked fan-out (8 apps/batch) with inter-chunk latest-wins pruning — logic is correct and strictly reduces memory.

Sequence Diagram

sequenceDiagram
    participant P as pusher.py
    participant AI as app_integrations.py
    participant WH as webhooks.py
    participant CB as CircuitBreaker
    participant LW as latest_wins
    participant HX as httpx.AsyncClient

    P->>AI: trigger_realtime_audio_bytes(uid, sample_rate, data: bytearray)
    Note over AI: chunk_size = 8
    loop for each chunk of 8 apps
        AI->>LW: latest_wins_start(uid) → version
        AI->>CB: get_webhook_circuit_breaker(url)
        CB-->>AI: allow_request()?
        AI->>LW: latest_wins_check(uid, version)
        AI->>HX: client.post(url, content=data ← bytearray, no copy)
        HX-->>AI: response
        AI->>CB: record_success() / record_failure()
        AI->>LW: latest_wins_check → break if stale
    end

    P->>WH: send_audio_bytes_developer_webhook(uid, sample_rate, data: bytearray)
    WH->>CB: get_webhook_circuit_breaker(webhook_url)
    WH->>HX: client.post(webhook_url, content=data ← bytearray, no copy)
    HX-->>WH: response
Loading

Reviews (1): Last reviewed commit: "fix(http_client): add TTL eviction for c..." | Re-trigger Greptile

Comment thread backend/utils/webhooks.py Outdated
client = get_webhook_client()
response = await client.post(
webhook_url, content=bytes(data), headers={'Content-Type': 'application/octet-stream'}
webhook_url, content=data, headers={'Content-Type': 'application/octet-stream'}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Broken existing test assertion

test_async_webhooks.py::TestSendAudioBytesDeveloperWebhook::test_bytearray_converted_to_bytes (line 157) asserts isinstance(sent_content, bytes) — i.e., the content passed to client.post must be a bytes object. The test module stubs user_webhook_status_db to return True, so the client.post() call is always reached. Now that content=bytes(data) has been replaced with content=data, the argument type is bytearray, and isinstance(bytearray(b'\xab\xcd'), bytes) returns False in Python 3. This test will fail.

The PR lists only test_async_http_infrastructure and test_pusher_circuit_breaker as having been run — test_async_webhooks.py was not covered. The test should be updated to reflect the new intended type:

Suggested change
webhook_url, content=data, headers={'Content-Type': 'application/octet-stream'}
webhook_url, content=data, headers={'Content-Type': 'application/octet-stream'}

And in test_async_webhooks.py, update line 157:

assert isinstance(sent_content, (bytes, bytearray))

Comment on lines +113 to +125
def _evict_stale_circuit_breakers():
"""Remove circuit breaker entries that have been idle (closed, no recent failures)."""
now = time.monotonic()
stale_keys = [
k
for k, cb in _webhook_circuit_breakers.items()
if cb._state == 'closed'
and (now - cb._last_failure_time > _CIRCUIT_BREAKER_IDLE_TTL or cb._last_failure_time == 0.0)
]
for k in stale_keys:
del _webhook_circuit_breakers[k]
if stale_keys:
logger.info(f'Evicted {len(stale_keys)} stale circuit breakers, {len(_webhook_circuit_breakers)} remaining')
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Open circuit breakers are never evicted

_evict_stale_circuit_breakers only removes entries whose _state == 'closed'. An entry that has been in 'open' state for hours (a permanently-failing webhook URL that never gets a successful probe) will never be picked up by this eviction, even though it is functionally idle. In a deployment with many transient plugin webhook URLs that go offline permanently, the 500-entry soft cap still allows up to ~500 stale-open entries to accumulate until the next pod restart.

A simple fix would extend the stale criteria to also evict entries that are open/half_open and idle beyond a larger TTL:

stale_keys = [
    k
    for k, cb in _webhook_circuit_breakers.items()
    if cb._state == 'closed'
    and (now - cb._last_failure_time > _CIRCUIT_BREAKER_IDLE_TTL or cb._last_failure_time == 0.0)
    or (cb._state in ('open', 'half_open')
        and now - cb._last_failure_time > _CIRCUIT_BREAKER_IDLE_TTL * 24)  # 24 h for failed targets
]

Comment on lines 106 to 109
if key not in _webhook_circuit_breakers:
if len(_webhook_circuit_breakers) > _CIRCUIT_BREAKER_MAX_ENTRIES:
_evict_stale_circuit_breakers()
_webhook_circuit_breakers[key] = WebhookCircuitBreaker(key)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Soft cap silently exceeded when eviction yields no candidates

The new entry is inserted unconditionally after _evict_stale_circuit_breakers(), regardless of whether any entries were actually freed. If all 500+ existing entries are 'open' or 'half_open' (e.g., during a widespread webhook outage), eviction is a no-op and the dict silently grows past _CIRCUIT_BREAKER_MAX_ENTRIES. The same pattern applies to _latest_wins_versions. This is fine as a best-effort cap but it's worth documenting as a soft upper bound in the constant name or docstring to avoid future confusion.

beastoin and others added 3 commits April 17, 2026 09:10
96+ concurrent private cloud upload coroutines compete for 4 workers,
each requiring 3 sequential executor calls per flush. With 4 workers,
only ~1.3 concurrent flushes are possible — far below the 96 uploads/min
needed, causing private_cloud_queue to grow unboundedly (8.3GB observed).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… OOM

Root cause of 4.5GB memory / 173 OOM kills per 24h: unbounded
private_cloud_queue (List[dict]) grew to 150+ items per user (~147MB each)
when storage_executor was saturated. 96 users × 90 avg items × 960KB =
~8.3GB queued audio across pods.

Fix: deque(maxlen=10) drops oldest chunk when queue is full. An OOM kill
loses ALL queued data for ALL users on the pod — dropping the oldest 60s
chunk for one user is strictly better. Adds explicit warning logging
before drops at both enqueue points.

Also frees batch bytearray immediately after bytes() copy in _flush_batch
to reduce peak memory during upload.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@beastoin beastoin changed the title fix(pusher): reduce memory pressure from webhook audio byte copies fix(pusher): cap unbounded private_cloud_queue and increase storage workers to prevent OOM Apr 17, 2026
beastoin and others added 12 commits April 17, 2026 09:12
…tcha

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
CODEx review caught the third private_cloud_queue enqueue point
(disconnect flush) was missing the drop-oldest warning log.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Previously only closed breakers were evicted. Open/half_open breakers
for abandoned webhook URLs never aged out, allowing monotonic growth.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
httpx 0.28 treats bytearray as iterable, creating an IteratorByteStream
that fails with AsyncClient. Convert bytearray to bytes inline at the
httpx call site (not at function entry) to minimize copy lifetime.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…er_webhook

Same httpx 0.28 bytearray compatibility fix as app_integrations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Track _last_access_time on every allow_request() call so actively used
breakers are never evicted, even if they never failed. Fixes issue where
healthy endpoints with _last_failure_time=0.0 were incorrectly treated
as stale on registry overflow.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- TestPrivateCloudQueueCap: verifies deque(maxlen=10), PRIVATE_CLOUD_QUEUE_MAX_SIZE=10,
  3 overflow warning sites, and deque drop-oldest behavior
- TestCircuitBreakerAccessTracking: verifies active breakers not evicted,
  stale breakers evicted, and allow_request() updates _last_access_time

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Verifies all 12 apps receive audio when sent in chunks of 8.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…se.conversations

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Guard http_client stub attributes with __file__ check to avoid
overwriting the real module when tests are collected together.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@beastoin
Copy link
Copy Markdown
Collaborator Author

E2E Live Test Evidence — curl/wscat/pytest

1. Service Startup

Backend: uvicorn main:app --host 127.0.0.1 --port 10240 (PR branch code)
Pusher:  uvicorn pusher.main:app --host 127.0.0.1 --port 10241 (PR branch code)
Both started successfully with Application startup complete.

2. wscat WebSocket Tests

wscat Test 1: WS handshake to pusher → PASS (connects, server accepts)
wscat Test 2: 5 concurrent WS connections → all 5 completed
Pusher log: 25 connections open, 25 closed, 75 trigger events logged

Server closes with 1006 after Redis auth fails for test UIDs (expected in local dev without prod Redis creds). Key: WS handshake and connection lifecycle works correctly.

3. curl HTTP Tests

GET /docs (backend) → 200 (0.8ms)
GET /docs (pusher)  → 200 (0.5ms)

4. Code Verification (46/46 PASS)

Category Tests Result
storage_executor = 16 workers max_workers check, 16 concurrent task execution (peak=16) 3/3 PASS
deque(maxlen=10) queue cap 25→10 items, oldest dropped, newest kept, overflow count=15, memory bounded (10KB) 5/5 PASS
deque concurrent safety 8 threads × 100 appends, no errors, len=10 1/1 PASS
Circuit breaker access tracking initial state, access time update, open after 5 failures, blocks when open, closes on success, registry key dedup, stale evicted, active kept 8/8 PASS
httpx bytes() conversion bytes() from bytearray, httpx.Request works, bytearray→IteratorByteStream confirmed (the bug) 3/3 PASS
Pusher source verification deque import, PRIVATE_CLOUD_QUEUE_MAX_SIZE=10, deque(maxlen=...), 3 overflow warnings, del batch['data'], del chunks_to_upload, storage_executor import, no List import 8/8 PASS
Webhooks source verification no import requests, uses get_webhook_client, bytes() conversion, 4 async functions verified 7/7 PASS
HTTP client source verification _last_access_time in slots, allow_request updates access time, eviction uses access time, eviction does NOT use failure time 4/4 PASS
Latest-wins tracking versions increment, old stale, current valid 3/3 PASS
Semaphore isolation webhook=64, maps=8, acquire/release 4/4 PASS

5. Unit Tests (73/73 PASS)

tests/unit/test_async_http_infrastructure.py  — 38 passed
tests/unit/test_async_app_integrations.py     — 9 passed
tests/unit/test_async_webhooks.py             — 26 passed
============================== 73 passed in 0.36s ==============================

Key Evidence Summary

  • storage_executor confirmed at 16 workers — ran 16 concurrent tasks with peak_concurrent=16
  • deque(maxlen=10) confirmed — 25 enqueues → 10 items kept, items 0-14 dropped, 15-24 retained, memory bounded at 10KB
  • httpx 0.28 bytearray bug confirmed and fixedbytearrayIteratorByteStream (fails on AsyncClient.send()); bytes() conversion at call site fixes it
  • Circuit breaker eviction fixed — uses _last_access_time (not _last_failure_time); active healthy endpoints no longer evicted

by AI for @beastoin

beastoin and others added 3 commits April 17, 2026 11:35
With activeConnectionsPerPod=30 (not 62), the safe cap is much higher.
Math: floor((4096-2048)/(30*2*0.92)) = 37 theoretical max.
20 provides 1.85x safety margin while halving data loss risk.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@beastoin
Copy link
Copy Markdown
Collaborator Author

lgtm

@beastoin beastoin merged commit 5011187 into main Apr 17, 2026
2 checks passed
@beastoin beastoin deleted the fix/pusher-memory-copies-6022 branch April 17, 2026 11:38
Glucksberg pushed a commit to Glucksberg/omi-local that referenced this pull request Apr 28, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

P0: Pusher retry exhaustion cascade — circuit breaker + graceful degradation

1 participant