-
-
Notifications
You must be signed in to change notification settings - Fork 61
feat(logs) : Update export items endpoint to respect sampling & routing decision #7907
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,13 +2,15 @@ | |
| from datetime import datetime | ||
| from typing import Any, Dict, Iterable, NamedTuple, Type, cast | ||
|
|
||
| import sentry_sdk | ||
| from google.protobuf.json_format import MessageToDict | ||
| from google.protobuf.timestamp_pb2 import Timestamp | ||
| from sentry_protos.snuba.v1.downsampled_storage_pb2 import DownsampledStorageConfig | ||
| from sentry_protos.snuba.v1.endpoint_trace_items_pb2 import ( | ||
| ExportTraceItemsRequest, | ||
| ExportTraceItemsResponse, | ||
| ) | ||
| from sentry_protos.snuba.v1.request_common_pb2 import PageToken, TraceItemType | ||
| from sentry_protos.snuba.v1.request_common_pb2 import PageToken, RequestMeta, TraceItemType | ||
| from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, ArrayValue, TraceItem | ||
|
|
||
| from snuba import state | ||
|
|
@@ -36,6 +38,7 @@ | |
| ) | ||
| from snuba.web.rpc.common.debug_info import setup_trace_query_settings | ||
| from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException | ||
| from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import RoutingDecision | ||
|
|
||
| _DEFAULT_PAGE_SIZE = 10_000 | ||
|
|
||
|
|
@@ -48,76 +51,164 @@ | |
| TraceItemFilter, | ||
| ) | ||
|
|
||
| FLEX_WIN_START = "sentry__time_window.start_timestamp" | ||
| FLEX_WIN_END = "sentry__time_window.end_timestamp" | ||
|
|
||
|
|
||
| def _flex_time_window_filters(start_sec: int, end_sec: int) -> list[TraceItemFilter]: | ||
| return [ | ||
| TraceItemFilter( | ||
| comparison_filter=ComparisonFilter( | ||
| key=AttributeKey(name=FLEX_WIN_START), | ||
| op=ComparisonFilter.OP_GREATER_THAN_OR_EQUALS, | ||
| value=AttributeValue(val_int=start_sec), | ||
| ) | ||
| ), | ||
| TraceItemFilter( | ||
| comparison_filter=ComparisonFilter( | ||
| key=AttributeKey(name=FLEX_WIN_END), | ||
| op=ComparisonFilter.OP_LESS_THAN, | ||
| value=AttributeValue(val_int=end_sec), | ||
| ) | ||
| ), | ||
| ] | ||
|
|
||
|
|
||
| def _parse_flex_window_from_filters(filters: list[TraceItemFilter]) -> tuple[int, int] | None: | ||
| start_sec: int | None = None | ||
| end_sec: int | None = None | ||
| for filt in filters: | ||
| if not filt.HasField("comparison_filter"): | ||
| continue | ||
| k = filt.comparison_filter.key.name | ||
| if k == FLEX_WIN_START and filt.comparison_filter.value.HasField("val_int"): | ||
| start_sec = filt.comparison_filter.value.val_int | ||
| elif k == FLEX_WIN_END and filt.comparison_filter.value.HasField("val_int"): | ||
| end_sec = filt.comparison_filter.value.val_int | ||
| if start_sec is not None and end_sec is not None: | ||
| return (start_sec, end_sec) | ||
| if start_sec is not None or end_sec is not None: | ||
| raise ValueError("Invalid flex time window in page token") | ||
| return None | ||
|
|
||
|
|
||
| def _parse_last_seen_tuple( | ||
| filters: list[TraceItemFilter], | ||
| ) -> tuple[int, TraceItemType.ValueType, float, str, str]: | ||
| """Parse the 5 'last seen' key equality filters""" | ||
| if len(filters) != 5: | ||
| raise ValueError("Invalid last_seen in page token") | ||
| if not ( | ||
| filters[0].comparison_filter.key.name == "last_seen_project_id" | ||
| and filters[0].comparison_filter.key.type == AttributeKey.Type.TYPE_INT | ||
| ): | ||
| raise ValueError("Invalid project id") | ||
| last_seen_project_id = filters[0].comparison_filter.value.val_int | ||
| if not ( | ||
| filters[1].comparison_filter.key.name == "last_seen_item_type" | ||
| and filters[1].comparison_filter.key.type == AttributeKey.Type.TYPE_INT | ||
| ): | ||
| raise ValueError("Invalid item type") | ||
| last_seen_item_type = filters[1].comparison_filter.value.val_int | ||
| if not ( | ||
| filters[2].comparison_filter.key.name == "last_seen_timestamp" | ||
| and filters[2].comparison_filter.key.type == AttributeKey.Type.TYPE_DOUBLE | ||
| ): | ||
| raise ValueError("Invalid timestamp") | ||
| last_seen_timestamp = filters[2].comparison_filter.value.val_double | ||
| if not ( | ||
| filters[3].comparison_filter.key.name == "last_seen_trace_id" | ||
| and filters[3].comparison_filter.key.type == AttributeKey.Type.TYPE_STRING | ||
| ): | ||
| raise ValueError("Invalid trace id") | ||
| last_seen_trace_id = filters[3].comparison_filter.value.val_str | ||
| if not ( | ||
| filters[4].comparison_filter.key.name == "last_seen_item_id" | ||
| and filters[4].comparison_filter.key.type == AttributeKey.Type.TYPE_STRING | ||
| ): | ||
| raise ValueError("Invalid item id") | ||
| last_seen_item_id = filters[4].comparison_filter.value.val_str | ||
| return ( | ||
| last_seen_project_id, | ||
| cast(TraceItemType.ValueType, last_seen_item_type), | ||
| last_seen_timestamp, | ||
| last_seen_trace_id, | ||
| last_seen_item_id, | ||
| ) | ||
|
|
||
|
|
||
| class ExportTraceItemsPageToken: | ||
| """Page token: always encodes the active [start,end) in unix seconds, shared with flex routing. | ||
|
|
||
| 2 filters: window only; move to the next time slice (flex) with no keyset cursor. | ||
| 7 filters: same 2 time-window fields plus 5 equality fields; continue after the | ||
| last row within that window (keyset / tuple seek). | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| last_seen_project_id: int, | ||
| last_seen_item_type: TraceItemType.ValueType, | ||
| last_seen_timestamp: float, | ||
| last_seen_trace_id: str, | ||
| last_seen_item_id: str, | ||
| *, | ||
| window_start_sec: int, | ||
| window_end_sec: int, | ||
| last_seen_project_id: int = 0, | ||
| last_seen_item_type: TraceItemType.ValueType = TraceItemType.TRACE_ITEM_TYPE_UNSPECIFIED, | ||
| last_seen_timestamp: float = 0.0, | ||
| last_seen_trace_id: str = "", | ||
| last_seen_item_id: str = "", | ||
| ): | ||
| self.window_start_sec = window_start_sec | ||
| self.window_end_sec = window_end_sec | ||
| self.last_seen_project_id = last_seen_project_id | ||
| self.last_seen_item_type = last_seen_item_type | ||
| self.last_seen_timestamp = last_seen_timestamp | ||
| self.last_seen_trace_id = last_seen_trace_id | ||
| self.last_seen_item_id = last_seen_item_id | ||
|
|
||
| @property | ||
| def has_last_seen(self) -> bool: | ||
|
manessaraj marked this conversation as resolved.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: I feel like what
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, name can be better |
||
| return self.last_seen_item_id != "" | ||
|
|
||
| @classmethod | ||
| def from_protobuf(cls, page_token: PageToken) -> Optional["ExportTraceItemsPageToken"]: | ||
| if page_token == PageToken(): | ||
| return None | ||
| filters = page_token.filter_offset.and_filter.filters | ||
| if len(filters) != 5: | ||
| if not page_token.filter_offset.HasField("and_filter"): | ||
| raise ValueError("Invalid page token") | ||
|
|
||
| if not ( | ||
| filters[0].comparison_filter.key.name == "last_seen_project_id" | ||
| and filters[0].comparison_filter.key.type == AttributeKey.Type.TYPE_INT | ||
| ): | ||
| raise ValueError("Invalid project id") | ||
| last_seen_project_id = filters[0].comparison_filter.value.val_int | ||
| if not ( | ||
| filters[1].comparison_filter.key.name == "last_seen_item_type" | ||
| and filters[1].comparison_filter.key.type == AttributeKey.Type.TYPE_INT | ||
| ): | ||
| raise ValueError("Invalid item type") | ||
| last_seen_item_type = filters[1].comparison_filter.value.val_int | ||
|
|
||
| if not ( | ||
| filters[2].comparison_filter.key.name == "last_seen_timestamp" | ||
| and filters[2].comparison_filter.key.type == AttributeKey.Type.TYPE_DOUBLE | ||
| ): | ||
| raise ValueError("Invalid timestamp") | ||
| last_seen_timestamp = filters[2].comparison_filter.value.val_double | ||
|
|
||
| if not ( | ||
| filters[3].comparison_filter.key.name == "last_seen_trace_id" | ||
| and filters[3].comparison_filter.key.type == AttributeKey.Type.TYPE_STRING | ||
| ): | ||
| raise ValueError("Invalid trace id") | ||
| last_seen_trace_id = filters[3].comparison_filter.value.val_str | ||
|
|
||
| if not ( | ||
| filters[4].comparison_filter.key.name == "last_seen_item_id" | ||
| and filters[4].comparison_filter.key.type == AttributeKey.Type.TYPE_STRING | ||
| ): | ||
| raise ValueError("Invalid item id") | ||
| last_seen_item_id = filters[4].comparison_filter.value.val_str | ||
|
|
||
| return cls( | ||
| last_seen_project_id, | ||
| cast(TraceItemType.ValueType, last_seen_item_type), | ||
| last_seen_timestamp, | ||
| last_seen_trace_id, | ||
| last_seen_item_id, | ||
| ) | ||
| filters = list(page_token.filter_offset.and_filter.filters) | ||
| n = len(filters) | ||
| if n == 2: | ||
| win = _parse_flex_window_from_filters(filters) | ||
| if win is None: | ||
| raise ValueError("Invalid page token") | ||
| w0, w1 = win | ||
| return cls( | ||
| window_start_sec=w0, | ||
| window_end_sec=w1, | ||
| ) | ||
| if n == 7: | ||
| win = _parse_flex_window_from_filters(filters[:2]) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would If not, this part seems like it could be consolidated to just be written once: |
||
| if win is None: | ||
| raise ValueError("Invalid page token") | ||
| w0, w1 = win | ||
| pid, ity, lsts, ltr, lid = _parse_last_seen_tuple(filters[2:]) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: just spell the variables out |
||
| return cls( | ||
| window_start_sec=w0, | ||
| window_end_sec=w1, | ||
| last_seen_project_id=pid, | ||
| last_seen_item_type=ity, | ||
| last_seen_timestamp=lsts, | ||
| last_seen_trace_id=ltr, | ||
| last_seen_item_id=lid, | ||
| ) | ||
| raise ValueError("Invalid page token: expected 2 or 7 filter clauses") | ||
|
|
||
| def to_protobuf(self) -> PageToken: | ||
| filters = TraceItemFilter( | ||
| and_filter=AndFilter( | ||
| filters=[ | ||
| and_filters: list[TraceItemFilter] = list( | ||
| _flex_time_window_filters(self.window_start_sec, self.window_end_sec) | ||
| ) | ||
| if self.has_last_seen: | ||
| and_filters.extend( | ||
| [ | ||
| TraceItemFilter( | ||
| comparison_filter=ComparisonFilter( | ||
| key=AttributeKey( | ||
|
|
@@ -165,12 +256,41 @@ def to_protobuf(self) -> PageToken: | |
| ), | ||
| ] | ||
| ) | ||
| if not and_filters: | ||
| raise ValueError("empty export page token") | ||
| return PageToken( | ||
| filter_offset=TraceItemFilter( | ||
| and_filter=AndFilter(filters=and_filters), | ||
| ) | ||
| ) | ||
| return PageToken(filter_offset=filters) | ||
|
|
||
|
|
||
| def _is_flextime_export(in_msg: ExportTraceItemsRequest) -> bool: | ||
| if not in_msg.meta.HasField("downsampled_storage_config"): | ||
| return False | ||
| return ( | ||
| in_msg.meta.downsampled_storage_config.mode | ||
| == DownsampledStorageConfig.Mode.MODE_HIGHEST_ACCURACY_FLEXTIME | ||
| ) | ||
|
|
||
|
|
||
| def _export_query_meta( | ||
| in_msg: ExportTraceItemsRequest, routing_decision: RoutingDecision | ||
| ) -> RequestMeta: | ||
| if routing_decision.time_window is None: | ||
| return in_msg.meta | ||
| meta = RequestMeta() | ||
| meta.CopyFrom(in_msg.meta) | ||
| meta.start_timestamp.CopyFrom(routing_decision.time_window.start_timestamp) | ||
| meta.end_timestamp.CopyFrom(routing_decision.time_window.end_timestamp) | ||
| return meta | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Page token's time window is never applied to queryHigh Severity The Additional Locations (1)Reviewed by Cursor Bugbot for commit b46663e. Configure here. |
||
|
|
||
|
|
||
| def _build_query( | ||
| in_msg: ExportTraceItemsRequest, limit: int, page_token: ExportTraceItemsPageToken | None = None | ||
| in_msg: ExportTraceItemsRequest, | ||
| limit: int, | ||
| page_token: ExportTraceItemsPageToken | None = None, | ||
| query_meta: RequestMeta | None = None, | ||
| ) -> Query: | ||
| selected_columns = [ | ||
| SelectedExpression("timestamp", f.toUnixTimestamp(column("timestamp"), alias="timestamp")), | ||
|
|
@@ -244,13 +364,14 @@ def _build_query( | |
| ), | ||
| ) | ||
| ] | ||
| if page_token is not None | ||
| if page_token is not None and page_token.has_last_seen | ||
| else [] | ||
| ) | ||
| meta = query_meta if query_meta is not None else in_msg.meta | ||
| query = Query( | ||
| from_clause=entity, | ||
| selected_columns=selected_columns, | ||
| condition=base_conditions_and(in_msg.meta, *(page_token_filter)), | ||
| condition=base_conditions_and(meta, *(page_token_filter)), | ||
| order_by=[ | ||
| # we add organization_id and project_id to the order by to optimize data reading | ||
| # https://clickhouse.com/docs/sql-reference/statements/select/order-by#optimization-of-data-reading | ||
|
|
@@ -269,14 +390,28 @@ def _build_query( | |
|
|
||
|
|
||
| def _build_snuba_request( | ||
| in_msg: ExportTraceItemsRequest, limit: int, page_token: ExportTraceItemsPageToken | None = None | ||
| in_msg: ExportTraceItemsRequest, | ||
| routing_decision: RoutingDecision, | ||
| limit: int, | ||
| page_token: ExportTraceItemsPageToken | None = None, | ||
| ) -> SnubaRequest: | ||
| query_settings = setup_trace_query_settings() if in_msg.meta.debug else HTTPQuerySettings() | ||
| try: | ||
| routing_decision.strategy.merge_clickhouse_settings(routing_decision, query_settings) | ||
| query_settings.set_sampling_tier(routing_decision.tier) | ||
| except Exception as e: | ||
| sentry_sdk.capture_message(f"Error merging clickhouse settings: {e}") | ||
|
|
||
| query_settings.set_skip_transform_order_by(True) | ||
| return SnubaRequest( | ||
| id=uuid.UUID(in_msg.meta.request_id), | ||
| original_body=MessageToDict(in_msg), | ||
| query=_build_query(in_msg, limit, page_token), | ||
| query=_build_query( | ||
| in_msg, | ||
| limit, | ||
| page_token, | ||
| query_meta=_export_query_meta(in_msg, routing_decision), | ||
| ), | ||
| query_settings=query_settings, | ||
| attribution_info=AttributionInfo( | ||
| referrer=in_msg.meta.referrer, | ||
|
|
@@ -425,22 +560,37 @@ def _execute(self, in_msg: ExportTraceItemsRequest) -> ExportTraceItemsResponse: | |
| page_token = ExportTraceItemsPageToken.from_protobuf(in_msg.page_token) | ||
| results = run_query( | ||
| dataset=PluggableDataset(name="eap", all_entities=[]), | ||
| request=_build_snuba_request(in_msg, limit, page_token), | ||
| request=_build_snuba_request(in_msg, self.routing_decision, limit, page_token), | ||
| timer=self._timer, | ||
| ) | ||
|
|
||
| rows = results.result.get("data", []) | ||
| processed_results = _convert_rows(rows) | ||
| is_flex = _is_flextime_export(in_msg) | ||
| orig_start = in_msg.meta.start_timestamp.seconds | ||
| routed = self.routing_decision.time_window | ||
|
|
||
| w_start = in_msg.meta.start_timestamp.seconds | ||
| w_end = in_msg.meta.end_timestamp.seconds | ||
| if routed is not None: | ||
| w_start, w_end = routed.start_timestamp.seconds, routed.end_timestamp.seconds | ||
|
|
||
| next_token: PageToken | None = None | ||
| if len(processed_results.items) >= limit: | ||
| next_token = ExportTraceItemsPageToken( | ||
| window_start_sec=w_start, | ||
| window_end_sec=w_end, | ||
| last_seen_project_id=processed_results.last_seen_project_id, | ||
| last_seen_item_type=processed_results.last_seen_item_type, | ||
| last_seen_trace_id=processed_results.last_seen_trace_id, | ||
| last_seen_timestamp=processed_results.last_seen_timestamp, | ||
| last_seen_item_id=processed_results.last_seen_item_id, | ||
| ).to_protobuf() | ||
| elif is_flex and routed is not None and routed.start_timestamp.seconds > orig_start: | ||
| next_token = ExportTraceItemsPageToken( | ||
| window_start_sec=orig_start, | ||
| window_end_sec=routed.start_timestamp.seconds, | ||
| ).to_protobuf() | ||
|
Comment on lines
+589
to
+593
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: The pagination token's time window ( Suggested FixWhen a Prompt for AI AgentDid we get this right? 👍 / 👎 to inform future reviews. |
||
| else: | ||
| next_token = PageToken(end_pagination=True) | ||
|
|
||
|
|
||


Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I lean towards not having hard-coded integers like 2, 5, or 7 - could we think of a better way? One idea I thought of (and please feel free to propose better ways) would be to have
_WINDOW_FIELDS = [FLEX_WIN_START, FLEX_WIN_END]and then_KEYSET_CURSOR_FIELDS = [last seen_project_id, last_seen_item_type, etc]and then it would allow you to do iflen(filters) != len(_KEYSET_CURSOR_FIELDS), which is more readable. Feel free to use better variable names or another approachThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, previously you added this; I just moved the code. I can think if there's better way. I am not much familiar with patterns in this codebase.