Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
81067f1
fix(webhooks): remove bytes(data) copy and cap asyncio.gather concurr…
beastoin Apr 17, 2026
aa1ab04
fix(webhooks): remove bytes(data) copy in send_audio_bytes_developer_…
beastoin Apr 17, 2026
68fc153
fix(http_client): add TTL eviction for circuit breakers and latest-wins
beastoin Apr 17, 2026
8035189
fix(pusher): increase storage_executor from 4 to 16 workers
beastoin Apr 17, 2026
620df48
fix(pusher): cap private_cloud_queue with deque(maxlen=10) to prevent…
beastoin Apr 17, 2026
42e4fb3
test: update storage_executor assertion from 4 to 16 workers
beastoin Apr 17, 2026
1e8ff43
docs(backend): update storage_executor to 16 workers and queue cap go…
beastoin Apr 17, 2026
b2c4597
fix(pusher): add overflow warning to disconnect flush enqueue path
beastoin Apr 17, 2026
99cf227
fix(http_client): evict open/half_open circuit breakers after idle TTL
beastoin Apr 17, 2026
020018d
test: update webhook test to verify bytearray passed directly (no copy)
beastoin Apr 17, 2026
f090f12
fix(webhooks): restore bytes() conversion for httpx 0.28 compatibility
beastoin Apr 17, 2026
ea9f062
fix(webhooks): restore bytes() conversion in send_audio_bytes_develop…
beastoin Apr 17, 2026
762c28a
test: verify bytes() conversion at httpx call site for audio webhook
beastoin Apr 17, 2026
878b47b
fix(http_client): use last-access time for circuit breaker eviction
beastoin Apr 17, 2026
1cd82e2
test: add queue cap, deque drop-oldest, and circuit breaker access tests
beastoin Apr 17, 2026
747acbe
test: add 12-app chunked fan-out test for audio bytes
beastoin Apr 17, 2026
a4f085d
test: fix webhook test import by stubbing database.folders and databa…
beastoin Apr 17, 2026
e0299ca
test: fix cross-test contamination in app_integrations mock stubs
beastoin Apr 17, 2026
41d99fa
fix(pusher): raise private_cloud_queue cap from 10 to 20
beastoin Apr 17, 2026
dd18a5b
test: update queue cap test to match new maxlen=20
beastoin Apr 17, 2026
4be536e
docs(backend): update queue cap to maxlen=20 in gotcha
beastoin Apr 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ Never block the event loop — it freezes health checks, HPA scaling, and all co
- Semaphores: always wrap calls — `async with get_webhook_semaphore(): await client.post(...)`
- Circuit breakers: `get_webhook_circuit_breaker(url)` for external targets — call `cb.record_success()`/`cb.record_failure()`
- Lifecycle: lazy singletons, closed at shutdown via `close_all_clients()`
- **Lane 2 — Executors** (`utils/executors.py`): `critical_executor` (8 workers) and `storage_executor` (4 workers). Never ad-hoc `Thread`/`ThreadPoolExecutor`. Use `loop.run_in_executor(critical_executor, fn)`.
- **Lane 2 — Executors** (`utils/executors.py`): `critical_executor` (8 workers) and `storage_executor` (16 workers). Never ad-hoc `Thread`/`ThreadPoolExecutor`. Use `loop.run_in_executor(critical_executor, fn)`.
- Deadlock rule: coordinators that fan out to `critical_executor` must run in default executor (`None`)
- **Lane 3 — Lint**: `python scripts/lint_async_blockers.py` catches `requests.*`, `time.sleep()`, `Thread().start()` in async code. Run before committing.
- **Shutdown**: `close_all_clients()` + `shutdown_executors()` wired in `main.py` and `pusher/main.py`.
Expand All @@ -160,6 +160,6 @@ Never block the event loop — it freezes health checks, HPA scaling, and all co
7. **Firestore collection group queries** need explicit indexes — 500 with no useful error
8. **Mutable WebSocket state races** — snapshot `nonlocal` variables before spawning async work
9. **Silent fire-and-forget drops** — functions gating on connection state must log when dropping work
10. **Unbounded queues for user data** — `deque(maxlen=N)` silently drops audio; data-safety queues must stay unbounded
10. **Queue caps for user data** — `private_cloud_queue` uses `deque(maxlen=20)` to prevent OOM kills (sized for 30 conns/pod); dropping oldest chunk is better than killing the pod and losing ALL data for ALL users
11. **`langdetect` unreliable on short text** — don't use on <20 chars or gate paid API calls on interim streaming text
12. **DG keepalive vs response timeout** — `keep_alive()` prevents DG's 10s idle timeout but NOT 1011 response timeout after all audio is processed. Post-session 1011 is benign.
32 changes: 24 additions & 8 deletions backend/routers/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from collections import deque
from datetime import datetime, timezone
from typing import Dict, List, Set
from typing import Dict, Set

from fastapi import APIRouter
from fastapi.websockets import WebSocketDisconnect, WebSocket
Expand Down Expand Up @@ -50,8 +50,8 @@
PRIVATE_CLOUD_BATCH_MAX_AGE = 60.0 # seconds — flush batch if oldest chunk exceeds this age
PRIVATE_CLOUD_SYNC_MAX_RETRIES = 3

# Queue warning thresholds
PRIVATE_CLOUD_QUEUE_WARN_SIZE = 50
# Queue size limits
PRIVATE_CLOUD_QUEUE_MAX_SIZE = 20 # ~18MB/connection max (30 conns × 18MB = 540MB) — prevents OOM with headroom
SPEAKER_SAMPLE_QUEUE_WARN_SIZE = 100

# Constants for transcript queue batching
Expand Down Expand Up @@ -169,9 +169,10 @@ def on_done(t):
transcript_queue: deque = deque(maxlen=TRANSCRIPT_QUEUE_WARN_SIZE)
audio_bytes_queue: deque = deque(maxlen=AUDIO_BYTES_QUEUE_WARN_SIZE)

# private_cloud_queue stays unbounded — it carries irreplaceable user audio.
# Silent drops (via deque maxlen) would cause permanent data loss.
private_cloud_queue: List[dict] = []
# private_cloud_queue caps at PRIVATE_CLOUD_QUEUE_MAX_SIZE to prevent OOM kills.
# An OOM kill loses ALL queued data for ALL users on the pod — dropping the oldest
# chunk for one user is strictly better than killing the pod.
private_cloud_queue: deque = deque(maxlen=PRIVATE_CLOUD_QUEUE_MAX_SIZE)
audio_bytes_event = asyncio.Event() # Signals when items are added for instant wake

async def process_private_cloud_queue():
Expand Down Expand Up @@ -206,6 +207,7 @@ async def _flush_batch(conv_id: str):
if not batch or len(batch['data']) == 0:
return
chunk_data = bytes(batch['data'])
del batch['data'] # free bytearray immediately — chunk_data holds the bytes copy
timestamp = batch['timestamp']
retries = batch.get('retries', 0)
try:
Expand All @@ -214,6 +216,7 @@ async def _flush_batch(conv_id: str):
await loop.run_in_executor(
storage_executor, upload_audio_chunks_batch, chunks_to_upload, uid, conv_id, cached_protection_level
)
del chunks_to_upload
try:
audio_files = await loop.run_in_executor(
storage_executor, conversations_db.create_audio_files_from_chunks, uid, conv_id
Expand Down Expand Up @@ -397,6 +400,11 @@ async def receive_tasks():
and current_conversation_id != new_conversation_id
and len(private_cloud_sync_buffer) > 0
):
if len(private_cloud_queue) >= PRIVATE_CLOUD_QUEUE_MAX_SIZE:
logger.warning(
f"private_cloud_queue full ({len(private_cloud_queue)}/{PRIVATE_CLOUD_QUEUE_MAX_SIZE}), "
f"dropping oldest chunk to prevent OOM {uid}"
)
private_cloud_queue.append(
{
'data': bytes(private_cloud_sync_buffer),
Expand Down Expand Up @@ -483,8 +491,11 @@ async def receive_tasks():
private_cloud_sync_buffer.extend(audio_data)
# Queue chunk every PRIVATE_CLOUD_CHUNK_DURATION seconds
if len(private_cloud_sync_buffer) >= sample_rate * 2 * PRIVATE_CLOUD_CHUNK_DURATION:
if len(private_cloud_queue) >= PRIVATE_CLOUD_QUEUE_WARN_SIZE:
logger.warning(f"Warning: private_cloud_queue size {len(private_cloud_queue)} {uid}")
if len(private_cloud_queue) >= PRIVATE_CLOUD_QUEUE_MAX_SIZE:
logger.warning(
f"private_cloud_queue full ({len(private_cloud_queue)}/{PRIVATE_CLOUD_QUEUE_MAX_SIZE}), "
f"dropping oldest chunk to prevent OOM {uid}"
)
private_cloud_queue.append(
{
'data': bytes(private_cloud_sync_buffer),
Expand Down Expand Up @@ -537,6 +548,11 @@ async def receive_tasks():
finally:
# Flush any remaining private cloud sync buffer before shutdown
if private_cloud_sync_enabled and current_conversation_id and len(private_cloud_sync_buffer) > 0:
if len(private_cloud_queue) >= PRIVATE_CLOUD_QUEUE_MAX_SIZE:
logger.warning(
f"private_cloud_queue full ({len(private_cloud_queue)}/{PRIVATE_CLOUD_QUEUE_MAX_SIZE}), "
f"dropping oldest chunk to prevent OOM {uid}"
)
private_cloud_queue.append(
{
'data': bytes(private_cloud_sync_buffer),
Expand Down
54 changes: 43 additions & 11 deletions backend/tests/unit/test_async_app_integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,22 @@ def _noop_track(uid, feature):
# Stub llms.memory
sys.modules["utils.llms.memory"].get_prompt_memories = MagicMock(return_value=[])

# Stub http_client
sys.modules["utils.http_client"].get_webhook_client = MagicMock()
sys.modules["utils.http_client"].get_maps_client = MagicMock()
_mock_cb = MagicMock()
_mock_cb.allow_request = MagicMock(return_value=True)
_mock_cb.record_success = MagicMock()
_mock_cb.record_failure = MagicMock()
sys.modules["utils.http_client"].get_webhook_circuit_breaker = MagicMock(return_value=_mock_cb)
# Stub http_client — only set mock attributes on stub modules (not the real module)
import asyncio as _asyncio

sys.modules["utils.http_client"].get_webhook_semaphore = MagicMock(return_value=_asyncio.Semaphore(64))
sys.modules["utils.http_client"].latest_wins_start = MagicMock(return_value=1)
sys.modules["utils.http_client"].latest_wins_check = MagicMock(return_value=True)
_http_mod = sys.modules.get("utils.http_client")
if _http_mod is not None and not hasattr(_http_mod, '__file__'):
# Stub module — safe to add mock attributes for import resolution
_http_mod.get_webhook_client = MagicMock()
_http_mod.get_maps_client = MagicMock()
_mock_cb = MagicMock()
_mock_cb.allow_request = MagicMock(return_value=True)
_mock_cb.record_success = MagicMock()
_mock_cb.record_failure = MagicMock()
_http_mod.get_webhook_circuit_breaker = MagicMock(return_value=_mock_cb)
_http_mod.get_webhook_semaphore = MagicMock(return_value=_asyncio.Semaphore(64))
_http_mod.latest_wins_start = MagicMock(return_value=1)
_http_mod.latest_wins_check = MagicMock(return_value=True)

# Stub executors — must use real ThreadPoolExecutor because asyncio's
# run_in_executor calls executor.submit() and wraps the returned Future.
Expand Down Expand Up @@ -235,6 +238,35 @@ async def test_no_threading_used(self):
mock_threading.Thread.assert_not_called()


class TestAudioBytesChunkedFanOut:
"""Test >8 apps are sent in chunked batches."""

@pytest.mark.asyncio
async def test_12_apps_sent_in_two_chunks(self):
"""12 apps should be sent in chunks of 8 + 4."""
apps = []
for i in range(12):
app = MagicMock()
app.id = f"app-{i}"
app.triggers_realtime_audio_bytes.return_value = True
app.enabled = True
app.external_integration.webhook_url = f"https://app{i}.test/audio"
apps.append(app)

mock_response = MagicMock()
mock_response.status_code = 200
mock_client = AsyncMock()
mock_client.post = AsyncMock(return_value=mock_response)

with patch.object(app_integrations, "get_available_apps", return_value=apps), patch(
"utils.app_integrations.get_webhook_client", return_value=mock_client
):
await app_integrations.trigger_realtime_audio_bytes("uid-1", 8000, bytearray(b'\x00' * 100))

# All 12 apps should have received the audio
assert mock_client.post.call_count == 12


class TestAsyncTriggerRealtimeIntegrations:
"""Test async realtime integration fan-out."""

Expand Down
122 changes: 119 additions & 3 deletions backend/tests/unit/test_async_http_infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,9 @@ def test_critical_executor_has_8_workers(self):
"""critical_executor documented as 8 workers for latency-sensitive work."""
assert critical_executor._max_workers == 8

def test_storage_executor_has_4_workers(self):
"""storage_executor documented as 4 workers for batch I/O."""
assert storage_executor._max_workers == 4
def test_storage_executor_has_16_workers(self):
"""storage_executor sized for 16 workers to handle concurrent private cloud uploads."""
assert storage_executor._max_workers == 16


class TestNotificationWebhookWiring:
Expand All @@ -408,3 +408,119 @@ def test_send_summary_calls_storage_executor_with_asyncio_run(self):
# Verify the exact wiring pattern
assert 'storage_executor.submit(asyncio.run, day_summary_webhook(' in src
assert 'critical_executor' not in src


class TestPrivateCloudQueueCap:
"""Verify private_cloud_queue uses bounded deque to prevent OOM."""

def test_pusher_uses_deque_with_maxlen(self):
"""private_cloud_queue must be deque(maxlen=PRIVATE_CLOUD_QUEUE_MAX_SIZE)."""
import ast
import os

backend_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
with open(os.path.join(backend_dir, 'routers', 'pusher.py')) as f:
src = f.read()

assert 'deque(maxlen=PRIVATE_CLOUD_QUEUE_MAX_SIZE)' in src
assert 'private_cloud_queue: List[dict] = []' not in src

def test_queue_max_size_is_20(self):
"""Queue cap should be 20 items (~18MB max per connection, safe for 30-conn pods)."""
import ast
import os

backend_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
with open(os.path.join(backend_dir, 'routers', 'pusher.py')) as f:
src = f.read()

tree = ast.parse(src)
for node in ast.walk(tree):
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id == 'PRIVATE_CLOUD_QUEUE_MAX_SIZE':
assert isinstance(node.value, ast.Constant)
assert node.value.value == 20
return
pytest.fail("PRIVATE_CLOUD_QUEUE_MAX_SIZE constant not found")

def test_overflow_warning_at_all_enqueue_points(self):
"""All 3 enqueue points must log overflow warning before deque drops oldest."""
import os

backend_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
with open(os.path.join(backend_dir, 'routers', 'pusher.py')) as f:
src = f.read()

# Count occurrences of the overflow warning pattern
warning_count = src.count('private_cloud_queue full')
assert warning_count == 3, f"Expected 3 overflow warnings, found {warning_count}"

def test_deque_maxlen_drops_oldest(self):
"""Verify deque(maxlen=N) drops oldest item when full."""
from collections import deque

q = deque(maxlen=3)
q.append({'id': 1})
q.append({'id': 2})
q.append({'id': 3})
assert len(q) == 3
q.append({'id': 4}) # oldest (id=1) should be dropped
assert len(q) == 3
assert q[0]['id'] == 2
assert q[-1]['id'] == 4


class TestCircuitBreakerAccessTracking:
"""Verify circuit breaker eviction uses last-access time."""

def test_active_breaker_not_evicted(self):
"""Actively used breaker should not be evicted even with 0 failures."""
import time
from utils.http_client import (
_webhook_circuit_breakers,
get_webhook_circuit_breaker,
_evict_stale_circuit_breakers,
_CIRCUIT_BREAKER_IDLE_TTL,
)

_webhook_circuit_breakers.clear()
cb = get_webhook_circuit_breaker('https://active.test/hook')
cb.allow_request() # Updates _last_access_time to now
assert cb._last_failure_time == 0.0 # Never failed

_evict_stale_circuit_breakers()
assert 'https://active.test/hook' in _webhook_circuit_breakers
_webhook_circuit_breakers.clear()

def test_stale_breaker_evicted(self):
"""Breaker not accessed for > TTL should be evicted."""
import time
from utils.http_client import (
_webhook_circuit_breakers,
get_webhook_circuit_breaker,
_evict_stale_circuit_breakers,
_CIRCUIT_BREAKER_IDLE_TTL,
)

_webhook_circuit_breakers.clear()
cb = get_webhook_circuit_breaker('https://stale.test/hook')
# Backdate access time to exceed TTL
cb._last_access_time = time.monotonic() - _CIRCUIT_BREAKER_IDLE_TTL - 1

_evict_stale_circuit_breakers()
assert 'https://stale.test/hook' not in _webhook_circuit_breakers
_webhook_circuit_breakers.clear()

def test_allow_request_updates_access_time(self):
"""allow_request() must update _last_access_time."""
import time
from utils.http_client import _webhook_circuit_breakers, get_webhook_circuit_breaker

_webhook_circuit_breakers.clear()
cb = get_webhook_circuit_breaker('https://test.test/hook')
old_access = cb._last_access_time
time.sleep(0.01)
cb.allow_request()
assert cb._last_access_time > old_access
_webhook_circuit_breakers.clear()
9 changes: 6 additions & 3 deletions backend/tests/unit/test_async_webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
_db_redis.enable_user_webhook_db = MagicMock()
_db_redis.set_user_webhook_db = MagicMock()

for mod_name in ["database", "database.notifications", "database.users"]:
for mod_name in ["database", "database.notifications", "database.users", "database.folders", "database.conversations"]:
if mod_name not in sys.modules:
sys.modules[mod_name] = types.ModuleType(mod_name)
if mod_name == "database":
Expand All @@ -37,6 +37,8 @@
sys.modules["database.notifications"].get_token_only = MagicMock(return_value=None)
sys.modules["database.users"].get_user_profile = MagicMock(return_value={"name": "Test"})
sys.modules["database.users"].get_people_by_ids = MagicMock(return_value=[])
sys.modules["database.folders"].get_folders = MagicMock(return_value=[])
sys.modules["database.conversations"].get_conversations = MagicMock(return_value=[])

if "utils.notifications" not in sys.modules:
sys.modules["utils.notifications"] = types.ModuleType("utils.notifications")
Expand Down Expand Up @@ -141,8 +143,8 @@ async def test_success_sends_via_httpx(self):
assert call_args.kwargs.get("headers", {}).get("Content-Type") == "application/octet-stream"

@pytest.mark.asyncio
async def test_bytearray_converted_to_bytes(self):
"""Verify bytearray is converted to immutable bytes before sending."""
async def test_bytearray_converted_to_bytes_at_call_site(self):
"""Verify bytearray is converted to bytes inline at httpx call (required by httpx 0.28)."""
mock_response = MagicMock()
mock_response.status_code = 200

Expand All @@ -155,6 +157,7 @@ async def test_bytearray_converted_to_bytes(self):
call_args = mock_client.post.call_args
sent_content = call_args.kwargs.get("content")
assert isinstance(sent_content, bytes)
assert sent_content == b'\xab\xcd'

@pytest.mark.asyncio
async def test_url_comma_parsing(self):
Expand Down
13 changes: 9 additions & 4 deletions backend/utils/app_integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,8 +498,6 @@ async def _async_trigger_realtime_audio_bytes(uid: str, sample_rate: int, data:
if not filtered_apps:
return {}

audio_data = bytes(data)

version = latest_wins_start(uid)

async def _single(app: App):
Expand All @@ -522,15 +520,22 @@ async def _single(app: App):
return # Check again after acquiring semaphore
client = get_webhook_client()
response = await client.post(
url, content=audio_data, headers={'Content-Type': 'application/octet-stream'}
url, content=bytes(data), headers={'Content-Type': 'application/octet-stream'}
)
logger.info(f'trigger_realtime_audio_bytes {app.id} status: {response.status_code}')
cb.record_success()
except Exception as e:
cb.record_failure()
logger.error(f"Plugin integration error: {e}")

await asyncio.gather(*[_single(app) for app in filtered_apps], return_exceptions=True)
# Cap per-call concurrency: only fan out to 8 apps at a time to limit memory pressure
# from concurrent webhook calls holding references to the audio data
chunk_size = 8
for i in range(0, len(filtered_apps), chunk_size):
chunk = filtered_apps[i : i + chunk_size]
await asyncio.gather(*[_single(app) for app in chunk], return_exceptions=True)
if not latest_wins_check(uid, version):
break # Newer data arrived, stop sending stale chunks
return {}


Expand Down
2 changes: 1 addition & 1 deletion backend/utils/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
logger = logging.getLogger(__name__)

critical_executor = ThreadPoolExecutor(max_workers=8, thread_name_prefix="critical")
storage_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="storage")
storage_executor = ThreadPoolExecutor(max_workers=16, thread_name_prefix="storage")


def shutdown_executors():
Expand Down
Loading
Loading