Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 69 additions & 24 deletions src/sentry/api/endpoints/organization_trace_item_stats.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import logging
from collections import defaultdict
from concurrent.futures import as_completed
from dataclasses import dataclass
from typing import Literal

from rest_framework import serializers
from rest_framework.request import Request
from rest_framework.response import Response
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey

from sentry import options
Expand All @@ -18,14 +19,22 @@
from sentry.api.serializers.base import serialize
from sentry.api.utils import handle_query_errors
from sentry.models.organization import Organization
from sentry.search.eap.columns import ColumnDefinitions
from sentry.search.eap.constants import SUPPORTED_STATS_TYPES
from sentry.search.eap.occurrences.attributes import (
OCCURRENCE_INTERNAL_TO_PUBLIC_ALIAS_MAPPINGS,
OCCURRENCE_STATS_EXCLUDED_ATTRIBUTES_PUBLIC_ALIAS,
)
from sentry.search.eap.occurrences.definitions import OCCURRENCE_DEFINITIONS
from sentry.search.eap.resolver import SearchResolver
from sentry.search.eap.spans.attributes import (
SPANS_INTERNAL_TO_PUBLIC_ALIAS_MAPPINGS,
SPANS_STATS_EXCLUDED_ATTRIBUTES_PUBLIC_ALIAS,
)
from sentry.search.eap.spans.definitions import SPAN_DEFINITIONS
from sentry.search.eap.types import SearchResolverConfig
from sentry.snuba import rpc_dataset_common
from sentry.snuba.occurrences_rpc import Occurrences
from sentry.snuba.referrer import Referrer
from sentry.snuba.spans_rpc import Spans
from sentry.utils.concurrent import ContextPropagatingThreadPoolExecutor
Expand All @@ -36,13 +45,45 @@

MAX_THREADS = 4

SUPPORTED_ITEM_TYPES = ["spans", "occurrences"]


@dataclass(frozen=True)
class TraceItemStatsConfig:
rpc_class: type[rpc_dataset_common.RPCBase]
definitions: ColumnDefinitions
alias_mappings: dict[Literal["string", "number", "boolean"], dict[str, str]]
excluded_attributes: set[str]
referrer: Referrer
id_column: str


def get_trace_item_stats_config(item_type: str) -> TraceItemStatsConfig:
if item_type == "occurrences":
return TraceItemStatsConfig(
rpc_class=Occurrences,
definitions=OCCURRENCE_DEFINITIONS,
alias_mappings=OCCURRENCE_INTERNAL_TO_PUBLIC_ALIAS_MAPPINGS,
excluded_attributes=OCCURRENCE_STATS_EXCLUDED_ATTRIBUTES_PUBLIC_ALIAS,
referrer=Referrer.API_OCCURRENCES_FREQUENCY_STATS_RPC,
id_column="id",
)
return TraceItemStatsConfig(
rpc_class=Spans,
definitions=SPAN_DEFINITIONS,
alias_mappings=SPANS_INTERNAL_TO_PUBLIC_ALIAS_MAPPINGS,
excluded_attributes=SPANS_STATS_EXCLUDED_ATTRIBUTES_PUBLIC_ALIAS,
referrer=Referrer.API_SPANS_FREQUENCY_STATS_RPC,
id_column="span_id",
)


class TraceItemStatsPaginator:
"""
Custom paginator for trace item stats that properly handles pagination
based on the number of attributes fetched, not the number of stats types.
Similar to TraceItemAttributesNamesPaginator, with some extra code to make
it work for stats results/
it work for stats results.
"""

def __init__(self, data_fn):
Expand Down Expand Up @@ -76,7 +117,12 @@ class OrganizationTraceItemsStatsSerializer(serializers.Serializer):
limit = serializers.IntegerField(
required=False,
)
spansLimit = serializers.IntegerField(required=False, default=1000, max_value=1000)
traceItemsLimit = serializers.IntegerField(required=False, default=1000, max_value=1000)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think renaming spansLimit -> traceItemsLimit is safe since the frontend doesn't currently pass the query param at all

itemType = serializers.ChoiceField(
choices=SUPPORTED_ITEM_TYPES,
required=False,
default="spans",
)


@cell_silo_endpoint
Expand All @@ -97,9 +143,11 @@ def get(self, request: Request, organization: Organization) -> Response:
return Response(serializer.errors, status=400)
serialized = serializer.validated_data

stats_config = get_trace_item_stats_config(serialized.get("itemType", "spans"))

resolver_config = SearchResolverConfig()
resolver = SearchResolver(
params=snuba_params, config=resolver_config, definitions=SPAN_DEFINITIONS
params=snuba_params, config=resolver_config, definitions=stats_config.definitions
)

substring_match = serialized.get("substringMatch", "")
Expand All @@ -113,36 +161,33 @@ def get(self, request: Request, organization: Organization) -> Response:
attrs_snuba_params.start = adjusted_start_date
attrs_snuba_params.end = adjusted_end_date
attrs_resolver = SearchResolver(
params=attrs_snuba_params, config=resolver_config, definitions=SPAN_DEFINITIONS
)
attrs_meta = attrs_resolver.resolve_meta(
referrer=Referrer.API_SPANS_FREQUENCY_STATS_RPC.value
params=attrs_snuba_params, config=resolver_config, definitions=stats_config.definitions
)
attrs_meta.trace_item_type = TraceItemType.TRACE_ITEM_TYPE_SPAN
attrs_resolver.resolve_meta(referrer=stats_config.referrer.value)

max_attributes = options.get("explore.trace-items.keys.max")

def get_table_results():
with handle_query_errors():
return Spans.run_table_query(
return stats_config.rpc_class.run_table_query(
params=snuba_params,
config=SearchResolverConfig(),
offset=0,
limit=serialized.get("spansLimit", 1000),
limit=serialized.get("traceItemsLimit", 1000),
sampling_mode=snuba_params.sampling_mode,
query_string=serialized.get("query", ""),
orderby=["-timestamp"],
referrer=Referrer.API_SPANS_FREQUENCY_STATS_RPC.value,
selected_columns=["span_id", "timestamp"],
referrer=stats_config.referrer.value,
selected_columns=[stats_config.id_column, "timestamp"],
)

def run_stats_query_with_span_ids(span_id_filter):
def run_stats_query_with_item_ids(item_id_filter):
with handle_query_errors():
return Spans.run_stats_query(
return stats_config.rpc_class.run_stats_query(
params=snuba_params,
stats_types=serialized.get("statsType"),
query_string=span_id_filter,
referrer=Referrer.API_SPANS_FREQUENCY_STATS_RPC.value,
query_string=item_id_filter,
referrer=stats_config.referrer.value,
config=resolver_config,
search_resolver=resolver,
max_buckets=1,
Expand All @@ -151,11 +196,11 @@ def run_stats_query_with_span_ids(span_id_filter):

def run_stats_query_with_error_handling(attributes):
with handle_query_errors():
return Spans.run_stats_query(
return stats_config.rpc_class.run_stats_query(
params=snuba_params,
stats_types=serialized.get("statsType"),
query_string=serialized.get("query", ""),
referrer=Referrer.API_SPANS_FREQUENCY_STATS_RPC.value,
referrer=stats_config.referrer.value,
config=resolver_config,
search_resolver=resolver,
attributes=attributes,
Expand All @@ -166,10 +211,10 @@ def data_fn(offset: int, limit: int):
if not table_results["data"]:
return {"data": []}, 0

span_ids = [row["span_id"] for row in table_results["data"]]
span_id_list = ",".join(span_ids)
item_ids = [row[stats_config.id_column] for row in table_results["data"]]
item_id_list = ",".join(item_ids)

preflight_stats = run_stats_query_with_span_ids(f"id:[{span_id_list}]")
preflight_stats = run_stats_query_with_item_ids(f"id:[{item_id_list}]")
try:
internal_alias_attr_keys = list(
preflight_stats[0]["attribute_distributions"]["data"].keys()
Expand All @@ -179,11 +224,11 @@ def data_fn(offset: int, limit: int):

sanitized_keys = []
for internal_name in internal_alias_attr_keys:
public_alias = SPANS_INTERNAL_TO_PUBLIC_ALIAS_MAPPINGS.get("string", {}).get(
public_alias = stats_config.alias_mappings.get("string", {}).get(
internal_name, internal_name
)

if public_alias in SPANS_STATS_EXCLUDED_ATTRIBUTES_PUBLIC_ALIAS:
if public_alias in stats_config.excluded_attributes:
continue

if value_substring_match:
Expand Down
3 changes: 3 additions & 0 deletions src/sentry/snuba/rpc_dataset_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,9 @@ def run_stats_query(
referrer: str,
config: SearchResolverConfig,
search_resolver: SearchResolver | None = None,
attributes: list[AttributeKey] | None = None,
max_buckets: int = 75,
skip_translate_internal_to_public_alias: bool = False,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Diff here because the RPC base class was missing some arguments from its interface

) -> list[dict[str, Any]]:
raise NotImplementedError()

Expand Down
83 changes: 81 additions & 2 deletions tests/snuba/api/endpoints/test_organization_trace_item_stats.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from django.urls import reverse

from sentry.testutils.cases import APITransactionTestCase, SnubaTestCase, SpanTestCase
from sentry.testutils.cases import (
APITransactionTestCase,
OccurrenceTestCase,
SnubaTestCase,
SpanTestCase,
)
from sentry.testutils.helpers import parse_link_header
from sentry.testutils.helpers.datetime import before_now
from sentry.testutils.helpers.options import override_options
Expand All @@ -10,6 +15,7 @@ class OrganizationTraceItemsStatsEndpointTest(
APITransactionTestCase,
SnubaTestCase,
SpanTestCase,
OccurrenceTestCase,
):
view = "sentry-api-0-organization-trace-item-stats"

Expand Down Expand Up @@ -47,6 +53,18 @@ def _store_span(self, description=None, tags=None, duration=None):
),
)

def _store_occurrence(self, level="error", title="test error", attributes=None):
group = self.create_group(project=self.project)
occ = self.create_eap_occurrence(
group_id=group.id,
level=level,
title=title,
timestamp=self.ten_mins_ago,
attributes=attributes,
)
self.store_eap_items([occ])
return occ

def test_no_project(self) -> None:
response = self.do_request()
assert response.status_code == 200, response.data
Expand All @@ -63,6 +81,16 @@ def test_invalid_stats_type(self) -> None:
response = self.do_request(query={"statsType": ["invalid_type"]})
assert response.status_code == 400, response.data

def test_invalid_item_type(self) -> None:
self._store_span()
response = self.do_request(
query={
"statsType": ["attributeDistributions"],
"itemType": "invalid",
}
)
assert response.status_code == 400, response.data

def test_distribution_values(self) -> None:
tags = [
({"browser": "chrome", "device": "desktop"}, 500),
Expand All @@ -78,7 +106,11 @@ def test_distribution_values(self) -> None:
self._store_span(tags=tag, duration=duration)

response = self.do_request(
query={"query": "span.duration:<=100", "statsType": ["attributeDistributions"]}
query={
"query": "span.duration:<=100",
"statsType": ["attributeDistributions"],
"itemType": "spans",
}
)
assert response.status_code == 200, response.data
assert len(response.data["data"]) == 1
Expand Down Expand Up @@ -237,3 +269,50 @@ def test_custom_limit_parameter(self) -> None:
links[attrs["rel"]] = attrs

assert links["previous"]["results"] == "false"

def test_occurrence_distribution_values(self) -> None:
for level in ["error", "error", "warning"]:
self._store_occurrence(level=level)

response = self.do_request(
query={"statsType": ["attributeDistributions"], "itemType": "occurrences"}
)
assert response.status_code == 200, response.data
assert len(response.data["data"]) == 1
attribute_distribution = response.data["data"][0]["attribute_distributions"]["data"]
assert "level" in attribute_distribution
level_buckets = attribute_distribution["level"]
labels = {bucket["label"] for bucket in level_buckets}
assert "error" in labels
assert "warning" in labels

def test_occurrence_excluded_attributes(self) -> None:
self._store_occurrence()

response = self.do_request(
query={"statsType": ["attributeDistributions"], "itemType": "occurrences"}
)
assert response.status_code == 200, response.data
assert len(response.data["data"]) == 1
attribute_distribution = response.data["data"][0]["attribute_distributions"]["data"]
for excluded in ["id", "trace", "group_id", "issue_occurrence_id", "primary_hash"]:
assert excluded not in attribute_distribution

def test_occurrence_query_filter(self) -> None:
self._store_occurrence(level="error")
self._store_occurrence(level="warning")

response = self.do_request(
query={
"query": "level:error",
"statsType": ["attributeDistributions"],
"itemType": "occurrences",
}
)
assert response.status_code == 200, response.data
assert len(response.data["data"]) == 1
attribute_distribution = response.data["data"][0]["attribute_distributions"]["data"]
assert "level" in attribute_distribution
labels = {bucket["label"] for bucket in attribute_distribution["level"]}
assert "error" in labels
assert "warning" not in labels
Loading