Skip to content

chore(ws): add retry backoff and recovery grace to batch consumer#59597

Merged
estefaniarabadan merged 7 commits into
masterfrom
estefania/backoff-grace-period
May 27, 2026
Merged

chore(ws): add retry backoff and recovery grace to batch consumer#59597
estefaniarabadan merged 7 commits into
masterfrom
estefania/backoff-grace-period

Conversation

@estefaniarabadan
Copy link
Copy Markdown
Contributor

@estefaniarabadan estefaniarabadan commented May 22, 2026

Problem

  1. No retry backoff. A batch landing in waiting_retry was eligible to be re-picked on the very next poll cycle, so transient failures (network blips, downstream pressure) could be retried tightly until
    max_attempts was burned through.
  2. No recovery grace window. The orphan sweep treated any executing row whose advisory lock wasn't held as stale, including rows that had only existed for milliseconds. A worker that briefly dropped its psycopg session or one whose advisory lock was probed between status-write and lock-acquire could be racily reclaimed by another worker.
  3. Out-of-order batches within a run. Sibling batches for the same (team_id, schema_id) were processed independently, so a failure on batch_index=N did not stop N+1 from running. For pipelines where ordering within a run matters, this could surface downstream state inconsistencies.

Changes

  • Added retry_backoff_base_seconds to ConsumerConfig (default 15s), waiting_retry batches wait base * attempt seconds before becoming re-eligible
  • Added recovery_grace_seconds to ConsumerConfig (defaults to recovery_interval_seconds), orphan sweep ignores executing rows younger than this threshold
  • Added head-of-line gating per run_uuid in BatchQueue.get_unprocessed_and_lock, later batch_index rows are excluded while earlier ones are executing or within their retry backoff
  • Changed BatchConsumer._process_single to return bool; _process_group halts sibling batches in the same group on any non-success
  • Added sb_run_uuid_bi_idx index on SourceBatch(run_uuid, batch_index) via SeparateDatabaseAndState migration 000

How did you test this code?

Local run
Tests pass

Publish to changelog?

NO

Docs update

NO

@estefaniarabadan estefaniarabadan requested a review from a team May 22, 2026 10:36
@estefaniarabadan estefaniarabadan added the skip-inkeep-docs Use this label to skip an Inkeep docs PR in posthog.com label May 22, 2026
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 22, 2026

Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 1
posthog/temporal/data_imports/pipelines/pipeline_v3/postgres_queue/jobs_db.py:186-188
The docstring says the batch is eligible when `retry_backoff_base_seconds * s.attempt` seconds have elapsed, but the SQL multiplier is `GREATEST(COALESCE(s.attempt, 1), 1)` — which floors the factor at 1 so a `waiting_retry` row with `attempt=0` never becomes immediately eligible. The description understates this: it implies `attempt=0` would produce a zero wait, which the code deliberately avoids.

```suggestion
        ``retry_backoff_base_seconds`` gates the ``waiting_retry`` branch on
        the age of the latest status row: a batch is only eligible when
        ``now() - s.created_at >= retry_backoff_base_seconds * GREATEST(s.attempt, 1)``
        (attempt is floored at 1 so that a zero-attempt row still waits at least one
        base period).
```

Reviews (1): Last reviewed commit: "add backoff and grace period" | Re-trigger Greptile

Comment thread posthog/temporal/data_imports/pipelines/pipeline_v3/postgres_queue/jobs_db.py Outdated
@estefaniarabadan estefaniarabadan changed the title chore(ws): add grace period and backoff to pipelines V3 queue chore(ws): add retry backoff and recovery grace to batch consumer May 26, 2026
@estefaniarabadan estefaniarabadan enabled auto-merge (squash) May 26, 2026 15:15
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 26, 2026

Migration SQL Changes

Hey 👋, we've detected some migrations on this PR. Here's the SQL output for each migration, make sure they make sense:

products/warehouse_sources_queue/backend/migrations/0002_sourcebatch_run_uuid_batch_index_idx.py

BEGIN;
--
-- Custom state/database change combination
--
-- (no-op)
COMMIT;

Last updated: 2026-05-27 14:01 UTC (4f34bf0)

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 26, 2026

🔍 Migration Risk Analysis

We've analyzed your migrations for potential risks.

Summary: 0 Safe | 1 Needs Review | 0 Blocked

⚠️ Needs Review

May have performance impact

warehouse_sources_queue.0002_sourcebatch_run_uuid_batch_index_idx
  └─ #1 ✅ SeparateDatabaseAndState
     Wrapper operation - see nested operations for risk: RunPython
     database_operations: RunPython
     └─ #2 ⚠️ RunPython: RunPython data migration needs review for performance

📚 How to Deploy These Changes Safely

RunPython:

Use batching for large data migrations:

  • Use .iterator() to avoid loading all rows into memory
  • Use .bulk_update() instead of saving individual objects
  • Batch size: 1,000-10,000 rows per batch
  • Add pauses between batches
  • Consider background jobs for very large updates (millions of rows)

See the migration safety guide

Last updated: 2026-05-27 14:01 UTC (4f34bf0)

@estefaniarabadan estefaniarabadan merged commit 5b6c584 into master May 27, 2026
298 of 300 checks passed
@estefaniarabadan estefaniarabadan deleted the estefania/backoff-grace-period branch May 27, 2026 15:28
@deployment-status-posthog
Copy link
Copy Markdown

deployment-status-posthog Bot commented May 27, 2026

Deploy status

Environment Status Deployed At Workflow
dev ✅ Deployed 2026-05-27 16:01 UTC Run
prod-us ✅ Deployed 2026-05-27 16:16 UTC Run
prod-eu ✅ Deployed 2026-05-27 16:16 UTC Run

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

skip-inkeep-docs Use this label to skip an Inkeep docs PR in posthog.com

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants