feat(replay-vision): add SweepScannerWorkflow for Phase 2 schedule fires#60772
Conversation
Migration SQL ChangesHey 👋, we've detected some migrations on this PR. Here's the SQL output for each migration, make sure they make sense:
|
🔍 Migration Risk AnalysisWe've analyzed your migrations for potential risks. Summary: 1 Safe | 0 Needs Review | 0 Blocked ✅ SafeBrief or no lock, backwards compatible Last updated: 2026-06-01 20:38 UTC (bc5dcea) |
Prompt To Fix All With AIFix the following 2 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 2
products/replay_vision/backend/tests/test_sweep.py:339-342
Redundant duplicate assertion — the same list-comprehension check appears twice in a row. The inline assert on line 339 and the `advance_calls` assertion on lines 341-342 are identical; one should be removed.
```suggestion
advance_calls = [call for fn, call in mocks.activity_calls if fn == advance_scanner_watermark_activity]
assert advance_calls == []
```
### Issue 2 of 2
products/replay_vision/backend/temporal/sweep_workflow.py:56-59
`asyncio.gather` with `return_exceptions=False` (the default) propagates on the **first** non-`WorkflowAlreadyStartedError` failure, not only when every dispatch fails. The comment "if every dispatch fails" misrepresents the actual semantics — even a single unexpected failure in a 100-candidate batch will skip the watermark advance. The behaviour itself is intentional and safe (the next sweep retries, already-started children get `WorkflowAlreadyStartedError`), but the comment makes it sound like partial failures are tolerated.
```suggestion
# `return_exceptions=False`: if *any* dispatch fails with an error other
# than WorkflowAlreadyStartedError, the first such exception propagates
# and the watermark advance is skipped — next sweep retries the same
# window. Already-started children are deduplicated by Temporal's
# workflow_id and by `UNIQUE(scanner_id, session_id)` on the row.
```
Reviews (1): Last reviewed commit: "refactor(replay-vision): simplify SweepS..." | Re-trigger Greptile |
Lays the foundation for per-scanner Temporal schedules. Wraps SessionRecordingListFromQuery with a session_end-based filter: a session is eligible when it's had no activity in the last 35 minutes and its end time is past the scanner's watermark. The wrap delegates all RecordingsQuery filter compilation to the recordings list, so a scanner's saved filters resolve identically to the UI.
- Bump _PARTITION_LOOKBACK from 6h to 26h, anchored to posthog-js's 24h session_id rotation cap + 2h skew/lag headroom. Adds regression test for long-running sessions whose start is older than 6h. - Add keyset pagination via last_seen_session_id kwarg + lexicographic tuple comparison. Lets the schedule resume past a saturated batch without skipping sessions tied at the boundary microsecond. - Drop the now kwarg; use datetime.now(dt.UTC) directly so inner and outer clocks always agree under @freeze_time. - Push sampling into the inner HAVING via extra_having_predicates so un-sampled sessions are dropped before being aggregated by the outer. - Validate max_execution_time_seconds > 0. - Comment on inner.order_by mutation noting get_query() re-parses each call.
Drop multiline blocks, keep at most one sentence per comment, remove plan/phase prose.
93698c0 to
f7c0345
Compare
c898cda to
a30b8c4
Compare
PR overviewThis pull request adds the SweepScannerWorkflow for replay-vision Phase 2 scheduled scanner runs, dispatching candidate sweep work through Temporal child workflows. The touched workflow code coordinates scanner-enabled batch execution for observation creation. There is one open security concern around quota enforcement during sweep fan-out: a user with scanner configuration access could trigger concurrent child workflows that each see available quota and collectively exceed the intended monthly observation limit. Two prior issues have already been addressed, so the remaining risk is focused on bounding or atomically reserving quota before dispatch. The impact appears limited to quota/resource abuse rather than direct data exposure or authorization bypass. Open issues (1)
Fixed/addressed: 2 · PR risk: 5/10 |
4d2e011 to
80f2b38
Compare
Closes a DoS vector flagged in review: a client sending events with session_ids longer than 128 chars (the MAX_SESSION_ID_LENGTH used by the ApplyScannerWorkflow wire payload) would wedge the sweep on Pydantic validation. Filtering at the query layer keeps over-length sessions invisible to the scanner so the watermark always advances.
Adds the Temporal workflow that fires every 5 minutes per scanner, runs ScannerCandidateQuery, dispatches ABANDONed ApplyScannerWorkflow children, and advances the watermark. Per-scanner schedules and the reconciler land in a later PR. - Migration 0010: add last_seen_session_id to ReplayScanner (keyset tiebreaker for resuming saturated batches without re-emitting). - find_scanner_candidates_activity: reads scanner row, runs the candidate query, returns candidates + a saturated flag. Filters enabled=True to short-circuit disabled scanners. Verifies the creator still has session_recording read on the team as a defence-in-depth check. - advance_scanner_watermark_activity: bumps last_swept_at + last_seen_session_id via .update(), no scanner_version bump. - SweepScannerWorkflow: find -> asyncio.gather over _start_child -> advance. First non-WorkflowAlreadyStartedError failure aborts the gather and skips the watermark advance; UNIQUE(scanner_id, session_id) on ReplayObservation dedups retries. - ReplayScannerViewSet: dangerously_get_required_scopes adds session_recording:read to create/update/partial_update and initial() enforces the matching user_access_control check, matching the /observe/ authorization boundary.
80f2b38 to
abd878f
Compare
| retry_policy=common.RetryPolicy(maximum_attempts=1), | ||
| ) | ||
| if not find_result.candidates: | ||
| return |
There was a problem hiding this comment.
Medium: Observation quota bypass
An authenticated user who can configure a broad enabled scanner can cause a sweep to start up to DEFAULT_CANDIDATE_LIMIT child workflows at once. Each child checks compute_quota_snapshot() independently in create_observation_activity, so concurrent children can all observe quota headroom before any pending rows are visible and create more observations than the monthly quota allows. Reserve quota atomically before dispatching, or cap the dispatch batch to the current remaining quota using a DB-side lock/claim so the sweep cannot fan out past the organization’s remaining allowance.
Stacked on #60617.
Problem
Phase 2 needs a Temporal workflow that fires every 5 minutes per scanner, runs
ScannerCandidateQuery, dispatches oneApplyScannerWorkflowper candidate, and advances the watermark. The per-scanner schedule lands in the next stacked PR — this is just the workflow.Changes
SweepScannerWorkflow: find candidates → ABANDONed children → advance watermark. On full-batch failure raisesAllChildStartsFailedand skips the watermark advance so the next fire retries the window.find_scanner_candidates_activity: returns candidates +saturatedflag (len == DEFAULT_CANDIDATE_LIMIT). Non-retryable on malformed savedquery.advance_scanner_watermark_activity: bumpslast_swept_atandlast_seen_session_idvia.update()— noscanner_versionbump, idempotent.last_seen_session_idtoReplayScanner.How did you test this code?
I'm an agent. 15 new tests pass; 78 existing tests still pass. No manual testing.
Automatic notifications
🤖 Agent context
Agent: Claude (Claude Code). Used
ALLOW_DUPLICATEfor the child reuse policy to match the existingrasterize-recordingdispatch —UNIQUE(scanner_id, session_id)is the durable dedup.