Replace COUNT(*) with incremental trigger deltas#314
Conversation
|
Updates to Preview Branch (optimise/incremental-trigger-counters) ↗︎
Tasks are run on every commit but only new migration files are pushed.
View logs for this Workflow Run ↗︎. |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughReplaces COUNT-based job progress updates with incremental, per-row trigger logic and refactors the worker promotion flow to perform a single batched promotion call per job; adds early-exit guards, per-status O(1) deltas, timestamp handling, and narrows trigger firing conditions. Changes
Sequence Diagram(s)sequenceDiagram
participant Worker as Worker (Go)
participant DB as Database
participant Tasks as Tasks table (rows)
participant Trigger as update_job_counters()
participant Jobs as Jobs table (rows)
Worker->>DB: SELECT candidate jobs with quota, concurrency, counters
Worker->>DB: CALL promote_waiting_tasks_for_job(job_id, slots)
DB->>Tasks: atomically claim and update N waiting tasks
Tasks->>Trigger: AFTER UPDATE OF status OR DELETE (per-row)
Trigger->>Jobs: UPDATE jobs SET counters, progress, started_at/completed_at (O(1) deltas)
Jobs-->>Trigger: RETURN updated job row
DB-->>Worker: RETURN number of promoted tasks
Worker->>Worker: accumulate promoted count, continue to next job
Possibly related PRs
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
3e14ec9 to
f29426b
Compare
Release VersionsApp patch: ChangelogPerformance
|
|
🐝 Review App Deployed Homepage: https://hover-pr-314.fly.dev |
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@supabase/migrations/20260409111417_incremental_update_job_counters.sql`:
- Around line 45-50: The early-exit condition currently only skips when no
terminal counters change and NEW.status != 'running', which still allows
repeated transitions into 'running' to rewrite the jobs row; modify the
condition around IF v_completed_delta = 0 AND v_failed_delta = 0 AND
v_skipped_delta = 0 AND NEW.status != 'running' so it also returns early when
started_at is already set (i.e., when NEW.started_at IS NOT NULL or
OLD.started_at IS NOT NULL as appropriate) to avoid updating the jobs row on
subsequent pending/waiting → running promotions; update the check in the trigger
function that uses v_completed_delta, v_failed_delta, v_skipped_delta,
NEW.status and started_at before performing the UPDATE/RETURN NEW.
- Around line 112-117: The DELETE branch currently computes progress using
pre-delete counters; change it to compute post-delete values by applying the
same decrements used earlier in the DELETE block (the deltas you computed for
total_tasks, completed_tasks, failed_tasks, skipped_tasks) and then set progress
= ((post_completed_tasks + post_failed_tasks)::REAL / (post_total_tasks -
post_skipped_tasks)::REAL) * 100.0 with the same zero-guard as the UPDATE
branch—i.e., derive post_* values by subtracting the exact deletion deltas used
above and use those post_* expressions in the CASE to avoid reading original row
values.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 5da5e8c3-6edb-4b49-96f7-e08086bf659a
📒 Files selected for processing (2)
CHANGELOG.mdsupabase/migrations/20260409111417_incremental_update_job_counters.sql
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
supabase/migrations/20260409111417_incremental_update_job_counters.sql (2)
45-50:⚠️ Potential issue | 🟠 MajorReturn early after the first
→ runningtransition.Lines 47-48 still send every later
pending/waiting → runningpromotion throughUPDATE jobs, even whenjobs.started_atis already set. That keeps the hot path taking the same row lock this PR is trying to remove.♻️ Proposed fix
- IF v_completed_delta = 0 AND v_failed_delta = 0 AND v_skipped_delta = 0 - AND NEW.status != 'running' - THEN - RETURN NEW; - END IF; + IF v_completed_delta = 0 AND v_failed_delta = 0 AND v_skipped_delta = 0 THEN + IF NEW.status != 'running' THEN + RETURN NEW; + END IF; + + UPDATE jobs + SET started_at = NOW() + WHERE id = NEW.job_id + AND started_at IS NULL; + + RETURN NEW; + END IF;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@supabase/migrations/20260409111417_incremental_update_job_counters.sql` around lines 45 - 50, The early-return condition should also skip updates for later pending/waiting→running promotions when the job already had started_at set; change the IF so it returns when v_completed_delta = 0 AND v_failed_delta = 0 AND v_skipped_delta = 0 AND (NEW.status != 'running' OR OLD.started_at IS NOT NULL). In other words, keep the v_* checks but treat a transition to 'running' as noteworthy only if OLD.started_at IS NULL (the first transition); reference v_completed_delta, v_failed_delta, v_skipped_delta, NEW.status and OLD.started_at when making this change.
112-117:⚠️ Potential issue | 🟠 MajorRecompute DELETE
progressfrom post-delete values, not the old row.In PostgreSQL, each
SETexpression reads the pre-update row. Lines 112-115 therefore divide by the oldtotal_tasks/skipped_tasksvalues, so deleting a terminal or skipped task leavesprogressstale.♻️ Proposed fix
progress = CASE - WHEN total_tasks > 0 AND (total_tasks - skipped_tasks) > 0 THEN - ((completed_tasks + failed_tasks)::REAL - / (total_tasks - skipped_tasks)::REAL) * 100.0 + WHEN GREATEST(0, total_tasks - 1) > 0 + AND ( + GREATEST(0, total_tasks - 1) + - CASE + WHEN OLD.status = 'skipped' THEN GREATEST(0, skipped_tasks - 1) + ELSE skipped_tasks + END + ) > 0 + THEN ( + ( + CASE + WHEN OLD.status = 'completed' THEN GREATEST(0, completed_tasks - 1) + ELSE completed_tasks + END + + CASE + WHEN OLD.status = 'failed' THEN GREATEST(0, failed_tasks - 1) + ELSE failed_tasks + END + )::REAL + / ( + GREATEST(0, total_tasks - 1) + - CASE + WHEN OLD.status = 'skipped' THEN GREATEST(0, skipped_tasks - 1) + ELSE skipped_tasks + END + )::REAL + ) * 100.0 ELSE 0.0 ENDIn PostgreSQL, when an UPDATE statement assigns multiple columns in one SET clause, do expressions later in the SET list read the original row values or the values assigned earlier in the same statement?🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@supabase/migrations/20260409111417_incremental_update_job_counters.sql` around lines 112 - 117, The progress calculation is using pre-update column values (Postgres reads the original row for SET expressions), so change the UPDATE to compute progress from the new/assigned values instead of the old ones: build the new values in a derived row (e.g. a FROM (VALUES ...) or a JOINed subquery that yields v.total_tasks, v.skipped_tasks, v.completed_tasks, v.failed_tasks) and then set progress = CASE WHEN v.total_tasks > 0 AND (v.total_tasks - v.skipped_tasks) > 0 THEN ((v.completed_tasks + v.failed_tasks)::REAL / (v.total_tasks - v.skipped_tasks)::REAL) * 100.0 ELSE 0.0 END, referring to v.* rather than the table columns so progress reflects post-delete/post-update counts.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@supabase/migrations/20260409111417_incremental_update_job_counters.sql`:
- Around line 74-81: The completed_at update logic only sets the timestamp when
NEW.status is 'completed' or 'failed', but it should also treat 'skipped' as a
terminal status; update the conditional on NEW.status in the completed_at CASE
(the same branch that checks completed_tasks, failed_tasks, skipped_tasks with
v_completed_delta/v_failed_delta/v_skipped_delta against total_tasks) to include
'skipped' so that when the last outstanding task becomes skipped the
completed_at branch runs and the timestamp is set.
---
Duplicate comments:
In `@supabase/migrations/20260409111417_incremental_update_job_counters.sql`:
- Around line 45-50: The early-return condition should also skip updates for
later pending/waiting→running promotions when the job already had started_at
set; change the IF so it returns when v_completed_delta = 0 AND v_failed_delta =
0 AND v_skipped_delta = 0 AND (NEW.status != 'running' OR OLD.started_at IS NOT
NULL). In other words, keep the v_* checks but treat a transition to 'running'
as noteworthy only if OLD.started_at IS NULL (the first transition); reference
v_completed_delta, v_failed_delta, v_skipped_delta, NEW.status and
OLD.started_at when making this change.
- Around line 112-117: The progress calculation is using pre-update column
values (Postgres reads the original row for SET expressions), so change the
UPDATE to compute progress from the new/assigned values instead of the old ones:
build the new values in a derived row (e.g. a FROM (VALUES ...) or a JOINed
subquery that yields v.total_tasks, v.skipped_tasks, v.completed_tasks,
v.failed_tasks) and then set progress = CASE WHEN v.total_tasks > 0 AND
(v.total_tasks - v.skipped_tasks) > 0 THEN ((v.completed_tasks +
v.failed_tasks)::REAL / (v.total_tasks - v.skipped_tasks)::REAL) * 100.0 ELSE
0.0 END, referring to v.* rather than the table columns so progress reflects
post-delete/post-update counts.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: a7001390-7ceb-4240-b3d8-c56546d8ea3a
📒 Files selected for processing (2)
CHANGELOG.mdsupabase/migrations/20260409111417_incremental_update_job_counters.sql
|
🐝 Review App Deployed Homepage: https://hover-pr-314.fly.dev |
|
🐝 Review App Deployed Homepage: https://hover-pr-314.fly.dev |
|
🐝 Review App Deployed Homepage: https://hover-pr-314.fly.dev |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/jobs/worker.go (1)
3023-3055:⚠️ Potential issue | 🟠 MajorQuota can be over-allocated across jobs in the same organisation.
get_daily_quota_remaining()is organisation-scoped, but this query snapshots it once per job before any promotion happens. If one organisation has multiple waiting jobs, each loop iteration can spend the full remaining quota, so a single monitor tick can promote well past the org’s real budget. The later concurrency clamp is job-local, so it does not prevent this.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/jobs/worker.go` around lines 3023 - 3055, The query calls get_daily_quota_remaining() per job which snapshots the same organisation quota multiple times, allowing overallocation when an org has multiple waiting jobs; change the logic to compute or fetch quota once per organisation inside the transaction (e.g., SELECT DISTINCT organisation_id, get_daily_quota_remaining(organisation_id) ...) and then, when iterating over job rows (jobInfo, jobs slice) allocate/promote tasks while decrementing that organisation's remaining quota (or skip jobs when the org's remaining quota is exhausted); update the tx.QueryContext call and subsequent loop to reference the per-organisation quota map instead of relying on per-job quota_remaining so promotions cannot exceed the org's real budget.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/jobs/worker.go`:
- Around line 3023-3040: The SQL currently uses "JOIN tasks t ... SELECT
DISTINCT" which fans out per waiting task then de-duplicates; change the query
string passed to tx.QueryContext in the code around the QueryContext call so it
selects from jobs j and uses WHERE EXISTS (SELECT 1 FROM tasks t WHERE t.job_id
= j.id AND t.status = 'waiting') instead of the JOIN and DISTINCT, preserving
the CROSS JOIN LATERAL quota call, the same selected columns (j.id, j.status,
q.quota_remaining, COALESCE(j.concurrency,0) AS concurrency, j.running_tasks,
j.pending_tasks) and other WHERE predicates (j.status IN ('running','pending'),
j.organisation_id IS NOT NULL, q.quota_remaining > 0).
- Around line 3081-3093: The code that computes slots (variable slots, using
job.QuotaRemaining) ignores the pendingUnlimitedCap when job.Concurrency <= 0,
allowing unlimited tasks to be promoted; change the logic in the slots
calculation (the block that inspects job.Concurrency, job.RunningTasks and
job.PendingTasks) to apply the existing pendingUnlimitedCap fallback: when
job.Concurrency <= 0, set slots = min(job.QuotaRemaining, pendingUnlimitedCap -
job.PendingTasks) (and clamp to >=0), and keep the current behavior for
job.Concurrency > 0 (min of quota and available concurrency) so that
pendingUnlimitedCap is honored for null/zero concurrency jobs.
---
Outside diff comments:
In `@internal/jobs/worker.go`:
- Around line 3023-3055: The query calls get_daily_quota_remaining() per job
which snapshots the same organisation quota multiple times, allowing
overallocation when an org has multiple waiting jobs; change the logic to
compute or fetch quota once per organisation inside the transaction (e.g.,
SELECT DISTINCT organisation_id, get_daily_quota_remaining(organisation_id) ...)
and then, when iterating over job rows (jobInfo, jobs slice) allocate/promote
tasks while decrementing that organisation's remaining quota (or skip jobs when
the org's remaining quota is exhausted); update the tx.QueryContext call and
subsequent loop to reference the per-organisation quota map instead of relying
on per-job quota_remaining so promotions cannot exceed the org's real budget.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 39169136-1a11-447c-b28b-97db17838395
📒 Files selected for processing (3)
CHANGELOG.mdinternal/jobs/worker.gosupabase/migrations/20260409120000_unify_job_progress_triggers.sql
|
🐝 Review App Deployed Homepage: https://hover-pr-314.fly.dev |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/jobs/worker.go`:
- Around line 3090-3096: The slot calculation for uncapped concurrency omits
running tasks and should match availablePendingSlots; change the cap computation
in the branch that sets slots to use both job.PendingTasks and job.RunningTasks
(i.e., compute capAvail := pendingUnlimitedCap - (job.PendingTasks +
job.RunningTasks)) and then clamp slots to capAvail (ensuring capAvail can't
drive slots negative). Update the code that currently uses pendingUnlimitedCap -
job.PendingTasks to subtract RunningTasks as well so promotion respects both
running and pending counts.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 02f15ee0-e0c4-49ac-a847-89eced6d0913
📒 Files selected for processing (1)
internal/jobs/worker.go
|
🐝 Review App Deployed Homepage: https://hover-pr-314.fly.dev |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/jobs/worker.go (1)
3008-3048:⚠️ Potential issue | 🟠 MajorKeep quota accounting per organisation, not per job.
get_daily_quota_remaining()is organisation-scoped, but Line 3028 snapshots that same remaining budget onto every eligible job. The loop then spends it independently per job, so one organisation with 10 slots left and two waiting jobs can promote up to 20 tasks in one sweep. Carry a per-organisation remaining budget through the loop, or move slot allocation into a single SQL path that assigns quota once per organisation.♻️ Minimal Go-side fix
type jobInfo struct { ID string + OrganisationID string Status string QuotaRemaining int Concurrency int RunningTasks int PendingTasks int @@ SELECT j.id, + j.organisation_id, j.status, q.quota_remaining, COALESCE(j.concurrency, 0) AS concurrency, j.running_tasks, j.pending_tasks @@ - if err := rows.Scan(&j.ID, &j.Status, &j.QuotaRemaining, &j.Concurrency, &j.RunningTasks, &j.PendingTasks); err != nil { + if err := rows.Scan(&j.ID, &j.OrganisationID, &j.Status, &j.QuotaRemaining, &j.Concurrency, &j.RunningTasks, &j.PendingTasks); err != nil { return fmt.Errorf("failed to scan job: %w", err) } jobs = append(jobs, j) } @@ totalPromoted := 0 + remainingByOrg := make(map[string]int, len(jobs)) for _, job := range jobs { + remaining, ok := remainingByOrg[job.OrganisationID] + if !ok { + remaining = job.QuotaRemaining + } + - slots := job.QuotaRemaining + slots := remaining if job.Concurrency > 0 { if available := job.Concurrency - job.RunningTasks - job.PendingTasks; available < slots { slots = available } } else { @@ if err != nil { log.Warn().Err(err).Str("job_id", job.ID).Msg("Failed to promote waiting tasks for job") continue } + remainingByOrg[job.OrganisationID] = max(0, remaining-promoted) if promoted > 0 { log.Debug(). Str("job_id", job.ID).Also applies to: 3081-3118
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/jobs/worker.go` around lines 3008 - 3048, The code snapshots organisation-scoped quota into each job (jobInfo.QuotaRemaining) and then spends it per-job causing overallocation across jobs in the same organisation; update the logic inside the wp.dbQueue.ExecuteMaintenance block (the SELECT that fills jobInfo and the loop that promotes tasks) to treat quota per-organisation: include organisation_id in the SELECT, maintain an in-memory map[organisation_id]int for remaining_quota initialized from get_daily_quota_remaining (or change the SQL to allocate slots per-organisation once), and when computing promotable slots for a job (using jobInfo.Concurrency, RunningTasks, PendingTasks) deduct from that organisation's remaining_quota (use min(available_job_slots, orgRemaining)) and only promote that many tasks; ensure the map is updated as you spend quota and persist any DB-side counters/locks as before to avoid races.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@internal/jobs/worker.go`:
- Around line 3008-3048: The code snapshots organisation-scoped quota into each
job (jobInfo.QuotaRemaining) and then spends it per-job causing overallocation
across jobs in the same organisation; update the logic inside the
wp.dbQueue.ExecuteMaintenance block (the SELECT that fills jobInfo and the loop
that promotes tasks) to treat quota per-organisation: include organisation_id in
the SELECT, maintain an in-memory map[organisation_id]int for remaining_quota
initialized from get_daily_quota_remaining (or change the SQL to allocate slots
per-organisation once), and when computing promotable slots for a job (using
jobInfo.Concurrency, RunningTasks, PendingTasks) deduct from that organisation's
remaining_quota (use min(available_job_slots, orgRemaining)) and only promote
that many tasks; ensure the map is updated as you spend quota and persist any
DB-side counters/locks as before to avoid races.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 7346c09c-125f-4fed-826b-da20a2167694
📒 Files selected for processing (1)
internal/jobs/worker.go
…ounters Replace COUNT(*) with incremental trigger deltas
Summary
COUNT(*) WHERE status = '...'subqueries inupdate_job_counters()UPDATE path with O(1) incremental deltas computed fromOLD/NEW.statuscompleted/failed/skipped) and isn't the firstrunningtransition, theUPDATE jobsis skipped entirely — covers commonpending↔running,waiting↔pendingpaths that previously triggered all three full-table scansWhy this matters
pg_stat_statementsafter a fresh reset showedINSERT INTO tasksat 34.4% of total DB time with a 2,233ms average. A large portion of that cost comes fromON CONFLICT DO UPDATErows firing the UPDATE trigger path (with 3× COUNT(*)) for every duplicate URL re-enqueue. Every other status transition (claim task, complete task, fail task) also pays the 3× scan cost on the tasks table.Test plan
update_job_countersno longer appears inpg_stat_statementswith high avg latencycompleted_tasks,failed_tasks,skipped_tasks,progress,started_at,completed_atremain correct across a full job runSummary by CodeRabbit
Performance
Behaviour
Documentation