From 36e4b59cdaf9cf129847c5beca698f4506dfe60f Mon Sep 17 00:00:00 2001 From: Marcos Gaeta Date: Fri, 29 Oct 2021 15:41:46 -0700 Subject: [PATCH 1/2] Move Pipeline to a file --- src/sentry/digests/__init__.py | 12 ++-- src/sentry/digests/notifications.py | 86 +++++------------------------ src/sentry/digests/utilities.py | 3 +- src/sentry/utils/pipeline.py | 79 ++++++++++++++++++++++++++ 4 files changed, 104 insertions(+), 76 deletions(-) create mode 100644 src/sentry/utils/pipeline.py diff --git a/src/sentry/digests/__init__.py b/src/sentry/digests/__init__.py index 25d5f330d0b888..8f9f771a04e222 100644 --- a/src/sentry/digests/__init__.py +++ b/src/sentry/digests/__init__.py @@ -1,6 +1,8 @@ +from __future__ import annotations + from collections import namedtuple from datetime import datetime -from typing import TYPE_CHECKING, Mapping, Optional, Sequence +from typing import TYPE_CHECKING, List, MutableMapping from django.conf import settings @@ -21,15 +23,17 @@ class Record(namedtuple("Record", "key value timestamp")): @property - def datetime(self) -> Optional[datetime]: - return to_datetime(self.timestamp) # type: ignore + def datetime(self) -> datetime | None: + # Explicitly typing to satisfy mypy. + dt: datetime | None = to_datetime(self.timestamp) + return dt ScheduleEntry = namedtuple("ScheduleEntry", "key timestamp") OPTIONS = frozenset(("increment_delay", "maximum_delay", "minimum_delay")) -Digest = Mapping["Rule", Mapping["Group", Sequence[Record]]] +Digest = MutableMapping[Rule, MutableMapping[Group, List[Record]]] def get_option_key(plugin: str, option: str) -> str: diff --git a/src/sentry/digests/notifications.py b/src/sentry/digests/notifications.py index 46b6474cd5562b..833f39da662757 100644 --- a/src/sentry/digests/notifications.py +++ b/src/sentry/digests/notifications.py @@ -4,15 +4,15 @@ import itertools import logging from collections import OrderedDict, defaultdict, namedtuple -from functools import reduce -from typing import Any, Callable, Mapping, MutableMapping, MutableSequence, Sequence +from typing import Any, Mapping, MutableMapping, MutableSequence, Sequence from sentry.app import tsdb -from sentry.digests import Record +from sentry.digests import Digest, Record from sentry.eventstore.models import Event from sentry.models import Group, GroupStatus, Project, Rule from sentry.notifications.types import ActionTargetType from sentry.utils.dates import to_timestamp +from sentry.utils.pipeline import Pipeline logger = logging.getLogger("sentry.digests") @@ -101,59 +101,6 @@ def attach_state( return {"project": project, "groups": groups, "rules": rules} -class Pipeline: - def __init__(self) -> None: - self.operations: MutableSequence[Callable[..., Any]] = [] - self.logs: MutableSequence[str] = [] - - def __call__(self, sequence: Sequence[Any]) -> tuple[Any, Sequence[str]]: - # Explicitly typing to satisfy mypy. - func: Callable[[Any, Callable[[Any], Any]], Any] = lambda x, operation: operation(x) - return reduce(func, self.operations, sequence), self.logs - - def _log(self, message: str) -> None: - logger.debug(message) - self.logs.append(message) - - def apply(self, function: Callable[[MutableMapping[str, Any]], Any]) -> Pipeline: - def operation(sequence: MutableMapping[str, Any]) -> Any: - result = function(sequence) - self._log(f"{function!r} applied to {len(sequence)} items.") - return result - - self.operations.append(operation) - return self - - def filter(self, function: Callable[[Record], bool]) -> Pipeline: - def operation(sequence: Sequence[Any]) -> Sequence[Any]: - result = [s for s in sequence if function(s)] - self._log(f"{function!r} filtered {len(sequence)} items to {len(result)}.") - return result - - self.operations.append(operation) - return self - - def map(self, function: Callable[[Sequence[Any]], Any]) -> Pipeline: - def operation(sequence: Sequence[Any]) -> Sequence[Any]: - result = [function(s) for s in sequence] - self._log(f"{function!r} applied to {len(sequence)} items.") - return result - - self.operations.append(operation) - return self - - def reduce( - self, function: Callable[[Any, Any], Any], initializer: Callable[[Sequence[Any]], Any] - ) -> Pipeline: - def operation(sequence: Sequence[Any]) -> Any: - result = reduce(function, sequence, initializer(sequence)) - self._log(f"{function!r} reduced {len(sequence)} items to {len(result)}.") - return result - - self.operations.append(operation) - return self - - def rewrite_record( record: Record, project: Project, @@ -217,30 +164,26 @@ def sort_rule_groups(rules: Mapping[str, Rule]) -> Mapping[str, Rule]: ) +def check_group_state(record: Record) -> bool: + # Explicitly typing to satisfy mypy. + is_unresolved: bool = record.value.event.group.get_status() == GroupStatus.UNRESOLVED + return is_unresolved + + def build_digest( project: Project, records: Sequence[Record], state: Mapping[str, Any] | None = None, -) -> tuple[Any | None, Sequence[str]]: - records = list(records) +) -> tuple[Digest | None, Sequence[str]]: if not records: return None, [] - # XXX: This is a hack to allow generating a mock digest without actually - # doing any real IO! - if state is None: - state = fetch_state(project, records) - - state = attach_state(**state) - - def check_group_state(record: Record) -> bool: - # Explicitly typing to satisfy mypy. - is_unresolved: bool = record.value.event.group.get_status() == GroupStatus.UNRESOLVED - return is_unresolved + # XXX(hack): Allow generating a mock digest without actually doing any real IO! + state = state or fetch_state(project, records) pipeline = ( Pipeline() - .map(functools.partial(rewrite_record, **state)) + .map(functools.partial(rewrite_record, **attach_state(**state))) .filter(bool) .filter(check_group_state) .reduce(group_records, lambda sequence: defaultdict(lambda: defaultdict(list))) @@ -248,4 +191,5 @@ def check_group_state(record: Record) -> bool: .apply(sort_rule_groups) ) - return pipeline(records) + digest, logs = pipeline(records) + return digest, logs diff --git a/src/sentry/digests/utilities.py b/src/sentry/digests/utilities.py index d736a371ab1f63..06dc1b7f393fe5 100644 --- a/src/sentry/digests/utilities.py +++ b/src/sentry/digests/utilities.py @@ -74,7 +74,8 @@ def get_event_from_groups_in_digest(digest: Digest) -> Iterable[Event]: def build_custom_digest(original_digest: Digest, events: Iterable[Event]) -> Digest: - user_digest = OrderedDict() + """Given a digest and a set of events, filter the digest to only records that include the events.""" + user_digest: Digest = OrderedDict() for rule, rule_groups in original_digest.items(): user_rule_groups = OrderedDict() for group, group_records in rule_groups.items(): diff --git a/src/sentry/utils/pipeline.py b/src/sentry/utils/pipeline.py new file mode 100644 index 00000000000000..0ab2e0467a6f88 --- /dev/null +++ b/src/sentry/utils/pipeline.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import logging +from functools import reduce +from typing import Any, Callable, MutableMapping, MutableSequence, Sequence + +logger = logging.getLogger("sentry.utils.pipeline") + + +class Pipeline: + """ + A Pipeline is a way to call an ordered list of Array operations to a list of objects. + + Usage: + # Construct the pipeline with any number of pipeline steps. + pipeline = Pipeline() + .map(...) + .filter(...) + ... + .apply(...) + + # Optionally add new steps. + if _condition_: + pipeline = pipeline.filter(...) + + # Invoke the pipeline. + result = pipeline(objects) + """ + + def __init__(self) -> None: + self.operations: MutableSequence[Callable[..., Any]] = [] + self.logs: MutableSequence[str] = [] + + def __call__(self, sequence: Sequence[Any]) -> tuple[Any, Sequence[str]]: + # Explicitly typing to satisfy mypy. + func: Callable[[Any, Callable[[Any], Any]], Any] = lambda x, operation: operation(x) + return reduce(func, self.operations, sequence), self.logs + + def _log(self, message: str) -> None: + logger.debug(message) + self.logs.append(message) + + def apply(self, function: Callable[[MutableMapping[Any, Any]], Any]) -> Pipeline: + def operation(sequence: MutableMapping[Any, Any]) -> Any: + result = function(sequence) + self._log(f"{function!r} applied to {len(sequence)} items.") + return result + + self.operations.append(operation) + return self + + def filter(self, function: Callable[[Any], bool]) -> Pipeline: + def operation(sequence: Sequence[Any]) -> Sequence[Any]: + result = [s for s in sequence if function(s)] + self._log(f"{function!r} filtered {len(sequence)} items to {len(result)}.") + return result + + self.operations.append(operation) + return self + + def map(self, function: Callable[[Sequence[Any]], Any]) -> Pipeline: + def operation(sequence: Sequence[Any]) -> Sequence[Any]: + result = [function(s) for s in sequence] + self._log(f"{function!r} applied to {len(sequence)} items.") + return result + + self.operations.append(operation) + return self + + def reduce( + self, function: Callable[[Any, Any], Any], initializer: Callable[[Sequence[Any]], Any] + ) -> Pipeline: + def operation(sequence: Sequence[Any]) -> Any: + result = reduce(function, sequence, initializer(sequence)) + self._log(f"{function!r} reduced {len(sequence)} items to {len(result)}.") + return result + + self.operations.append(operation) + return self From 591507a82f0eb7eba5fd4ee33be659c45df4c7ce Mon Sep 17 00:00:00 2001 From: Marcos Gaeta Date: Fri, 29 Oct 2021 16:34:59 -0700 Subject: [PATCH 2/2] fix types --- src/sentry/digests/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/digests/__init__.py b/src/sentry/digests/__init__.py index 8f9f771a04e222..e727ce3b805060 100644 --- a/src/sentry/digests/__init__.py +++ b/src/sentry/digests/__init__.py @@ -33,7 +33,7 @@ def datetime(self) -> datetime | None: OPTIONS = frozenset(("increment_delay", "maximum_delay", "minimum_delay")) -Digest = MutableMapping[Rule, MutableMapping[Group, List[Record]]] +Digest = MutableMapping["Rule", MutableMapping["Group", List[Record]]] def get_option_key(plugin: str, option: str) -> str: