prerelease: Opus encoding + batch upload (#5418)#5588
Merged
Conversation
- Add encode_pcm_to_opus() and decode_opus_to_pcm() helpers - Add PRIVATE_CLOUD_OPUS_ENABLED feature flag (default: false) - Modify upload_audio_chunk() to encode PCM→Opus before upload - Update download_audio_chunks_and_merge() to decode Opus→PCM - Update delete_audio_chunks() and list_audio_chunks() for .opus/.opus.enc - Add _get_extension_for_path() and _strip_extension() helpers - 11.7x compression ratio (160KB PCM → 13KB Opus per 5s chunk)
Change PRIVATE_CLOUD_CHUNK_DURATION from 5s to 60s and refactor process_private_cloud_queue() to batch chunks by conversation_id before uploading. Reduces GCS Class A write ops by ~12x. Flush triggers: batch >= 60s of audio, oldest chunk > 60s age, websocket disconnect. Conversation switch flushes old buffer. Closes #5418 Phase 2.
Tests batch accumulation, size-based flush, max-age flush, shutdown flush, and conversation switch flush.
Adds batch upload function for #5418 Phase 2 that concatenates multiple audio chunks into a single GCS write operation (~12x fewer Class A ops). Supports standard and encrypted paths. Feature flag defaults to false for safe rollout.
11 tests covering: multi-chunk batch, single chunk, encrypted batch, flag disabled fallback, empty batch, DB lookup count, ordering, and timestamp-order preservation.
Update gap threshold from 30s to 90s to accommodate 60s batch chunks. Compute last chunk duration from blob size instead of hardcoded 5s.
Reviewer feedback: blob.open('wb') streams chunk-by-chunk instead
of accumulating full bytearray + bytes() copy in memory.
Tests now verify blob.open() + write() calls instead of upload_from_string for batch mode.
Add size boundary (threshold-1), age boundary (exact 60s, 59.9s), conversation switch guard combinations (empty buffer, no current conv), gap threshold tests (90s boundary, 60s and 5s chunk patterns), and duration-from-blob-size tests (60s, 5s, fallback, multi-chunk).
… args, flag-disabled DB delegation 4 new tests covering: 50-chunk large batch streaming, identical timestamp filename stability, blob.open() call args + no upload_from_string assertion, flag-disabled level=None DB delegation.
…d fallback (#5418) - Add original PCM length to container header to trim padding after decode - Add bounds checking in decode_opus_to_pcm for truncated/corrupt data - Make download_single_chunk fall back to next extension on decode/decrypt failure Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add TestOpusDecodeErrorHandling: truncated header, packet length, packet body, zero input - Update existing tests for new container header format (original PCM length field) - Update small_input_padded test to verify trimming to original length Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Prevents rapid-fire retries when batch upload fails — each retry now waits ~60s (PRIVATE_CLOUD_BATCH_MAX_AGE) before re-flushing.
Test that failed upload resets queued_at for natural 60s backoff, and that retry preserves data and increments count.
- TestDownloadFallbackPath: 4 tests covering extension priority fallback, all-not-found raises FileNotFoundError, successful opus skips .bin, malformed opus falls back to legacy .bin - Uses _FakeNotFound to properly patch mocked NotFound exception Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…nto fix/batch-upload-5418-phase2
… active Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Batch upload is now always-active default behavior per manager directive. Remove os import, flag constant, and conditional logic. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Hardcode gap_threshold=90 as default behavior. Remove os import. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Batch streaming upload is now the only code path. Remove flag constant, os import, and per-chunk fallback branch. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace TestFeatureFlagDefaults with TestConstants. Remove flag-disabled test paths and legacy 30s threshold tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Remove PRIVATE_CLOUD_BATCH_ENABLED patches from all tests. Delete TestBatchUploadFlagDisabled class (5 tests for removed fallback). Rename to TestBatchUpload. 10 tests remain. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Cherry-pick from hiro's PR #5556 (commit 44117f9). - list_audio_chunks: parse .batch. prefix before stripping extension, handle timestamp ranges, add is_batch field - delete_audio_chunks: try .batch.enc/.batch.bin extensions, scan range-named batch blobs by start timestamp - download_audio_chunks_and_merge: resolve actual GCS paths via list_audio_chunks to handle batch blobs, deduplicate downloads Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Cherry-pick from hiro's PR #5556 (commit 88bf2df), conflict resolved to remove flag-disabled tests (flag already removed in prior commit). - TestListAudioChunksBatchAware: 7 tests for .batch.bin/.batch.enc parsing - TestDeleteAudioChunksBatchAware: 3 tests for batch file deletion - TestDownloadAudioChunksMergeBatchAware: 4 tests for batch blob resolution Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Previously renamed batch blobs (e.g., 1000.000-1060.000.batch.bin) to single-timestamp names (1000.000.bin) during conversation merge, losing the batch structure. Now preserves original filenames. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
3 tests verifying conversation merge preserves batch blob filenames: - Batch blob filename preserved (not renamed to single-timestamp) - Single-chunk filename preserved (backwards compatible) - Mixed single + batch blobs all copied with original names Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Handle .batch.bin/.batch.enc from upload_audio_chunks_batch (PR #5556): - PRIVATE_CLOUD_EXTENSIONS: add batch extensions (longest-first order) - _strip_extension(): handle batch filenames with timestamp ranges - _get_extension_for_path(): recognize batch.bin/batch.enc - list_audio_chunks(): parse range timestamps, add is_batch field - delete_audio_chunks(): scan for range-named batch blobs - download_audio_chunks_and_merge(): resolve actual GCS paths via list_audio_chunks() for batch-aware download with deduplication Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
_copy_audio_chunks_for_merge() now copies batch blobs with their original filename (including timestamp range) instead of constructing a single-timestamp path that would lose the batch format. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
17 new tests covering: - Batch extension helpers (_strip_extension, _get_extension_for_path) - list_audio_chunks: batch.bin, batch.enc, range timestamps, mixed blobs - delete_audio_chunks: single-timestamp batch, range-named batch scan - download_audio_chunks_and_merge: batch blob deduplication, mixed download Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… verify/combined-5554-5555-v2
…to verify/combined-5554-5555-v2 # Conflicts: # backend/test.sh # backend/utils/conversations/merge_conversations.py # backend/utils/other/storage.py
…+ batch handling Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tibility NotFound from google.cloud.exceptions is mocked at module level, making except NotFound: handlers fail. Adds _FakeNotFound (real Exception subclass) and patches storage_mod.NotFound in download tests, matching kenji's pattern. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Contributor
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
Collaborator
Author
|
lgtm |
Glucksberg
pushed a commit
to Glucksberg/omi-local
that referenced
this pull request
Apr 28, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Combines PRs #5554 (Opus encoding) and #5555 (batch upload phase 2) for issue #5418.
Sub-PRs
Verification
Deploy Plan
Services to Deploy
Deploy Sequence
gh workflow run gcp_backend.yml -f environment=prod -f branch=mainEnv Vars / Feature Flags
PRIVATE_CLOUD_OPUS_ENABLEDflag removed — always activePRIVATE_CLOUD_BATCH_ENABLEDandPRIVATE_CLOUD_BATCH_PUSHER_ENABLEDflags removed — always activePRIVATE_CLOUD_BATCH_MAX_AGE = 60.0hardcoded in routers/pusher.pyopuslib==3.0.1already in prod requirements.txt, no new depsRollout Behavior
.bin/.encchunks continue to download correctly.opus/.opus.enc, ~13x compression) + batch naming ({start}-{end}.batch.bin).bin,.enc,.opus,.opus.enc,.batch.bin,.batch.enc)Monitoring — What to Watch
First 30 minutes:
ImportErroronopuslib.opus/.batch.binextensions in Cloud LoggingFirst hour:
4. Download path — Opus-decoded audio works for recent conversations
5. Private cloud sync full roundtrip (upload -> download)
6. Pusher WS connections stable, no flush failures
First 24 hours:
7. Merge conversations with mixed-format chunks
8. GCS storage reduction (Opus blobs ~8% the size of PCM)
9. Pod memory stays flat (
del upload_datacleanup)Error signatures to grep:
Cloud Logging queries:
Monitoring cadence: T+30m, T+1h, T+2h, T+4h, then every 4h for 24h
Rollback Plan
Files Changed
backend/utils/other/storage.py— Opus encode/decode, batch-aware list/delete/downloadbackend/utils/conversations/merge_conversations.py— Batch blob filename preservationbackend/routers/pusher.py— Batch upload logic, 60s flush, conversation-switch flushbackend/database/conversations.py— Flag removal, gap threshold adjustmentbackend/tests/unit/test_storage_opus_encoding.py— 47 tests (NEW)backend/tests/unit/test_batch_upload_storage.py— 27 tests (NEW)backend/test.sh— Added test filesTest plan
.opus/.opus.encGenerated with Claude Code