From f1903808c7065dde340009d60071e80a31d7d622 Mon Sep 17 00:00:00 2001 From: Kyle Consalus Date: Mon, 22 Sep 2025 10:19:45 -0700 Subject: [PATCH 01/14] initial --- .../buffer/redis_hash_sorted_set_buffer.py | 8 ++ .../workflow_engine/processors/schedule.py | 89 ++++++++++++++++--- 2 files changed, 87 insertions(+), 10 deletions(-) diff --git a/src/sentry/workflow_engine/buffer/redis_hash_sorted_set_buffer.py b/src/sentry/workflow_engine/buffer/redis_hash_sorted_set_buffer.py index 4cd3802a8f64bd..523b1526c08fa9 100644 --- a/src/sentry/workflow_engine/buffer/redis_hash_sorted_set_buffer.py +++ b/src/sentry/workflow_engine/buffer/redis_hash_sorted_set_buffer.py @@ -7,6 +7,7 @@ from typing import Any, TypeAlias, TypeVar import rb +import pydantic from redis.client import Pipeline ClusterPipeline: TypeAlias = Any @@ -418,3 +419,10 @@ def _conditional_delete_rb_fallback( converted_results.update(host_parsed) return converted_results + + def get_parsed_key[T: pydantic.BaseModel](self, key: str, model: type[T]) -> T: + value = self._execute_redis_operation(key, "get") + return model.parse_raw(value) + + def put_parsed_key[T: pydantic.BaseModel](self, key: str, value: T) -> None: + self._execute_redis_operation(key, "set", value.json()) diff --git a/src/sentry/workflow_engine/processors/schedule.py b/src/sentry/workflow_engine/processors/schedule.py index 8713ce62488421..e51f881decd21e 100644 --- a/src/sentry/workflow_engine/processors/schedule.py +++ b/src/sentry/workflow_engine/processors/schedule.py @@ -1,9 +1,17 @@ +import hashlib import logging import math import uuid from datetime import datetime, timezone +from itertools import chain, islice +from collections.abc import Generator +from contextlib import contextmanager +from dataclasses import asdict +from datetime import datetime, timedelta, timezone from itertools import islice +import pydantic + from sentry import options from sentry.utils import metrics from sentry.utils.iterators import chunked @@ -79,6 +87,67 @@ def process_in_batches(client: ProjectDelayedWorkflowClient) -> None: ) +class CohortUpdates(pydantic.BaseModel): + values: dict[int, float] + + def get_last_cohort_run(self, cohort_id: int) -> float: + return self.values.get(cohort_id, 0) + + +NUM_COHORTS = 6 + + +class ProjectChooser: + def __init__(self, buffer: RedisHashSortedSetBuffer, num_cohorts: int = NUM_COHORTS): + self.buffer = buffer + self.num_cohorts = num_cohorts + + def fetch_updates(self) -> CohortUpdates: + return self.buffer.get_parsed_key("WORKFLOW_ENGINE_COHORT_UPDATES", CohortUpdates) + + def persist_updates(self, cohort_updates: CohortUpdates) -> None: + self.buffer.put_parsed_key("WORKFLOW_ENGINE_COHORT_UPDATES", cohort_updates) + + def project_id_to_cohort(self, project_id: int) -> int: + return hashlib.sha256(project_id.to_bytes(8)).digest()[0] % self.num_cohorts + + def project_ids_to_process( + self, fetch_time: float, cohort_updates: CohortUpdates, all_project_ids: list[int] + ) -> list[int]: + must_process = set[int]() + may_process = set[int]() + now = fetch_time + for co in range(self.num_cohorts): + last_run = cohort_updates.get_last_cohort_run(co) + elapsed = timedelta(seconds=now - last_run) + if elapsed > timedelta(minutes=1): + must_process.add(co) + elif elapsed > timedelta(seconds=60 / self.num_cohorts): + may_process.add(co) + if may_process and not must_process: + choice = min(may_process, key=lambda c: (cohort_updates.get_last_cohort_run(c), c)) + must_process.add(choice) + cohort_updates.values.update({cohort_id: fetch_time for cohort_id in must_process}) + return [ + project_id + for project_id in all_project_ids + if self.project_id_to_cohort(project_id) in must_process + ] + + +@contextmanager +def chosen_projects( + buffer: RedisHashSortedSetBuffer, fetch_time: float, all_project_ids: list[int] +) -> Generator[list[int]]: + project_chooser = ProjectChooser(buffer) + cohort_updates = project_chooser.fetch_updates() + project_ids_to_process = project_chooser.project_ids_to_process( + fetch_time, cohort_updates, all_project_ids + ) + yield project_ids_to_process + project_chooser.persist_updates(cohort_updates) + + def process_buffered_workflows(buffer_client: DelayedWorkflowClient) -> None: option_name = buffer_client.option if option_name and not options.get(option_name): @@ -92,17 +161,17 @@ def process_buffered_workflows(buffer_client: DelayedWorkflowClient) -> None: max=fetch_time, ) - metrics.distribution( - "workflow_engine.schedule.projects", len(all_project_ids_and_timestamps) - ) - logger.info( - "delayed_workflow.project_id_list", - extra={"project_ids": sorted(all_project_ids_and_timestamps.keys())}, - ) + with chosen_projects( + buffer_client, fetch_time, list(all_project_ids_and_timestamps.keys()) + ) as project_ids_to_process: + metrics.distribution("workflow_engine.schedule.projects", len(project_ids_to_process)) + logger.info( + "delayed_workflow.project_id_list", + extra={"project_ids": sorted(project_ids_to_process)}, + ) - project_ids = list(all_project_ids_and_timestamps.keys()) - for project_id in project_ids: - process_in_batches(buffer_client.for_project(project_id)) + for project_id in project_ids_to_process: + process_in_batches(buffer_client.for_project(project_id)) mark_projects_processed(buffer_client, all_project_ids_and_timestamps) From 58089e7ee8dd8b92b33dcf53e911b5f5f2210348 Mon Sep 17 00:00:00 2001 From: Kyle Consalus Date: Mon, 22 Sep 2025 11:11:33 -0700 Subject: [PATCH 02/14] tests --- .../buffer/redis_hash_sorted_set_buffer.py | 6 +- .../workflow_engine/processors/schedule.py | 1 - .../test_redis_hash_sorted_set_buffer.py | 52 +++ .../processors/test_schedule.py | 303 +++++++++++++++++- 4 files changed, 359 insertions(+), 3 deletions(-) diff --git a/src/sentry/workflow_engine/buffer/redis_hash_sorted_set_buffer.py b/src/sentry/workflow_engine/buffer/redis_hash_sorted_set_buffer.py index 523b1526c08fa9..a92a7bbdb29990 100644 --- a/src/sentry/workflow_engine/buffer/redis_hash_sorted_set_buffer.py +++ b/src/sentry/workflow_engine/buffer/redis_hash_sorted_set_buffer.py @@ -51,6 +51,8 @@ def _by_pairs(seq: list[T]) -> Iterable[tuple[T, T]]: "zrangebyscore": False, "zrem": True, "zremrangebyscore": True, + "set": True, + "get": False, } @@ -420,8 +422,10 @@ def _conditional_delete_rb_fallback( return converted_results - def get_parsed_key[T: pydantic.BaseModel](self, key: str, model: type[T]) -> T: + def get_parsed_key[T: pydantic.BaseModel](self, key: str, model: type[T]) -> T | None: value = self._execute_redis_operation(key, "get") + if value is None: + return None return model.parse_raw(value) def put_parsed_key[T: pydantic.BaseModel](self, key: str, value: T) -> None: diff --git a/src/sentry/workflow_engine/processors/schedule.py b/src/sentry/workflow_engine/processors/schedule.py index e51f881decd21e..d182074788a41c 100644 --- a/src/sentry/workflow_engine/processors/schedule.py +++ b/src/sentry/workflow_engine/processors/schedule.py @@ -6,7 +6,6 @@ from itertools import chain, islice from collections.abc import Generator from contextlib import contextmanager -from dataclasses import asdict from datetime import datetime, timedelta, timezone from itertools import islice diff --git a/tests/sentry/workflow_engine/buffer/test_redis_hash_sorted_set_buffer.py b/tests/sentry/workflow_engine/buffer/test_redis_hash_sorted_set_buffer.py index 07afd19fc0d934..93d70357df8032 100644 --- a/tests/sentry/workflow_engine/buffer/test_redis_hash_sorted_set_buffer.py +++ b/tests/sentry/workflow_engine/buffer/test_redis_hash_sorted_set_buffer.py @@ -474,3 +474,55 @@ def test_conditional_delete_rb_host_batching(self): for key in keys: remaining = self.buf.get_sorted_set(key, 0, time.time() + 10) assert len(remaining) == 0 + + def test_get_parsed_key_put_parsed_key(self): + """Test storing and retrieving pydantic models using get_parsed_key/put_parsed_key.""" + from pydantic import BaseModel + + class TestModel(BaseModel): + name: str + value: int + enabled: bool + + # Test putting and getting a parsed model + test_data = TestModel(name="test", value=42, enabled=True) + self.buf.put_parsed_key("test_key", test_data) + + retrieved_data = self.buf.get_parsed_key("test_key", TestModel) + + assert retrieved_data is not None + assert retrieved_data.name == "test" + assert retrieved_data.value == 42 + assert retrieved_data.enabled is True + assert isinstance(retrieved_data, TestModel) + + def test_get_parsed_key_put_parsed_key_complex_model(self): + """Test with more complex pydantic model containing nested data.""" + from pydantic import BaseModel + + class NestedModel(BaseModel): + items: list[int] + metadata: dict[str, str] + + test_data = NestedModel( + items=[1, 2, 3, 4, 5], metadata={"source": "test", "version": "1.0"} + ) + + self.buf.put_parsed_key("complex_key", test_data) + retrieved_data = self.buf.get_parsed_key("complex_key", NestedModel) + + assert retrieved_data is not None + assert retrieved_data.items == [1, 2, 3, 4, 5] + assert retrieved_data.metadata == {"source": "test", "version": "1.0"} + assert isinstance(retrieved_data, NestedModel) + + def test_get_parsed_key_missing_key(self): + """Test get_parsed_key returns None for missing key.""" + from pydantic import BaseModel + + class TestModel(BaseModel): + name: str + + # Try to get a key that doesn't exist + retrieved_data = self.buf.get_parsed_key("nonexistent_key", TestModel) + assert retrieved_data is None diff --git a/tests/sentry/workflow_engine/processors/test_schedule.py b/tests/sentry/workflow_engine/processors/test_schedule.py index a9adf48ef17db2..bd6d007afdd6bd 100644 --- a/tests/sentry/workflow_engine/processors/test_schedule.py +++ b/tests/sentry/workflow_engine/processors/test_schedule.py @@ -1,14 +1,20 @@ from datetime import datetime -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, Mock, patch from uuid import uuid4 +import pytest + from sentry.testutils.cases import TestCase from sentry.testutils.helpers.datetime import before_now, freeze_time from sentry.testutils.helpers.options import override_options from sentry.utils import json from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient from sentry.workflow_engine.processors.schedule import ( + NUM_COHORTS, + CohortUpdates, + ProjectChooser, bucket_num_groups, + chosen_projects, process_buffered_workflows, process_in_batches, ) @@ -174,3 +180,298 @@ def test_get_hash_data_with_batch_key(self) -> None: assert f"{self.rule.id}:{self.group.id}" in result data = json.loads(result[f"{self.rule.id}:{self.group.id}"]) assert data["event_id"] == "event-456" + + +class TestCohortUpdates: + def test_get_last_cohort_run_existing(self): + """Test getting last run time for existing cohort.""" + updates = CohortUpdates(values={1: 100.5, 2: 200.3}) + assert updates.get_last_cohort_run(1) == 100.5 + assert updates.get_last_cohort_run(2) == 200.3 + + def test_get_last_cohort_run_missing(self): + """Test getting last run time for non-existent cohort returns 0.""" + updates = CohortUpdates(values={1: 100.5}) + assert updates.get_last_cohort_run(999) == 0 + + +class TestProjectChooser: + @pytest.fixture + def mock_buffer(self): + """Create a mock buffer for testing.""" + mock_buffer = Mock() + return mock_buffer + + @pytest.fixture + def project_chooser(self, mock_buffer): + """Create a ProjectChooser with mocked buffer.""" + return ProjectChooser(mock_buffer, num_cohorts=6) + + def test_init_default_cohorts(self, mock_buffer): + chooser = ProjectChooser(mock_buffer) + assert chooser.num_cohorts == NUM_COHORTS + + def test_fetch_updates(self, project_chooser, mock_buffer): + """Test fetching cohort updates from buffer.""" + expected_updates = CohortUpdates(values={1: 100.0}) + mock_buffer.get_parsed_key.return_value = expected_updates + + result = project_chooser.fetch_updates() + + mock_buffer.get_parsed_key.assert_called_once_with( + "WORKFLOW_ENGINE_COHORT_UPDATES", CohortUpdates + ) + assert result == expected_updates + + def test_persist_updates(self, project_chooser, mock_buffer): + """Test persisting cohort updates to buffer.""" + updates = CohortUpdates(values={1: 100.0, 2: 200.0}) + + project_chooser.persist_updates(updates) + + mock_buffer.put_parsed_key.assert_called_once_with( + "WORKFLOW_ENGINE_COHORT_UPDATES", updates + ) + + def test_fetch_updates_missing_key(self, project_chooser, mock_buffer): + """Test fetching cohort updates when key doesn't exist (returns None).""" + mock_buffer.get_parsed_key.return_value = None + + result = project_chooser.fetch_updates() + + mock_buffer.get_parsed_key.assert_called_once_with( + "WORKFLOW_ENGINE_COHORT_UPDATES", CohortUpdates + ) + assert isinstance(result, CohortUpdates) + assert result.values == {} # Should be default empty dict + + def test_project_id_to_cohort_distribution(self, project_chooser): + """Test that project IDs are distributed across cohorts.""" + project_ids = list(range(1, 1001)) # 1000 project IDs + cohorts = [project_chooser.project_id_to_cohort(pid) for pid in project_ids] + + # Check all cohorts are used + assert set(cohorts) == set(range(6)) + + # Check distribution is reasonably even (each cohort gets some projects) + cohort_counts = [cohorts.count(i) for i in range(6)] + assert all(count > 0 for count in cohort_counts) + assert all(count < 1000 for count in cohort_counts) + + def test_project_id_to_cohort_consistent(self, project_chooser): + """Test that same project ID always maps to same cohort.""" + for project_id in [123, 999, 4, 193848493]: + cohort1 = project_chooser.project_id_to_cohort(project_id) + cohort2 = project_chooser.project_id_to_cohort(project_id) + cohort3 = project_chooser.project_id_to_cohort(project_id) + + assert cohort1 == cohort2 == cohort3 + assert 0 <= cohort1 < 6 + + def test_project_ids_to_process_must_process_over_minute(self, project_chooser): + """Test that cohorts not run for over a minute are marked as must_process.""" + fetch_time = 1000.0 + cohort_updates = CohortUpdates( + values={ + 0: 900.0, # 100 seconds ago - must process + 1: 950.0, # 50 seconds ago - may process + 2: 990.0, # 10 seconds ago - no process + } + ) + all_project_ids = [10, 11, 12, 13, 14, 15] # Projects mapping to cohorts 0-5 + + result = project_chooser.project_ids_to_process(fetch_time, cohort_updates, all_project_ids) + + # Should include projects from cohort 0 (over 1 minute old) + expected_cohort = project_chooser.project_id_to_cohort(10) + if expected_cohort == 0: + assert 10 in result + + # cohort_updates should be updated with fetch_time for processed cohorts + assert 0 in cohort_updates.values + + def test_project_ids_to_process_may_process_fallback(self, project_chooser): + """Test that when no must_process cohorts, oldest may_process is chosen.""" + fetch_time = 1000.0 + cohort_updates = CohortUpdates( + values={ + 0: 995.0, # 5 seconds ago - may process (older) + 1: 998.0, # 2 seconds ago - may process (newer) + 2: 999.0, # 1 second ago - no process + } + ) + all_project_ids = [10, 11, 12] + + result = project_chooser.project_ids_to_process(fetch_time, cohort_updates, all_project_ids) + + # Should choose the oldest from may_process cohorts (cohort 0) + # and update cohort_updates accordingly + assert len(result) > 0 # Should process something + processed_cohorts = {project_chooser.project_id_to_cohort(pid) for pid in result} + + # The processed cohorts should be updated in cohort_updates + for cohort in processed_cohorts: + assert cohort_updates.values[cohort] == fetch_time + + def test_project_ids_to_process_no_processing_needed(self, project_chooser): + """Test when no processing is needed (all cohorts recently processed).""" + fetch_time = 1000.0 + cohort_updates = CohortUpdates( + values={ + 0: 999.0, # 1 second ago + 1: 998.0, # 2 seconds ago + 2: 997.0, # 3 seconds ago + 3: 996.0, # 4 seconds ago + 4: 995.0, # 5 seconds ago + 5: 994.0, # 6 seconds ago + } + ) + all_project_ids = [10, 11, 12, 13, 14, 15] + + result = project_chooser.project_ids_to_process(fetch_time, cohort_updates, all_project_ids) + + # No cohorts are old enough for must_process or may_process + assert len(result) == 0 + + def test_scenario_once_per_minute_6_cohorts(self, project_chooser): + """ + Scenario test: Running once per minute with 6 cohorts. + Should converge to stable cycle where each cohort gets processed every 6 minutes. + """ + # Find project IDs that map to each cohort to ensure even distribution + all_project_ids = [] + used_cohorts: set[int] = set() + project_id = 1 + while len(used_cohorts) < 6: + cohort = project_chooser.project_id_to_cohort(project_id) + if cohort not in used_cohorts: + all_project_ids.append(project_id) + used_cohorts.add(cohort) + project_id += 1 + + cohort_updates = CohortUpdates(values={}) + + # Track which cohorts get processed over time + processed_cohorts_over_time = [] + + # Simulate 20 minutes of processing (20 runs, once per minute) + for minute in range(20): + fetch_time = float(minute * 60) # Every 60 seconds + + processed_projects = project_chooser.project_ids_to_process( + fetch_time, cohort_updates, all_project_ids + ) + processed_cohorts = { + project_chooser.project_id_to_cohort(pid) for pid in processed_projects + } + processed_cohorts_over_time.append(processed_cohorts) + + # After initial ramp-up, should settle into stable cycle + # Check the last 12 runs (2 full cycles) for stability + stable_period = processed_cohorts_over_time[-12:] + + # Each cohort should be processed at least once in 12 minutes + cohort_counts = {i: 0 for i in range(6)} + for processed_cohorts in stable_period: + for cohort in processed_cohorts: + cohort_counts[cohort] += 1 + + # Verify that all cohorts get processed (at least once in stable period) + for cohort in range(6): + assert cohort_counts[cohort] >= 1, f"Cohort {cohort} never processed in stable period" + + def test_scenario_six_times_per_minute(self, project_chooser): + """ + Scenario test: Running 6 times per minute (every 10 seconds). + Should process one cohort per run in stable cycle. + """ + # Find project IDs that map to each cohort to ensure even distribution + all_project_ids = [] + used_cohorts: set[int] = set() + project_id = 1 + while len(used_cohorts) < 6: + cohort = project_chooser.project_id_to_cohort(project_id) + if cohort not in used_cohorts: + all_project_ids.append(project_id) + used_cohorts.add(cohort) + project_id += 1 + + cohort_updates = CohortUpdates(values={}) + + processed_cohorts_over_time = [] + + # Simulate 2 minutes of processing (12 runs, every 10 seconds) + for run in range(12): + fetch_time = float(run * 10) # Every 10 seconds + + processed_projects = project_chooser.project_ids_to_process( + fetch_time, cohort_updates, all_project_ids + ) + processed_cohorts = { + project_chooser.project_id_to_cohort(pid) for pid in processed_projects + } + processed_cohorts_over_time.append(processed_cohorts) + + # After initial period, check stable cycle behavior + # In the last 6 runs, each cohort should be processed at least once + stable_period = processed_cohorts_over_time[-6:] + + cohort_counts = {i: 0 for i in range(6)} + for processed_cohorts in stable_period: + for cohort in processed_cohorts: + cohort_counts[cohort] += 1 + + # Each cohort should be processed at least once in the last 6 runs + for cohort in range(6): + assert cohort_counts[cohort] >= 1, f"Cohort {cohort} never processed in stable period" + + +class TestChosenProjects: + @pytest.fixture + def mock_project_chooser(self): + """Create a mock ProjectChooser.""" + return Mock() + + def test_chosen_projects_context_manager(self, mock_project_chooser): + """Test chosen_projects as a context manager.""" + # Setup mocks + mock_cohort_updates = Mock() + mock_project_chooser.fetch_updates.return_value = mock_cohort_updates + mock_project_chooser.project_ids_to_process.return_value = [1, 2, 3] + + fetch_time = 1000.0 + all_project_ids = [1, 2, 3, 4, 5] + + # Use context manager + with chosen_projects(mock_project_chooser, fetch_time, all_project_ids) as result: + project_ids_to_process = result + + # Verify fetch_updates was called + mock_project_chooser.fetch_updates.assert_called_once() + + # Verify project_ids_to_process was called with correct args + mock_project_chooser.project_ids_to_process.assert_called_once_with( + fetch_time, mock_cohort_updates, all_project_ids + ) + + # Verify the result + assert project_ids_to_process == [1, 2, 3] + + # Verify persist_updates was called after context exit + mock_project_chooser.persist_updates.assert_called_once_with(mock_cohort_updates) + + def test_chosen_projects_fetch_updates_exception(self, mock_project_chooser): + """Test that exception during fetch_updates is properly handled.""" + # Make fetch_updates raise an exception (e.g. key doesn't exist) + mock_project_chooser.fetch_updates.side_effect = Exception("Key not found") + + fetch_time = 1000.0 + all_project_ids = [1, 2, 3, 4, 5] + + # Should raise the exception from fetch_updates + with pytest.raises(Exception, match="Key not found"): + with chosen_projects(mock_project_chooser, fetch_time, all_project_ids): + pass + + # persist_updates should not be called if fetch_updates fails + mock_project_chooser.persist_updates.assert_not_called() From a314467eaeaf2bad4fbfc549ba90d7699465fbb9 Mon Sep 17 00:00:00 2001 From: Kyle Consalus Date: Mon, 22 Sep 2025 16:15:58 -0700 Subject: [PATCH 03/14] fix --- .../workflow_engine/buffer/batch_client.py | 19 +++++++++++++ .../workflow_engine/processors/schedule.py | 28 +++++-------------- .../processors/test_schedule.py | 3 +- 3 files changed, 27 insertions(+), 23 deletions(-) diff --git a/src/sentry/workflow_engine/buffer/batch_client.py b/src/sentry/workflow_engine/buffer/batch_client.py index 360184ae0bd3a4..a901613a0aa526 100644 --- a/src/sentry/workflow_engine/buffer/batch_client.py +++ b/src/sentry/workflow_engine/buffer/batch_client.py @@ -4,6 +4,8 @@ from collections.abc import Mapping from typing import TYPE_CHECKING +import pydantic + import sentry.workflow_engine.buffer as buffer from sentry.workflow_engine.models import Workflow @@ -11,6 +13,13 @@ from sentry.workflow_engine.buffer.redis_hash_sorted_set_buffer import RedisHashSortedSetBuffer +class CohortUpdates(pydantic.BaseModel): + values: dict[int, float] + + def get_last_cohort_run(self, cohort_id: int) -> float: + return self.values.get(cohort_id, 0) + + class DelayedWorkflowClient: """ Client for interacting with batch processing of delayed workflows. @@ -69,6 +78,16 @@ def _get_buffer_keys(cls) -> list[str]: for shard in range(cls._BUFFER_SHARDS) ] + _COHORT_UPDATES_KEY = "WORKFLOW_ENGINE_COHORT_UPDATES" + + def fetch_updates(self) -> CohortUpdates: + return self._buffer.get_parsed_key( + self._COHORT_UPDATES_KEY, CohortUpdates + ) or CohortUpdates(values={}) + + def persist_updates(self, cohort_updates: CohortUpdates) -> None: + self._buffer.put_parsed_key(self._COHORT_UPDATES_KEY, cohort_updates) + def for_project(self, project_id: int) -> ProjectDelayedWorkflowClient: """Create a project-specific client for workflow operations.""" return ProjectDelayedWorkflowClient(project_id, self._buffer) diff --git a/src/sentry/workflow_engine/processors/schedule.py b/src/sentry/workflow_engine/processors/schedule.py index d182074788a41c..6a342567348bd6 100644 --- a/src/sentry/workflow_engine/processors/schedule.py +++ b/src/sentry/workflow_engine/processors/schedule.py @@ -9,12 +9,11 @@ from datetime import datetime, timedelta, timezone from itertools import islice -import pydantic - from sentry import options from sentry.utils import metrics from sentry.utils.iterators import chunked from sentry.workflow_engine.buffer.batch_client import ( + CohortUpdates, DelayedWorkflowClient, ProjectDelayedWorkflowClient, ) @@ -86,27 +85,14 @@ def process_in_batches(client: ProjectDelayedWorkflowClient) -> None: ) -class CohortUpdates(pydantic.BaseModel): - values: dict[int, float] - - def get_last_cohort_run(self, cohort_id: int) -> float: - return self.values.get(cohort_id, 0) - - NUM_COHORTS = 6 class ProjectChooser: - def __init__(self, buffer: RedisHashSortedSetBuffer, num_cohorts: int = NUM_COHORTS): - self.buffer = buffer + def __init__(self, buffer_client: DelayedWorkflowClient, num_cohorts: int = NUM_COHORTS): + self.client = buffer_client self.num_cohorts = num_cohorts - def fetch_updates(self) -> CohortUpdates: - return self.buffer.get_parsed_key("WORKFLOW_ENGINE_COHORT_UPDATES", CohortUpdates) - - def persist_updates(self, cohort_updates: CohortUpdates) -> None: - self.buffer.put_parsed_key("WORKFLOW_ENGINE_COHORT_UPDATES", cohort_updates) - def project_id_to_cohort(self, project_id: int) -> int: return hashlib.sha256(project_id.to_bytes(8)).digest()[0] % self.num_cohorts @@ -136,15 +122,15 @@ def project_ids_to_process( @contextmanager def chosen_projects( - buffer: RedisHashSortedSetBuffer, fetch_time: float, all_project_ids: list[int] + buffer_client: DelayedWorkflowClient, fetch_time: float, all_project_ids: list[int] ) -> Generator[list[int]]: - project_chooser = ProjectChooser(buffer) - cohort_updates = project_chooser.fetch_updates() + project_chooser = ProjectChooser(buffer_client) + cohort_updates = buffer_client.fetch_updates() project_ids_to_process = project_chooser.project_ids_to_process( fetch_time, cohort_updates, all_project_ids ) yield project_ids_to_process - project_chooser.persist_updates(cohort_updates) + buffer_client.persist_updates(cohort_updates) def process_buffered_workflows(buffer_client: DelayedWorkflowClient) -> None: diff --git a/tests/sentry/workflow_engine/processors/test_schedule.py b/tests/sentry/workflow_engine/processors/test_schedule.py index bd6d007afdd6bd..18dc7963d50f27 100644 --- a/tests/sentry/workflow_engine/processors/test_schedule.py +++ b/tests/sentry/workflow_engine/processors/test_schedule.py @@ -8,10 +8,9 @@ from sentry.testutils.helpers.datetime import before_now, freeze_time from sentry.testutils.helpers.options import override_options from sentry.utils import json -from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient +from sentry.workflow_engine.buffer.batch_client import CohortUpdates, DelayedWorkflowClient from sentry.workflow_engine.processors.schedule import ( NUM_COHORTS, - CohortUpdates, ProjectChooser, bucket_num_groups, chosen_projects, From ca3d8bc990534185755b9dfd98193e1d431c980d Mon Sep 17 00:00:00 2001 From: Kyle Consalus Date: Mon, 22 Sep 2025 16:35:02 -0700 Subject: [PATCH 04/14] some fixes --- .../workflow_engine/processors/schedule.py | 10 +- .../buffer/test_batch_client.py | 194 +++++++++++++++++- .../processors/test_schedule.py | 76 ++----- 3 files changed, 216 insertions(+), 64 deletions(-) diff --git a/src/sentry/workflow_engine/processors/schedule.py b/src/sentry/workflow_engine/processors/schedule.py index 6a342567348bd6..53d5eb1f4c005b 100644 --- a/src/sentry/workflow_engine/processors/schedule.py +++ b/src/sentry/workflow_engine/processors/schedule.py @@ -122,15 +122,14 @@ def project_ids_to_process( @contextmanager def chosen_projects( - buffer_client: DelayedWorkflowClient, fetch_time: float, all_project_ids: list[int] + project_chooser: ProjectChooser, fetch_time: float, all_project_ids: list[int] ) -> Generator[list[int]]: - project_chooser = ProjectChooser(buffer_client) - cohort_updates = buffer_client.fetch_updates() + cohort_updates = project_chooser.client.fetch_updates() project_ids_to_process = project_chooser.project_ids_to_process( fetch_time, cohort_updates, all_project_ids ) yield project_ids_to_process - buffer_client.persist_updates(cohort_updates) + project_chooser.client.persist_updates(cohort_updates) def process_buffered_workflows(buffer_client: DelayedWorkflowClient) -> None: @@ -146,8 +145,9 @@ def process_buffered_workflows(buffer_client: DelayedWorkflowClient) -> None: max=fetch_time, ) + project_chooser = ProjectChooser(buffer_client) with chosen_projects( - buffer_client, fetch_time, list(all_project_ids_and_timestamps.keys()) + project_chooser, fetch_time, list(all_project_ids_and_timestamps.keys()) ) as project_ids_to_process: metrics.distribution("workflow_engine.schedule.projects", len(project_ids_to_process)) logger.info( diff --git a/tests/sentry/workflow_engine/buffer/test_batch_client.py b/tests/sentry/workflow_engine/buffer/test_batch_client.py index d63c554bb08100..35a6a379fc8844 100644 --- a/tests/sentry/workflow_engine/buffer/test_batch_client.py +++ b/tests/sentry/workflow_engine/buffer/test_batch_client.py @@ -1,7 +1,9 @@ from unittest.mock import Mock +import pytest from sentry.testutils.cases import TestCase -from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient +from sentry.workflow_engine.buffer.batch_client import CohortUpdates, DelayedWorkflowClient +from sentry.workflow_engine.buffer.redis_hash_sorted_set_buffer import RedisHashSortedSetBuffer class TestDelayedWorkflowClient(TestCase): @@ -115,3 +117,193 @@ def test_mark_project_ids_as_processed_deduplicates_results(self) -> None: expected_result = [123, 456, 789] assert sorted(result) == sorted(expected_result) assert len(result) == 3 # Should have exactly 3 unique project IDs + + + +class TestCohortUpdates: + def test_get_last_cohort_run_existing(self): + """Test getting last run time for existing cohort.""" + updates = CohortUpdates(values={1: 100.5, 2: 200.3}) + assert updates.get_last_cohort_run(1) == 100.5 + assert updates.get_last_cohort_run(2) == 200.3 + + def test_get_last_cohort_run_missing(self): + """Test getting last run time for non-existent cohort returns 0.""" + updates = CohortUpdates(values={1: 100.5}) + assert updates.get_last_cohort_run(999) == 0 + + +class TestDelayedWorkflowClient: + @pytest.fixture + def mock_buffer(self): + """Create a mock buffer for testing.""" + return Mock(spec=RedisHashSortedSetBuffer) + + @pytest.fixture + def delayed_workflow_client(self, mock_buffer): + """Create a DelayedWorkflowClient with mocked buffer.""" + return DelayedWorkflowClient(buf=mock_buffer) + + def test_fetch_updates(self, delayed_workflow_client, mock_buffer): + """Test fetching cohort updates from buffer.""" + expected_updates = CohortUpdates(values={1: 100.0}) + mock_buffer.get_parsed_key.return_value = expected_updates + + result = delayed_workflow_client.fetch_updates() + + mock_buffer.get_parsed_key.assert_called_once_with( + "WORKFLOW_ENGINE_COHORT_UPDATES", CohortUpdates + ) + assert result == expected_updates + + def test_persist_updates(self, delayed_workflow_client, mock_buffer): + """Test persisting cohort updates to buffer.""" + updates = CohortUpdates(values={1: 100.0, 2: 200.0}) + + delayed_workflow_client.persist_updates(updates) + + mock_buffer.put_parsed_key.assert_called_once_with( + "WORKFLOW_ENGINE_COHORT_UPDATES", updates + ) + + def test_fetch_updates_missing_key(self, delayed_workflow_client, mock_buffer): + """Test fetching cohort updates when key doesn't exist (returns None).""" + mock_buffer.get_parsed_key.return_value = None + + result = delayed_workflow_client.fetch_updates() + + mock_buffer.get_parsed_key.assert_called_once_with( + "WORKFLOW_ENGINE_COHORT_UPDATES", CohortUpdates + ) + assert isinstance(result, CohortUpdates) + assert result.values == {} # Should be default empty dict + + def test_add_project_ids(self, delayed_workflow_client, mock_buffer): + """Test adding project IDs to a random shard.""" + project_ids = [1, 2, 3] + + delayed_workflow_client.add_project_ids(project_ids) + + # Should call push_to_sorted_set with one of the buffer keys + assert mock_buffer.push_to_sorted_set.call_count == 1 + call_args = mock_buffer.push_to_sorted_set.call_args + assert call_args[1]["value"] == project_ids + # Key should be one of the expected buffer keys + called_key = call_args[1]["key"] + expected_keys = DelayedWorkflowClient._get_buffer_keys() + assert called_key in expected_keys + + def test_get_project_ids(self, delayed_workflow_client, mock_buffer): + """Test getting project IDs within score range.""" + expected_result = {1: [100.0], 2: [200.0]} + mock_buffer.bulk_get_sorted_set.return_value = expected_result + + result = delayed_workflow_client.get_project_ids(min=0.0, max=300.0) + + mock_buffer.bulk_get_sorted_set.assert_called_once_with( + DelayedWorkflowClient._get_buffer_keys(), + min=0.0, + max=300.0, + ) + assert result == expected_result + + def test_clear_project_ids(self, delayed_workflow_client, mock_buffer): + """Test clearing project IDs within score range.""" + delayed_workflow_client.clear_project_ids(min=0.0, max=300.0) + + mock_buffer.delete_keys.assert_called_once_with( + DelayedWorkflowClient._get_buffer_keys(), + min=0.0, + max=300.0, + ) + + def test_get_buffer_keys(self): + """Test that buffer keys are generated correctly.""" + keys = DelayedWorkflowClient._get_buffer_keys() + + assert len(keys) == 8 # _BUFFER_SHARDS + assert keys[0] == "workflow_engine_delayed_processing_buffer" # shard 0 + assert keys[1] == "workflow_engine_delayed_processing_buffer:1" # shard 1 + assert keys[7] == "workflow_engine_delayed_processing_buffer:7" # shard 7 + + def test_for_project(self, delayed_workflow_client, mock_buffer): + """Test creating a project-specific client.""" + project_id = 123 + + project_client = delayed_workflow_client.for_project(project_id) + + assert project_client.project_id == project_id + assert project_client._buffer == mock_buffer + + +class TestProjectDelayedWorkflowClient: + @pytest.fixture + def mock_buffer(self): + """Create a mock buffer for testing.""" + return Mock(spec=RedisHashSortedSetBuffer) + + @pytest.fixture + def project_client(self, mock_buffer): + """Create a ProjectDelayedWorkflowClient with mocked buffer.""" + return DelayedWorkflowClient(buf=mock_buffer).for_project(123) + + def test_filters_without_batch_key(self, project_client): + """Test filters generation without batch key.""" + filters = project_client._filters(batch_key=None) + assert filters == {"project_id": 123} + + def test_filters_with_batch_key(self, project_client): + """Test filters generation with batch key.""" + filters = project_client._filters(batch_key="test-batch") + assert filters == {"project_id": 123, "batch_key": "test-batch"} + + def test_delete_hash_fields(self, project_client, mock_buffer): + """Test deleting specific fields from workflow hash.""" + fields = ["field1", "field2"] + + project_client.delete_hash_fields(batch_key=None, fields=fields) + + from sentry.workflow_engine.models import Workflow + + mock_buffer.delete_hash.assert_called_once_with( + model=Workflow, filters={"project_id": 123}, fields=fields + ) + + def test_get_hash_length(self, project_client, mock_buffer): + """Test getting hash length.""" + mock_buffer.get_hash_length.return_value = 5 + + result = project_client.get_hash_length(batch_key=None) + + from sentry.workflow_engine.models import Workflow + + mock_buffer.get_hash_length.assert_called_once_with( + model=Workflow, filters={"project_id": 123} + ) + assert result == 5 + + def test_get_hash_data(self, project_client, mock_buffer): + """Test fetching hash data.""" + expected_data = {"key1": "value1", "key2": "value2"} + mock_buffer.get_hash.return_value = expected_data + + result = project_client.get_hash_data(batch_key="test-batch") + + from sentry.workflow_engine.models import Workflow + + mock_buffer.get_hash.assert_called_once_with( + model=Workflow, filters={"project_id": 123, "batch_key": "test-batch"} + ) + assert result == expected_data + + def test_push_to_hash(self, project_client, mock_buffer): + """Test pushing data to hash in bulk.""" + data = {"key1": "value1", "key2": "value2"} + + project_client.push_to_hash(batch_key="test-batch", data=data) + + from sentry.workflow_engine.models import Workflow + + mock_buffer.push_to_hash_bulk.assert_called_once_with( + model=Workflow, filters={"project_id": 123, "batch_key": "test-batch"}, data=data + ) diff --git a/tests/sentry/workflow_engine/processors/test_schedule.py b/tests/sentry/workflow_engine/processors/test_schedule.py index 18dc7963d50f27..dccf7f0031c321 100644 --- a/tests/sentry/workflow_engine/processors/test_schedule.py +++ b/tests/sentry/workflow_engine/processors/test_schedule.py @@ -181,24 +181,11 @@ def test_get_hash_data_with_batch_key(self) -> None: assert data["event_id"] == "event-456" -class TestCohortUpdates: - def test_get_last_cohort_run_existing(self): - """Test getting last run time for existing cohort.""" - updates = CohortUpdates(values={1: 100.5, 2: 200.3}) - assert updates.get_last_cohort_run(1) == 100.5 - assert updates.get_last_cohort_run(2) == 200.3 - - def test_get_last_cohort_run_missing(self): - """Test getting last run time for non-existent cohort returns 0.""" - updates = CohortUpdates(values={1: 100.5}) - assert updates.get_last_cohort_run(999) == 0 - - class TestProjectChooser: @pytest.fixture def mock_buffer(self): """Create a mock buffer for testing.""" - mock_buffer = Mock() + mock_buffer = Mock(spec=DelayedWorkflowClient) return mock_buffer @pytest.fixture @@ -210,40 +197,6 @@ def test_init_default_cohorts(self, mock_buffer): chooser = ProjectChooser(mock_buffer) assert chooser.num_cohorts == NUM_COHORTS - def test_fetch_updates(self, project_chooser, mock_buffer): - """Test fetching cohort updates from buffer.""" - expected_updates = CohortUpdates(values={1: 100.0}) - mock_buffer.get_parsed_key.return_value = expected_updates - - result = project_chooser.fetch_updates() - - mock_buffer.get_parsed_key.assert_called_once_with( - "WORKFLOW_ENGINE_COHORT_UPDATES", CohortUpdates - ) - assert result == expected_updates - - def test_persist_updates(self, project_chooser, mock_buffer): - """Test persisting cohort updates to buffer.""" - updates = CohortUpdates(values={1: 100.0, 2: 200.0}) - - project_chooser.persist_updates(updates) - - mock_buffer.put_parsed_key.assert_called_once_with( - "WORKFLOW_ENGINE_COHORT_UPDATES", updates - ) - - def test_fetch_updates_missing_key(self, project_chooser, mock_buffer): - """Test fetching cohort updates when key doesn't exist (returns None).""" - mock_buffer.get_parsed_key.return_value = None - - result = project_chooser.fetch_updates() - - mock_buffer.get_parsed_key.assert_called_once_with( - "WORKFLOW_ENGINE_COHORT_UPDATES", CohortUpdates - ) - assert isinstance(result, CohortUpdates) - assert result.values == {} # Should be default empty dict - def test_project_id_to_cohort_distribution(self, project_chooser): """Test that project IDs are distributed across cohorts.""" project_ids = list(range(1, 1001)) # 1000 project IDs @@ -429,24 +382,28 @@ class TestChosenProjects: @pytest.fixture def mock_project_chooser(self): """Create a mock ProjectChooser.""" - return Mock() + return Mock(spec=ProjectChooser) def test_chosen_projects_context_manager(self, mock_project_chooser): """Test chosen_projects as a context manager.""" # Setup mocks - mock_cohort_updates = Mock() - mock_project_chooser.fetch_updates.return_value = mock_cohort_updates - mock_project_chooser.project_ids_to_process.return_value = [1, 2, 3] + mock_cohort_updates = Mock(spec=CohortUpdates) + mock_buffer_client = Mock(spec=DelayedWorkflowClient) + mock_project_chooser.client = mock_buffer_client + mock_buffer_client.fetch_updates.return_value = mock_cohort_updates fetch_time = 1000.0 all_project_ids = [1, 2, 3, 4, 5] + expected_result = [1, 2, 3] + + mock_project_chooser.project_ids_to_process.return_value = expected_result # Use context manager with chosen_projects(mock_project_chooser, fetch_time, all_project_ids) as result: project_ids_to_process = result - # Verify fetch_updates was called - mock_project_chooser.fetch_updates.assert_called_once() + # Verify fetch_updates was called on project_chooser.client + mock_buffer_client.fetch_updates.assert_called_once() # Verify project_ids_to_process was called with correct args mock_project_chooser.project_ids_to_process.assert_called_once_with( @@ -454,15 +411,18 @@ def test_chosen_projects_context_manager(self, mock_project_chooser): ) # Verify the result - assert project_ids_to_process == [1, 2, 3] + assert project_ids_to_process == expected_result # Verify persist_updates was called after context exit - mock_project_chooser.persist_updates.assert_called_once_with(mock_cohort_updates) + mock_buffer_client.persist_updates.assert_called_once_with(mock_cohort_updates) def test_chosen_projects_fetch_updates_exception(self, mock_project_chooser): """Test that exception during fetch_updates is properly handled.""" + # Setup mocks + mock_buffer_client = Mock(spec=DelayedWorkflowClient) + mock_project_chooser.client = mock_buffer_client # Make fetch_updates raise an exception (e.g. key doesn't exist) - mock_project_chooser.fetch_updates.side_effect = Exception("Key not found") + mock_buffer_client.fetch_updates.side_effect = Exception("Key not found") fetch_time = 1000.0 all_project_ids = [1, 2, 3, 4, 5] @@ -473,4 +433,4 @@ def test_chosen_projects_fetch_updates_exception(self, mock_project_chooser): pass # persist_updates should not be called if fetch_updates fails - mock_project_chooser.persist_updates.assert_not_called() + mock_buffer_client.persist_updates.assert_not_called() From df23413ee91894331060928c264ad7bfab0ff0b1 Mon Sep 17 00:00:00 2001 From: Kyle Consalus Date: Mon, 22 Sep 2025 16:49:02 -0700 Subject: [PATCH 05/14] more --- src/sentry/options/defaults.py | 7 ++ .../workflow_engine/processors/schedule.py | 8 +- .../processors/test_schedule.py | 81 ++++++++++--------- 3 files changed, 56 insertions(+), 40 deletions(-) diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index a8ddd368033de0..181bf40201a408 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -3151,6 +3151,13 @@ flags=FLAG_AUTOMATOR_MODIFIABLE, ) +register( + "workflow_engine.num_cohorts", + type=Int, + default=1, + flags=FLAG_AUTOMATOR_MODIFIABLE, +) + # Restrict uptime issue creation for specific host provider identifiers. Items # in this list map to the `host_provider_id` column in the UptimeSubscription # table. diff --git a/src/sentry/workflow_engine/processors/schedule.py b/src/sentry/workflow_engine/processors/schedule.py index 53d5eb1f4c005b..d14f51904951c5 100644 --- a/src/sentry/workflow_engine/processors/schedule.py +++ b/src/sentry/workflow_engine/processors/schedule.py @@ -105,9 +105,9 @@ def project_ids_to_process( for co in range(self.num_cohorts): last_run = cohort_updates.get_last_cohort_run(co) elapsed = timedelta(seconds=now - last_run) - if elapsed > timedelta(minutes=1): + if elapsed >= timedelta(minutes=1): must_process.add(co) - elif elapsed > timedelta(seconds=60 / self.num_cohorts): + elif elapsed >= timedelta(seconds=60 / self.num_cohorts): may_process.add(co) if may_process and not must_process: choice = min(may_process, key=lambda c: (cohort_updates.get_last_cohort_run(c), c)) @@ -145,7 +145,9 @@ def process_buffered_workflows(buffer_client: DelayedWorkflowClient) -> None: max=fetch_time, ) - project_chooser = ProjectChooser(buffer_client) + project_chooser = ProjectChooser( + buffer_client, num_cohorts=options.get("workflow_engine.num_cohorts", NUM_COHORTS) + ) with chosen_projects( project_chooser, fetch_time, list(all_project_ids_and_timestamps.keys()) ) as project_ids_to_process: diff --git a/tests/sentry/workflow_engine/processors/test_schedule.py b/tests/sentry/workflow_engine/processors/test_schedule.py index dccf7f0031c321..995ddda9fe5cc4 100644 --- a/tests/sentry/workflow_engine/processors/test_schedule.py +++ b/tests/sentry/workflow_engine/processors/test_schedule.py @@ -184,13 +184,11 @@ def test_get_hash_data_with_batch_key(self) -> None: class TestProjectChooser: @pytest.fixture def mock_buffer(self): - """Create a mock buffer for testing.""" mock_buffer = Mock(spec=DelayedWorkflowClient) return mock_buffer @pytest.fixture def project_chooser(self, mock_buffer): - """Create a ProjectChooser with mocked buffer.""" return ProjectChooser(mock_buffer, num_cohorts=6) def test_init_default_cohorts(self, mock_buffer): @@ -198,7 +196,6 @@ def test_init_default_cohorts(self, mock_buffer): assert chooser.num_cohorts == NUM_COHORTS def test_project_id_to_cohort_distribution(self, project_chooser): - """Test that project IDs are distributed across cohorts.""" project_ids = list(range(1, 1001)) # 1000 project IDs cohorts = [project_chooser.project_id_to_cohort(pid) for pid in project_ids] @@ -211,7 +208,6 @@ def test_project_id_to_cohort_distribution(self, project_chooser): assert all(count < 1000 for count in cohort_counts) def test_project_id_to_cohort_consistent(self, project_chooser): - """Test that same project ID always maps to same cohort.""" for project_id in [123, 999, 4, 193848493]: cohort1 = project_chooser.project_id_to_cohort(project_id) cohort2 = project_chooser.project_id_to_cohort(project_id) @@ -221,7 +217,6 @@ def test_project_id_to_cohort_consistent(self, project_chooser): assert 0 <= cohort1 < 6 def test_project_ids_to_process_must_process_over_minute(self, project_chooser): - """Test that cohorts not run for over a minute are marked as must_process.""" fetch_time = 1000.0 cohort_updates = CohortUpdates( values={ @@ -243,7 +238,6 @@ def test_project_ids_to_process_must_process_over_minute(self, project_chooser): assert 0 in cohort_updates.values def test_project_ids_to_process_may_process_fallback(self, project_chooser): - """Test that when no must_process cohorts, oldest may_process is chosen.""" fetch_time = 1000.0 cohort_updates = CohortUpdates( values={ @@ -266,7 +260,6 @@ def test_project_ids_to_process_may_process_fallback(self, project_chooser): assert cohort_updates.values[cohort] == fetch_time def test_project_ids_to_process_no_processing_needed(self, project_chooser): - """Test when no processing is needed (all cohorts recently processed).""" fetch_time = 1000.0 cohort_updates = CohortUpdates( values={ @@ -285,10 +278,11 @@ def test_project_ids_to_process_no_processing_needed(self, project_chooser): # No cohorts are old enough for must_process or may_process assert len(result) == 0 - def test_scenario_once_per_minute_6_cohorts(self, project_chooser): + def test_scenario_once_per_minute_6_cohorts(self, project_chooser: ProjectChooser) -> None: """ Scenario test: Running once per minute with 6 cohorts. - Should converge to stable cycle where each cohort gets processed every 6 minutes. + In steady state, all cohorts should be processed on every run since + may_process threshold is 10 seconds and we run every 60 seconds. """ # Find project IDs that map to each cohort to ensure even distribution all_project_ids = [] @@ -306,8 +300,8 @@ def test_scenario_once_per_minute_6_cohorts(self, project_chooser): # Track which cohorts get processed over time processed_cohorts_over_time = [] - # Simulate 20 minutes of processing (20 runs, once per minute) - for minute in range(20): + # Simulate 5 minutes of processing (5 runs, once per minute) + for minute in range(5): fetch_time = float(minute * 60) # Every 60 seconds processed_projects = project_chooser.project_ids_to_process( @@ -318,24 +312,27 @@ def test_scenario_once_per_minute_6_cohorts(self, project_chooser): } processed_cohorts_over_time.append(processed_cohorts) - # After initial ramp-up, should settle into stable cycle - # Check the last 12 runs (2 full cycles) for stability - stable_period = processed_cohorts_over_time[-12:] - - # Each cohort should be processed at least once in 12 minutes - cohort_counts = {i: 0 for i in range(6)} - for processed_cohorts in stable_period: - for cohort in processed_cohorts: - cohort_counts[cohort] += 1 - - # Verify that all cohorts get processed (at least once in stable period) - for cohort in range(6): - assert cohort_counts[cohort] >= 1, f"Cohort {cohort} never processed in stable period" - - def test_scenario_six_times_per_minute(self, project_chooser): + # After the first run (ramp-up), every run should process all 6 cohorts + # because may_process threshold (10s) < run interval (60s) + stable_period = processed_cohorts_over_time[1:] # Skip first run + + for run_index, processed_cohorts in enumerate(stable_period): + assert ( + len(processed_cohorts) == 6 + ), f"Run {run_index + 1} processed {len(processed_cohorts)} cohorts, expected 6" + assert processed_cohorts == { + 0, + 1, + 2, + 3, + 4, + 5, + }, f"Run {run_index + 1} didn't process all cohorts: {processed_cohorts}" + + def test_scenario_six_times_per_minute(self, project_chooser: ProjectChooser) -> None: """ Scenario test: Running 6 times per minute (every 10 seconds). - Should process one cohort per run in stable cycle. + Should process exactly one cohort per run in stable cycle, cycling through all. """ # Find project IDs that map to each cohort to ensure even distribution all_project_ids = [] @@ -364,18 +361,28 @@ def test_scenario_six_times_per_minute(self, project_chooser): } processed_cohorts_over_time.append(processed_cohorts) - # After initial period, check stable cycle behavior - # In the last 6 runs, each cohort should be processed at least once - stable_period = processed_cohorts_over_time[-6:] + # After initial ramp-up, should process exactly 1 cohort per run + # and cycle through all cohorts over 6 runs + stable_period = processed_cohorts_over_time[6:] # Skip first 6 runs for ramp-up - cohort_counts = {i: 0 for i in range(6)} - for processed_cohorts in stable_period: - for cohort in processed_cohorts: - cohort_counts[cohort] += 1 + for run_index, processed_cohorts in enumerate(stable_period): + assert ( + len(processed_cohorts) == 1 + ), f"Run {run_index + 6} processed {len(processed_cohorts)} cohorts, expected 1" - # Each cohort should be processed at least once in the last 6 runs - for cohort in range(6): - assert cohort_counts[cohort] >= 1, f"Cohort {cohort} never processed in stable period" + # Over any 6 consecutive runs in stable period, all cohorts should be processed + all_processed_cohorts = set() + for processed_cohorts in stable_period: + all_processed_cohorts.update(processed_cohorts) + + assert all_processed_cohorts == { + 0, + 1, + 2, + 3, + 4, + 5, + }, f"Not all cohorts processed in stable period: {all_processed_cohorts}" class TestChosenProjects: From 34505ecb7e3057eb458668df13cd6e4bee24ebc7 Mon Sep 17 00:00:00 2001 From: Kyle Consalus Date: Tue, 23 Sep 2025 15:30:42 -0700 Subject: [PATCH 06/14] more tests --- .../workflow_engine/buffer/batch_client.py | 3 - .../workflow_engine/processors/schedule.py | 6 +- .../processors/test_schedule.py | 129 ++++++++++-------- 3 files changed, 76 insertions(+), 62 deletions(-) diff --git a/src/sentry/workflow_engine/buffer/batch_client.py b/src/sentry/workflow_engine/buffer/batch_client.py index a901613a0aa526..5f5f02f2ead2a7 100644 --- a/src/sentry/workflow_engine/buffer/batch_client.py +++ b/src/sentry/workflow_engine/buffer/batch_client.py @@ -16,9 +16,6 @@ class CohortUpdates(pydantic.BaseModel): values: dict[int, float] - def get_last_cohort_run(self, cohort_id: int) -> float: - return self.values.get(cohort_id, 0) - class DelayedWorkflowClient: """ diff --git a/src/sentry/workflow_engine/processors/schedule.py b/src/sentry/workflow_engine/processors/schedule.py index d14f51904951c5..3ba71572311dbe 100644 --- a/src/sentry/workflow_engine/processors/schedule.py +++ b/src/sentry/workflow_engine/processors/schedule.py @@ -91,6 +91,7 @@ def process_in_batches(client: ProjectDelayedWorkflowClient) -> None: class ProjectChooser: def __init__(self, buffer_client: DelayedWorkflowClient, num_cohorts: int = NUM_COHORTS): self.client = buffer_client + assert num_cohorts > 0 and num_cohorts <= 255 self.num_cohorts = num_cohorts def project_id_to_cohort(self, project_id: int) -> int: @@ -102,15 +103,16 @@ def project_ids_to_process( must_process = set[int]() may_process = set[int]() now = fetch_time + long_ago = now - 1000 for co in range(self.num_cohorts): - last_run = cohort_updates.get_last_cohort_run(co) + last_run = cohort_updates.values.get(co, long_ago) elapsed = timedelta(seconds=now - last_run) if elapsed >= timedelta(minutes=1): must_process.add(co) elif elapsed >= timedelta(seconds=60 / self.num_cohorts): may_process.add(co) if may_process and not must_process: - choice = min(may_process, key=lambda c: (cohort_updates.get_last_cohort_run(c), c)) + choice = min(may_process, key=lambda c: (cohort_updates.values.get(c, long_ago), c)) must_process.add(choice) cohort_updates.values.update({cohort_id: fetch_time for cohort_id in must_process}) return [ diff --git a/tests/sentry/workflow_engine/processors/test_schedule.py b/tests/sentry/workflow_engine/processors/test_schedule.py index 995ddda9fe5cc4..0ce1c5f73a6aa7 100644 --- a/tests/sentry/workflow_engine/processors/test_schedule.py +++ b/tests/sentry/workflow_engine/processors/test_schedule.py @@ -191,6 +191,19 @@ def mock_buffer(self): def project_chooser(self, mock_buffer): return ProjectChooser(mock_buffer, num_cohorts=6) + def _find_projects_for_cohorts(self, chooser: ProjectChooser, num_cohorts: int) -> list[int]: + """Helper method to find project IDs that map to each cohort to ensure even distribution.""" + all_project_ids = [] + used_cohorts: set[int] = set() + project_id = 1 + while len(used_cohorts) < num_cohorts: + cohort = chooser.project_id_to_cohort(project_id) + if cohort not in used_cohorts: + all_project_ids.append(project_id) + used_cohorts.add(cohort) + project_id += 1 + return all_project_ids + def test_init_default_cohorts(self, mock_buffer): chooser = ProjectChooser(mock_buffer) assert chooser.num_cohorts == NUM_COHORTS @@ -281,25 +294,13 @@ def test_project_ids_to_process_no_processing_needed(self, project_chooser): def test_scenario_once_per_minute_6_cohorts(self, project_chooser: ProjectChooser) -> None: """ Scenario test: Running once per minute with 6 cohorts. - In steady state, all cohorts should be processed on every run since - may_process threshold is 10 seconds and we run every 60 seconds. + Since run interval (60s) equals must_process threshold (60s), + all cohorts should be processed on every single run. """ - # Find project IDs that map to each cohort to ensure even distribution - all_project_ids = [] - used_cohorts: set[int] = set() - project_id = 1 - while len(used_cohorts) < 6: - cohort = project_chooser.project_id_to_cohort(project_id) - if cohort not in used_cohorts: - all_project_ids.append(project_id) - used_cohorts.add(cohort) - project_id += 1 + all_project_ids = self._find_projects_for_cohorts(project_chooser, 6) cohort_updates = CohortUpdates(values={}) - # Track which cohorts get processed over time - processed_cohorts_over_time = [] - # Simulate 5 minutes of processing (5 runs, once per minute) for minute in range(5): fetch_time = float(minute * 60) # Every 60 seconds @@ -310,16 +311,8 @@ def test_scenario_once_per_minute_6_cohorts(self, project_chooser: ProjectChoose processed_cohorts = { project_chooser.project_id_to_cohort(pid) for pid in processed_projects } - processed_cohorts_over_time.append(processed_cohorts) - - # After the first run (ramp-up), every run should process all 6 cohorts - # because may_process threshold (10s) < run interval (60s) - stable_period = processed_cohorts_over_time[1:] # Skip first run - for run_index, processed_cohorts in enumerate(stable_period): - assert ( - len(processed_cohorts) == 6 - ), f"Run {run_index + 1} processed {len(processed_cohorts)} cohorts, expected 6" + # Every run should process all 6 cohorts. assert processed_cohorts == { 0, 1, @@ -327,29 +320,22 @@ def test_scenario_once_per_minute_6_cohorts(self, project_chooser: ProjectChoose 3, 4, 5, - }, f"Run {run_index + 1} didn't process all cohorts: {processed_cohorts}" + }, f"Run {minute} didn't process all cohorts: {processed_cohorts}" def test_scenario_six_times_per_minute(self, project_chooser: ProjectChooser) -> None: """ Scenario test: Running 6 times per minute (every 10 seconds). Should process exactly one cohort per run in stable cycle, cycling through all. """ - # Find project IDs that map to each cohort to ensure even distribution - all_project_ids = [] - used_cohorts: set[int] = set() - project_id = 1 - while len(used_cohorts) < 6: - cohort = project_chooser.project_id_to_cohort(project_id) - if cohort not in used_cohorts: - all_project_ids.append(project_id) - used_cohorts.add(cohort) - project_id += 1 + all_project_ids = self._find_projects_for_cohorts(project_chooser, 6) cohort_updates = CohortUpdates(values={}) + all_cohorts = set(range(6)) processed_cohorts_over_time = [] # Simulate 2 minutes of processing (12 runs, every 10 seconds) + previous_cohorts = [] for run in range(12): fetch_time = float(run * 10) # Every 10 seconds @@ -359,30 +345,59 @@ def test_scenario_six_times_per_minute(self, project_chooser: ProjectChooser) -> processed_cohorts = { project_chooser.project_id_to_cohort(pid) for pid in processed_projects } + if run == 0: + assert ( + processed_cohorts == all_cohorts + ), f"First run should process all cohorts, got {processed_cohorts}" + previous_cohorts.append(processed_cohorts) + if len(previous_cohorts) > 6: + previous_cohorts.pop(0) + elif len(previous_cohorts) == 6: + processed_in_last_cycle = set().union(*previous_cohorts) + assert ( + processed_in_last_cycle == all_cohorts + ), f"Run {run} should process all cohorts, got {processed_in_last_cycle}" processed_cohorts_over_time.append(processed_cohorts) - # After initial ramp-up, should process exactly 1 cohort per run - # and cycle through all cohorts over 6 runs - stable_period = processed_cohorts_over_time[6:] # Skip first 6 runs for ramp-up - - for run_index, processed_cohorts in enumerate(stable_period): - assert ( - len(processed_cohorts) == 1 - ), f"Run {run_index + 6} processed {len(processed_cohorts)} cohorts, expected 1" - - # Over any 6 consecutive runs in stable period, all cohorts should be processed - all_processed_cohorts = set() - for processed_cohorts in stable_period: - all_processed_cohorts.update(processed_cohorts) - - assert all_processed_cohorts == { - 0, - 1, - 2, - 3, - 4, - 5, - }, f"Not all cohorts processed in stable period: {all_processed_cohorts}" + def test_scenario_once_per_minute_cohort_count_1(self, mock_buffer) -> None: + """ + Scenario test: Running once per minute with cohort count of 1 (production default). + This demonstrates that all projects are processed together every minute. + """ + # Create ProjectChooser with cohort count = 1 (production default) + chooser = ProjectChooser(mock_buffer, num_cohorts=1) + all_project_ids = self._find_projects_for_cohorts(chooser, 1) + + # Add more projects to demonstrate they all map to cohort 0 + additional_projects = [10, 25, 50, 100, 999, 1001, 5000] + all_project_ids.extend(additional_projects) + + # Verify all projects map to cohort 0 + for project_id in all_project_ids: + cohort = chooser.project_id_to_cohort(project_id) + assert cohort == 0, f"Project {project_id} should map to cohort 0, got {cohort}" + + cohort_updates = CohortUpdates(values={}) + + # Simulate 5 minutes of processing (5 runs, once per minute) + for minute in range(5): + fetch_time = float(minute * 60) # Every 60 seconds + + processed_projects = chooser.project_ids_to_process( + fetch_time, cohort_updates, all_project_ids + ) + processed_cohorts = {chooser.project_id_to_cohort(pid) for pid in processed_projects} + + # With cohort count = 1, should always process cohort 0 + assert processed_cohorts == { + 0 + }, f"Run {minute} should process cohort 0, got {processed_cohorts}" + + # Since all projects are in cohort 0, processing cohort 0 means ALL projects + assert set(processed_projects) == set(all_project_ids), ( + f"Run {minute}: Expected all {len(all_project_ids)} projects to be processed, " + f"but got {len(processed_projects)}: {sorted(processed_projects)}" + ) class TestChosenProjects: From 6d8d970fbbe7645e6b0b67f84bccbf436e32c920 Mon Sep 17 00:00:00 2001 From: Kyle Consalus Date: Fri, 26 Sep 2025 15:07:00 -0700 Subject: [PATCH 07/14] More merge --- .../workflow_engine/processors/schedule.py | 40 +++++-- .../buffer/test_batch_client.py | 103 +++++++++--------- .../test_redis_hash_sorted_set_buffer.py | 2 +- .../processors/test_schedule.py | 5 - 4 files changed, 78 insertions(+), 72 deletions(-) diff --git a/src/sentry/workflow_engine/processors/schedule.py b/src/sentry/workflow_engine/processors/schedule.py index 3ba71572311dbe..bcb09f473cae86 100644 --- a/src/sentry/workflow_engine/processors/schedule.py +++ b/src/sentry/workflow_engine/processors/schedule.py @@ -1,4 +1,5 @@ import hashlib +import itertools import logging import math import uuid @@ -85,11 +86,8 @@ def process_in_batches(client: ProjectDelayedWorkflowClient) -> None: ) -NUM_COHORTS = 6 - - class ProjectChooser: - def __init__(self, buffer_client: DelayedWorkflowClient, num_cohorts: int = NUM_COHORTS): + def __init__(self, buffer_client: DelayedWorkflowClient, num_cohorts): self.client = buffer_client assert num_cohorts > 0 and num_cohorts <= 255 self.num_cohorts = num_cohorts @@ -148,7 +146,7 @@ def process_buffered_workflows(buffer_client: DelayedWorkflowClient) -> None: ) project_chooser = ProjectChooser( - buffer_client, num_cohorts=options.get("workflow_engine.num_cohorts", NUM_COHORTS) + buffer_client, num_cohorts=options.get("workflow_engine.num_cohorts", 1) ) with chosen_projects( project_chooser, fetch_time, list(all_project_ids_and_timestamps.keys()) @@ -162,11 +160,14 @@ def process_buffered_workflows(buffer_client: DelayedWorkflowClient) -> None: for project_id in project_ids_to_process: process_in_batches(buffer_client.for_project(project_id)) - mark_projects_processed(buffer_client, all_project_ids_and_timestamps) + mark_projects_processed( + buffer_client, project_ids_to_process, all_project_ids_and_timestamps + ) def mark_projects_processed( buffer_client: DelayedWorkflowClient, + project_ids_to_process: list[int], all_project_ids_and_timestamps: dict[int, list[float]], ) -> None: if not all_project_ids_and_timestamps: @@ -186,9 +187,24 @@ def mark_projects_processed( deleted = buffer_client.mark_project_ids_as_processed(dict(chunk)) deleted_project_ids.update(deleted) - logger.info( - "process_buffered_workflows.project_ids_deleted", - extra={ - "deleted_project_ids": sorted(deleted_project_ids), - }, - ) + logger.info( + "process_buffered_workflows.project_ids_deleted", + extra={ + "deleted_project_ids": sorted(deleted_project_ids), + "processed_project_ids": sorted(project_ids_to_process), + }, + ) + except Exception: + logger.exception( + "process_buffered_workflows.conditional_delete_from_sorted_sets_error" + ) + # Fallback. + buffer_client.clear_project_ids( + min=0, + max=max_project_timestamp, + ) + else: + buffer_client.clear_project_ids( + min=0, + max=max_project_timestamp, + ) diff --git a/tests/sentry/workflow_engine/buffer/test_batch_client.py b/tests/sentry/workflow_engine/buffer/test_batch_client.py index 35a6a379fc8844..62ee1e25e531c9 100644 --- a/tests/sentry/workflow_engine/buffer/test_batch_client.py +++ b/tests/sentry/workflow_engine/buffer/test_batch_client.py @@ -1,20 +1,35 @@ from unittest.mock import Mock import pytest -from sentry.testutils.cases import TestCase + from sentry.workflow_engine.buffer.batch_client import CohortUpdates, DelayedWorkflowClient from sentry.workflow_engine.buffer.redis_hash_sorted_set_buffer import RedisHashSortedSetBuffer -class TestDelayedWorkflowClient(TestCase): - def setUp(self) -> None: - self.mock_buffer = Mock() - self.buffer_keys = ["test_key_1", "test_key_2"] - self.workflow_client = DelayedWorkflowClient( - buf=self.mock_buffer, buffer_keys=self.buffer_keys - ) +class TestDelayedWorkflowClient: + @pytest.fixture + def mock_buffer(self): + """Create a mock buffer for testing.""" + return Mock(spec=RedisHashSortedSetBuffer) + + @pytest.fixture + def buffer_keys(self): + """Create test buffer keys.""" + return ["test_key_1", "test_key_2"] + + @pytest.fixture + def delayed_workflow_client(self, mock_buffer): + """Create a DelayedWorkflowClient with mocked buffer.""" + return DelayedWorkflowClient(buf=mock_buffer) + + @pytest.fixture + def workflow_client_with_keys(self, mock_buffer, buffer_keys): + """Create a DelayedWorkflowClient with mocked buffer and specific keys.""" + return DelayedWorkflowClient(buf=mock_buffer, buffer_keys=buffer_keys) - def test_mark_project_ids_as_processed(self) -> None: + def test_mark_project_ids_as_processed( + self, workflow_client_with_keys, mock_buffer, buffer_keys + ): """Test mark_project_ids_as_processed with mocked RedisHashSortedSetBuffer.""" # Mock the conditional_delete_from_sorted_sets return value # Return value is dict[str, list[int]] where keys are buffer keys and values are deleted project IDs @@ -22,7 +37,7 @@ def test_mark_project_ids_as_processed(self) -> None: "test_key_1": [123, 456], "test_key_2": [789], } - self.mock_buffer.conditional_delete_from_sorted_sets.return_value = mock_return_value + mock_buffer.conditional_delete_from_sorted_sets.return_value = mock_return_value # Input data: project_id -> max_timestamp mapping project_id_max_timestamps = { @@ -32,11 +47,11 @@ def test_mark_project_ids_as_processed(self) -> None: } # Call the method - result = self.workflow_client.mark_project_ids_as_processed(project_id_max_timestamps) + result = workflow_client_with_keys.mark_project_ids_as_processed(project_id_max_timestamps) # Verify the mock was called with the correct arguments - self.mock_buffer.conditional_delete_from_sorted_sets.assert_called_once_with( - tuple(self.buffer_keys), # DelayedWorkflowClient stores keys as tuple + mock_buffer.conditional_delete_from_sorted_sets.assert_called_once_with( + tuple(buffer_keys), # DelayedWorkflowClient stores keys as tuple [(123, 1000.5), (456, 2000.0), (789, 1500.75)], ) @@ -44,10 +59,12 @@ def test_mark_project_ids_as_processed(self) -> None: expected_result = [123, 456, 789] assert sorted(result) == sorted(expected_result) - def test_mark_project_ids_as_processed_empty_input(self) -> None: + def test_mark_project_ids_as_processed_empty_input( + self, workflow_client_with_keys, mock_buffer, buffer_keys + ): """Test mark_project_ids_as_processed with empty input.""" # Mock return value for empty input - self.mock_buffer.conditional_delete_from_sorted_sets.return_value = { + mock_buffer.conditional_delete_from_sorted_sets.return_value = { "test_key_1": [], "test_key_2": [], } @@ -56,25 +73,27 @@ def test_mark_project_ids_as_processed_empty_input(self) -> None: project_id_max_timestamps: dict[int, float] = {} # Call the method - result = self.workflow_client.mark_project_ids_as_processed(project_id_max_timestamps) + result = workflow_client_with_keys.mark_project_ids_as_processed(project_id_max_timestamps) # Verify the mock was called with empty member list - self.mock_buffer.conditional_delete_from_sorted_sets.assert_called_once_with( - tuple(self.buffer_keys), + mock_buffer.conditional_delete_from_sorted_sets.assert_called_once_with( + tuple(buffer_keys), [], ) # Result should be empty assert result == [] - def test_mark_project_ids_as_processed_partial_deletion(self) -> None: + def test_mark_project_ids_as_processed_partial_deletion( + self, workflow_client_with_keys, mock_buffer, buffer_keys + ): """Test mark_project_ids_as_processed when only some project IDs are deleted.""" # Mock return value where only some project IDs are actually deleted mock_return_value = { "test_key_1": [123], # Only project 123 was deleted from this key "test_key_2": [], # No projects deleted from this key } - self.mock_buffer.conditional_delete_from_sorted_sets.return_value = mock_return_value + mock_buffer.conditional_delete_from_sorted_sets.return_value = mock_return_value # Input with multiple project IDs project_id_max_timestamps = { @@ -83,25 +102,27 @@ def test_mark_project_ids_as_processed_partial_deletion(self) -> None: } # Call the method - result = self.workflow_client.mark_project_ids_as_processed(project_id_max_timestamps) + result = workflow_client_with_keys.mark_project_ids_as_processed(project_id_max_timestamps) # Verify the mock was called with all input project IDs - self.mock_buffer.conditional_delete_from_sorted_sets.assert_called_once_with( - tuple(self.buffer_keys), + mock_buffer.conditional_delete_from_sorted_sets.assert_called_once_with( + tuple(buffer_keys), [(123, 1000.5), (456, 2000.0)], ) # Result should only contain the actually deleted project IDs assert result == [123] - def test_mark_project_ids_as_processed_deduplicates_results(self) -> None: + def test_mark_project_ids_as_processed_deduplicates_results( + self, workflow_client_with_keys, mock_buffer, buffer_keys + ): """Test that mark_project_ids_as_processed deduplicates project IDs from multiple keys.""" # Mock return value where the same project ID appears in multiple keys mock_return_value = { "test_key_1": [123, 456], "test_key_2": [456, 789], # 456 appears in both keys } - self.mock_buffer.conditional_delete_from_sorted_sets.return_value = mock_return_value + mock_buffer.conditional_delete_from_sorted_sets.return_value = mock_return_value # Input data project_id_max_timestamps = { @@ -111,39 +132,13 @@ def test_mark_project_ids_as_processed_deduplicates_results(self) -> None: } # Call the method - result = self.workflow_client.mark_project_ids_as_processed(project_id_max_timestamps) + result = workflow_client_with_keys.mark_project_ids_as_processed(project_id_max_timestamps) # Verify the result deduplicates project ID 456 expected_result = [123, 456, 789] assert sorted(result) == sorted(expected_result) assert len(result) == 3 # Should have exactly 3 unique project IDs - - -class TestCohortUpdates: - def test_get_last_cohort_run_existing(self): - """Test getting last run time for existing cohort.""" - updates = CohortUpdates(values={1: 100.5, 2: 200.3}) - assert updates.get_last_cohort_run(1) == 100.5 - assert updates.get_last_cohort_run(2) == 200.3 - - def test_get_last_cohort_run_missing(self): - """Test getting last run time for non-existent cohort returns 0.""" - updates = CohortUpdates(values={1: 100.5}) - assert updates.get_last_cohort_run(999) == 0 - - -class TestDelayedWorkflowClient: - @pytest.fixture - def mock_buffer(self): - """Create a mock buffer for testing.""" - return Mock(spec=RedisHashSortedSetBuffer) - - @pytest.fixture - def delayed_workflow_client(self, mock_buffer): - """Create a DelayedWorkflowClient with mocked buffer.""" - return DelayedWorkflowClient(buf=mock_buffer) - def test_fetch_updates(self, delayed_workflow_client, mock_buffer): """Test fetching cohort updates from buffer.""" expected_updates = CohortUpdates(values={1: 100.0}) @@ -201,7 +196,7 @@ def test_get_project_ids(self, delayed_workflow_client, mock_buffer): result = delayed_workflow_client.get_project_ids(min=0.0, max=300.0) mock_buffer.bulk_get_sorted_set.assert_called_once_with( - DelayedWorkflowClient._get_buffer_keys(), + tuple(DelayedWorkflowClient._get_buffer_keys()), min=0.0, max=300.0, ) @@ -212,7 +207,7 @@ def test_clear_project_ids(self, delayed_workflow_client, mock_buffer): delayed_workflow_client.clear_project_ids(min=0.0, max=300.0) mock_buffer.delete_keys.assert_called_once_with( - DelayedWorkflowClient._get_buffer_keys(), + tuple(DelayedWorkflowClient._get_buffer_keys()), min=0.0, max=300.0, ) diff --git a/tests/sentry/workflow_engine/buffer/test_redis_hash_sorted_set_buffer.py b/tests/sentry/workflow_engine/buffer/test_redis_hash_sorted_set_buffer.py index 93d70357df8032..5d336b336daefe 100644 --- a/tests/sentry/workflow_engine/buffer/test_redis_hash_sorted_set_buffer.py +++ b/tests/sentry/workflow_engine/buffer/test_redis_hash_sorted_set_buffer.py @@ -65,7 +65,7 @@ def buffer(self, set_sentry_option, request, mock_time_provider): @pytest.fixture(autouse=True) def setup_buffer(self, buffer, mock_time_provider): - self.buf = buffer + self.buf: RedisHashSortedSetBuffer = buffer self.mock_time = mock_time_provider def test_push_to_hash(self): diff --git a/tests/sentry/workflow_engine/processors/test_schedule.py b/tests/sentry/workflow_engine/processors/test_schedule.py index 0ce1c5f73a6aa7..12126bb7b291e3 100644 --- a/tests/sentry/workflow_engine/processors/test_schedule.py +++ b/tests/sentry/workflow_engine/processors/test_schedule.py @@ -10,7 +10,6 @@ from sentry.utils import json from sentry.workflow_engine.buffer.batch_client import CohortUpdates, DelayedWorkflowClient from sentry.workflow_engine.processors.schedule import ( - NUM_COHORTS, ProjectChooser, bucket_num_groups, chosen_projects, @@ -204,10 +203,6 @@ def _find_projects_for_cohorts(self, chooser: ProjectChooser, num_cohorts: int) project_id += 1 return all_project_ids - def test_init_default_cohorts(self, mock_buffer): - chooser = ProjectChooser(mock_buffer) - assert chooser.num_cohorts == NUM_COHORTS - def test_project_id_to_cohort_distribution(self, project_chooser): project_ids = list(range(1, 1001)) # 1000 project IDs cohorts = [project_chooser.project_id_to_cohort(pid) for pid in project_ids] From bfb12c8ce3fda30e3821386a3480e32d35688907 Mon Sep 17 00:00:00 2001 From: Kyle Consalus Date: Mon, 29 Sep 2025 11:01:26 -0700 Subject: [PATCH 08/14] fix typing --- src/sentry/workflow_engine/processors/schedule.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/workflow_engine/processors/schedule.py b/src/sentry/workflow_engine/processors/schedule.py index bcb09f473cae86..b2e6b0b7996834 100644 --- a/src/sentry/workflow_engine/processors/schedule.py +++ b/src/sentry/workflow_engine/processors/schedule.py @@ -87,7 +87,7 @@ def process_in_batches(client: ProjectDelayedWorkflowClient) -> None: class ProjectChooser: - def __init__(self, buffer_client: DelayedWorkflowClient, num_cohorts): + def __init__(self, buffer_client: DelayedWorkflowClient, num_cohorts: int): self.client = buffer_client assert num_cohorts > 0 and num_cohorts <= 255 self.num_cohorts = num_cohorts From 8594ae32b5f90b2a2f34da99ab699a97dae926c8 Mon Sep 17 00:00:00 2001 From: Kyle Consalus Date: Fri, 3 Oct 2025 13:53:20 -0700 Subject: [PATCH 09/14] fix cleanup --- .../workflow_engine/processors/schedule.py | 9 ++--- .../processors/test_schedule.py | 38 +++++++++++++++++++ 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/src/sentry/workflow_engine/processors/schedule.py b/src/sentry/workflow_engine/processors/schedule.py index b2e6b0b7996834..585e1fce53f092 100644 --- a/src/sentry/workflow_engine/processors/schedule.py +++ b/src/sentry/workflow_engine/processors/schedule.py @@ -1,14 +1,11 @@ import hashlib -import itertools import logging import math import uuid -from datetime import datetime, timezone -from itertools import chain, islice from collections.abc import Generator from contextlib import contextmanager from datetime import datetime, timedelta, timezone -from itertools import islice +from itertools import chain, islice from sentry import options from sentry.utils import metrics @@ -167,7 +164,7 @@ def process_buffered_workflows(buffer_client: DelayedWorkflowClient) -> None: def mark_projects_processed( buffer_client: DelayedWorkflowClient, - project_ids_to_process: list[int], + processed_project_ids: list[int], all_project_ids_and_timestamps: dict[int, list[float]], ) -> None: if not all_project_ids_and_timestamps: @@ -191,7 +188,7 @@ def mark_projects_processed( "process_buffered_workflows.project_ids_deleted", extra={ "deleted_project_ids": sorted(deleted_project_ids), - "processed_project_ids": sorted(project_ids_to_process), + "processed_project_ids": sorted(processed_project_ids), }, ) except Exception: diff --git a/tests/sentry/workflow_engine/processors/test_schedule.py b/tests/sentry/workflow_engine/processors/test_schedule.py index 12126bb7b291e3..81969ea2511573 100644 --- a/tests/sentry/workflow_engine/processors/test_schedule.py +++ b/tests/sentry/workflow_engine/processors/test_schedule.py @@ -9,10 +9,12 @@ from sentry.testutils.helpers.options import override_options from sentry.utils import json from sentry.workflow_engine.buffer.batch_client import CohortUpdates, DelayedWorkflowClient +from sentry.workflow_engine.buffer.redis_hash_sorted_set_buffer import RedisHashSortedSetBuffer from sentry.workflow_engine.processors.schedule import ( ProjectChooser, bucket_num_groups, chosen_projects, + mark_projects_processed, process_buffered_workflows, process_in_batches, ) @@ -451,3 +453,39 @@ def test_chosen_projects_fetch_updates_exception(self, mock_project_chooser): # persist_updates should not be called if fetch_updates fails mock_buffer_client.persist_updates.assert_not_called() + + +@override_options({"workflow_engine.scheduler.use_conditional_delete": True}) +def test_mark_projects_processed_only_cleans_up_processed_projects() -> None: + """Test that mark_projects_processed only cleans up processed projects, not all projects.""" + processed_project_id = 5000 + unprocessed_project_id = 5001 + + current_time = 1000.0 + + def get_fake_time() -> float: + return current_time + + all_project_ids_and_timestamps = { + processed_project_id: [1000.0], + unprocessed_project_id: [2000.0], + } + + client = DelayedWorkflowClient(RedisHashSortedSetBuffer(now_fn=get_fake_time)) + + # Add both projects to buffer + for project_id, [timestamp] in all_project_ids_and_timestamps.items(): + current_time = timestamp + client.add_project_ids([project_id]) + + # Only mark one project as processed + mark_projects_processed( + client, + [processed_project_id], # Only this one was processed + all_project_ids_and_timestamps, + ) + + # The unprocessed project should still be in buffer + remaining_project_ids = client.get_project_ids(min=0, max=3000.0) + assert unprocessed_project_id in remaining_project_ids + assert processed_project_id not in remaining_project_ids From 65aa4ac23f8786273fc72cf18821d838934a65b4 Mon Sep 17 00:00:00 2001 From: Kyle Consalus Date: Thu, 9 Oct 2025 10:20:16 -0700 Subject: [PATCH 10/14] Merge --- .../workflow_engine/processors/schedule.py | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/src/sentry/workflow_engine/processors/schedule.py b/src/sentry/workflow_engine/processors/schedule.py index 585e1fce53f092..f55ac8824bdd9e 100644 --- a/src/sentry/workflow_engine/processors/schedule.py +++ b/src/sentry/workflow_engine/processors/schedule.py @@ -5,7 +5,7 @@ from collections.abc import Generator from contextlib import contextmanager from datetime import datetime, timedelta, timezone -from itertools import chain, islice +from itertools import islice from sentry import options from sentry.utils import metrics @@ -191,17 +191,3 @@ def mark_projects_processed( "processed_project_ids": sorted(processed_project_ids), }, ) - except Exception: - logger.exception( - "process_buffered_workflows.conditional_delete_from_sorted_sets_error" - ) - # Fallback. - buffer_client.clear_project_ids( - min=0, - max=max_project_timestamp, - ) - else: - buffer_client.clear_project_ids( - min=0, - max=max_project_timestamp, - ) From 7a581df4e3104ee3a2bb1ef0824162735622e990 Mon Sep 17 00:00:00 2001 From: Kyle Consalus Date: Thu, 9 Oct 2025 10:26:37 -0700 Subject: [PATCH 11/14] dox --- .../workflow_engine/processors/schedule.py | 16 ++++++++++++-- .../processors/test_schedule.py | 22 +++++++++---------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/src/sentry/workflow_engine/processors/schedule.py b/src/sentry/workflow_engine/processors/schedule.py index f55ac8824bdd9e..e0a07bdbb7fcb8 100644 --- a/src/sentry/workflow_engine/processors/schedule.py +++ b/src/sentry/workflow_engine/processors/schedule.py @@ -84,17 +84,25 @@ def process_in_batches(client: ProjectDelayedWorkflowClient) -> None: class ProjectChooser: + """ + ProjectChooser assists in determining which projects to process based on the cohort updates. + """ + def __init__(self, buffer_client: DelayedWorkflowClient, num_cohorts: int): self.client = buffer_client assert num_cohorts > 0 and num_cohorts <= 255 self.num_cohorts = num_cohorts - def project_id_to_cohort(self, project_id: int) -> int: + def _project_id_to_cohort(self, project_id: int) -> int: return hashlib.sha256(project_id.to_bytes(8)).digest()[0] % self.num_cohorts def project_ids_to_process( self, fetch_time: float, cohort_updates: CohortUpdates, all_project_ids: list[int] ) -> list[int]: + """ + Given the time, the cohort update history, and the list of project ids in need of processing, + determine which project ids should be processed. + """ must_process = set[int]() may_process = set[int]() now = fetch_time @@ -113,7 +121,7 @@ def project_ids_to_process( return [ project_id for project_id in all_project_ids - if self.project_id_to_cohort(project_id) in must_process + if self._project_id_to_cohort(project_id) in must_process ] @@ -121,6 +129,10 @@ def project_ids_to_process( def chosen_projects( project_chooser: ProjectChooser, fetch_time: float, all_project_ids: list[int] ) -> Generator[list[int]]: + """ + Context manager that yields the project ids to be processed, and manages the + cohort state after the processing is complete. + """ cohort_updates = project_chooser.client.fetch_updates() project_ids_to_process = project_chooser.project_ids_to_process( fetch_time, cohort_updates, all_project_ids diff --git a/tests/sentry/workflow_engine/processors/test_schedule.py b/tests/sentry/workflow_engine/processors/test_schedule.py index 81969ea2511573..5c3b791db7708d 100644 --- a/tests/sentry/workflow_engine/processors/test_schedule.py +++ b/tests/sentry/workflow_engine/processors/test_schedule.py @@ -198,7 +198,7 @@ def _find_projects_for_cohorts(self, chooser: ProjectChooser, num_cohorts: int) used_cohorts: set[int] = set() project_id = 1 while len(used_cohorts) < num_cohorts: - cohort = chooser.project_id_to_cohort(project_id) + cohort = chooser._project_id_to_cohort(project_id) if cohort not in used_cohorts: all_project_ids.append(project_id) used_cohorts.add(cohort) @@ -207,7 +207,7 @@ def _find_projects_for_cohorts(self, chooser: ProjectChooser, num_cohorts: int) def test_project_id_to_cohort_distribution(self, project_chooser): project_ids = list(range(1, 1001)) # 1000 project IDs - cohorts = [project_chooser.project_id_to_cohort(pid) for pid in project_ids] + cohorts = [project_chooser._project_id_to_cohort(pid) for pid in project_ids] # Check all cohorts are used assert set(cohorts) == set(range(6)) @@ -219,9 +219,9 @@ def test_project_id_to_cohort_distribution(self, project_chooser): def test_project_id_to_cohort_consistent(self, project_chooser): for project_id in [123, 999, 4, 193848493]: - cohort1 = project_chooser.project_id_to_cohort(project_id) - cohort2 = project_chooser.project_id_to_cohort(project_id) - cohort3 = project_chooser.project_id_to_cohort(project_id) + cohort1 = project_chooser._project_id_to_cohort(project_id) + cohort2 = project_chooser._project_id_to_cohort(project_id) + cohort3 = project_chooser._project_id_to_cohort(project_id) assert cohort1 == cohort2 == cohort3 assert 0 <= cohort1 < 6 @@ -240,7 +240,7 @@ def test_project_ids_to_process_must_process_over_minute(self, project_chooser): result = project_chooser.project_ids_to_process(fetch_time, cohort_updates, all_project_ids) # Should include projects from cohort 0 (over 1 minute old) - expected_cohort = project_chooser.project_id_to_cohort(10) + expected_cohort = project_chooser._project_id_to_cohort(10) if expected_cohort == 0: assert 10 in result @@ -263,7 +263,7 @@ def test_project_ids_to_process_may_process_fallback(self, project_chooser): # Should choose the oldest from may_process cohorts (cohort 0) # and update cohort_updates accordingly assert len(result) > 0 # Should process something - processed_cohorts = {project_chooser.project_id_to_cohort(pid) for pid in result} + processed_cohorts = {project_chooser._project_id_to_cohort(pid) for pid in result} # The processed cohorts should be updated in cohort_updates for cohort in processed_cohorts: @@ -306,7 +306,7 @@ def test_scenario_once_per_minute_6_cohorts(self, project_chooser: ProjectChoose fetch_time, cohort_updates, all_project_ids ) processed_cohorts = { - project_chooser.project_id_to_cohort(pid) for pid in processed_projects + project_chooser._project_id_to_cohort(pid) for pid in processed_projects } # Every run should process all 6 cohorts. @@ -340,7 +340,7 @@ def test_scenario_six_times_per_minute(self, project_chooser: ProjectChooser) -> fetch_time, cohort_updates, all_project_ids ) processed_cohorts = { - project_chooser.project_id_to_cohort(pid) for pid in processed_projects + project_chooser._project_id_to_cohort(pid) for pid in processed_projects } if run == 0: assert ( @@ -371,7 +371,7 @@ def test_scenario_once_per_minute_cohort_count_1(self, mock_buffer) -> None: # Verify all projects map to cohort 0 for project_id in all_project_ids: - cohort = chooser.project_id_to_cohort(project_id) + cohort = chooser._project_id_to_cohort(project_id) assert cohort == 0, f"Project {project_id} should map to cohort 0, got {cohort}" cohort_updates = CohortUpdates(values={}) @@ -383,7 +383,7 @@ def test_scenario_once_per_minute_cohort_count_1(self, mock_buffer) -> None: processed_projects = chooser.project_ids_to_process( fetch_time, cohort_updates, all_project_ids ) - processed_cohorts = {chooser.project_id_to_cohort(pid) for pid in processed_projects} + processed_cohorts = {chooser._project_id_to_cohort(pid) for pid in processed_projects} # With cohort count = 1, should always process cohort 0 assert processed_cohorts == { From cd8375462a1bd12b9c128b145f5524b10288cbd3 Mon Sep 17 00:00:00 2001 From: Kyle Consalus Date: Fri, 10 Oct 2025 13:33:48 -0700 Subject: [PATCH 12/14] fix merge error, add test for exception behavior --- src/sentry/workflow_engine/processors/schedule.py | 5 +++-- .../workflow_engine/processors/test_schedule.py | 12 ++++++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/sentry/workflow_engine/processors/schedule.py b/src/sentry/workflow_engine/processors/schedule.py index e0a07bdbb7fcb8..02680e0557e17d 100644 --- a/src/sentry/workflow_engine/processors/schedule.py +++ b/src/sentry/workflow_engine/processors/schedule.py @@ -182,14 +182,15 @@ def mark_projects_processed( if not all_project_ids_and_timestamps: return with metrics.timer("workflow_engine.scheduler.mark_projects_processed"): - member_maxes = [ + processed_member_maxes = [ (project_id, max(timestamps)) for project_id, timestamps in all_project_ids_and_timestamps.items() + if project_id in processed_project_ids ] deleted_project_ids = set[int]() # The conditional delete can be slow, so we break it into chunks that probably # aren't big enough to hold onto the main redis thread for too long. - for chunk in chunked(member_maxes, 500): + for chunk in chunked(processed_member_maxes, 500): with metrics.timer( "workflow_engine.conditional_delete_from_sorted_sets.chunk_duration" ): diff --git a/tests/sentry/workflow_engine/processors/test_schedule.py b/tests/sentry/workflow_engine/processors/test_schedule.py index 5c3b791db7708d..cf4f11872efb70 100644 --- a/tests/sentry/workflow_engine/processors/test_schedule.py +++ b/tests/sentry/workflow_engine/processors/test_schedule.py @@ -454,6 +454,18 @@ def test_chosen_projects_fetch_updates_exception(self, mock_project_chooser): # persist_updates should not be called if fetch_updates fails mock_buffer_client.persist_updates.assert_not_called() + def test_chosen_projects_exception_during_processing(self, mock_project_chooser): + mock_buffer_client = Mock(spec=DelayedWorkflowClient) + mock_project_chooser.client = mock_buffer_client + mock_buffer_client.fetch_updates.return_value = Mock(spec=CohortUpdates) + mock_project_chooser.project_ids_to_process.return_value = [1, 2, 3] + + with pytest.raises(RuntimeError, match="Processing failed"): + with chosen_projects(mock_project_chooser, 1000.0, [1, 2, 3, 4, 5]): + raise RuntimeError("Processing failed") + + mock_buffer_client.persist_updates.assert_not_called() + @override_options({"workflow_engine.scheduler.use_conditional_delete": True}) def test_mark_projects_processed_only_cleans_up_processed_projects() -> None: From 6100a8d6c86d95b40e987ac2e34627f001980f19 Mon Sep 17 00:00:00 2001 From: Kyle Consalus Date: Tue, 14 Oct 2025 10:41:59 -0700 Subject: [PATCH 13/14] killswitch --- src/sentry/options/defaults.py | 7 +++ .../workflow_engine/processors/schedule.py | 20 +++++++-- .../processors/test_schedule.py | 45 +++++++++++++++++++ 3 files changed, 69 insertions(+), 3 deletions(-) diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 181bf40201a408..d159cbf34905ac 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -3158,6 +3158,13 @@ flags=FLAG_AUTOMATOR_MODIFIABLE, ) +register( + "workflow_engine.use_cohort_selection", + type=Bool, + default=True, + flags=FLAG_AUTOMATOR_MODIFIABLE, +) + # Restrict uptime issue creation for specific host provider identifiers. Items # in this list map to the `host_provider_id` column in the UptimeSubscription # table. diff --git a/src/sentry/workflow_engine/processors/schedule.py b/src/sentry/workflow_engine/processors/schedule.py index 02680e0557e17d..3d1b0192c65b21 100644 --- a/src/sentry/workflow_engine/processors/schedule.py +++ b/src/sentry/workflow_engine/processors/schedule.py @@ -127,12 +127,21 @@ def project_ids_to_process( @contextmanager def chosen_projects( - project_chooser: ProjectChooser, fetch_time: float, all_project_ids: list[int] + project_chooser: ProjectChooser | None, + fetch_time: float, + all_project_ids: list[int], ) -> Generator[list[int]]: """ Context manager that yields the project ids to be processed, and manages the cohort state after the processing is complete. + + If project_chooser is None, all projects are yielded without cohort-based selection. """ + if project_chooser is None: + # No cohort selection - process all projects + yield all_project_ids + return + cohort_updates = project_chooser.client.fetch_updates() project_ids_to_process = project_chooser.project_ids_to_process( fetch_time, cohort_updates, all_project_ids @@ -154,9 +163,14 @@ def process_buffered_workflows(buffer_client: DelayedWorkflowClient) -> None: max=fetch_time, ) - project_chooser = ProjectChooser( - buffer_client, num_cohorts=options.get("workflow_engine.num_cohorts", 1) + # Check if cohort-based selection is enabled (defaults to True for safety) + use_cohort_selection = options.get("workflow_engine.use_cohort_selection", True) + project_chooser = ( + ProjectChooser(buffer_client, num_cohorts=options.get("workflow_engine.num_cohorts", 1)) + if use_cohort_selection + else None ) + with chosen_projects( project_chooser, fetch_time, list(all_project_ids_and_timestamps.keys()) ) as project_ids_to_process: diff --git a/tests/sentry/workflow_engine/processors/test_schedule.py b/tests/sentry/workflow_engine/processors/test_schedule.py index cf4f11872efb70..9d98e53f66ac85 100644 --- a/tests/sentry/workflow_engine/processors/test_schedule.py +++ b/tests/sentry/workflow_engine/processors/test_schedule.py @@ -94,6 +94,42 @@ def test_skips_processing_with_option(self, mock_process_in_batches) -> None: # Should still contain our project assert project.id in all_project_ids + @override_options( + {"delayed_workflow.rollout": True, "workflow_engine.use_cohort_selection": False} + ) + @patch("sentry.workflow_engine.processors.schedule.process_in_batches") + def test_processes_all_projects_without_cohort_selection( + self, mock_process_in_batches: MagicMock + ) -> None: + """Test that all projects are processed when cohort selection is disabled.""" + project = self.create_project() + project_two = self.create_project() + group = self.create_group(project) + group_two = self.create_group(project_two) + + # Push data to buffer + self.batch_client.for_project(project.id).push_to_hash( + batch_key=None, + data={f"345:{group.id}": json.dumps({"event_id": "event-1"})}, + ) + self.batch_client.for_project(project_two.id).push_to_hash( + batch_key=None, + data={f"345:{group_two.id}": json.dumps({"event_id": "event-2"})}, + ) + + # Add projects to sorted set + self.batch_client.add_project_ids([project.id, project_two.id]) + + process_buffered_workflows(self.batch_client) + + # All projects should be processed (no cohort filtering) + assert mock_process_in_batches.call_count == 2 + + # Verify that the buffer keys are cleaned up + fetch_time = datetime.now().timestamp() + all_project_ids = self.batch_client.get_project_ids(min=0, max=fetch_time) + assert all_project_ids == {} + class ProcessInBatchesTest(CreateEventTestCase): def setUp(self) -> None: @@ -466,6 +502,15 @@ def test_chosen_projects_exception_during_processing(self, mock_project_chooser) mock_buffer_client.persist_updates.assert_not_called() + def test_chosen_projects_without_cohort_selection(self): + """Test chosen_projects when project_chooser is None (cohort selection disabled).""" + fetch_time = 1000.0 + all_project_ids = [1, 2, 3, 4, 5] + + # When project_chooser is None, all projects should be yielded without redis interaction + with chosen_projects(None, fetch_time, all_project_ids) as result: + assert result == all_project_ids + @override_options({"workflow_engine.scheduler.use_conditional_delete": True}) def test_mark_projects_processed_only_cleans_up_processed_projects() -> None: From 95cd1e6f9fa3418e437d1b1428a03a942878538e Mon Sep 17 00:00:00 2001 From: "getsantry[bot]" <66042841+getsantry[bot]@users.noreply.github.com> Date: Tue, 14 Oct 2025 17:44:40 +0000 Subject: [PATCH 14/14] :hammer_and_wrench: apply pre-commit fixes --- .../workflow_engine/buffer/redis_hash_sorted_set_buffer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/workflow_engine/buffer/redis_hash_sorted_set_buffer.py b/src/sentry/workflow_engine/buffer/redis_hash_sorted_set_buffer.py index a92a7bbdb29990..33f986f2494fd7 100644 --- a/src/sentry/workflow_engine/buffer/redis_hash_sorted_set_buffer.py +++ b/src/sentry/workflow_engine/buffer/redis_hash_sorted_set_buffer.py @@ -6,8 +6,8 @@ from collections.abc import Callable, Iterable, Mapping, Sequence from typing import Any, TypeAlias, TypeVar -import rb import pydantic +import rb from redis.client import Pipeline ClusterPipeline: TypeAlias = Any