Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions src/sentry/search/eap/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,33 @@ def proto_definition(self) -> AttributeAggregation:


@dataclass(frozen=True, kw_only=True)
class ResolvedTraceMetricAggregate(ResolvedAggregate):
class ResolvedTraceMetricAggregate(ResolvedFunction):
# The internal rpc alias for this column
internal_name: Function.ValueType
extrapolation_mode: ExtrapolationMode.ValueType
# The attribute to conditionally aggregate on
key: AttributeKey

is_aggregate: bool = field(default=True, init=False)
trace_metric: TraceMetric | None

@property
def proto_definition(self) -> AttributeAggregation | AttributeConditionalAggregation:
if self.trace_metric is None:
return AttributeAggregation(
aggregate=self.internal_name,
key=self.key,
label=self.public_alias,
extrapolation_mode=self.extrapolation_mode,
)
return AttributeConditionalAggregation(
aggregate=self.internal_name,
key=self.key,
filter=self.trace_metric.get_filter(),
label=self.public_alias,
extrapolation_mode=self.extrapolation_mode,
)


@dataclass(frozen=True, kw_only=True)
class ResolvedConditionalAggregate(ResolvedFunction):
Expand Down Expand Up @@ -399,7 +423,7 @@ def resolve(
extrapolation_mode=resolve_extrapolation_mode(
search_config, self.extrapolation_mode_override
),
argument=resolved_attribute,
key=resolved_attribute,
trace_metric=trace_metric,
)

Expand Down
15 changes: 14 additions & 1 deletion src/sentry/search/eap/rpc_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from sentry_protos.snuba.v1.trace_item_filter_pb2 import AndFilter, TraceItemFilter
from sentry_protos.snuba.v1.trace_item_filter_pb2 import AndFilter, OrFilter, TraceItemFilter


def and_trace_item_filters(
Expand All @@ -12,3 +12,16 @@ def and_trace_item_filters(
return filters[0]

return TraceItemFilter(and_filter=AndFilter(filters=filters))


def or_trace_item_filters(
*trace_item_filters: TraceItemFilter | None,
) -> TraceItemFilter | None:
filters: list[TraceItemFilter] = [f for f in trace_item_filters if f is not None]
if not filters:
return None

if len(filters) == 1:
return filters[0]

return TraceItemFilter(or_filter=OrFilter(filters=filters))
24 changes: 18 additions & 6 deletions src/sentry/search/eap/trace_metrics/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from sentry.exceptions import InvalidSearchQuery
from sentry.search.eap.columns import ResolvedTraceMetricAggregate, ResolvedTraceMetricFormula
from sentry.search.eap.resolver import SearchResolver
from sentry.search.eap.rpc_utils import or_trace_item_filters
from sentry.search.eap.trace_metrics.types import TraceMetric, TraceMetricType
from sentry.search.eap.types import SearchResolverConfig
from sentry.search.events import fields
Expand Down Expand Up @@ -40,6 +41,7 @@ def _extra_conditions_from_columns(
selected_columns: list[str] | None,
equations: list[str] | None,
) -> TraceItemFilter | None:
aggregate_all_metrics = False
selected_metrics: set[TraceMetric] = set()

if selected_columns:
Expand All @@ -56,20 +58,30 @@ def _extra_conditions_from_columns(
) and not isinstance(resolved_function, ResolvedTraceMetricFormula):
continue

if not resolved_function.trace_metric:
if resolved_function.trace_metric is None:
# found an aggregation across all metrics, not just 1
aggregate_all_metrics = True
continue

selected_metrics.add(resolved_function.trace_metric)

if equations:
raise InvalidSearchQuery("Cannot support equations on trace metrics yet")

# no selected metrics, no filter needed
if not selected_metrics:
return None

if len(selected_metrics) > 1:
raise InvalidSearchQuery("Cannot aggregate multiple metrics in 1 query.")

selected_metric = selected_metrics.pop()
# check if there are any aggregations across all metrics mixed with
# aggregations for a single metric as this is not permitted
if aggregate_all_metrics and selected_metrics:
raise InvalidSearchQuery(
"Cannot aggregate all metrics and singlular metrics in the same query."
)

return selected_metric.get_filter()
# at this point we only have selected metrics remaining
filters = [metric.get_filter() for metric in selected_metrics]
return or_trace_item_filters(*filters)

def _extra_conditions_from_metric(
self,
Expand Down
64 changes: 33 additions & 31 deletions src/sentry/search/eap/trace_metrics/formulas.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import cast

from sentry_protos.snuba.v1.attribute_conditional_aggregation_pb2 import (
AttributeConditionalAggregation,
)
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import Column
from sentry_protos.snuba.v1.formula_pb2 import Literal as LiteralValue
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import (
Expand All @@ -16,13 +17,14 @@
ResolverSettings,
TraceMetricFormulaDefinition,
ValueArgumentDefinition,
extract_trace_metric_aggregate_arguments,
)
from sentry.search.eap.trace_metrics.config import TraceMetricsSearchResolverConfig
from sentry.search.eap.validator import literal_validator


def _rate_internal(
divisor: int, metric_type: str, settings: ResolverSettings
divisor: int, args: ResolvedArguments, settings: ResolverSettings
) -> Column.BinaryFormula:
"""
Calculate rate per X for trace metrics using the value attribute.
Expand All @@ -40,35 +42,37 @@ def _rate_internal(
else settings["snuba_params"].interval
)

metric_type = (
search_config.metric.metric_type
if search_config.metric and search_config.metric.metric_type
else metric_type
)

if metric_type == "counter":
return Column.BinaryFormula(
left=Column(
aggregation=AttributeAggregation(
aggregate=Function.FUNCTION_SUM,
key=AttributeKey(type=AttributeKey.TYPE_DOUBLE, name="sentry.value"),
extrapolation_mode=extrapolation_mode,
),
),
op=Column.BinaryFormula.OP_DIVIDE,
right=Column(
literal=LiteralValue(val_double=time_interval / divisor),
),
)
trace_metric = search_config.metric or extract_trace_metric_aggregate_arguments(args)

return Column.BinaryFormula(
left=Column(
if trace_metric is None:
left = Column(
aggregation=AttributeAggregation(
aggregate=Function.FUNCTION_COUNT,
key=AttributeKey(name="sentry.project_id", type=AttributeKey.Type.TYPE_INT),
extrapolation_mode=extrapolation_mode,
),
),
)
)
elif trace_metric.metric_type == "counter":
left = Column(
conditional_aggregation=AttributeConditionalAggregation(
aggregate=Function.FUNCTION_SUM,
key=AttributeKey(type=AttributeKey.TYPE_DOUBLE, name="sentry.value"),
filter=trace_metric.get_filter(),
extrapolation_mode=extrapolation_mode,
)
)
else:
left = Column(
conditional_aggregation=AttributeConditionalAggregation(
aggregate=Function.FUNCTION_COUNT,
key=AttributeKey(name="sentry.project_id", type=AttributeKey.Type.TYPE_INT),
filter=trace_metric.get_filter(),
extrapolation_mode=extrapolation_mode,
)
)

return Column.BinaryFormula(
left=left,
op=Column.BinaryFormula.OP_DIVIDE,
right=Column(
literal=LiteralValue(val_double=time_interval / divisor),
Expand All @@ -80,16 +84,14 @@ def per_second(args: ResolvedArguments, settings: ResolverSettings) -> Column.Bi
"""
Calculate rate per second for trace metrics using the value attribute.
"""
metric_type = cast(str, args[2]) if len(args) >= 3 and args[2] else "counter"
return _rate_internal(1, metric_type, settings)
return _rate_internal(1, args, settings)


def per_minute(args: ResolvedArguments, settings: ResolverSettings) -> Column.BinaryFormula:
"""
Calculate rate per minute for trace metrics using the value attribute.
"""
metric_type = cast(str, args[2]) if len(args) >= 3 and args[2] else "counter"
return _rate_internal(60, metric_type, settings)
return _rate_internal(60, args, settings)


TRACE_METRICS_FORMULA_DEFINITIONS: dict[str, FormulaDefinition] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,16 @@ def test_list_metrics(self):
# this query does not filter on any metrics, so scan all metrics
response = self.do_request(
{
"field": ["metric.name", "metric.type", "metric.unit", "count(metric.name)"],
"field": [
"metric.name",
"metric.type",
"metric.unit",
"count(metric.name)",
"per_second(metric.name)",
],
"orderby": "metric.name",
"dataset": self.dataset,
"statsPeriod": "10m",
}
)
assert response.status_code == 200, response.content
Expand All @@ -326,24 +333,28 @@ def test_list_metrics(self):
"metric.type": "gauge",
"metric.unit": None,
"count(metric.name)": 2,
"per_second(metric.name)": pytest.approx(2 / 600, abs=0.001),
},
{
"metric.name": "baz",
"metric.type": "distribution",
"metric.unit": None,
"count(metric.name)": 3,
"per_second(metric.name)": pytest.approx(3 / 600, abs=0.001),
},
{
"metric.name": "foo",
"metric.type": "counter",
"metric.unit": None,
"count(metric.name)": 1,
"per_second(metric.name)": pytest.approx(1 / 600, abs=0.001),
},
{
"metric.name": "qux",
"metric.type": "distribution",
"metric.unit": "millisecond",
"count(metric.name)": 4,
"per_second(metric.name)": pytest.approx(4 / 600, abs=0.001),
},
]

Expand Down Expand Up @@ -412,11 +423,47 @@ def test_aggregation_multiple_embedded_same_metric_name(self):
]

def test_aggregation_multiple_embedded_different_metric_name(self):
trace_metrics = [
self.create_trace_metric("foo", 1, "counter"),
self.create_trace_metric("foo", 2, "counter"),
self.create_trace_metric("bar", 4, "counter"),
self.create_trace_metric("baz", 8, "gauge"),
]
self.store_trace_metrics(trace_metrics)

response = self.do_request(
{
"field": [
"count(value,foo,counter,-)",
"count(value,bar,counter,-)",
"count(value,baz,gauge,-)",
"per_second(value,foo,counter,-)",
"per_second(value,bar,counter,-)",
"per_second(value,baz,gauge,-)",
],
"dataset": self.dataset,
"project": self.project.id,
"statsPeriod": "10m",
}
)
assert response.status_code == 200, response.content
assert response.data["data"] == [
{
"count(value,foo,counter,-)": 2,
"count(value,bar,counter,-)": 1,
"count(value,baz,gauge,-)": 1,
"per_second(value,foo,counter,-)": pytest.approx(3 / 600, abs=0.001),
"per_second(value,bar,counter,-)": pytest.approx(4 / 600, abs=0.001),
"per_second(value,baz,gauge,-)": pytest.approx(1 / 600, abs=0.001),
},
]

def test_mixing_all_metrics_and_one_metric(self):
response = self.do_request(
{
"field": [
"count(value,foo,counter,-)",
"per_second(value)",
],
"dataset": self.dataset,
"project": self.project.id,
Expand All @@ -425,6 +472,7 @@ def test_aggregation_multiple_embedded_different_metric_name(self):
assert response.status_code == 400, response.content
assert response.data == {
"detail": ErrorDetail(
"Cannot aggregate multiple metrics in 1 query.", code="parse_error"
"Cannot aggregate all metrics and singlular metrics in the same query.",
code="parse_error",
)
}
Loading