-
-
Notifications
You must be signed in to change notification settings - Fork 4.5k
feat(aci): Delayed workflows cohort sharding #100038
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
TODO: Make cohort size an option, spell out the transition, use precise cleanup, add a test for num_cohorts = 1. |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #100038 +/- ##
===========================================
+ Coverage 78.80% 81.03% +2.22%
===========================================
Files 8699 8701 +2
Lines 385940 386024 +84
Branches 24413 24413
===========================================
+ Hits 304159 312799 +8640
+ Misses 81430 72874 -8556
Partials 351 351 |
This is true, but an artifact of the use_conditional_delete being required for partial clean-up. Next week the fallback will be removed. |
| value = self._execute_redis_operation(key, "get") | ||
| if value is None: | ||
| return None | ||
| return model.parse_raw(value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is cool!
Delayed workflows tasks are currently all scheduled every minute. This frequency (60s) is what we want, but running them all at once makes our load bursty, and causes some rate limits to be frequently hit in Snuba querying. This change establishes "cohorts" and schedules one cohort at a time, allowing us to smoothly transition to N groups of `delayed_workflow` tasks per minute, one every 1/Nth of a minute, smoothing out our workload. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Introduce cohort-sharded scheduling for delayed workflows, persisting cohort run timestamps in Redis and refactoring processing/cleanup; add pydantic-based Redis helpers and a new num_cohorts option. > > - **Workflow Engine (Scheduling & Processing)** > - Introduce cohort-sharded scheduling via `ProjectChooser` and `chosen_projects`, selecting projects per run based on `workflow_engine.num_cohorts`. > - Persist cohort run timestamps in Redis as `CohortUpdates` using `DelayedWorkflowClient.fetch_updates/persist_updates`. > - Refactor cleanup into `mark_projects_processed`; compute `max_project_timestamp` and use conditional delete or range clear accordingly. > - **Buffer Layer** > - Add `RedisHashSortedSetBuffer.get_parsed_key/put_parsed_key` for pydantic models; extend supported ops and expiry handling. > - Extend `DelayedWorkflowClient` with `_COHORT_UPDATES_KEY` and cohort update helpers. > - **Options** > - Add `workflow_engine.num_cohorts` (Int, default `1`). > - **Tests** > - Convert to pytest-style and add coverage for cohort selection, Redis parsed key helpers, and new scheduling/cleanup flows. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 87a09d0. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Co-authored-by: getsantry[bot] <66042841+getsantry[bot]@users.noreply.github.com>
Delayed workflows tasks are currently all scheduled every minute.
This frequency (60s) is what we want, but running them all at once makes our load bursty, and causes some rate limits to be frequently hit in Snuba querying.
This change establishes "cohorts" and schedules one cohort at a time, allowing us to smoothly transition to N groups of
delayed_workflowtasks per minute, one every 1/Nth of a minute, smoothing out our workload.Note
Introduce cohort-sharded scheduling for delayed workflows, persisting cohort run timestamps in Redis and refactoring processing/cleanup; add pydantic-based Redis helpers and a new num_cohorts option.
ProjectChooserandchosen_projects, selecting projects per run based onworkflow_engine.num_cohorts.CohortUpdatesusingDelayedWorkflowClient.fetch_updates/persist_updates.mark_projects_processed; computemax_project_timestampand use conditional delete or range clear accordingly.RedisHashSortedSetBuffer.get_parsed_key/put_parsed_keyfor pydantic models; extend supported ops and expiry handling.DelayedWorkflowClientwith_COHORT_UPDATES_KEYand cohort update helpers.workflow_engine.num_cohorts(Int, default1).Written by Cursor Bugbot for commit 87a09d0. This will update automatically on new commits. Configure here.