Add batch upload for private cloud audio chunks (#5418 Phase 2)#5556
Add batch upload for private cloud audio chunks (#5418 Phase 2)#5556
Conversation
Adds batch upload function for #5418 Phase 2 that concatenates multiple audio chunks into a single GCS write operation (~12x fewer Class A ops). Supports standard and encrypted paths. Feature flag defaults to false for safe rollout.
11 tests covering: multi-chunk batch, single chunk, encrypted batch, flag disabled fallback, empty batch, DB lookup count, ordering, and timestamp-order preservation.
Greptile SummaryThis PR introduces Key findings:
Confidence Score: 2/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Caller
participant upload_audio_chunks_batch
participant upload_audio_chunk
participant users_db
participant GCS
Caller->>upload_audio_chunks_batch: chunks, uid, conv_id, protection_level
alt chunks is empty
upload_audio_chunks_batch-->>Caller: []
else PRIVATE_CLOUD_BATCH_ENABLED = false (fallback)
upload_audio_chunks_batch->>upload_audio_chunks_batch: sort chunks by timestamp
loop each sorted chunk
upload_audio_chunks_batch->>upload_audio_chunk: chunk_data, uid, conv_id, ts, level
alt protection_level is None
upload_audio_chunk->>users_db: get_data_protection_level(uid) [once per chunk!]
users_db-->>upload_audio_chunk: level
end
upload_audio_chunk->>GCS: upload {ts}.bin or {ts}.enc
GCS-->>upload_audio_chunk: ok
upload_audio_chunk-->>upload_audio_chunks_batch: path
end
upload_audio_chunks_batch-->>Caller: [path1, path2, ...]
else PRIVATE_CLOUD_BATCH_ENABLED = true (batch)
upload_audio_chunks_batch->>upload_audio_chunks_batch: sort chunks by timestamp
alt protection_level is None
upload_audio_chunks_batch->>users_db: get_data_protection_level(uid) [once per batch]
users_db-->>upload_audio_chunks_batch: level
end
alt level == enhanced
loop each chunk
upload_audio_chunks_batch->>upload_audio_chunks_batch: encrypt_audio_chunk(data, uid)
end
upload_audio_chunks_batch->>GCS: upload {first_ts}-{last_ts}.batch.enc
else standard
upload_audio_chunks_batch->>upload_audio_chunks_batch: concatenate raw PCM
upload_audio_chunks_batch->>GCS: upload {first_ts}-{last_ts}.batch.bin
end
GCS-->>upload_audio_chunks_batch: ok
upload_audio_chunks_batch->>upload_audio_chunks_batch: del batch_data
upload_audio_chunks_batch-->>Caller: [batch_path]
end
|
| if not PRIVATE_CLOUD_BATCH_ENABLED: | ||
| # Flag disabled — fall back to per-chunk uploads | ||
| paths = [] | ||
| for chunk in sorted_chunks: | ||
| path = upload_audio_chunk( | ||
| chunk_data=chunk['data'], | ||
| uid=uid, | ||
| conversation_id=conversation_id, | ||
| timestamp=chunk['timestamp'], | ||
| data_protection_level=data_protection_level, | ||
| ) | ||
| paths.append(path) | ||
| return paths |
There was a problem hiding this comment.
Fallback path still does one DB read per chunk when data_protection_level is None
When the flag is disabled, the fallback delegates directly to upload_audio_chunk, which calls users_db.get_data_protection_level(uid) for each chunk individually. The "Single DB lookup per batch" benefit described in the PR description only applies to the batch-enabled code path.
If the feature flag is expected to be toggled back off (e.g., for a gradual rollout), this means callers that omit data_protection_level will incur N Firestore reads again. Consider resolving the protection level once before the loop in the fallback path, the same way the batch path does on line 399–401:
if not PRIVATE_CLOUD_BATCH_ENABLED:
protection_level = (
data_protection_level if data_protection_level is not None
else users_db.get_data_protection_level(uid)
)
paths = []
for chunk in sorted_chunks:
path = upload_audio_chunk(
chunk_data=chunk['data'],
uid=uid,
conversation_id=conversation_id,
timestamp=chunk['timestamp'],
data_protection_level=protection_level,
)
paths.append(path)
return pathsReviewer feedback: blob.open('wb') streams chunk-by-chunk instead
of accumulating full bytearray + bytes() copy in memory.
Tests now verify blob.open() + write() calls instead of upload_from_string for batch mode.
… args, flag-disabled DB delegation 4 new tests covering: 50-chunk large batch streaming, identical timestamp filename stability, blob.open() call args + no upload_from_string assertion, flag-disabled level=None DB delegation.
|
all checkpoints passed (CP0-CP8). 15 tests, streaming writes, flag defaults false. ready for merge when manager approves. known follow-up: list/download/delete paths need batch-aware updates before PRIVATE_CLOUD_BATCH_ENABLED can be flipped to true. by AI for @beastoin |
|
CP4 Codex analysis summary:
Approach chosen: streaming by AI for @beastoin |
Independent Verification Result (kelvin)Verdict: PASS ✅ Tests15/15 pass on combined branch Mechanism Review
Codex Audit Findings
Combined PR |
|
Hey @beastoin 👋 Thank you so much for taking the time to contribute to Omi! We truly appreciate you putting in the effort to submit this pull request. After careful review, we've decided not to merge this particular PR. Please don't take this personally — we genuinely try to merge as many contributions as possible, but sometimes we have to make tough calls based on:
Your contribution is still valuable to us, and we'd love to see you contribute again in the future! If you'd like feedback on how to improve this PR or want to discuss alternative approaches, please don't hesitate to reach out. Thank you for being part of the Omi community! 💜 |
Handle .batch.bin/.batch.enc from upload_audio_chunks_batch (PR #5556): - PRIVATE_CLOUD_EXTENSIONS: add batch extensions (longest-first order) - _strip_extension(): handle batch filenames with timestamp ranges - _get_extension_for_path(): recognize batch.bin/batch.enc - list_audio_chunks(): parse range timestamps, add is_batch field - delete_audio_chunks(): scan for range-named batch blobs - download_audio_chunks_and_merge(): resolve actual GCS paths via list_audio_chunks() for batch-aware download with deduplication Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Cherry-pick from hiro's PR #5556 (commit 44117f9). - list_audio_chunks: parse .batch. prefix before stripping extension, handle timestamp ranges, add is_batch field - delete_audio_chunks: try .batch.enc/.batch.bin extensions, scan range-named batch blobs by start timestamp - download_audio_chunks_and_merge: resolve actual GCS paths via list_audio_chunks to handle batch blobs, deduplicate downloads Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Cherry-pick from hiro's PR #5556 (commit 88bf2df), conflict resolved to remove flag-disabled tests (flag already removed in prior commit). - TestListAudioChunksBatchAware: 7 tests for .batch.bin/.batch.enc parsing - TestDeleteAudioChunksBatchAware: 3 tests for batch file deletion - TestDownloadAudioChunksMergeBatchAware: 4 tests for batch blob resolution Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…dware#5418) Handle .batch.bin/.batch.enc from upload_audio_chunks_batch (PR BasedHardware#5556): - PRIVATE_CLOUD_EXTENSIONS: add batch extensions (longest-first order) - _strip_extension(): handle batch filenames with timestamp ranges - _get_extension_for_path(): recognize batch.bin/batch.enc - list_audio_chunks(): parse range timestamps, add is_batch field - delete_audio_chunks(): scan for range-named batch blobs - download_audio_chunks_and_merge(): resolve actual GCS paths via list_audio_chunks() for batch-aware download with deduplication Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Cherry-pick from hiro's PR BasedHardware#5556 (commit 44117f9). - list_audio_chunks: parse .batch. prefix before stripping extension, handle timestamp ranges, add is_batch field - delete_audio_chunks: try .batch.enc/.batch.bin extensions, scan range-named batch blobs by start timestamp - download_audio_chunks_and_merge: resolve actual GCS paths via list_audio_chunks to handle batch blobs, deduplicate downloads Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Cherry-pick from hiro's PR BasedHardware#5556 (commit 88bf2df), conflict resolved to remove flag-disabled tests (flag already removed in prior commit). - TestListAudioChunksBatchAware: 7 tests for .batch.bin/.batch.enc parsing - TestDeleteAudioChunksBatchAware: 3 tests for batch file deletion - TestDownloadAudioChunksMergeBatchAware: 4 tests for batch blob resolution Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
Adds
upload_audio_chunks_batch()tobackend/utils/other/storage.pyfor #5418 Phase 2 — reduces GCS Class A write ops by ~12x by streaming multiple audio chunks into a single GCS upload.Changes:
upload_audio_chunks_batch(chunks, uid, conversation_id, data_protection_level)functionPRIVATE_CLOUD_BATCH_ENABLEDfeature flag (env var, defaultfalse) for safe rolloutblob.open('wb')into single.batch.binor.batch.encobjectupload_audio_chunk()callsBatch extension fixes (kelvin audit):
list_audio_chunks()— batch-aware parsing: splits on.batch.prefix, handles timestamp ranges (first-last), returnsis_batchfielddelete_audio_chunks()— tries.batch.enc/.batch.binextensions, scans range-named batch blobs by start timestampdownload_audio_chunks_and_merge()— resolves actual GCS paths vialist_audio_chunks()instead of constructing from timestamps, deduplicates batch blob downloads, handles encrypted batch decryptionFiles changed:
backend/utils/other/storage.py— batch upload + batch-aware list/delete/downloadbackend/tests/unit/test_batch_upload_storage.py— 29 unit testsbackend/test.sh— registered new testsTesting
29 unit tests covering all code paths:
kelvin re-verification: FULL PASS (4/4 steps, 80/80 unit tests)
Review cycle changes
bytearrayconcatenation to streamingblob.open('wb')writes (reviewer feedback on memory spike)Risks / Edge Cases
false— zero production impact until explicitly enabledCloses phase 2 scope of #5418.
by AI for @beastoin