diff --git a/src/sentry/profiles/flamegraph.py b/src/sentry/profiles/flamegraph.py index 483b7f36df3ff6..fc93aa432278c5 100644 --- a/src/sentry/profiles/flamegraph.py +++ b/src/sentry/profiles/flamegraph.py @@ -20,7 +20,7 @@ Storage, ) -from sentry import features, options +from sentry import options from sentry.search.eap.types import EAPResponse, SearchResolverConfig from sentry.search.events.builder.discover import DiscoverQueryBuilder from sentry.search.events.builder.profile_functions import ProfileFunctionsQueryBuilder @@ -102,18 +102,8 @@ def get_profile_candidates(self) -> ProfileCandidates: if self.data_source == "functions": return self.get_profile_candidates_from_functions() elif self.data_source == "transactions": - if features.has( - "organizations:profiling-flamegraph-use-increased-chunks-query-strategy", - self.snuba_params.organization, - ): - return self.get_profile_candidates_from_transactions_v2() return self.get_profile_candidates_from_transactions() elif self.data_source == "profiles": - if features.has( - "organizations:profiling-flamegraph-use-increased-chunks-query-strategy", - self.snuba_params.organization, - ): - return self.get_profile_candidates_from_profiles_v2() return self.get_profile_candidates_from_profiles() elif self.data_source == "spans": return self.get_profile_candidates_from_spans() @@ -190,50 +180,6 @@ def get_profile_candidates_from_functions(self) -> ProfileCandidates: def get_profile_candidates_from_transactions(self) -> ProfileCandidates: max_profiles = options.get("profiling.flamegraph.profile-set.size") - - builder = self.get_transactions_based_candidate_query(query=self.query, limit=max_profiles) - - results = builder.run_query( - Referrer.API_PROFILING_PROFILE_FLAMEGRAPH_TRANSACTION_CANDIDATES.value, - ) - results = builder.process_results(results) - - transaction_profile_candidates: list[TransactionProfileCandidate] = [ - { - "project_id": row["project.id"], - "profile_id": row["profile.id"], - } - for row in results["data"] - if row["profile.id"] is not None - ] - - max_continuous_profile_candidates = max( - max_profiles - len(transaction_profile_candidates), 0 - ) - - continuous_profile_candidates, _ = self.get_chunks_for_profilers( - [ - ProfilerMeta( - project_id=row["project.id"], - profiler_id=row["profiler.id"], - thread_id=row["thread.id"], - start=row["precise.start_ts"], - end=row["precise.finish_ts"], - transaction_id=row["id"], - ) - for row in results["data"] - if row["profiler.id"] is not None and row["thread.id"] - ], - max_continuous_profile_candidates, - ) - - return { - "transaction": transaction_profile_candidates, - "continuous": continuous_profile_candidates, - } - - def get_profile_candidates_from_transactions_v2(self) -> ProfileCandidates: - max_profiles = options.get("profiling.flamegraph.profile-set.size") initial_chunk_delta_hours = options.get( "profiling.flamegraph.query.initial_chunk_delta.hours" ) @@ -503,150 +449,6 @@ def get_profile_candidates_from_profiles(self) -> ProfileCandidates: if self.snuba_params.organization is None: raise ValueError("`organization` is required and cannot be `None`") - max_profiles = options.get("profiling.flamegraph.profile-set.size") - - referrer = Referrer.API_PROFILING_PROFILE_FLAMEGRAPH_PROFILE_CANDIDATES.value - - transaction_profiles_builder = self.get_transactions_based_candidate_query( - query=None, limit=max_profiles - ) - - conditions = [] - - conditions.append(Condition(Column("project_id"), Op.IN, self.snuba_params.project_ids)) - - conditions.append( - Condition(Column("start_timestamp"), Op.LT, resolve_datetime64(self.snuba_params.end)) - ) - - conditions.append( - Condition(Column("end_timestamp"), Op.GTE, resolve_datetime64(self.snuba_params.start)) - ) - - environments = self.snuba_params.environment_names - if environments: - conditions.append(Condition(Column("environment"), Op.IN, environments)) - - continuous_profiles_query = Query( - match=Storage(StorageKey.ProfileChunks.value), - select=[ - Column("project_id"), - Column("profiler_id"), - Column("chunk_id"), - Column("start_timestamp"), - Column("end_timestamp"), - ], - where=conditions, - orderby=[OrderBy(Column("start_timestamp"), Direction.DESC)], - limit=Limit(max_profiles), - ) - - all_results = bulk_snuba_queries( - [ - transaction_profiles_builder.get_snql_query(), - Request( - dataset=Dataset.Profiles.value, - app_id="default", - query=continuous_profiles_query, - tenant_ids={ - "referrer": referrer, - "organization_id": self.snuba_params.organization.id, - }, - ), - ], - referrer, - ) - - transaction_profile_results = transaction_profiles_builder.process_results(all_results[0]) - continuous_profile_results = all_results[1] - - transaction_profile_candidates: list[TransactionProfileCandidate] = [ - { - "project_id": row["project.id"], - "profile_id": row["profile.id"], - } - for row in transaction_profile_results["data"] - if row["profile.id"] is not None - ] - - max_continuous_profile_candidates = max( - max_profiles - len(transaction_profile_candidates), 0 - ) - - profiler_metas = [ - ProfilerMeta( - project_id=row["project.id"], - profiler_id=row["profiler.id"], - thread_id=row["thread.id"], - start=row["precise.start_ts"], - end=row["precise.finish_ts"], - transaction_id=row["id"], - ) - for row in transaction_profile_results["data"] - if row["profiler.id"] is not None and row["thread.id"] - ] - - continuous_profile_candidates: list[ContinuousProfileCandidate] = [] - continuous_duration = 0.0 - - # If there are continuous profiles attached to transactions, we prefer those as - # the active thread id gives us more user friendly flamegraphs than without. - if profiler_metas: - continuous_profile_candidates, continuous_duration = self.get_chunks_for_profilers( - profiler_metas, max_continuous_profile_candidates - ) - - seen_chunks = { - (candidate["profiler_id"], candidate["chunk_id"]) - for candidate in continuous_profile_candidates - } - - always_use_direct_chunks = features.has( - "organizations:profiling-flamegraph-always-use-direct-chunks", - self.snuba_params.organization, - actor=self.request.user, - ) - - # If we still don't have any continuous profile candidates, we'll fall back to - # directly using the continuous profiling data - if not continuous_profile_candidates or always_use_direct_chunks: - total_duration = continuous_duration if always_use_direct_chunks else 0.0 - max_duration = options.get("profiling.continuous-profiling.flamegraph.max-seconds") - - for row in continuous_profile_results["data"]: - - # Make sure to dedupe profile chunks so we don't reuse chunks - if (row["profiler_id"], row["chunk_id"]) in seen_chunks: - continue - - start = datetime.fromisoformat(row["start_timestamp"]).timestamp() - end = datetime.fromisoformat(row["end_timestamp"]).timestamp() - - candidate: ContinuousProfileCandidate = { - "project_id": row["project_id"], - "profiler_id": row["profiler_id"], - "chunk_id": row["chunk_id"], - "start": str(int(start * 1e9)), - "end": str(int(end * 1e9)), - } - - continuous_profile_candidates.append(candidate) - - total_duration += end - start - - # can set max duration to negative to skip this check - if max_duration >= 0 and total_duration >= max_duration: - break - - return { - "transaction": transaction_profile_candidates, - "continuous": continuous_profile_candidates, - } - - def get_profile_candidates_from_profiles_v2(self) -> ProfileCandidates: - if self.snuba_params.organization is None: - raise ValueError("`organization` is required and cannot be `None`") - max_profiles = options.get("profiling.flamegraph.profile-set.size") initial_chunk_delta_hours = options.get( "profiling.flamegraph.query.initial_chunk_delta.hours" diff --git a/tests/sentry/api/endpoints/test_organization_profiling_profiles.py b/tests/sentry/api/endpoints/test_organization_profiling_profiles.py index f6b6b96de40fcf..1385ec663ab838 100644 --- a/tests/sentry/api/endpoints/test_organization_profiling_profiles.py +++ b/tests/sentry/api/endpoints/test_organization_profiling_profiles.py @@ -237,10 +237,12 @@ def test_queries_profile_candidates_from_transactions(self) -> None: assert response.status_code == 200, response.content - mock_raw_snql_query.assert_called_once() + # The new implementation uses exponential time chunking, so it makes multiple calls + assert mock_raw_snql_query.call_count > 0 - call_args = mock_raw_snql_query.call_args.args - snql_request = call_args[0] + # Check the first call to verify it queries transactions correctly + first_call_args = mock_raw_snql_query.call_args_list[0][0] + snql_request = first_call_args[0] assert snql_request.dataset == Dataset.Discover.value assert ( @@ -290,33 +292,22 @@ def test_queries_profile_candidates_from_profiles(self) -> None: mock_bulk_snuba_queries.assert_called_once() call_args = mock_bulk_snuba_queries.call_args.args - [transactions_snql_request, profiles_snql_request] = call_args[0] - - assert transactions_snql_request.dataset == Dataset.Discover.value - assert ( - Or( - conditions=[ - Condition(Column("profile_id"), Op.IS_NOT_NULL), - And( - conditions=[ - Condition(Column("profiler_id"), Op.IS_NOT_NULL), - Condition( - Function( - "has", - [Column("contexts.key"), "trace.thread_id"], - ), - Op.EQ, - 1, - ), - ], - ), - ], - ) - in transactions_snql_request.query.where - ) + [profiles_snql_request] = call_args[0] assert profiles_snql_request.dataset == Dataset.Profiles.value + where_conditions = profiles_snql_request.query.where + assert len(where_conditions) == 3 + assert Condition(Column("project_id"), Op.IN, [self.project.id]) in where_conditions + condition_signatures = { + (c.lhs.name, c.op) for c in where_conditions if isinstance(c, Condition) + } + assert condition_signatures == { + ("project_id", Op.IN), + ("start_timestamp", Op.LT), + ("end_timestamp", Op.GTE), + } + @patch("sentry.profiles.flamegraph.bulk_snuba_queries", wraps=bulk_snuba_queries) @patch("sentry.search.events.builder.base.raw_snql_query", wraps=raw_snql_query) @patch("sentry.api.endpoints.organization_profiling_profiles.proxy_profiling_service") @@ -421,14 +412,14 @@ def test_queries_profile_candidates_from_functions_with_data( }, ) - @patch("sentry.profiles.flamegraph.bulk_snuba_queries") + @patch("sentry.profiles.flamegraph.FlamegraphExecutor._query_chunks_for_profilers") @patch("sentry.search.events.builder.base.raw_snql_query", wraps=raw_snql_query) @patch("sentry.api.endpoints.organization_profiling_profiles.proxy_profiling_service") def test_queries_profile_candidates_from_transactions_with_data( self, mock_proxy_profiling_service, mock_raw_snql_query, - mock_bulk_snuba_queries, + mock_query_chunks_for_profilers, ): # this transaction has no transaction profile or continuous profile self.store_transaction(transaction="foo", project=self.project) @@ -466,7 +457,7 @@ def test_queries_profile_candidates_from_transactions_with_data( ) # not able to write profile chunks to the table yet so mock it's response here - # so that the profiler transaction 1 looks like it has a profile chunk within + # so that the profiler transaction looks like it has a profile chunk within # the specified time range chunk = { # single chunk aligned to the transaction "project_id": self.project.id, @@ -475,7 +466,7 @@ def test_queries_profile_candidates_from_transactions_with_data( "start_timestamp": (start_timestamp - buffer).isoformat(), "end_timestamp": (finish_timestamp + buffer).isoformat(), } - mock_bulk_snuba_queries.return_value = [{"data": [chunk]}] + mock_query_chunks_for_profilers.return_value = [{"data": [chunk]}] mock_proxy_profiling_service.return_value = HttpResponse(status=200) @@ -487,13 +478,12 @@ def test_queries_profile_candidates_from_transactions_with_data( ) assert response.status_code == 200, response.content - # In practice, this should be called twice. But the second call is - # mocked in this test due to the inability to write to the profile - # chunks table. - mock_raw_snql_query.assert_called_once() + # The new implementation uses exponential time chunking, so it makes multiple calls + assert mock_raw_snql_query.call_count > 0 - call_args = mock_raw_snql_query.call_args.args - snql_request = call_args[0] + # Check the first call to verify it queries transactions correctly + first_call_args = mock_raw_snql_query.call_args_list[0][0] + snql_request = first_call_args[0] assert snql_request.dataset == Dataset.Discover.value assert ( @@ -545,39 +535,28 @@ def test_queries_profile_candidates_from_profiles_with_continuous_profiles_witho ): # this transaction has transaction profile profile_id = uuid4().hex - profile_transaction = self.store_transaction( + self.store_transaction( transaction="foo", profile_id=profile_id, project=self.project, ) - transaction = { - "project.id": self.project.id, - "profile.id": profile_id, - "timestamp": datetime.fromtimestamp(profile_transaction["timestamp"]).isoformat(), - "profiler.id": None, - "thread.id": None, - "precise.start_ts": datetime.fromtimestamp( - profile_transaction["start_timestamp"] - ).timestamp(), - "precise.finish_ts": datetime.fromtimestamp( - profile_transaction["timestamp"] - ).timestamp(), - } # this transaction has continuous profile with a matching chunk (to be mocked below) profiler_id = uuid4().hex thread_id = "12345" + profiler_transaction_id = uuid4().hex profiler_transaction = self.store_transaction( transaction="foo", profiler_id=profiler_id, thread_id=thread_id, + transaction_id=profiler_transaction_id, project=self.project, ) start_timestamp = datetime.fromtimestamp(profiler_transaction["start_timestamp"]) finish_timestamp = datetime.fromtimestamp(profiler_transaction["timestamp"]) buffer = timedelta(seconds=3) # not able to write profile chunks to the table yet so mock it's response here - # so that the profiler transaction 1 looks like it has a profile chunk within + # so that the profiler transaction looks like it has a profile chunk within # the specified time range chunk = { "project_id": self.project.id, @@ -589,17 +568,14 @@ def test_queries_profile_candidates_from_profiles_with_continuous_profiles_witho with ( patch( - "sentry.profiles.flamegraph.bulk_snuba_queries", - wraps=bulk_snuba_queries, - ) as mock_bulk_snuba_queries, + "sentry.profiles.flamegraph.FlamegraphExecutor._query_chunks_for_profilers" + ) as mock_query_chunks_for_profilers, patch( "sentry.api.endpoints.organization_profiling_profiles.proxy_profiling_service" ) as mock_proxy_profiling_service, ): - mock_bulk_snuba_queries.return_value = [ - {"data": [transaction]}, - {"data": [chunk]}, - ] + # Mock the chunks query for the profiler_meta + mock_query_chunks_for_profilers.return_value = [{"data": [chunk]}] mock_proxy_profiling_service.return_value = HttpResponse(status=200) response = self.do_request( @@ -611,35 +587,8 @@ def test_queries_profile_candidates_from_profiles_with_continuous_profiles_witho assert response.status_code == 200, response.content - mock_bulk_snuba_queries.assert_called_once() - - call_args = mock_bulk_snuba_queries.call_args.args - [transactions_snql_request, profiles_snql_request] = call_args[0] - - assert transactions_snql_request.dataset == Dataset.Discover.value - assert ( - Or( - conditions=[ - Condition(Column("profile_id"), Op.IS_NOT_NULL), - And( - conditions=[ - Condition(Column("profiler_id"), Op.IS_NOT_NULL), - Condition( - Function( - "has", - [Column("contexts.key"), "trace.thread_id"], - ), - Op.EQ, - 1, - ), - ], - ), - ], - ) - in transactions_snql_request.query.where - ) - - assert profiles_snql_request.dataset == Dataset.Profiles.value + # Verify that chunks were queried for the profiler + mock_query_chunks_for_profilers.assert_called_once() mock_proxy_profiling_service.assert_called_once_with( method="POST", @@ -656,8 +605,10 @@ def test_queries_profile_candidates_from_profiles_with_continuous_profiles_witho "project_id": self.project.id, "profiler_id": profiler_id, "chunk_id": chunk["chunk_id"], - "start": str(int((start_timestamp - buffer).timestamp() * 1e9)), - "end": str(int((finish_timestamp + buffer).timestamp() * 1e9)), + "thread_id": thread_id, + "start": str(int(profiler_transaction["start_timestamp"] * 1e9)), + "end": str(int(profiler_transaction["timestamp"] * 1e9)), + "transaction_id": profiler_transaction_id, }, ], }, @@ -669,26 +620,12 @@ def test_queries_profile_candidates_from_profiles_with_continuous_profiles_with_ # this transaction has transaction profile profile_id = uuid4().hex profile_transaction_id = uuid4().hex - profile_transaction = self.store_transaction( + self.store_transaction( transaction="foo", profile_id=profile_id, transaction_id=profile_transaction_id, project=self.project, ) - transaction_1 = { - "id": profile_transaction_id, - "project.id": self.project.id, - "profile.id": profile_id, - "timestamp": datetime.fromtimestamp(profile_transaction["timestamp"]).isoformat(), - "profiler.id": None, - "thread.id": None, - "precise.start_ts": datetime.fromtimestamp( - profile_transaction["start_timestamp"] - ).timestamp(), - "precise.finish_ts": datetime.fromtimestamp( - profile_transaction["timestamp"] - ).timestamp(), - } # this transaction has continuous profile with a matching chunk (to be mocked below) profiler_id = uuid4().hex @@ -696,31 +633,17 @@ def test_queries_profile_candidates_from_profiles_with_continuous_profiles_with_ profiler_transaction_id = uuid4().hex profiler_transaction = self.store_transaction( transaction="foo", - profile_id=profiler_id, + profiler_id=profiler_id, thread_id=thread_id, transaction_id=profiler_transaction_id, project=self.project, ) - transaction_2 = { - "id": profiler_transaction_id, - "project.id": self.project.id, - "profile.id": None, - "timestamp": datetime.fromtimestamp(profile_transaction["timestamp"]).isoformat(), - "profiler.id": profiler_id, - "thread.id": thread_id, - "precise.start_ts": datetime.fromtimestamp( - profile_transaction["start_timestamp"] - ).timestamp(), - "precise.finish_ts": datetime.fromtimestamp( - profile_transaction["timestamp"] - ).timestamp(), - } - start_timestamp = datetime.fromtimestamp(profile_transaction["start_timestamp"]) - finish_timestamp = datetime.fromtimestamp(profile_transaction["timestamp"]) + start_timestamp = datetime.fromtimestamp(profiler_transaction["start_timestamp"]) + finish_timestamp = datetime.fromtimestamp(profiler_transaction["timestamp"]) buffer = timedelta(seconds=3) # not able to write profile chunks to the table yet so mock it's response here - # so that the profiler transaction 1 looks like it has a profile chunk within + # so that the profiler transaction looks like it has a profile chunk within # the specified time range chunk_1 = { "project_id": self.project.id, @@ -730,29 +653,16 @@ def test_queries_profile_candidates_from_profiles_with_continuous_profiles_with_ "end_timestamp": (finish_timestamp + buffer).isoformat(), } - # a random chunk that could be chosen but will not because we have a chunk - # associated to a profile - chunk_2 = { - "project_id": self.project.id, - "profiler_id": uuid4().hex, - "chunk_id": uuid4().hex, - "start_timestamp": (start_timestamp - buffer).isoformat(), - "end_timestamp": (finish_timestamp + buffer).isoformat(), - } - with ( - patch("sentry.profiles.flamegraph.bulk_snuba_queries") as mock_bulk_snuba_queries, + patch( + "sentry.profiles.flamegraph.FlamegraphExecutor._query_chunks_for_profilers" + ) as mock_query_chunks_for_profilers, patch( "sentry.api.endpoints.organization_profiling_profiles.proxy_profiling_service" ) as mock_proxy_profiling_service, ): - mock_bulk_snuba_queries.side_effect = [ - [ - {"data": [transaction_1, transaction_2]}, - {"data": [chunk_1, chunk_2]}, - ], - [{"data": [chunk_1]}], - ] + # Mock the chunks query for the profiler_meta + mock_query_chunks_for_profilers.return_value = [{"data": [chunk_1]}] mock_proxy_profiling_service.return_value = HttpResponse(status=200) response = self.do_request( @@ -764,39 +674,8 @@ def test_queries_profile_candidates_from_profiles_with_continuous_profiles_with_ assert response.status_code == 200, response.content - assert mock_bulk_snuba_queries.call_count == 2 - - first_call_args = mock_bulk_snuba_queries.call_args_list[0][0] - [transactions_snql_request, profiles_snql_request] = first_call_args[0] - - assert transactions_snql_request.dataset == Dataset.Discover.value - assert ( - Or( - conditions=[ - Condition(Column("profile_id"), Op.IS_NOT_NULL), - And( - conditions=[ - Condition(Column("profiler_id"), Op.IS_NOT_NULL), - Condition( - Function( - "has", - [Column("contexts.key"), "trace.thread_id"], - ), - Op.EQ, - 1, - ), - ], - ), - ], - ) - in transactions_snql_request.query.where - ) - - assert profiles_snql_request.dataset == Dataset.Profiles.value - - second_call_args = mock_bulk_snuba_queries.call_args_list[1][0] - [profiles_snql_request] = second_call_args[0] - assert profiles_snql_request.dataset == Dataset.Profiles.value + # Verify that chunks were queried for the profiler + mock_query_chunks_for_profilers.assert_called_once() mock_proxy_profiling_service.assert_called_once_with( method="POST",