Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -3151,6 +3151,20 @@
flags=FLAG_AUTOMATOR_MODIFIABLE,
)

register(
"workflow_engine.num_cohorts",
type=Int,
default=1,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)

register(
"workflow_engine.use_cohort_selection",
type=Bool,
default=True,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)

# Restrict uptime issue creation for specific host provider identifiers. Items
# in this list map to the `host_provider_id` column in the UptimeSubscription
# table.
Expand Down
16 changes: 16 additions & 0 deletions src/sentry/workflow_engine/buffer/batch_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@
from collections.abc import Mapping
from typing import TYPE_CHECKING

import pydantic

import sentry.workflow_engine.buffer as buffer
from sentry.workflow_engine.models import Workflow

if TYPE_CHECKING:
from sentry.workflow_engine.buffer.redis_hash_sorted_set_buffer import RedisHashSortedSetBuffer


class CohortUpdates(pydantic.BaseModel):
values: dict[int, float]


class DelayedWorkflowClient:
"""
Client for interacting with batch processing of delayed workflows.
Expand Down Expand Up @@ -69,6 +75,16 @@ def _get_buffer_keys(cls) -> list[str]:
for shard in range(cls._BUFFER_SHARDS)
]

_COHORT_UPDATES_KEY = "WORKFLOW_ENGINE_COHORT_UPDATES"

def fetch_updates(self) -> CohortUpdates:
return self._buffer.get_parsed_key(
self._COHORT_UPDATES_KEY, CohortUpdates
) or CohortUpdates(values={})

def persist_updates(self, cohort_updates: CohortUpdates) -> None:
self._buffer.put_parsed_key(self._COHORT_UPDATES_KEY, cohort_updates)

def for_project(self, project_id: int) -> ProjectDelayedWorkflowClient:
"""Create a project-specific client for workflow operations."""
return ProjectDelayedWorkflowClient(project_id, self._buffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from collections.abc import Callable, Iterable, Mapping, Sequence
from typing import Any, TypeAlias, TypeVar

import pydantic
import rb
from redis.client import Pipeline

Expand Down Expand Up @@ -50,6 +51,8 @@ def _by_pairs(seq: list[T]) -> Iterable[tuple[T, T]]:
"zrangebyscore": False,
"zrem": True,
"zremrangebyscore": True,
"set": True,
"get": False,
}


Expand Down Expand Up @@ -418,3 +421,12 @@ def _conditional_delete_rb_fallback(
converted_results.update(host_parsed)

return converted_results

def get_parsed_key[T: pydantic.BaseModel](self, key: str, model: type[T]) -> T | None:
value = self._execute_redis_operation(key, "get")
if value is None:
return None
return model.parse_raw(value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is cool!


def put_parsed_key[T: pydantic.BaseModel](self, key: str, value: T) -> None:
self._execute_redis_operation(key, "set", value.json())
122 changes: 103 additions & 19 deletions src/sentry/workflow_engine/processors/schedule.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import hashlib
import logging
import math
import uuid
from datetime import datetime, timezone
from collections.abc import Generator
from contextlib import contextmanager
from datetime import datetime, timedelta, timezone
from itertools import islice

from sentry import options
from sentry.utils import metrics
from sentry.utils.iterators import chunked
from sentry.workflow_engine.buffer.batch_client import (
CohortUpdates,
DelayedWorkflowClient,
ProjectDelayedWorkflowClient,
)
Expand Down Expand Up @@ -79,6 +83,73 @@ def process_in_batches(client: ProjectDelayedWorkflowClient) -> None:
)


class ProjectChooser:
"""
ProjectChooser assists in determining which projects to process based on the cohort updates.
"""

def __init__(self, buffer_client: DelayedWorkflowClient, num_cohorts: int):
self.client = buffer_client
assert num_cohorts > 0 and num_cohorts <= 255
self.num_cohorts = num_cohorts

def _project_id_to_cohort(self, project_id: int) -> int:
return hashlib.sha256(project_id.to_bytes(8)).digest()[0] % self.num_cohorts

def project_ids_to_process(
self, fetch_time: float, cohort_updates: CohortUpdates, all_project_ids: list[int]
) -> list[int]:
"""
Given the time, the cohort update history, and the list of project ids in need of processing,
determine which project ids should be processed.
"""
must_process = set[int]()
may_process = set[int]()
now = fetch_time
long_ago = now - 1000
for co in range(self.num_cohorts):
last_run = cohort_updates.values.get(co, long_ago)
elapsed = timedelta(seconds=now - last_run)
if elapsed >= timedelta(minutes=1):
must_process.add(co)
elif elapsed >= timedelta(seconds=60 / self.num_cohorts):
may_process.add(co)
if may_process and not must_process:
choice = min(may_process, key=lambda c: (cohort_updates.values.get(c, long_ago), c))
must_process.add(choice)
cohort_updates.values.update({cohort_id: fetch_time for cohort_id in must_process})
return [
project_id
for project_id in all_project_ids
if self._project_id_to_cohort(project_id) in must_process
]


@contextmanager
def chosen_projects(
project_chooser: ProjectChooser | None,
fetch_time: float,
all_project_ids: list[int],
) -> Generator[list[int]]:
"""
Context manager that yields the project ids to be processed, and manages the
cohort state after the processing is complete.

If project_chooser is None, all projects are yielded without cohort-based selection.
"""
if project_chooser is None:
# No cohort selection - process all projects
yield all_project_ids
return

cohort_updates = project_chooser.client.fetch_updates()
project_ids_to_process = project_chooser.project_ids_to_process(
fetch_time, cohort_updates, all_project_ids
)
yield project_ids_to_process
project_chooser.client.persist_updates(cohort_updates)


def process_buffered_workflows(buffer_client: DelayedWorkflowClient) -> None:
option_name = buffer_client.option
if option_name and not options.get(option_name):
Expand All @@ -92,45 +163,58 @@ def process_buffered_workflows(buffer_client: DelayedWorkflowClient) -> None:
max=fetch_time,
)

metrics.distribution(
"workflow_engine.schedule.projects", len(all_project_ids_and_timestamps)
)
logger.info(
"delayed_workflow.project_id_list",
extra={"project_ids": sorted(all_project_ids_and_timestamps.keys())},
# Check if cohort-based selection is enabled (defaults to True for safety)
use_cohort_selection = options.get("workflow_engine.use_cohort_selection", True)
project_chooser = (
ProjectChooser(buffer_client, num_cohorts=options.get("workflow_engine.num_cohorts", 1))
if use_cohort_selection
else None
)

project_ids = list(all_project_ids_and_timestamps.keys())
for project_id in project_ids:
process_in_batches(buffer_client.for_project(project_id))
with chosen_projects(
project_chooser, fetch_time, list(all_project_ids_and_timestamps.keys())
) as project_ids_to_process:
metrics.distribution("workflow_engine.schedule.projects", len(project_ids_to_process))
logger.info(
"delayed_workflow.project_id_list",
extra={"project_ids": sorted(project_ids_to_process)},
)

for project_id in project_ids_to_process:
process_in_batches(buffer_client.for_project(project_id))

mark_projects_processed(buffer_client, all_project_ids_and_timestamps)
mark_projects_processed(
buffer_client, project_ids_to_process, all_project_ids_and_timestamps
)


def mark_projects_processed(
buffer_client: DelayedWorkflowClient,
processed_project_ids: list[int],
all_project_ids_and_timestamps: dict[int, list[float]],
) -> None:
if not all_project_ids_and_timestamps:
return
with metrics.timer("workflow_engine.scheduler.mark_projects_processed"):
member_maxes = [
processed_member_maxes = [
(project_id, max(timestamps))
for project_id, timestamps in all_project_ids_and_timestamps.items()
if project_id in processed_project_ids
]
deleted_project_ids = set[int]()
# The conditional delete can be slow, so we break it into chunks that probably
# aren't big enough to hold onto the main redis thread for too long.
for chunk in chunked(member_maxes, 500):
for chunk in chunked(processed_member_maxes, 500):
with metrics.timer(
"workflow_engine.conditional_delete_from_sorted_sets.chunk_duration"
):
deleted = buffer_client.mark_project_ids_as_processed(dict(chunk))
deleted_project_ids.update(deleted)

logger.info(
"process_buffered_workflows.project_ids_deleted",
extra={
"deleted_project_ids": sorted(deleted_project_ids),
},
)
logger.info(
"process_buffered_workflows.project_ids_deleted",
extra={
"deleted_project_ids": sorted(deleted_project_ids),
"processed_project_ids": sorted(processed_project_ids),
},
)
Loading
Loading