diff --git a/src/sentry/seer/assisted_query/traces_tools.py b/src/sentry/seer/assisted_query/traces_tools.py new file mode 100644 index 00000000000000..820cdd5159dc2d --- /dev/null +++ b/src/sentry/seer/assisted_query/traces_tools.py @@ -0,0 +1,132 @@ +import logging + +from sentry.api import client +from sentry.constants import ALL_ACCESS_PROJECT_ID +from sentry.models.apikey import ApiKey +from sentry.models.organization import Organization + +logger = logging.getLogger(__name__) + +API_KEY_SCOPES = ["org:read", "project:read", "event:read"] + + +def get_attribute_names( + *, org_id: int, project_ids: list[int], stats_period: str, item_type: str = "spans" +) -> dict: + """ + Get attribute names for trace items by calling the public API endpoint. + + This ensures all queryable built-in fields (like span.op, span.description, etc.) + are included in the response, unlike the Snuba RPC which may exclude certain + standard fields. + + Args: + org_id: Organization ID + project_ids: List of project IDs to query + stats_period: Time period string (e.g., "7d", "24h", "30d") + item_type: Type of trace item (default: "spans", can be "spans", "logs", etc.) + + Returns: + Dictionary with attributes: + { + "fields": { + "string": ["span.op", "span.description", ...], + "number": ["span.duration", ...] + } + } + """ + organization = Organization.objects.get(id=org_id) + api_key = ApiKey(organization_id=org_id, scope_list=API_KEY_SCOPES) + + fields: dict[str, list[str]] = {"string": [], "number": []} + + # Fetch both string and number attributes from the public API + for attr_type in ["string", "number"]: + query_params = { + "attributeType": attr_type, + "itemType": item_type, + "statsPeriod": stats_period, + "project": project_ids or [ALL_ACCESS_PROJECT_ID], + } + + # API returns: [{"key": "...", "name": "span.op", "attributeSource": {...}}, ...] + resp = client.get( + auth=api_key, + user=None, + path=f"/organizations/{organization.slug}/trace-items/attributes/", + params=query_params, + ) + + fields[attr_type] = [item["name"] for item in resp.data] + + return {"fields": fields} + + +def get_attribute_values_with_substring( + *, + org_id: int, + project_ids: list[int], + fields_with_substrings: list[dict[str, str]], + stats_period: str = "7d", + limit: int = 100, + item_type: str = "spans", +) -> dict: + """ + Get attribute values for specific fields, optionally filtered by substring. Only string attributes are supported. + + Args: + org_id: Organization ID + project_ids: List of project IDs to query + fields_with_substrings: List of dicts with "field" and optional "substring" keys + Example: [{"field": "span.status", "substring": "error"}] + stats_period: Time period string (e.g., "7d", "24h", "30d") + limit: Maximum number of values to return per field (API default is 1000) + item_type: Type of trace item (default: "spans") + + Returns: + Dictionary with values: + { + "values": { + "span.status": ["ok", "error", ...], + "transaction": ["checkout", ...] + } + } + """ + if not fields_with_substrings: + return {"values": {}} + + organization = Organization.objects.get(id=org_id) + api_key = ApiKey(organization_id=org_id, scope_list=API_KEY_SCOPES) + + values: dict[str, set[str]] = {} + + for field_with_substring in fields_with_substrings: + field = field_with_substring["field"] + substring = field_with_substring.get("substring", "") + + query_params = { + "itemType": item_type, + "attributeType": "string", + "statsPeriod": stats_period, + "project": project_ids or [ALL_ACCESS_PROJECT_ID], + } + if substring: + query_params["substringMatch"] = substring + + # API returns: [{"value": "ok", "count": 123, ...}, ...] + resp = client.get( + auth=api_key, + user=None, + path=f"/organizations/{organization.slug}/trace-items/attributes/{field}/values/", + params=query_params, + ) + + # Extract "value" from each item, filter out None/empty, and respect limit + field_values_list = [item["value"] for item in resp.data if item.get("value")] + # Merge with existing values if field already exists (multiple substrings for same field) + values.setdefault(field, set()).update(field_values_list[:limit]) + + # Convert sets to sorted lists for JSON serialization + return { + "values": {field: sorted(field_values)[:limit] for field, field_values in values.items()} + } diff --git a/src/sentry/seer/endpoints/seer_rpc.py b/src/sentry/seer/endpoints/seer_rpc.py index b3c6aab07cced4..796a6dfecae3da 100644 --- a/src/sentry/seer/endpoints/seer_rpc.py +++ b/src/sentry/seer/endpoints/seer_rpc.py @@ -4,7 +4,6 @@ import logging import uuid from collections.abc import Callable -from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Any, TypedDict import sentry_sdk @@ -26,10 +25,6 @@ from rest_framework.request import Request from rest_framework.response import Response from sentry_protos.snuba.v1.downsampled_storage_pb2 import DownsampledStorageConfig -from sentry_protos.snuba.v1.endpoint_trace_item_attributes_pb2 import ( - TraceItemAttributeNamesRequest, - TraceItemAttributeValuesRequest, -) from sentry_protos.snuba.v1.endpoint_trace_item_details_pb2 import TraceItemDetailsRequest from sentry_protos.snuba.v1.endpoint_trace_item_stats_pb2 import ( AttributeDistributionsRequest, @@ -46,7 +41,6 @@ from sentry.api.api_publish_status import ApiPublishStatus from sentry.api.authentication import AuthenticationSiloLimit, StandardAuthentication from sentry.api.base import Endpoint, region_silo_endpoint -from sentry.api.endpoints.organization_trace_item_attributes import as_attribute_key from sentry.api.endpoints.project_trace_item_details import convert_rpc_attribute_to_json from sentry.constants import ( ENABLE_PR_REVIEW_TEST_GENERATION_DEFAULT, @@ -66,7 +60,6 @@ from sentry.search.eap.resolver import SearchResolver from sentry.search.eap.spans.definitions import SPAN_DEFINITIONS from sentry.search.eap.types import SearchResolverConfig, SupportedTraceItemType -from sentry.search.eap.utils import can_expose_attribute from sentry.search.events.types import SnubaParams from sentry.seer.assisted_query.discover_tools import ( get_event_filter_key_values, @@ -78,6 +71,10 @@ get_issue_filter_keys, get_issues_stats, ) +from sentry.seer.assisted_query.traces_tools import ( + get_attribute_names, + get_attribute_values_with_substring, +) from sentry.seer.autofix.autofix_tools import get_error_event_details, get_profile_details from sentry.seer.autofix.coding_agent import launch_coding_agents_for_run from sentry.seer.autofix.utils import AutofixTriggerSource @@ -370,181 +367,6 @@ def get_organization_seer_consent_by_org_name( return {"consent": False, "consent_url": consent_url} -def get_attribute_names(*, org_id: int, project_ids: list[int], stats_period: str) -> dict: - type_mapping = { - AttributeKey.Type.TYPE_STRING: "string", - AttributeKey.Type.TYPE_DOUBLE: "number", - } - - period = parse_stats_period(stats_period) - if period is None: - period = datetime.timedelta(days=7) - - end = datetime.datetime.now() - start = end - period - - start_time_proto = ProtobufTimestamp() - start_time_proto.FromDatetime(start) - end_time_proto = ProtobufTimestamp() - end_time_proto.FromDatetime(end) - - fields: dict[str, list[str]] = {type_str: [] for type_str in type_mapping.values()} - - for attr_type, type_str in type_mapping.items(): - req = TraceItemAttributeNamesRequest( - meta=RequestMeta( - organization_id=org_id, - cogs_category="events_analytics_platform", - referrer=Referrer.SEER_RPC.value, - project_ids=project_ids, - start_timestamp=start_time_proto, - end_timestamp=end_time_proto, - trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN, - ), - type=attr_type, - limit=1000, - ) - - fields_resp = snuba_rpc.attribute_names_rpc(req) - - parsed_fields = [ - as_attribute_key( - attr.name, - "string" if attr_type == AttributeKey.Type.TYPE_STRING else "number", - SupportedTraceItemType.SPANS, - )["name"] - for attr in fields_resp.attributes - if attr.name - and can_expose_attribute( - attr.name, SupportedTraceItemType.SPANS, include_internal=False - ) - ] - - fields[type_str].extend(parsed_fields) - - return {"fields": fields} - - -def get_attribute_values_with_substring( - *, - org_id: int, - project_ids: list[int], - fields_with_substrings: list[dict[str, str]], - stats_period: str = "48h", - limit: int = 100, - sampled: bool = True, -) -> dict: - """ - Get attribute values with substring. - Note: The RPC is guaranteed to not return duplicate values for the same field. - ie: if span.description is requested with both null and "payment" substrings, - the RPC will return the set of values for span.description to avoid duplicates. - - TODO: Replace with batch attribute values RPC once available - """ - values: dict[str, set[str]] = {} - - if not fields_with_substrings: - return {"values": values} - - period = parse_stats_period(stats_period) - if period is None: - period = datetime.timedelta(days=7) - - end = datetime.datetime.now() - start = end - period - - start_time_proto = ProtobufTimestamp() - start_time_proto.FromDatetime(start) - end_time_proto = ProtobufTimestamp() - end_time_proto.FromDatetime(end) - - sampling_mode = ( - DownsampledStorageConfig.MODE_NORMAL - if sampled - else DownsampledStorageConfig.MODE_HIGHEST_ACCURACY - ) - - resolver = SearchResolver( - params=SnubaParams( - start=start, - end=end, - ), - config=SearchResolverConfig(), - definitions=SPAN_DEFINITIONS, - ) - - def process_field_with_substring( - field_with_substring: dict[str, str], - ) -> tuple[str, set[str]] | None: - """Helper function to process a single field_with_substring request.""" - field = field_with_substring["field"] - substring = field_with_substring["substring"] - - resolved_field, _ = resolver.resolve_attribute(field) - if resolved_field.proto_definition.type == AttributeKey.Type.TYPE_STRING: - req = TraceItemAttributeValuesRequest( - meta=RequestMeta( - organization_id=org_id, - cogs_category="events_analytics_platform", - referrer=Referrer.SEER_RPC.value, - project_ids=project_ids, - start_timestamp=start_time_proto, - end_timestamp=end_time_proto, - trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN, - downsampled_storage_config=DownsampledStorageConfig(mode=sampling_mode), - ), - key=resolved_field.proto_definition, - limit=limit, - value_substring_match=substring, - ) - - values_response = snuba_rpc.attribute_values_rpc(req) - return field, {value for value in values_response.values if value} - return None - - timeout_seconds = 1.0 - - with ThreadPoolExecutor(max_workers=min(len(fields_with_substrings), 10)) as executor: - future_to_field = { - executor.submit( - process_field_with_substring, field_with_substring - ): field_with_substring - for field_with_substring in fields_with_substrings - } - - try: - for future in as_completed(future_to_field, timeout=timeout_seconds): - field_with_substring = future_to_field[future] - - try: - result = future.result() - if result is not None: - field, field_values = result - if field in values: - values[field].update(field_values) - else: - values[field] = field_values - except TimeoutError: - logger.warning( - "RPC call timed out after %s seconds for field %s, skipping", - timeout_seconds, - field_with_substring.get("field", "unknown"), - ) - except Exception as e: - logger.warning( - "RPC call failed for field %s: %s", - field_with_substring.get("field", "unknown"), - str(e), - ) - except TimeoutError: - for future in future_to_field: - future.cancel() - logger.warning("Overall timeout exceeded, cancelled remaining RPC calls") - - return {"values": values} - - def get_attributes_and_values( *, org_id: int, diff --git a/tests/snuba/api/endpoints/test_seer_attributes.py b/tests/snuba/api/endpoints/test_seer_attributes.py index 553c33f7e88175..a6ac674971ecf8 100644 --- a/tests/snuba/api/endpoints/test_seer_attributes.py +++ b/tests/snuba/api/endpoints/test_seer_attributes.py @@ -1,13 +1,11 @@ -from concurrent.futures import TimeoutError -from unittest.mock import Mock, patch +from unittest.mock import patch from uuid import uuid4 -from sentry.seer.endpoints.seer_rpc import ( +from sentry.seer.assisted_query.traces_tools import ( get_attribute_names, get_attribute_values_with_substring, - get_attributes_and_values, - get_spans, ) +from sentry.seer.endpoints.seer_rpc import get_attributes_and_values, get_spans from sentry.testutils.cases import BaseSpansTestCase from sentry.testutils.helpers.datetime import before_now from tests.snuba.api.endpoints.test_organization_trace_item_attributes import ( @@ -33,11 +31,16 @@ def test_get_attribute_names(self) -> None: is_eap=True, ) - result = get_attribute_names( - org_id=self.organization.id, - project_ids=[self.project.id], - stats_period="7d", - ) + with self.feature( + [ + "organizations:visibility-explore-view", + ] + ): + result = get_attribute_names( + org_id=self.organization.id, + project_ids=[self.project.id], + stats_period="7d", + ) assert result == { "fields": { "string": [ @@ -65,26 +68,30 @@ def test_get_attribute_values_with_substring(self) -> None: is_eap=True, ) - result = get_attribute_values_with_substring( - org_id=self.organization.id, - project_ids=[self.project.id], - stats_period="7d", - fields_with_substrings=[ - { - "field": "transaction", - "substring": "ba", - }, - { - "field": "transaction", - "substring": "b", - }, - ], - sampled=False, - ) + with self.feature( + [ + "organizations:visibility-explore-view", + ] + ): + result = get_attribute_values_with_substring( + org_id=self.organization.id, + project_ids=[self.project.id], + stats_period="7d", + fields_with_substrings=[ + { + "field": "transaction", + "substring": "ba", + }, + { + "field": "transaction", + "substring": "b", + }, + ], + ) assert result == { "values": { - "transaction": {"bar", "baz"}, + "transaction": ["bar", "baz"], } } @@ -118,18 +125,23 @@ def test_get_attributes_and_values(self) -> None: is_eap=True, ) - result = get_attributes_and_values( - org_id=self.organization.id, - project_ids=[self.project.id], - stats_period="7d", - sampled=False, - attributes_ignored=[ - "sentry.segment_id", - "sentry.event_id", - "sentry.raw_description", - "sentry.transaction", - ], - ) + with self.feature( + [ + "organizations:visibility-explore-view", + ] + ): + result = get_attributes_and_values( + org_id=self.organization.id, + project_ids=[self.project.id], + stats_period="7d", + sampled=False, + attributes_ignored=[ + "sentry.segment_id", + "sentry.event_id", + "sentry.raw_description", + "sentry.transaction", + ], + ) assert result == { "attributes_and_values": { @@ -156,164 +168,6 @@ def test_get_attribute_values_with_substring_empty_field_list(self) -> None: expected: dict = {"values": {}} assert result == expected - def test_get_attribute_values_with_substring_async_success_and_partial_failures( - self, - ): - """Test concurrent execution with successful results, timeouts, and exceptions""" - for transaction in ["foo", "bar"]: - self.store_segment( - self.project.id, - uuid4().hex, - uuid4().hex, - span_id=uuid4().hex[:16], - organization_id=self.organization.id, - parent_span_id=None, - timestamp=before_now(days=0, minutes=10).replace(microsecond=0), - transaction=transaction, - duration=100, - exclusive_time=100, - is_eap=True, - ) - - with patch("sentry.seer.endpoints.seer_rpc.ThreadPoolExecutor") as mock_executor: - mock_executor_instance = Mock() - mock_executor.return_value.__enter__.return_value = mock_executor_instance - - mock_future_success = Mock() - mock_future_timeout = Mock() - mock_future_exception = Mock() - - mock_future_success.result.return_value = ("transaction", {"foo", "bar"}) - mock_future_timeout.result.side_effect = TimeoutError("Individual timeout") - mock_future_exception.result.side_effect = Exception("RPC failed") - - mock_executor_instance.submit.side_effect = [ - mock_future_success, - mock_future_timeout, - mock_future_exception, - ] - - fields_with_substrings = [ - {"field": "transaction", "substring": "fo"}, - {"field": "span.description", "substring": "timeout_field"}, - {"field": "span.status", "substring": "error_field"}, - ] - - with patch("sentry.seer.endpoints.seer_rpc.as_completed") as mock_as_completed: - - def as_completed_side_effect(future_to_field_dict, timeout): - return [ - mock_future_success, - mock_future_timeout, - mock_future_exception, - ] - - mock_as_completed.side_effect = as_completed_side_effect - - result = get_attribute_values_with_substring( - org_id=self.organization.id, - project_ids=[self.project.id], - stats_period="7d", - fields_with_substrings=fields_with_substrings, - sampled=False, - ) - - assert result == { - "values": { - "transaction": {"foo", "bar"}, - } - } - - assert mock_executor_instance.submit.call_count == 3 - mock_as_completed.assert_called_once() - - def test_get_attribute_values_with_substring_overall_timeout(self) -> None: - """Test overall timeout handling with future cancellation""" - self.store_segment( - self.project.id, - uuid4().hex, - uuid4().hex, - span_id=uuid4().hex[:16], - organization_id=self.organization.id, - parent_span_id=None, - timestamp=before_now(days=0, minutes=10).replace(microsecond=0), - transaction="foo", - duration=100, - exclusive_time=100, - is_eap=True, - ) - - with patch("sentry.seer.endpoints.seer_rpc.as_completed") as mock_as_completed: - mock_as_completed.side_effect = TimeoutError("Overall timeout") - - with patch("sentry.seer.endpoints.seer_rpc.ThreadPoolExecutor") as mock_executor: - mock_executor_instance = Mock() - mock_executor.return_value.__enter__.return_value = mock_executor_instance - - mock_future1 = Mock() - mock_future2 = Mock() - mock_executor_instance.submit.side_effect = [mock_future1, mock_future2] - - result = get_attribute_values_with_substring( - org_id=self.organization.id, - project_ids=[self.project.id], - stats_period="7d", - fields_with_substrings=[ - {"field": "transaction", "substring": "fo"}, - {"field": "span.description", "substring": "desc"}, - ], - sampled=False, - ) - - assert result == {"values": {}} - - mock_future1.cancel.assert_called_once() - mock_future2.cancel.assert_called_once() - - def test_get_attribute_values_with_substring_max_workers_limit(self) -> None: - """Test that ThreadPoolExecutor is limited to max 10 workers even with more fields""" - self.store_segment( - self.project.id, - uuid4().hex, - uuid4().hex, - span_id=uuid4().hex[:16], - organization_id=self.organization.id, - parent_span_id=None, - timestamp=before_now(days=0, minutes=10).replace(microsecond=0), - transaction="foo", - duration=100, - exclusive_time=100, - is_eap=True, - ) - - fields_with_substrings = [ - {"field": "transaction", "substring": f"field_{i}"} for i in range(15) - ] - - with patch("sentry.seer.endpoints.seer_rpc.ThreadPoolExecutor") as mock_executor: - mock_executor_instance = Mock() - mock_executor.return_value.__enter__.return_value = mock_executor_instance - - mock_futures = [Mock() for _ in range(15)] - for i, future in enumerate(mock_futures): - future.result.return_value = (f"transaction_{i}", {f"value_{i}"}) - - mock_executor_instance.submit.side_effect = mock_futures - - with patch("sentry.seer.endpoints.seer_rpc.as_completed") as mock_as_completed: - mock_as_completed.return_value = mock_futures - - get_attribute_values_with_substring( - org_id=self.organization.id, - project_ids=[self.project.id], - stats_period="7d", - fields_with_substrings=fields_with_substrings, - sampled=False, - ) - - mock_executor.assert_called_once_with(max_workers=10) - assert mock_executor_instance.submit.call_count == 15 - def test_get_spans_basic(self) -> None: """Test basic get_spans functionality""" for i, transaction in enumerate(["foo", "bar", "baz"]):