Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 101 additions & 39 deletions src/sentry/search/eap/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ def proto_definition(self) -> Column.BinaryFormula:
return self.formula


@dataclass(frozen=True, kw_only=True)
class ResolvedTraceMetricFormula(ResolvedFormula):
metric_name: str | None
metric_type: MetricType | None
metric_unit: str | None


@dataclass(frozen=True, kw_only=True)
class ResolvedAggregate(ResolvedFunction):
"""
Expand Down Expand Up @@ -222,7 +229,7 @@ def proto_definition(self) -> AttributeAggregation:


@dataclass(frozen=True, kw_only=True)
class ResolvedMetricAggregate(ResolvedAggregate):
class ResolvedTraceMetricAggregate(ResolvedAggregate):
metric_name: str | None
metric_type: MetricType | None
metric_unit: str | None
Expand Down Expand Up @@ -358,25 +365,8 @@ def resolve(

@dataclass(kw_only=True)
class TraceMetricAggregateDefinition(AggregateDefinition):
internal_function: Function.ValueType
attribute_resolver: Callable[[ResolvedArgument], AttributeKey] | None = None

def __post_init__(self) -> None:
if len(self.arguments) != 4:
raise InvalidSearchQuery(
f"Trace metric aggregates expects exactly 4 arguments to be defined, got {len(self.arguments)}"
)

if not isinstance(self.arguments[0], AttributeArgumentDefinition):
raise InvalidSearchQuery(
"Trace metric aggregates expect argument 0 to be of type AttributeArgumentDefinition"
)

for i in range(1, 4):
if not isinstance(self.arguments[i], ValueArgumentDefinition):
raise InvalidSearchQuery(
f"Trace metric aggregates expects argument {i} to be of type ValueArgumentDefinition"
)
validate_trace_metric_aggregate_arguments(self.arguments)

def resolve(
self,
Expand All @@ -396,27 +386,11 @@ def resolve(
if self.attribute_resolver is not None:
resolved_attribute = self.attribute_resolver(resolved_attribute)

metric_name = None
metric_type = None
metric_unit = None

if all(
isinstance(resolved_argument, str) and resolved_argument != ""
for resolved_argument in resolved_arguments[1:]
):
# a metric was passed
metric_name = cast(str, resolved_arguments[1])
metric_type = cast(MetricType, resolved_arguments[2])
metric_unit = None if resolved_arguments[3] == "-" else cast(str, resolved_arguments[3])
elif all(resolved_argument == "" for resolved_argument in resolved_arguments[1:]):
# no metrics were specified, assume we query all metrics
pass
else:
raise InvalidSearchQuery(
f"Trace metric aggregates expect the full metric to be specified, got name:{resolved_arguments[1]} type:{resolved_arguments[2]} unit:{resolved_arguments[3]}"
)
metric_name, metric_type, metric_unit = extract_trace_metric_aggregate_arguments(
resolved_arguments
)

return ResolvedMetricAggregate(
return ResolvedTraceMetricAggregate(
public_alias=alias,
internal_name=self.internal_function,
search_type=search_type,
Expand Down Expand Up @@ -512,6 +486,48 @@ def resolve(
)


@dataclass(kw_only=True)
class TraceMetricFormulaDefinition(FormulaDefinition):
def __post_init__(self) -> None:
validate_trace_metric_aggregate_arguments(self.arguments)

def resolve(
self,
alias: str,
search_type: constants.SearchType,
resolved_arguments: list[AttributeKey | Any],
snuba_params: SnubaParams,
query_result_cache: dict[str, EAPResponse],
search_config: SearchResolverConfig,
) -> ResolvedFormula:
resolver_settings = ResolverSettings(
extrapolation_mode=(
ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED
if self.extrapolation and not search_config.disable_aggregate_extrapolation
else ExtrapolationMode.EXTRAPOLATION_MODE_NONE
),
snuba_params=snuba_params,
query_result_cache=query_result_cache,
search_config=search_config,
)

metric_name, metric_type, metric_unit = extract_trace_metric_aggregate_arguments(
resolved_arguments
)

return ResolvedTraceMetricFormula(
public_alias=alias,
search_type=search_type,
formula=self.formula_resolver(resolved_arguments, resolver_settings),
is_aggregate=self.is_aggregate,
internal_type=self.internal_type,
processor=self.processor,
metric_name=metric_name,
metric_type=metric_type,
metric_unit=metric_unit,
)


def simple_sentry_field(
field: str,
search_type: constants.SearchType = "string",
Expand Down Expand Up @@ -617,3 +633,49 @@ def count_argument_resolver(resolved_argument: ResolvedArgument) -> AttributeKey
return resolved_argument

return count_argument_resolver


def validate_trace_metric_aggregate_arguments(
arguments: list[ValueArgumentDefinition | AttributeArgumentDefinition],
) -> None:
if len(arguments) != 4:
raise InvalidSearchQuery(
f"Trace metric aggregates expects exactly 4 arguments to be defined, got {len(arguments)}"
)

if not isinstance(arguments[0], AttributeArgumentDefinition):
raise InvalidSearchQuery(
"Trace metric aggregates expect argument 0 to be of type AttributeArgumentDefinition"
)

for i in range(1, 4):
if not isinstance(arguments[i], ValueArgumentDefinition):
raise InvalidSearchQuery(
f"Trace metric aggregates expects argument {i} to be of type ValueArgumentDefinition"
)


def extract_trace_metric_aggregate_arguments(
resolved_arguments: ResolvedArguments,
) -> tuple[str | None, MetricType | None, str | None]:
metric_name = None
metric_type = None
metric_unit = None

if all(
isinstance(resolved_argument, str) and resolved_argument != ""
for resolved_argument in resolved_arguments[1:]
):
# a metric was passed
metric_name = cast(str, resolved_arguments[1])
metric_type = cast(MetricType, resolved_arguments[2])
metric_unit = None if resolved_arguments[3] == "-" else cast(str, resolved_arguments[3])
elif all(resolved_argument == "" for resolved_argument in resolved_arguments[1:]):
# no metrics were specified, assume we query all metrics
pass
else:
raise InvalidSearchQuery(
f"Trace metric aggregates expect the full metric to be specified, got name:{resolved_arguments[1]} type:{resolved_arguments[2]} unit:{resolved_arguments[3]}"
)

return metric_name, metric_type, metric_unit
7 changes: 5 additions & 2 deletions src/sentry/search/eap/trace_metrics/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
)

from sentry.exceptions import InvalidSearchQuery
from sentry.search.eap.columns import ResolvedMetricAggregate
from sentry.search.eap.columns import ResolvedTraceMetricAggregate, ResolvedTraceMetricFormula
from sentry.search.eap.resolver import SearchResolver
from sentry.search.eap.types import MetricType, SearchResolverConfig
from sentry.search.events import fields
Expand Down Expand Up @@ -63,7 +63,10 @@ def _extra_conditions_from_columns(
continue

resolved_function, _ = search_resolver.resolve_function(column)
if not isinstance(resolved_function, ResolvedMetricAggregate):

if not isinstance(
resolved_function, ResolvedTraceMetricAggregate
) and not isinstance(resolved_function, ResolvedTraceMetricFormula):
continue

if not resolved_function.metric_name or not resolved_function.metric_type:
Expand Down
7 changes: 4 additions & 3 deletions src/sentry/search/eap/trace_metrics/formulas.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
FormulaDefinition,
ResolvedArguments,
ResolverSettings,
TraceMetricFormulaDefinition,
ValueArgumentDefinition,
)
from sentry.search.eap.trace_metrics.config import TraceMetricsSearchResolverConfig
Expand Down Expand Up @@ -87,8 +88,8 @@ def per_minute(args: ResolvedArguments, settings: ResolverSettings) -> Column.Bi
return _rate_internal(60, metric_type, settings)


TRACE_METRICS_FORMULA_DEFINITIONS = {
"per_second": FormulaDefinition(
TRACE_METRICS_FORMULA_DEFINITIONS: dict[str, FormulaDefinition] = {
"per_second": TraceMetricFormulaDefinition(
default_search_type="rate",
arguments=[
AttributeArgumentDefinition(
Expand Down Expand Up @@ -117,7 +118,7 @@ def per_minute(args: ResolvedArguments, settings: ResolverSettings) -> Column.Bi
is_aggregate=True,
infer_search_type_from_arguments=False,
),
"per_minute": FormulaDefinition(
"per_minute": TraceMetricFormulaDefinition(
default_search_type="rate",
arguments=[
AttributeArgumentDefinition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test_per_second_function(self) -> None:
"start": self.start,
"end": self.end,
"interval": "1h",
"yAxis": "per_second(test_metric, counter)",
"yAxis": "per_second(value, test_metric, counter, -)",
"project": self.project.id,
"dataset": self.dataset,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,26 @@ def test_aggregation_embedded_metric_name(self):
{"count(value,foo,counter,-)": 2},
]

def test_aggregation_embedded_metric_name_formula(self):
trace_metrics = [
*[self.create_trace_metric("foo", 1, "counter") for _ in range(6)],
self.create_trace_metric("bar", 594, "counter"),
]
self.store_trace_metrics(trace_metrics)

response = self.do_request(
{
"field": ["per_second(value,foo,counter,-)"],
"dataset": self.dataset,
"statsPeriod": "10m",
}
)
assert response.status_code == 200, response.content
assert response.data["data"] == [
# Over ten minute period, 6 events / 600 seconds = 0.01 events per second
{"per_second(value,foo,counter,-)": 0.01},
]

def test_aggregation_multiple_embedded_same_metric_name(self):
trace_metrics = [
self.create_trace_metric("foo", 1, "distribution"),
Expand Down
Loading