Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
6e8104a
Add Opus encoding for private cloud sync chunks (#5418)
beastoin Mar 10, 2026
efee00f
Update merge_conversations extension handling for Opus (#5418)
beastoin Mar 10, 2026
72dfd02
Add unit tests for Opus encoding in private cloud sync (#5418)
beastoin Mar 10, 2026
c36f856
Add opus encoding tests to test.sh (#5418)
beastoin Mar 10, 2026
8010ac3
Batch private cloud uploads by conversation_id (60s window)
beastoin Mar 11, 2026
d5ff4ef
Add unit tests for private cloud batch upload logic
beastoin Mar 11, 2026
368c13a
Add batch upload tests to test.sh
beastoin Mar 11, 2026
b904060
Add upload_audio_chunks_batch with PRIVATE_CLOUD_BATCH_ENABLED flag
beastoin Mar 11, 2026
9a34e31
Add unit tests for upload_audio_chunks_batch (#5418 Phase 2)
beastoin Mar 11, 2026
adefec2
Register batch upload tests in test.sh
beastoin Mar 11, 2026
7ecfa22
Fix audio file duration and gap threshold for 60s chunks
beastoin Mar 11, 2026
00a7064
Update comment to reference PRIVATE_CLOUD_CHUNK_DURATION constant
beastoin Mar 11, 2026
8f42889
Use streaming writes for batch upload to avoid memory spike
beastoin Mar 11, 2026
4e8d2c9
Update batch upload tests for streaming write API
beastoin Mar 11, 2026
ecec7c3
Add boundary tests and conversations.py coverage per tester feedback
beastoin Mar 11, 2026
ac0a825
Add tester-requested coverage: large batch, identical timestamps, API…
beastoin Mar 11, 2026
eb1201a
Harden Opus decode: bounds checking, PCM length preservation, downloa…
beastoin Mar 11, 2026
5f9ff3a
Add decode error handling tests and update header format tests (#5418)
beastoin Mar 11, 2026
c52d45c
Add retry backoff by resetting queued_at on upload failure
beastoin Mar 11, 2026
1b2a860
Add retry backoff tests per tester feedback
beastoin Mar 11, 2026
8336629
Add download fallback path tests for Opus decode/decrypt failure (#5418)
beastoin Mar 11, 2026
5a96db9
Wrap batch upload behind PRIVATE_CLOUD_BATCH_PUSHER_ENABLED env var (…
beastoin Mar 11, 2026
145815a
Feature-flag gap_threshold behind PRIVATE_CLOUD_BATCH_PUSHER_ENABLED
beastoin Mar 11, 2026
73a676e
Add feature flag tests and update existing tests for both flag states
beastoin Mar 11, 2026
4504275
Merge remote-tracking branch 'origin/fix/batch-upload-storage-5418' i…
beastoin Mar 11, 2026
e82475c
Integrate hiro's upload_audio_chunks_batch into pusher _flush_batch
beastoin Mar 11, 2026
f17a65f
Remove PRIVATE_CLOUD_OPUS_ENABLED feature flag — Opus encoding always…
beastoin Mar 11, 2026
f9806e6
Update tests: remove feature flag patches, drop disabled-flag test
beastoin Mar 11, 2026
ba6b865
Remove PRIVATE_CLOUD_BATCH_PUSHER_ENABLED flag from pusher.py
beastoin Mar 11, 2026
9b66ce6
Remove PRIVATE_CLOUD_BATCH_PUSHER_ENABLED flag from conversations.py
beastoin Mar 11, 2026
2c5eb6c
Remove PRIVATE_CLOUD_BATCH_ENABLED flag from storage.py
beastoin Mar 11, 2026
53f9365
Remove feature flag refs from test_pusher_batch_upload.py
beastoin Mar 11, 2026
5520a1f
Remove feature flag refs from test_batch_upload_storage.py
beastoin Mar 11, 2026
46ddf72
Fix batch-aware list/delete/download for .batch.bin/.batch.enc files
beastoin Mar 12, 2026
d5fc7f2
Add tests for batch-aware list, delete, and download functions
beastoin Mar 12, 2026
d2d5cfc
Fix _copy_audio_chunks_for_merge to preserve batch blob filenames
beastoin Mar 12, 2026
cf30785
Add tests for _copy_audio_chunks_for_merge batch blob preservation
beastoin Mar 12, 2026
8968f26
Add batch extension support to private cloud sync functions (#5418)
beastoin Mar 12, 2026
d0c55c6
Preserve batch blob filenames in conversation merge (#5418)
beastoin Mar 12, 2026
60601dd
Add batch extension tests for private cloud sync (#5418)
beastoin Mar 12, 2026
103d33e
Merge remote-tracking branch 'origin/feat/opus-encoding-5418-v2' into…
beastoin Mar 13, 2026
46118f1
Merge remote-tracking branch 'origin/fix/batch-upload-5418-phase2' in…
beastoin Mar 13, 2026
7f48757
fix: resolve merge conflicts in storage.py — use Opus-aware download …
beastoin Mar 13, 2026
ee81652
fix: patch NotFound in batch upload download tests for cross-PR compa…
beastoin Mar 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions backend/database/conversations.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,18 +319,19 @@ def create_audio_files_from_chunks(
if not chunks:
return []

# Group chunks based on 30-second gap rule
# Group chunks based on gap rule (90s threshold accommodates both 5s and 60s chunk durations)
audio_files = []
current_group = []
gap_threshold = 90 # seconds — must exceed max chunk duration (60s) to avoid false splits

for i, chunk in enumerate(chunks):
if not current_group:
current_group.append(chunk)
else:
# Check if there's a gap > 30 seconds between chunks
# Check if there's a gap between chunks exceeding the threshold
prev_chunk = current_group[-1]
time_gap = chunk['timestamp'] - prev_chunk['timestamp']
if time_gap > 30:
if time_gap > gap_threshold:
# Gap detected, finalize current group
audio_file = _finalize_audio_file_group(uid, conversation_id, current_group, audio_files)
if audio_file:
Expand Down Expand Up @@ -372,11 +373,13 @@ def _finalize_audio_file_group(
# Extract timestamps
timestamps = [chunk['timestamp'] for chunk in chunk_group]

# Calculate started_at and duration from timestamps
# Calculate started_at and duration from timestamps and blob sizes
started_at = datetime.fromtimestamp(chunk_group[0]['timestamp'], tz=timezone.utc)
last_chunk_start = datetime.fromtimestamp(chunk_group[-1]['timestamp'], tz=timezone.utc)
# Add 5 seconds for the last chunk's duration
duration = (last_chunk_start - started_at).total_seconds() + 5.0
# Estimate last chunk duration from blob size (PCM16 mono at 8kHz = 16000 bytes/sec)
last_chunk_size = chunk_group[-1].get('size', 0)
last_chunk_duration = last_chunk_size / 16000.0 if last_chunk_size > 0 else 5.0
duration = (last_chunk_start - started_at).total_seconds() + last_chunk_duration

return AudioFile(
id=file_id,
Expand Down
146 changes: 104 additions & 42 deletions backend/routers/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from collections import deque
from datetime import datetime, timezone
from typing import List, Set
from typing import Dict, List, Set

from fastapi import APIRouter
from fastapi.websockets import WebSocketDisconnect, WebSocket
Expand All @@ -27,7 +27,7 @@
realtime_transcript_webhook,
get_audio_bytes_webhook_seconds,
)
from utils.other.storage import upload_audio_chunk
from utils.other.storage import upload_audio_chunk, upload_audio_chunks_batch
from utils.speaker_identification import extract_speaker_samples
import logging

Expand All @@ -41,7 +41,8 @@

# Constants for private cloud sync
PRIVATE_CLOUD_SYNC_PROCESS_INTERVAL = 1.0
PRIVATE_CLOUD_CHUNK_DURATION = 5.0
PRIVATE_CLOUD_CHUNK_DURATION = 60.0
PRIVATE_CLOUD_BATCH_MAX_AGE = 60.0 # seconds — flush batch if oldest chunk exceeds this age
PRIVATE_CLOUD_SYNC_MAX_RETRIES = 3

# Queue warning thresholds
Expand Down Expand Up @@ -164,45 +165,44 @@ def on_done(t):
audio_bytes_event = asyncio.Event() # Signals when items are added for instant wake

async def process_private_cloud_queue():
"""Background task that processes private cloud sync uploads with retry logic."""
nonlocal websocket_active

while websocket_active or len(private_cloud_queue) > 0:
await asyncio.sleep(PRIVATE_CLOUD_SYNC_PROCESS_INTERVAL)

if not private_cloud_queue:
continue
"""Background task that batches private cloud sync uploads by conversation_id.

# Process all pending chunks
chunks_to_process = private_cloud_queue.copy()
private_cloud_queue.clear()

successful_conversation_ids = set() # Track conversations with successful uploads

for chunk_info in chunks_to_process:
chunk_data = chunk_info['data']
conv_id = chunk_info['conversation_id']
timestamp = chunk_info['timestamp']
retries = chunk_info.get('retries', 0)

try:
await asyncio.to_thread(
upload_audio_chunk, chunk_data, uid, conv_id, timestamp, cached_protection_level
)
successful_conversation_ids.add(conv_id)
except Exception as e:
if retries < PRIVATE_CLOUD_SYNC_MAX_RETRIES:
# Re-queue with incremented retry count
chunk_info['retries'] = retries + 1
private_cloud_queue.append(chunk_info)
logger.error(f"Private cloud upload failed (retry {retries + 1}): {e} {uid} {conv_id}")
else:
logger.info(
f"Private cloud upload failed after {PRIVATE_CLOUD_SYNC_MAX_RETRIES} retries, dropping chunk: {e} {uid} {conv_id}"
)
Chunks are accumulated per conversation and flushed when:
- The batch reaches 60s of audio data, or
- The oldest chunk in the batch exceeds PRIVATE_CLOUD_BATCH_MAX_AGE, or
- The websocket disconnects (shutdown flush).
"""
nonlocal websocket_active

# Update audio_files for conversations with successful uploads
for conv_id in successful_conversation_ids:
# Pending batches keyed by conversation_id
pending: Dict[str, dict] = {}

def _add_to_batch(chunk_info: dict):
conv_id = chunk_info['conversation_id']
if conv_id not in pending:
pending[conv_id] = {
'data': bytearray(),
'conversation_id': conv_id,
'timestamp': chunk_info['timestamp'], # oldest chunk timestamp
'queued_at': time.monotonic(),
'retries': 0,
}
batch = pending[conv_id]
batch['data'].extend(chunk_info['data'])

async def _flush_batch(conv_id: str):
"""Upload a batched chunk and update audio files."""
batch = pending.pop(conv_id, None)
if not batch or len(batch['data']) == 0:
return
chunk_data = bytes(batch['data'])
timestamp = batch['timestamp']
retries = batch.get('retries', 0)
try:
chunks_to_upload = [{'data': chunk_data, 'timestamp': timestamp}]
await asyncio.to_thread(
upload_audio_chunks_batch, chunks_to_upload, uid, conv_id, cached_protection_level
)
try:
audio_files = await asyncio.to_thread(conversations_db.create_audio_files_from_chunks, uid, conv_id)
if audio_files:
Expand All @@ -214,6 +214,47 @@ async def process_private_cloud_queue():
)
except Exception as e:
logger.error(f"Error updating audio files: {e} {uid} {conv_id}")
except Exception as e:
if retries < PRIVATE_CLOUD_SYNC_MAX_RETRIES:
batch['retries'] = retries + 1
batch['data'] = bytearray(chunk_data)
batch['queued_at'] = time.monotonic() # reset age so next retry waits ~60s
pending[conv_id] = batch
logger.error(f"Private cloud batch upload failed (retry {retries + 1}): {e} {uid} {conv_id}")
else:
logger.info(
f"Private cloud batch upload failed after {PRIVATE_CLOUD_SYNC_MAX_RETRIES} retries, dropping: {e} {uid} {conv_id}"
)
del chunk_data

while websocket_active or len(private_cloud_queue) > 0 or len(pending) > 0:
await asyncio.sleep(PRIVATE_CLOUD_SYNC_PROCESS_INTERVAL)

# Drain queue into pending batches
if private_cloud_queue:
chunks_to_process = private_cloud_queue.copy()
private_cloud_queue.clear()
for chunk_info in chunks_to_process:
_add_to_batch(chunk_info)

if not pending:
continue

now = time.monotonic()
batch_size_threshold = sample_rate * 2 * PRIVATE_CLOUD_CHUNK_DURATION

# Determine which conversations to flush
conv_ids_to_flush = []
for conv_id, batch in pending.items():
batch_age = now - batch['queued_at']
is_shutdown = not websocket_active
is_size_ready = len(batch['data']) >= batch_size_threshold
is_age_ready = batch_age >= PRIVATE_CLOUD_BATCH_MAX_AGE
if is_shutdown or is_size_ready or is_age_ready:
conv_ids_to_flush.append(conv_id)

for conv_id in conv_ids_to_flush:
await _flush_batch(conv_id)

async def process_speaker_sample_queue():
"""Background task that processes speaker sample extraction requests."""
Expand Down Expand Up @@ -334,7 +375,28 @@ async def receive_tasks():

# Conversation ID
if header_type == 103:
current_conversation_id = bytes(data[4:]).decode("utf-8")
new_conversation_id = bytes(data[4:]).decode("utf-8")
# Flush private cloud buffer for the old conversation before switching
if (
private_cloud_sync_enabled
and current_conversation_id
and current_conversation_id != new_conversation_id
and len(private_cloud_sync_buffer) > 0
):
private_cloud_queue.append(
{
'data': bytes(private_cloud_sync_buffer),
'conversation_id': current_conversation_id,
'timestamp': private_cloud_chunk_start_time or time.time(),
'retries': 0,
}
)
logger.info(
f"Flushed private cloud buffer on conversation switch: {len(private_cloud_sync_buffer)} bytes {uid}"
)
private_cloud_sync_buffer = bytearray()
private_cloud_chunk_start_time = None
current_conversation_id = new_conversation_id
logger.info(f"Pusher received conversation_id: {current_conversation_id} {uid}")
continue

Expand Down Expand Up @@ -405,7 +467,7 @@ async def receive_tasks():
private_cloud_chunk_start_time = buffer_start_timestamp

private_cloud_sync_buffer.extend(audio_data)
# Queue chunk every 5 seconds (sample_rate * 2 bytes per sample * 5 seconds)
# Queue chunk every PRIVATE_CLOUD_CHUNK_DURATION seconds
if len(private_cloud_sync_buffer) >= sample_rate * 2 * PRIVATE_CLOUD_CHUNK_DURATION:
if len(private_cloud_queue) >= PRIVATE_CLOUD_QUEUE_WARN_SIZE:
logger.warning(f"Warning: private_cloud_queue size {len(private_cloud_queue)} {uid}")
Expand Down
3 changes: 3 additions & 0 deletions backend/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ pytest tests/unit/test_translation_optimization.py -v
pytest tests/unit/test_conversation_source_unknown.py -v
pytest tests/unit/test_transcribe_conversation_cache.py -v
pytest tests/unit/test_pusher_private_cloud_data_protection.py -v
pytest tests/unit/test_pusher_batch_upload.py -v
pytest tests/unit/test_storage_upload_audio_chunk_data_protection.py -v
pytest tests/unit/test_storage_opus_encoding.py -v
pytest tests/unit/test_people_conversations_500s.py -v
pytest tests/unit/test_firestore_read_ops_cache.py -v
pytest tests/unit/test_ws_auth_handshake.py -v
pytest tests/unit/test_streaming_deepgram_backoff.py -v
pytest tests/unit/test_batch_upload_storage.py -v
Loading