diff --git a/src/sentry/search/eap/columns.py b/src/sentry/search/eap/columns.py index 346c195519331a..4aaa0f319c810d 100644 --- a/src/sentry/search/eap/columns.py +++ b/src/sentry/search/eap/columns.py @@ -238,9 +238,33 @@ def proto_definition(self) -> AttributeAggregation: @dataclass(frozen=True, kw_only=True) -class ResolvedTraceMetricAggregate(ResolvedAggregate): +class ResolvedTraceMetricAggregate(ResolvedFunction): + # The internal rpc alias for this column + internal_name: Function.ValueType + extrapolation_mode: ExtrapolationMode.ValueType + # The attribute to conditionally aggregate on + key: AttributeKey + + is_aggregate: bool = field(default=True, init=False) trace_metric: TraceMetric | None + @property + def proto_definition(self) -> AttributeAggregation | AttributeConditionalAggregation: + if self.trace_metric is None: + return AttributeAggregation( + aggregate=self.internal_name, + key=self.key, + label=self.public_alias, + extrapolation_mode=self.extrapolation_mode, + ) + return AttributeConditionalAggregation( + aggregate=self.internal_name, + key=self.key, + filter=self.trace_metric.get_filter(), + label=self.public_alias, + extrapolation_mode=self.extrapolation_mode, + ) + @dataclass(frozen=True, kw_only=True) class ResolvedConditionalAggregate(ResolvedFunction): @@ -399,7 +423,7 @@ def resolve( extrapolation_mode=resolve_extrapolation_mode( search_config, self.extrapolation_mode_override ), - argument=resolved_attribute, + key=resolved_attribute, trace_metric=trace_metric, ) diff --git a/src/sentry/search/eap/rpc_utils.py b/src/sentry/search/eap/rpc_utils.py index 2fb2465d41fc4e..acd604d5f4ea89 100644 --- a/src/sentry/search/eap/rpc_utils.py +++ b/src/sentry/search/eap/rpc_utils.py @@ -1,4 +1,4 @@ -from sentry_protos.snuba.v1.trace_item_filter_pb2 import AndFilter, TraceItemFilter +from sentry_protos.snuba.v1.trace_item_filter_pb2 import AndFilter, OrFilter, TraceItemFilter def and_trace_item_filters( @@ -12,3 +12,16 @@ def and_trace_item_filters( return filters[0] return TraceItemFilter(and_filter=AndFilter(filters=filters)) + + +def or_trace_item_filters( + *trace_item_filters: TraceItemFilter | None, +) -> TraceItemFilter | None: + filters: list[TraceItemFilter] = [f for f in trace_item_filters if f is not None] + if not filters: + return None + + if len(filters) == 1: + return filters[0] + + return TraceItemFilter(or_filter=OrFilter(filters=filters)) diff --git a/src/sentry/search/eap/trace_metrics/config.py b/src/sentry/search/eap/trace_metrics/config.py index a9550deb072104..e96cf39a7c34f9 100644 --- a/src/sentry/search/eap/trace_metrics/config.py +++ b/src/sentry/search/eap/trace_metrics/config.py @@ -7,6 +7,7 @@ from sentry.exceptions import InvalidSearchQuery from sentry.search.eap.columns import ResolvedTraceMetricAggregate, ResolvedTraceMetricFormula from sentry.search.eap.resolver import SearchResolver +from sentry.search.eap.rpc_utils import or_trace_item_filters from sentry.search.eap.trace_metrics.types import TraceMetric, TraceMetricType from sentry.search.eap.types import SearchResolverConfig from sentry.search.events import fields @@ -40,6 +41,7 @@ def _extra_conditions_from_columns( selected_columns: list[str] | None, equations: list[str] | None, ) -> TraceItemFilter | None: + aggregate_all_metrics = False selected_metrics: set[TraceMetric] = set() if selected_columns: @@ -56,20 +58,30 @@ def _extra_conditions_from_columns( ) and not isinstance(resolved_function, ResolvedTraceMetricFormula): continue - if not resolved_function.trace_metric: + if resolved_function.trace_metric is None: + # found an aggregation across all metrics, not just 1 + aggregate_all_metrics = True continue selected_metrics.add(resolved_function.trace_metric) + if equations: + raise InvalidSearchQuery("Cannot support equations on trace metrics yet") + + # no selected metrics, no filter needed if not selected_metrics: return None - if len(selected_metrics) > 1: - raise InvalidSearchQuery("Cannot aggregate multiple metrics in 1 query.") - - selected_metric = selected_metrics.pop() + # check if there are any aggregations across all metrics mixed with + # aggregations for a single metric as this is not permitted + if aggregate_all_metrics and selected_metrics: + raise InvalidSearchQuery( + "Cannot aggregate all metrics and singlular metrics in the same query." + ) - return selected_metric.get_filter() + # at this point we only have selected metrics remaining + filters = [metric.get_filter() for metric in selected_metrics] + return or_trace_item_filters(*filters) def _extra_conditions_from_metric( self, diff --git a/src/sentry/search/eap/trace_metrics/formulas.py b/src/sentry/search/eap/trace_metrics/formulas.py index 3513e93fdcb15a..8de72543fb1075 100644 --- a/src/sentry/search/eap/trace_metrics/formulas.py +++ b/src/sentry/search/eap/trace_metrics/formulas.py @@ -1,5 +1,6 @@ -from typing import cast - +from sentry_protos.snuba.v1.attribute_conditional_aggregation_pb2 import ( + AttributeConditionalAggregation, +) from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import Column from sentry_protos.snuba.v1.formula_pb2 import Literal as LiteralValue from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( @@ -16,13 +17,14 @@ ResolverSettings, TraceMetricFormulaDefinition, ValueArgumentDefinition, + extract_trace_metric_aggregate_arguments, ) from sentry.search.eap.trace_metrics.config import TraceMetricsSearchResolverConfig from sentry.search.eap.validator import literal_validator def _rate_internal( - divisor: int, metric_type: str, settings: ResolverSettings + divisor: int, args: ResolvedArguments, settings: ResolverSettings ) -> Column.BinaryFormula: """ Calculate rate per X for trace metrics using the value attribute. @@ -40,35 +42,37 @@ def _rate_internal( else settings["snuba_params"].interval ) - metric_type = ( - search_config.metric.metric_type - if search_config.metric and search_config.metric.metric_type - else metric_type - ) - - if metric_type == "counter": - return Column.BinaryFormula( - left=Column( - aggregation=AttributeAggregation( - aggregate=Function.FUNCTION_SUM, - key=AttributeKey(type=AttributeKey.TYPE_DOUBLE, name="sentry.value"), - extrapolation_mode=extrapolation_mode, - ), - ), - op=Column.BinaryFormula.OP_DIVIDE, - right=Column( - literal=LiteralValue(val_double=time_interval / divisor), - ), - ) + trace_metric = search_config.metric or extract_trace_metric_aggregate_arguments(args) - return Column.BinaryFormula( - left=Column( + if trace_metric is None: + left = Column( aggregation=AttributeAggregation( aggregate=Function.FUNCTION_COUNT, key=AttributeKey(name="sentry.project_id", type=AttributeKey.Type.TYPE_INT), extrapolation_mode=extrapolation_mode, - ), - ), + ) + ) + elif trace_metric.metric_type == "counter": + left = Column( + conditional_aggregation=AttributeConditionalAggregation( + aggregate=Function.FUNCTION_SUM, + key=AttributeKey(type=AttributeKey.TYPE_DOUBLE, name="sentry.value"), + filter=trace_metric.get_filter(), + extrapolation_mode=extrapolation_mode, + ) + ) + else: + left = Column( + conditional_aggregation=AttributeConditionalAggregation( + aggregate=Function.FUNCTION_COUNT, + key=AttributeKey(name="sentry.project_id", type=AttributeKey.Type.TYPE_INT), + filter=trace_metric.get_filter(), + extrapolation_mode=extrapolation_mode, + ) + ) + + return Column.BinaryFormula( + left=left, op=Column.BinaryFormula.OP_DIVIDE, right=Column( literal=LiteralValue(val_double=time_interval / divisor), @@ -80,16 +84,14 @@ def per_second(args: ResolvedArguments, settings: ResolverSettings) -> Column.Bi """ Calculate rate per second for trace metrics using the value attribute. """ - metric_type = cast(str, args[2]) if len(args) >= 3 and args[2] else "counter" - return _rate_internal(1, metric_type, settings) + return _rate_internal(1, args, settings) def per_minute(args: ResolvedArguments, settings: ResolverSettings) -> Column.BinaryFormula: """ Calculate rate per minute for trace metrics using the value attribute. """ - metric_type = cast(str, args[2]) if len(args) >= 3 and args[2] else "counter" - return _rate_internal(60, metric_type, settings) + return _rate_internal(60, args, settings) TRACE_METRICS_FORMULA_DEFINITIONS: dict[str, FormulaDefinition] = { diff --git a/tests/snuba/api/endpoints/test_organization_events_trace_metrics.py b/tests/snuba/api/endpoints/test_organization_events_trace_metrics.py index 56d7a83405b223..2de4850479c90b 100644 --- a/tests/snuba/api/endpoints/test_organization_events_trace_metrics.py +++ b/tests/snuba/api/endpoints/test_organization_events_trace_metrics.py @@ -314,9 +314,16 @@ def test_list_metrics(self): # this query does not filter on any metrics, so scan all metrics response = self.do_request( { - "field": ["metric.name", "metric.type", "metric.unit", "count(metric.name)"], + "field": [ + "metric.name", + "metric.type", + "metric.unit", + "count(metric.name)", + "per_second(metric.name)", + ], "orderby": "metric.name", "dataset": self.dataset, + "statsPeriod": "10m", } ) assert response.status_code == 200, response.content @@ -326,24 +333,28 @@ def test_list_metrics(self): "metric.type": "gauge", "metric.unit": None, "count(metric.name)": 2, + "per_second(metric.name)": pytest.approx(2 / 600, abs=0.001), }, { "metric.name": "baz", "metric.type": "distribution", "metric.unit": None, "count(metric.name)": 3, + "per_second(metric.name)": pytest.approx(3 / 600, abs=0.001), }, { "metric.name": "foo", "metric.type": "counter", "metric.unit": None, "count(metric.name)": 1, + "per_second(metric.name)": pytest.approx(1 / 600, abs=0.001), }, { "metric.name": "qux", "metric.type": "distribution", "metric.unit": "millisecond", "count(metric.name)": 4, + "per_second(metric.name)": pytest.approx(4 / 600, abs=0.001), }, ] @@ -412,11 +423,47 @@ def test_aggregation_multiple_embedded_same_metric_name(self): ] def test_aggregation_multiple_embedded_different_metric_name(self): + trace_metrics = [ + self.create_trace_metric("foo", 1, "counter"), + self.create_trace_metric("foo", 2, "counter"), + self.create_trace_metric("bar", 4, "counter"), + self.create_trace_metric("baz", 8, "gauge"), + ] + self.store_trace_metrics(trace_metrics) + response = self.do_request( { "field": [ "count(value,foo,counter,-)", "count(value,bar,counter,-)", + "count(value,baz,gauge,-)", + "per_second(value,foo,counter,-)", + "per_second(value,bar,counter,-)", + "per_second(value,baz,gauge,-)", + ], + "dataset": self.dataset, + "project": self.project.id, + "statsPeriod": "10m", + } + ) + assert response.status_code == 200, response.content + assert response.data["data"] == [ + { + "count(value,foo,counter,-)": 2, + "count(value,bar,counter,-)": 1, + "count(value,baz,gauge,-)": 1, + "per_second(value,foo,counter,-)": pytest.approx(3 / 600, abs=0.001), + "per_second(value,bar,counter,-)": pytest.approx(4 / 600, abs=0.001), + "per_second(value,baz,gauge,-)": pytest.approx(1 / 600, abs=0.001), + }, + ] + + def test_mixing_all_metrics_and_one_metric(self): + response = self.do_request( + { + "field": [ + "count(value,foo,counter,-)", + "per_second(value)", ], "dataset": self.dataset, "project": self.project.id, @@ -425,6 +472,7 @@ def test_aggregation_multiple_embedded_different_metric_name(self): assert response.status_code == 400, response.content assert response.data == { "detail": ErrorDetail( - "Cannot aggregate multiple metrics in 1 query.", code="parse_error" + "Cannot aggregate all metrics and singlular metrics in the same query.", + code="parse_error", ) }