feat(webhooks): Add push-based drain trigger to eliminate scheduler wait#109214
feat(webhooks): Add push-based drain trigger to eliminate scheduler wait#109214trent-sentry wants to merge 13 commits intomasterfrom
Conversation
When a webhook arrives, immediately trigger a drain of its mailbox instead of waiting for the scheduler's next cycle. A Redis SETNX lock (15 s TTL) deduplicates concurrent triggers so only one drain task is enqueued per active mailbox. The drain is always started from the head of the mailbox (minimum ID), not from the newly created payload's ID. Starting from the new ID would skip older undelivered payloads in the same mailbox that are waiting in a retry backoff window — those payloads would be stuck until the lock expired and the scheduler eventually picked them up. The scheduler now skips mailboxes that already have an active push- triggered drain (lock key present), avoiding duplicate drain tasks. Controlled by the `hybridcloud.webhookpayload.push_drain_trigger` feature option, defaulting to False. Refs CW-847
…er-to-eliminate-scheduler-wait
The wh:drain_active:{mailbox} lock was refreshed on each loop iteration
but never explicitly deleted when a drain finished. After the drain
returned — due to empty mailbox, deadline, or delivery failure — the
lock persisted for up to 15s. During this window, maybe_trigger_drain
could not acquire a new lock and the scheduler skipped the mailbox,
leaving new webhooks with no delivery path until the TTL expired.
Wrap the while loop in both drain_mailbox and drain_mailbox_parallel
with try/finally, calling cache.delete in the finally block so the lock
is always released regardless of exit path.
Update test_drain_clears_lock_on_completion to assert the lock is
cleared after completion, and add a new test confirming maybe_trigger_drain
can fire immediately after a drain finishes.
When maybe_trigger_drain acquires the wh:drain_active:{mailbox} lock
and enqueues drain_mailbox, replication lag on a freshly written payload
can cause DoesNotExist on the replica. The early return exits before the
try/finally that clears the lock, orphaning it for up to 15s. During
that window, push triggers cannot acquire the lock and the scheduler
skips the mailbox, leaving new webhooks with no delivery path.
Pass mailbox_name as an optional parameter to drain_mailbox and
drain_mailbox_parallel. In the DoesNotExist handler, delete the lock
immediately when mailbox_name is known. The scheduler's existing call
sites pass no mailbox_name (default None) so the deletion is skipped,
which is correct since the scheduler never sets the lock.
joseph-sentry
left a comment
There was a problem hiding this comment.
lgtm in general
i'm going to read through the code and understand how we sync at the db level to prevent double deliveries to convince myself there are no concurrency issues
| integration_id=integration_id, | ||
| request=self.request, | ||
| ) | ||
| if payload.mailbox_name not in seen_mailboxes: |
There was a problem hiding this comment.
it seems the mailbox_name is determined by the provider and shard_identifier
i'm assuming this code is here because we might expect a different mailbox_name for different payload objects, but it seems to me like its constant with respect to the identifier + self.provider, which are constant within this function
Fix lock orphaned when enqueue fails: if the DB query for head_id or drain_mailbox.delay raises inside maybe_trigger_drain after the lock is acquired, delete the lock before the outer exception handler records the error metric. Without this, a broker outage leaves the mailbox blocked for 15s. Simplify parser.py: seen_mailboxes was redundant because mailbox_name is provider:identifier, which is constant for all regions in the loop. Build the payload list first, then call maybe_trigger_drain once. Remove mailbox_name parameter from drain_mailbox and drain_mailbox_parallel as requested — use payload.mailbox_name from the fetched payload inside each function instead. The DoesNotExist early-return relies on the 15s TTL. Extract _drain_lock_key(mailbox_name) helper to avoid repeating the lock key format string. Rename push_trigger metric to push_trigger.success for consistency with .skipped and .error suffixes.
When maybe_trigger_drain acquires the lock, filter the head-of-mailbox query by schedule_for__lte=now() before enqueuing a drain. If no payload is ready (head is still in backoff), release the lock immediately so the scheduler can handle delivery once the backoff expires. Without this guard, push-triggered drains could call schedule_next_attempt on a still-backing-off payload, exhausting all MAX_ATTEMPTS in seconds instead of over the intended backoff period. Also replace the inline lock key string in schedule_webhook_delivery with the _drain_lock_key() helper to avoid silent divergence if the key format changes. Refs CW-847
|
not sure if i'm entirely correct here but i went back and read through the code, i think . might have to update the "schedule_for" of the payload in the mailbox you're consuming from like what's done in |
| try: | ||
| cache.set(_drain_lock_key(payload.mailbox_name), 1, timeout=15) | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
Should we have a context manager that can encapsulate the lock set/cleanup and error handling? I think you could have a cleaner solution both here and in the task scheduling with a context manager.
There was a problem hiding this comment.
Good call, this was a bit messy. I went with _refresh_drain_lock / _release_drain_lock helpers rather than a full context manager. The main reason is that lock ownership is split across the web and celery. I added the helper functions based on this feedback though as that cleans up it up a bit.
When maybe_trigger_drain acquires the lock and enqueues drain_mailbox, a replica replication lag race can cause drain_mailbox to hit DoesNotExist and return early — before the try/finally block that deletes the lock. The lock then stays set for the full 15s TTL, during which both push triggers and the scheduler skip the mailbox. Pass mailbox_name as an optional kwarg to drain_mailbox so the DoesNotExist handler can release the lock when it knows which mailbox was being drained. The scheduler path passes no mailbox_name and is unaffected. Refs CW-847
The parameter was never referenced after the drain logic was changed to always find the mailbox head independently via a DB query. Remove it from the signature and all call sites. Also update the maybe_trigger_drain docstring and test fixture message to use cache-backend-agnostic language instead of Redis-specific terms. Refs CW-847
|
|
||
| lock_key = _drain_lock_key(mailbox_name) | ||
| try: | ||
| if cache.add(lock_key, 1, timeout=15): |
There was a problem hiding this comment.
Should the timeout be greater than BATCH_SCHEDULE_OFFSET? Possibly not.
From AI analysis:
Relevant details:
- Lock TTL: 15 seconds
- Scheduler: runs every 10 seconds (
timedelta(seconds=10)inconf/server.py) - Push:
cache.add(lock_key, 1, timeout=15)— only one request can “win” per key until TTL - Lock is set when: (1) a push wins
cache.add, or (2) a drain task runs and doescache.set(...)at the top of its loop - Lock is cleared when: the drain task’s
finallyrunscache.delete(...), or the key expires after 15s
So the lock is only set after someone has already scheduled a drain (push added the key or a drain task has started and refreshed it). Until then, the scheduler doesn’t see the key and can schedule again.
Worst case in a 15-second window (same mailbox):
- From push: at most 1 (only one
cache.addcan succeed per key per 15s). - From scheduler: it runs at 0s and 10s. Each run can schedule that mailbox only if
cache.get(lock_key)is falsy. If no drain has started yet (tasks still in the queue), the lock is never set, so:- Run at 0s → enqueue task A (lock not set).
- Run at 10s → task A may still not have run → lock still not set → enqueue task B.
So in 15 seconds you can get:
- Up to 3 schedules for the same mailbox: 1 from push + 2 from scheduler (scheduler at 0s and 10s, with neither enqueued task having started and set the lock yet).
- Up to 2 tasks running at once for that mailbox if both get enqueued before either runs and does
cache.set.
So: with a 15s timeout, a drain task for a given mailbox can be scheduled up to 3 times in 15 seconds in the worst case (1 push + 2 scheduler runs). The 15s TTL limits push to one schedule per mailbox per window; the main risk is the scheduler running every 10s while the lock isn’t set yet because enqueued tasks haven’t started.
There was a problem hiding this comment.
the main risk is the scheduler running every 10s while the lock isn’t set yet because enqueued tasks haven’t started.
In that scenario, wouldn't the locks be obtained/failed as the tasks are executed? We'll have scheduled some waste work but the racing tasks will complete quickly and we won't double deliver.
Move the lock-release logic from an inner try/except (scoped only to drain_mailbox.delay) to the outer except block so it covers any failure in the function. No behavior change.
Add _refresh_drain_lock and _release_drain_lock functions to encapsulate the logic for refreshing and releasing the drain lock. This improves code readability and ensures consistent handling of the lock across different exit paths in the webhook delivery process. Update maybe_trigger_drain and drain_mailbox functions to utilize these new helpers, enhancing maintainability and reducing code duplication. Refs CW-847
Update the maybe_trigger_drain function to only release the drain lock if it was successfully acquired. This prevents unintended lock deletions when other processes hold the lock, ensuring that delivery is not blocked erroneously. The change enhances the reliability of the webhook delivery process by maintaining correct lock management across different execution paths. Refs CW-847
| **payload.as_dict(), | ||
| "delivered": delivered, | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Lock can expire during inner batch processing
Medium Severity
The drain lock (15s TTL) is refreshed only at the top of the outer while True loop, but the inner for record in query[:100] loop sequentially delivers up to 100 records, each involving an HTTP request with up to a 30s timeout. A full batch at even 150ms/request takes 15s — exactly the TTL — and any slower causes the lock to expire mid-batch. Since the push-triggered path doesn't update schedule_for (unlike the scheduler path), the lock is the sole mechanism preventing competing drains. When it expires, the scheduler sees the mailbox as ready and can enqueue a duplicate drain, risking double delivery.
Additional Locations (1)
| return | ||
| finally: | ||
| if options.get("hybridcloud.webhookpayload.push_drain_trigger"): | ||
| _release_drain_lock(payload.mailbox_name) |
There was a problem hiding this comment.
Scheduler-triggered drain unconditionally releases push-triggered locks
Low Severity
Both drain_mailbox and drain_mailbox_parallel unconditionally release the drain lock in their finally blocks when the option is enabled — regardless of whether they acquired the lock or a push trigger did. A TOCTOU race exists: the scheduler checks cache.get (no lock), then a push trigger acquires the lock and enqueues its own drain, then the scheduler-triggered drain starts, overwrites the lock via _refresh_drain_lock, and eventually releases it in finally, leaving the push-triggered drain running without lock protection.
Additional Locations (1)
…in tasks Give drain_mailbox_parallel the same mailbox_name: str | None = None contract as drain_mailbox. Scheduler-triggered calls omit the param (None → no lock release); push-triggered calls pass it (→ refresh and release on completion or race). Also move the drain lock refresh from the outer while-loop into the inner for-record loop in drain_mailbox, so a slow HTTP response batch (up to 30s × 100 records) cannot outlast the 15s TTL and let the key expire mid-batch.
The inner-loop lock refresh must only run for push-triggered drains (mailbox_name is not None). Without this gate, scheduler-triggered calls were setting the cache lock via _refresh_drain_lock even though they never acquired it, then leaving it set because the finally block correctly skips release when mailbox_name is None.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| # inner loop (up to 30s timeout × 100 records) cannot outlast the | ||
| # 15s TTL and let the key expire mid-batch. | ||
| if mailbox_name and options.get("hybridcloud.webhookpayload.push_drain_trigger"): | ||
| _refresh_drain_lock(payload.mailbox_name) |
There was a problem hiding this comment.
Lock refresh uses different variable than lock release
Medium Severity
_refresh_drain_lock uses payload.mailbox_name (from the DB record) while the guard condition and _release_drain_lock in the finally block use the mailbox_name parameter. The same inconsistency appears in drain_mailbox_parallel. If these values ever diverge, the lock refresh would operate on a different cache key than the one actually acquired, allowing the real lock to expire mid-batch while a stale key accumulates. Using mailbox_name consistently for all lock operations would be safer.


When a webhook arrives, immediately trigger a drain of its mailbox instead of waiting for the next scheduler cycle. A Redis SETNX lock (15 s TTL) deduplicates concurrent triggers so only one drain task is enqueued per active mailbox; subsequent webhooks within the TTL window are picked up by the already-running drain.
The drain is always started from the head of the mailbox (minimum ID in
(mailbox_name, id)index), not from the newly created payload's ID. Starting from the new ID would skip older undelivered payloads waiting in a retry backoff window — those payloads would be stuck until the lock expired and the scheduler eventually caught up. This matches the scheduler's own head-of-line behaviour.The scheduler now skips mailboxes that have an active push-triggered drain (lock key present in Redis), so the two delivery paths don't enqueue competing tasks.
Controlled by the
hybridcloud.webhookpayload.push_drain_triggeroption flag (defaultFalse).Refs CW-847