Sync: fair-use tracking with lock-on-exhaustion and soft cap gates#5863
Sync: fair-use tracking with lock-on-exhaustion and soft cap gates#5863
Conversation
Greptile SummaryThis PR closes a significant abuse vector: the Key changes:
Issues found:
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Client
participant sync_local_files
participant has_transcription_credits
participant is_hard_restricted
participant rate_limit_dependency
participant VAD as retrieve_vad_segments
participant FairUse as fair_use
participant Deepgram as process_segment
participant Analytics as record_usage
Client->>sync_local_files: POST /v1/sync-local-files (files, uid)
sync_local_files->>rate_limit_dependency: check IP rate limit (20 req/hr)
rate_limit_dependency-->>sync_local_files: 429 if exceeded
sync_local_files->>has_transcription_credits: check subscription credits(uid)
has_transcription_credits-->>sync_local_files: 402 if no credits
sync_local_files->>is_hard_restricted: check fair-use stage(uid)
is_hard_restricted-->>sync_local_files: 429 if restricted
sync_local_files->>VAD: retrieve_vad_segments(path, segmented_paths, errors, speech_durations)
Note over VAD: Merges segments within 120s gap<br/>Appends merged-segment durations<br/>(includes silence gaps ⚠️)
VAD-->>sync_local_files: segmented_paths, speech_durations
sync_local_files->>FairUse: record_speech_ms(uid, total_speech_ms, source='sync')
sync_local_files->>FairUse: get_rolling_speech_ms(uid)
sync_local_files->>FairUse: check_soft_caps(uid, speech_totals)
FairUse-->>sync_local_files: triggered_caps
opt triggered_caps non-empty
sync_local_files--)FairUse: asyncio.create_task(trigger_classifier_if_needed)
end
sync_local_files->>Deepgram: process_segment(path, uid, response, source)
Deepgram-->>sync_local_files: updated response
sync_local_files->>Analytics: record_usage(uid, transcription_seconds, speech_seconds)
sync_local_files-->>Client: {updated_memories, new_memories}
Last reviewed commit: "Add test_sync_fair_u..." |
backend/routers/sync.py
Outdated
| for i, segment in enumerate(segments): | ||
| if (segment['end'] - segment['start']) < 1: | ||
| continue | ||
| # Accumulate speech duration for fair-use tracking (#5854) | ||
| if speech_durations is not None: | ||
| speech_durations.append(segment['end'] - segment['start']) |
There was a problem hiding this comment.
Speech duration overcount: merged segments include silence gaps
speech_durations.append(segment['end'] - segment['start']) is called on the post-merge segments list, not the pre-merge voice_segments list. The merge step above combines any two adjacent voice segments whose gap is < 120 seconds — meaning the merged segment's duration includes up to ~119 seconds of silence per merge.
Example:
- Voice activity: 0–5 s and 15–20 s (10 s of actual speech, 10 s silence gap)
- Gap = 10 s < 120 s → merged to
{start:0, end:20}→ 20 s is recorded as speech - Fair-use credit deducted: 20 s instead of 10 s
In the worst case a user with two short phrases separated by 119 s of silence has their fair-use counter inflated by ~60×. The PR description says "Computes total speech duration from VAD segment boundaries" — that is only true if the pre-merge voice_segments are used. The fix is to accumulate from voice_segments before the merge loop runs:
# Collect actual voice-only durations from pre-merge VAD output
if speech_durations is not None:
for vs in voice_segments:
dur = vs['end'] - vs['start']
if dur >= 1:
speech_durations.append(dur)and remove the speech_durations.append(...) call from inside the post-merge loop.
| total_speech_seconds = int(sum(speech_durations)) | ||
| total_speech_ms = total_speech_seconds * 1000 |
There was a problem hiding this comment.
Sub-second precision silently truncated
int(sum(speech_durations)) applies floor truncation. For a total of, say, 29.9 s the user gets billed for only 29 s. The conversion to milliseconds then amplifies the rounding to ±999 ms. Using round() gives a fairer result and is consistent with typical billing conventions:
| total_speech_seconds = int(sum(speech_durations)) | |
| total_speech_ms = total_speech_seconds * 1000 | |
| total_speech_seconds = round(sum(speech_durations)) | |
| total_speech_ms = total_speech_seconds * 1000 |
| triggered_caps = check_soft_caps(uid, speech_totals=speech_totals) | ||
| if triggered_caps: | ||
| logger.info(f'fair_use: sync soft cap triggered uid={uid} caps={triggered_caps}') | ||
| asyncio.create_task(trigger_classifier_if_needed(uid, triggered_caps, f'sync-{uid}')) |
There was a problem hiding this comment.
Unhandled task exceptions silently dropped
asyncio.create_task(trigger_classifier_if_needed(...)) fires and forgets the coroutine. In Python 3.8+ an unhandled exception inside the task is only printed to stderr as a warning (Task exception was never retrieved) and is otherwise silently lost — no log entry in Cloud Logging, no retry. Given that this is an enforcement pathway, silent failures could let abusive accounts avoid escalation.
Consider adding a done callback to log the exception, or wrapping in a small helper used elsewhere in the codebase:
task = asyncio.create_task(trigger_classifier_if_needed(uid, triggered_caps, f'sync-{uid}'))
task.add_done_callback(
lambda t: logger.error(f'fair_use: classifier task failed uid={uid}: {t.exception()}')
if not t.cancelled() and t.exception() else None
)| [t.join() for t in threads[i : i + chunk_size]] | ||
|
|
||
| vad_errors = [] | ||
| speech_durations = [] # Thread-safe: list.append is atomic in CPython |
There was a problem hiding this comment.
Thread-safety relies on CPython implementation detail
The comment # Thread-safe: list.append is atomic in CPython is accurate for CPython but is not guaranteed by the Python language specification. If the backend ever runs on PyPy, Jython, or a future Python implementation that drops the GIL (e.g., the nogil/free-threading builds landing in 3.13+), concurrent list.append calls from multiple threads become unsafe and could result in a corrupted list or lost entries.
Consider using threading.Lock (already used elsewhere in the router) or a pre-allocated list with index-based writes to make the intent explicit and portable.
|
All checkpoints passed:
Local dev backend evidence: 6 live Redis integration tests confirm sync speech recording uses same pool as realtime, accumulates correctly, and triggers soft caps. PR is ready for merge. 4 files changed: sync.py (+52/-6), fair_use.py (+7/-1), test_sync_fair_use_gate.py (+172, new), test_sync_fair_use_live.py (+172, new), test.sh (+1). by AI for @beastoin |
Local Dev Backend Test Evidence — Sync Fair-Use GatesEnvironment:
Test 1: Upload real audio → fair-use speech tracking ✅VAD detected 12s of speech from the espeak-ng audio. Test 2: Redis accumulation verified ✅Speech data accumulated correctly across minute buckets in Redis Cloud. Test 3: Upload accumulation ✅Test 4: Rate limit (20 req/hour) ✅All 20 successful uploads logged Test 5: Pre-check gates verified ✅Code path confirmed in # Line 733-736: Pre-check gates
if not has_transcription_credits(uid):
raise HTTPException(status_code=402, detail="Monthly transcription limit reached")
if is_hard_restricted(uid):
raise HTTPException(status_code=429, detail="Account temporarily restricted")
# Line 782: Speech recording from VAD segments
record_speech_ms(uid, total_speech_ms, source='sync')
# Line 786: Soft cap check with precomputed totals
triggered_caps = check_soft_caps(uid, speech_totals=speech_totals)Summary
by AI for @beastoin |
PR Ready for Merge — All Checkpoints Passed ✅All omi-pr-workflow checkpoints completed:
Live backend test summary (CP9):
Awaiting manager merge approval. by AI for @beastoin |
|
lgtm |
Adds source='realtime' default param and info log line showing source. Same Redis pool regardless of source — logging only. Ref: #5854 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…oint - Pre-check: has_transcription_credits -> 402, is_hard_restricted -> 429 - Rate limit: 20 requests per hour per IP - After VAD: record speech_ms with source='sync' to shared fair-use pool - After processing: record_usage for subscription budget tracking - Speech duration computed from VAD segment boundaries before Deepgram Ref: #5854 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
12 tests covering: source param (3), precomputed totals (1), speech duration computation (4), is_hard_restricted (3), import availability (1). Ref: #5854 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… inflation) Moved speech_durations accumulation from merged segment loop to raw voice_segments, before the 120s gap merge. Merged segments include up to 120s of silence between speech spans, inflating the duration. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
6 tests with real Redis verifying: - source='sync' writes to same Redis pool as realtime - Mixed source speech accumulates correctly - Soft caps work with sync-sourced speech - Full sync flow simulation (VAD segments -> record -> check caps) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Match the listen API pattern: when transcription credits are exhausted, continue processing but set is_locked=True on created/updated conversations. User can pay to unlock (payment webhook calls unlock_all_*). Also removes rate_limit_dependency (20 req/hr) — will be in a separate PR. Closes part of #5854 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Allows sync endpoint to set is_locked=True before process_conversation runs, so derived objects (memories, action items) inherit the lock state. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Set is_locked on CreateConversation instead of post-update, so process_conversation propagates lock to memories and action items. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- TestCreateConversationLockPropagation: 4 tests verifying is_locked flows from CreateConversation through dict() to Conversation - TestSyncEndpointCodeStructure: 6 tests verifying no rate_limit, no 402, should_lock flag, is_locked param, hard restriction gate - TestSoftCapBoundary: 3 tests for exact-cap, 1ms-over, zero-speech Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
b74cc9b to
879cd0e
Compare
Ready for merge ✅All omi-pr-workflow checkpoints passed:
Post-rebase verification: Rebased onto latest main, resolved test.sh conflict (kept both Merge conflict resolved — PR is now mergeable. Awaiting manager merge approval. by AI for @beastoin |
|
lgtm |
## Summary - Add fair-use gates to `/v1/sync-local-files` endpoint - Block hard-restricted users (429) - Lock conversations (not block) when credits exhausted — user pays to unlock - Track VAD speech duration to Redis for rolling window caps - Trigger soft cap check and LLM classifier after speech recording - Add `is_locked` field to `CreateConversation` model for lock propagation through `process_conversation` - Add `source` parameter to `record_speech_ms` for sync vs realtime traceability ## Files changed (no transcribe.py) - `models/conversation.py` — `is_locked: bool = False` on `CreateConversation` - `routers/sync.py` — fair-use imports, `should_lock` flag, `is_locked` propagation - `utils/fair_use.py` — `source` param on `record_speech_ms` - `tests/unit/test_sync_fair_use_gate.py` — 25 unit tests - `test.sh` — add test to runner ## How locking works 1. `should_lock = not has_transcription_credits(uid)` — checks subscription credits 2. If soft caps triggered, `should_lock = True` 3. `is_locked` passed to `CreateConversation` → propagates to `Conversation`, memories, action items via `process_conversation` 4. Existing conversations updated with `is_locked=True` via `update_conversation` 5. User pays → `payment.py` calls `unlock_all_conversations/memories/action_items` ## Verification - 25/25 unit tests pass - All 43 router files pass syntax check (including transcribe.py) - Clean branch from main — zero changes to transcribe.py ## Test plan - [x] All 25 unit tests pass - [x] All router files syntax-verified via `py_compile` - [x] `is_locked` propagation tested through `CreateConversation` → `Conversation` - [x] Boundary tests: exactly-at-cap, 1ms-over, zero-speech - [x] Code structure tests: no 402, has should_lock, has is_hard_restricted Replaces reverted PR #5863 with clean implementation. 🤖 Generated with [Claude Code](https://claude.com/claude-code)
Summary
Adds fair-use gates to the sync (
/v1/sync-local-files) endpoint to match the listen API pattern (#5854).Gates
is_locked=Trueon conversations viaCreateConversationmodel — derived memories/action_items inherit lock. User can pay to unlock (payment webhook callsunlock_all_*)record_speech_ms(uid, ms, source='sync')into the shared Redis rolling windowWhat changed
models/conversation.py: Addedis_locked: bool = FalsetoCreateConversationmodelrouters/sync.py: Removed rate limit (separate PR). Replaced 402 credit block withshould_lockflag.process_segmentpassesis_lockedthroughCreateConversationsoprocess_conversationpropagates lock to memories and action items. For existing conversations, setsis_lockedbefore reprocessingtests/unit/test_sync_fair_use_gate.py: 26 tests — lock propagation (4), code structure (6), boundary caps (3), plus existing (13)test.sh: Addedtest_sync_fair_use_live.pyto integration test sectionPattern match with listen API
The listen API (
transcribe.py) does NOT disconnect or block users who hit transcription limits — it continues transcribing and processing. Theis_lockedfield on conversations/memories/action_items gates visibility. When the user upgrades,payment.pywebhook callsunlock_all_conversations,unlock_all_memories,unlock_all_action_items.Test plan
test_sync_fair_use_gate.py)test_sync_fair_use_live.py)CreateConversation(is_locked=True)→Conversation.is_locked→ memories/action_itemsCloses #5854
🤖 Generated with Claude Code
by AI for @beastoin