Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions src/sentry/search/eap/columns.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from collections.abc import Callable, Iterable, Mapping
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Literal, TypeAlias, TypedDict
from typing import Any, Literal, TypeAlias, TypedDict, cast

from dateutil.tz import tz
from sentry_protos.snuba.v1.attribute_conditional_aggregation_pb2 import (
Expand All @@ -22,7 +22,7 @@
from sentry.api.event_search import SearchFilter
from sentry.exceptions import InvalidSearchQuery
from sentry.search.eap import constants
from sentry.search.eap.types import EAPResponse, SearchResolverConfig
from sentry.search.eap.types import EAPResponse, MetricType, SearchResolverConfig
from sentry.search.events.types import SnubaParams

ResolvedArgument: TypeAlias = AttributeKey | str | int | float
Expand Down Expand Up @@ -221,6 +221,13 @@ def proto_definition(self) -> AttributeAggregation:
)


@dataclass(frozen=True, kw_only=True)
class ResolvedMetricAggregate(ResolvedAggregate):
metric_name: str | None
metric_type: MetricType | None
metric_unit: str | None


@dataclass(frozen=True, kw_only=True)
class ResolvedConditionalAggregate(ResolvedFunction):
# The internal rpc alias for this column
Expand Down Expand Up @@ -389,10 +396,18 @@ def resolve(
if self.attribute_resolver is not None:
resolved_attribute = self.attribute_resolver(resolved_attribute)

if all(resolved_argument != "" for resolved_argument in resolved_arguments[1:]):
metric_name = None
metric_type = None
metric_unit = None

if all(
isinstance(resolved_argument, str) and resolved_argument != ""
for resolved_argument in resolved_arguments[1:]
):
# a metric was passed
# TODO: we need to put it into the top level query conditions
pass
metric_name = cast(str, resolved_arguments[1])
metric_type = cast(MetricType, resolved_arguments[2])
metric_unit = None if resolved_arguments[3] == "-" else cast(str, resolved_arguments[3])
elif all(resolved_argument == "" for resolved_argument in resolved_arguments[1:]):
# no metrics were specified, assume we query all metrics
pass
Expand All @@ -401,7 +416,7 @@ def resolve(
f"Trace metric aggregates expect the full metric to be specified, got name:{resolved_arguments[1]} type:{resolved_arguments[2]} unit:{resolved_arguments[3]}"
)

return ResolvedAggregate(
return ResolvedMetricAggregate(
public_alias=alias,
internal_name=self.internal_function,
search_type=search_type,
Expand All @@ -411,6 +426,9 @@ def resolve(
self.extrapolation if not search_config.disable_aggregate_extrapolation else False
),
argument=resolved_attribute,
metric_name=metric_name,
metric_type=metric_type,
metric_unit=metric_unit,
)


Expand Down
57 changes: 35 additions & 22 deletions src/sentry/search/eap/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
ValueArgumentDefinition,
VirtualColumnDefinition,
)
from sentry.search.eap.rpc_utils import and_trace_item_filters
from sentry.search.eap.sampling import validate_sampling
from sentry.search.eap.spans.attributes import SPANS_INTERNAL_TO_PUBLIC_ALIAS_MAPPINGS
from sentry.search.eap.types import EAPResponse, SearchResolverConfig
Expand Down Expand Up @@ -139,31 +140,34 @@ def resolve_query(self, querystring: str | None) -> tuple[
span.set_tag("SearchResolver.resolved_query", where)
span.set_tag("SearchResolver.environment_query", environment_query)

# The RPC request meta does not contain the environment.
# So we have to inject it as a query condition.
#
# To do so, we want to AND it with the query.
# So if either one is not defined, we just use the other.
# But if both are defined, we AND them together.
where = and_trace_item_filters(
where,
# The RPC request meta does not contain the environment.
# So we have to inject it as a query condition.
environment_query,
)

if not environment_query:
return where, having, contexts
return where, having, contexts

if not where:
return environment_query, having, []
@sentry_sdk.trace
def resolve_query_with_columns(
self,
querystring: str | None,
selected_columns: list[str] | None,
equations: list[str] | None,
) -> tuple[
TraceItemFilter | None,
AggregationFilter | None,
list[VirtualColumnDefinition | None],
]:
where, having, contexts = self.resolve_query(querystring)

return (
TraceItemFilter(
and_filter=AndFilter(
filters=[
environment_query,
where,
]
)
),
having,
contexts,
)
# Some datasets like trace metrics require we inject additional
# conditions in the top level.
dataset_conditions = self.resolve_dataset_conditions(selected_columns, equations)
where = and_trace_item_filters(where, dataset_conditions)

return where, having, contexts

def __resolve_environment_query(self) -> TraceItemFilter | None:
resolved_column, _ = self.resolve_column("environment")
Expand Down Expand Up @@ -1174,3 +1178,12 @@ def _resolve_operation(self, operation: arithmetic.OperandType) -> tuple[
return Column(conditional_aggregation=col.proto_definition), contexts
elif isinstance(col, ResolvedFormula):
return Column(formula=col.proto_definition), contexts

def resolve_dataset_conditions(
self,
selected_columns: list[str] | None,
equations: list[str] | None,
) -> TraceItemFilter | None:
extra_conditions = self.config.extra_conditions(self, selected_columns, equations)

return and_trace_item_filters(extra_conditions)
14 changes: 14 additions & 0 deletions src/sentry/search/eap/rpc_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from sentry_protos.snuba.v1.trace_item_filter_pb2 import AndFilter, TraceItemFilter


def and_trace_item_filters(
*trace_item_filters: TraceItemFilter | None,
) -> TraceItemFilter | None:
filters: list[TraceItemFilter] = [f for f in trace_item_filters if f is not None]
if not filters:
return None

if len(filters) == 1:
return filters[0]

return TraceItemFilter(and_filter=AndFilter(filters=filters))
155 changes: 118 additions & 37 deletions src/sentry/search/eap/trace_metrics/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Literal, cast
from typing import cast

from rest_framework.request import Request
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue
Expand All @@ -9,10 +9,18 @@
TraceItemFilter,
)

from sentry.exceptions import InvalidSearchQuery
from sentry.search.eap.columns import ResolvedMetricAggregate
from sentry.search.eap.resolver import SearchResolver
from sentry.search.eap.types import SearchResolverConfig
from sentry.search.eap.types import MetricType, SearchResolverConfig
from sentry.search.events import fields

MetricType = Literal["counter", "gauge", "distribution"]

@dataclass(frozen=True, kw_only=True)
class Metric:
metric_name: str
metric_type: MetricType
metric_unit: str | None


@dataclass(frozen=True, kw_only=True)
Expand All @@ -21,50 +29,123 @@ class TraceMetricsSearchResolverConfig(SearchResolverConfig):
metric_type: MetricType | None
metric_unit: str | None

def extra_conditions(self, search_resolver: SearchResolver) -> TraceItemFilter | None:
if not self.metric_name or not self.metric_type:
def extra_conditions(
self,
search_resolver: SearchResolver,
selected_columns: list[str] | None,
equations: list[str] | None,
) -> TraceItemFilter | None:
# use the metric from the config first if it exists
if extra_conditions := self._extra_conditions_from_metric(search_resolver):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config is populated from the extra request params right? Is the long term plan to eventually stop supporting it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Trace metrics uses it own TraceMetricsSearchResolverConfig that inherits from SearchResolverConfig. This currently has top level configs to specify 1 metric. Long term, we'll remove that and only rely on the metric specified in the aggregation.

return extra_conditions

# then try to parse metric from the aggregations
if extra_conditions := self._extra_conditions_from_columns(
search_resolver, selected_columns, equations
):
return extra_conditions

return None

def _extra_conditions_from_columns(
self,
search_resolver: SearchResolver,
selected_columns: list[str] | None,
equations: list[str] | None,
) -> TraceItemFilter | None:
selected_metrics: set[Metric] = set()

if selected_columns:
stripped_columns = [column.strip() for column in selected_columns]
for column in stripped_columns:
match = fields.is_function(column)
if not match:
continue

resolved_function, _ = search_resolver.resolve_function(column)
if not isinstance(resolved_function, ResolvedMetricAggregate):
continue

if not resolved_function.metric_name or not resolved_function.metric_type:
continue

metric = Metric(
metric_name=resolved_function.metric_name,
metric_type=resolved_function.metric_type,
metric_unit=resolved_function.metric_unit,
)
selected_metrics.add(metric)

if not selected_metrics:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Equations Ignore Metric Aggregations

The _extra_conditions_from_columns method accepts an equations parameter but never processes it. When metric aggregates like sum(value,foo,counter,-) are used inside equations, the metric information won't be extracted, causing the query to miss the required metric filter conditions. This breaks the feature's core functionality of parsing metrics from aggregations when they appear in equations.

Fix in Cursor Fix in Web

return None

metric_name, _ = search_resolver.resolve_column("metric.name")
if not isinstance(metric_name.proto_definition, AttributeKey):
raise ValueError("Unable to resolve metric.name")
if len(selected_metrics) > 1:
raise InvalidSearchQuery("Cannot aggregate multiple metrics in 1 query.")
Comment on lines +82 to +83
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two questions

  1. if you can't select multiple metrics in one query why do we need it in the aggregates?
  2. shouldn't we dedupe the selected_metrics list if there's multiple aggregates on the same metric?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We cant select multiple metrics today because EAP does not have support to allow us to express it yet. See EAP-313
  2. selected_metrics is a set of tuples so it already dedupes, meaning it lets you select multiple aggregates on the same metric


metric_type, _ = search_resolver.resolve_column("metric.type")
if not isinstance(metric_type.proto_definition, AttributeKey):
raise ValueError("Unable to resolve metric.type")
selected_metric = selected_metrics.pop()

filters = [
TraceItemFilter(
comparison_filter=ComparisonFilter(
key=metric_name.proto_definition,
op=ComparisonFilter.OP_EQUALS,
value=AttributeValue(val_str=self.metric_name),
)
),
return get_metric_filter(search_resolver, selected_metric)

def _extra_conditions_from_metric(
self,
search_resolver: SearchResolver,
) -> TraceItemFilter | None:
if not self.metric_name or not self.metric_type:
return None

metric = Metric(
metric_name=self.metric_name,
metric_type=self.metric_type,
metric_unit=self.metric_unit,
)

return get_metric_filter(search_resolver, metric)


def get_metric_filter(
search_resolver: SearchResolver,
metric: Metric,
) -> TraceItemFilter:
metric_name, _ = search_resolver.resolve_column("metric.name")
if not isinstance(metric_name.proto_definition, AttributeKey):
raise ValueError("Unable to resolve metric.name")

metric_type, _ = search_resolver.resolve_column("metric.type")
if not isinstance(metric_type.proto_definition, AttributeKey):
raise ValueError("Unable to resolve metric.type")

filters = [
TraceItemFilter(
comparison_filter=ComparisonFilter(
key=metric_name.proto_definition,
op=ComparisonFilter.OP_EQUALS,
value=AttributeValue(val_str=metric.metric_name),
)
),
TraceItemFilter(
comparison_filter=ComparisonFilter(
key=metric_type.proto_definition,
op=ComparisonFilter.OP_EQUALS,
value=AttributeValue(val_str=metric.metric_type),
)
),
]

if metric.metric_unit:
metric_unit, _ = search_resolver.resolve_column("metric.unit")
if not isinstance(metric_unit.proto_definition, AttributeKey):
raise ValueError("Unable to resolve metric.unit")
filters.append(
TraceItemFilter(
comparison_filter=ComparisonFilter(
key=metric_type.proto_definition,
key=metric_unit.proto_definition,
op=ComparisonFilter.OP_EQUALS,
value=AttributeValue(val_str=self.metric_type),
)
),
]

if self.metric_unit:
metric_unit, _ = search_resolver.resolve_column("metric.unit")
if not isinstance(metric_unit.proto_definition, AttributeKey):
raise ValueError("Unable to resolve metric.unit")
filters.append(
TraceItemFilter(
comparison_filter=ComparisonFilter(
key=metric_unit.proto_definition,
op=ComparisonFilter.OP_EQUALS,
value=AttributeValue(val_str=self.metric_unit),
)
value=AttributeValue(val_str=metric.metric_unit),
)
)
)

return TraceItemFilter(and_filter=AndFilter(filters=filters))
return TraceItemFilter(and_filter=AndFilter(filters=filters))


ALLOWED_METRIC_TYPES: list[MetricType] = ["counter", "gauge", "distribution"]
Expand Down
10 changes: 9 additions & 1 deletion src/sentry/search/eap/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ class SearchResolverConfig:
# Whether to set the timestamp granularities to stable buckets
stable_timestamp_quantization: bool = True

def extra_conditions(self, search_resolver: "SearchResolver") -> TraceItemFilter | None:
def extra_conditions(
self,
search_resolver: "SearchResolver",
selected_columns: list[str] | None,
equations: list[str] | None,
) -> TraceItemFilter | None:
return None


Expand Down Expand Up @@ -81,3 +86,6 @@ class AdditionalQueries:
span: list[str] | None
log: list[str] | None
metric: list[str] | None


MetricType = Literal["counter", "gauge", "distribution"]
Loading
Loading