Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/sentry/digests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

from collections import namedtuple
from datetime import datetime
from typing import TYPE_CHECKING, Mapping, Optional, Sequence
from typing import TYPE_CHECKING, List, MutableMapping

from django.conf import settings

Expand All @@ -21,15 +23,17 @@

class Record(namedtuple("Record", "key value timestamp")):
@property
def datetime(self) -> Optional[datetime]:
return to_datetime(self.timestamp) # type: ignore
def datetime(self) -> datetime | None:
# Explicitly typing to satisfy mypy.
dt: datetime | None = to_datetime(self.timestamp)
return dt


ScheduleEntry = namedtuple("ScheduleEntry", "key timestamp")

OPTIONS = frozenset(("increment_delay", "maximum_delay", "minimum_delay"))

Digest = Mapping["Rule", Mapping["Group", Sequence[Record]]]
Digest = MutableMapping["Rule", MutableMapping["Group", List[Record]]]


def get_option_key(plugin: str, option: str) -> str:
Expand Down
86 changes: 15 additions & 71 deletions src/sentry/digests/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
import itertools
import logging
from collections import OrderedDict, defaultdict, namedtuple
from functools import reduce
from typing import Any, Callable, Mapping, MutableMapping, MutableSequence, Sequence
from typing import Any, Mapping, MutableMapping, MutableSequence, Sequence

from sentry.app import tsdb
from sentry.digests import Record
from sentry.digests import Digest, Record
from sentry.eventstore.models import Event
from sentry.models import Group, GroupStatus, Project, Rule
from sentry.notifications.types import ActionTargetType
from sentry.utils.dates import to_timestamp
from sentry.utils.pipeline import Pipeline

logger = logging.getLogger("sentry.digests")

Expand Down Expand Up @@ -101,59 +101,6 @@ def attach_state(
return {"project": project, "groups": groups, "rules": rules}


class Pipeline:
def __init__(self) -> None:
self.operations: MutableSequence[Callable[..., Any]] = []
self.logs: MutableSequence[str] = []

def __call__(self, sequence: Sequence[Any]) -> tuple[Any, Sequence[str]]:
# Explicitly typing to satisfy mypy.
func: Callable[[Any, Callable[[Any], Any]], Any] = lambda x, operation: operation(x)
return reduce(func, self.operations, sequence), self.logs

def _log(self, message: str) -> None:
logger.debug(message)
self.logs.append(message)

def apply(self, function: Callable[[MutableMapping[str, Any]], Any]) -> Pipeline:
def operation(sequence: MutableMapping[str, Any]) -> Any:
result = function(sequence)
self._log(f"{function!r} applied to {len(sequence)} items.")
return result

self.operations.append(operation)
return self

def filter(self, function: Callable[[Record], bool]) -> Pipeline:
def operation(sequence: Sequence[Any]) -> Sequence[Any]:
result = [s for s in sequence if function(s)]
self._log(f"{function!r} filtered {len(sequence)} items to {len(result)}.")
return result

self.operations.append(operation)
return self

def map(self, function: Callable[[Sequence[Any]], Any]) -> Pipeline:
def operation(sequence: Sequence[Any]) -> Sequence[Any]:
result = [function(s) for s in sequence]
self._log(f"{function!r} applied to {len(sequence)} items.")
return result

self.operations.append(operation)
return self

def reduce(
self, function: Callable[[Any, Any], Any], initializer: Callable[[Sequence[Any]], Any]
) -> Pipeline:
def operation(sequence: Sequence[Any]) -> Any:
result = reduce(function, sequence, initializer(sequence))
self._log(f"{function!r} reduced {len(sequence)} items to {len(result)}.")
return result

self.operations.append(operation)
return self


def rewrite_record(
record: Record,
project: Project,
Expand Down Expand Up @@ -217,35 +164,32 @@ def sort_rule_groups(rules: Mapping[str, Rule]) -> Mapping[str, Rule]:
)


def check_group_state(record: Record) -> bool:
# Explicitly typing to satisfy mypy.
is_unresolved: bool = record.value.event.group.get_status() == GroupStatus.UNRESOLVED
return is_unresolved


def build_digest(
project: Project,
records: Sequence[Record],
state: Mapping[str, Any] | None = None,
) -> tuple[Any | None, Sequence[str]]:
records = list(records)
) -> tuple[Digest | None, Sequence[str]]:
if not records:
return None, []

# XXX: This is a hack to allow generating a mock digest without actually
# doing any real IO!
if state is None:
state = fetch_state(project, records)

state = attach_state(**state)

def check_group_state(record: Record) -> bool:
# Explicitly typing to satisfy mypy.
is_unresolved: bool = record.value.event.group.get_status() == GroupStatus.UNRESOLVED
return is_unresolved
# XXX(hack): Allow generating a mock digest without actually doing any real IO!
state = state or fetch_state(project, records)

pipeline = (
Pipeline()
.map(functools.partial(rewrite_record, **state))
.map(functools.partial(rewrite_record, **attach_state(**state)))
.filter(bool)
.filter(check_group_state)
.reduce(group_records, lambda sequence: defaultdict(lambda: defaultdict(list)))
.apply(sort_group_contents)
.apply(sort_rule_groups)
)

return pipeline(records)
digest, logs = pipeline(records)
return digest, logs
3 changes: 2 additions & 1 deletion src/sentry/digests/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ def get_event_from_groups_in_digest(digest: Digest) -> Iterable[Event]:


def build_custom_digest(original_digest: Digest, events: Iterable[Event]) -> Digest:
user_digest = OrderedDict()
"""Given a digest and a set of events, filter the digest to only records that include the events."""
user_digest: Digest = OrderedDict()
for rule, rule_groups in original_digest.items():
user_rule_groups = OrderedDict()
for group, group_records in rule_groups.items():
Expand Down
79 changes: 79 additions & 0 deletions src/sentry/utils/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from __future__ import annotations

import logging
from functools import reduce
from typing import Any, Callable, MutableMapping, MutableSequence, Sequence

logger = logging.getLogger("sentry.utils.pipeline")


class Pipeline:
"""
A Pipeline is a way to call an ordered list of Array operations to a list of objects.

Usage:
# Construct the pipeline with any number of pipeline steps.
pipeline = Pipeline()
.map(...)
.filter(...)
...
.apply(...)

# Optionally add new steps.
if _condition_:
pipeline = pipeline.filter(...)

# Invoke the pipeline.
result = pipeline(objects)
"""

def __init__(self) -> None:
self.operations: MutableSequence[Callable[..., Any]] = []
self.logs: MutableSequence[str] = []

def __call__(self, sequence: Sequence[Any]) -> tuple[Any, Sequence[str]]:
# Explicitly typing to satisfy mypy.
func: Callable[[Any, Callable[[Any], Any]], Any] = lambda x, operation: operation(x)
return reduce(func, self.operations, sequence), self.logs

def _log(self, message: str) -> None:
logger.debug(message)
self.logs.append(message)

def apply(self, function: Callable[[MutableMapping[Any, Any]], Any]) -> Pipeline:
def operation(sequence: MutableMapping[Any, Any]) -> Any:
result = function(sequence)
self._log(f"{function!r} applied to {len(sequence)} items.")
return result

self.operations.append(operation)
return self

def filter(self, function: Callable[[Any], bool]) -> Pipeline:
def operation(sequence: Sequence[Any]) -> Sequence[Any]:
result = [s for s in sequence if function(s)]
self._log(f"{function!r} filtered {len(sequence)} items to {len(result)}.")
return result

self.operations.append(operation)
return self

def map(self, function: Callable[[Sequence[Any]], Any]) -> Pipeline:
def operation(sequence: Sequence[Any]) -> Sequence[Any]:
result = [function(s) for s in sequence]
self._log(f"{function!r} applied to {len(sequence)} items.")
return result

self.operations.append(operation)
return self

def reduce(
self, function: Callable[[Any, Any], Any], initializer: Callable[[Sequence[Any]], Any]
) -> Pipeline:
def operation(sequence: Sequence[Any]) -> Any:
result = reduce(function, sequence, initializer(sequence))
self._log(f"{function!r} reduced {len(sequence)} items to {len(result)}.")
return result

self.operations.append(operation)
return self