From 2b337ce4d85ab97c4af97f839a9e6797b4a6e871 Mon Sep 17 00:00:00 2001 From: Nico Hinderling Date: Thu, 9 Apr 2026 10:20:57 -0700 Subject: [PATCH] ref(preprod): Remove Kafka producer and taskbroker feature flag Taskbroker is now the only dispatch path for preprod artifacts. Remove the Kafka producer module, the launchpad-taskbroker-rollout feature flag, and all associated dead code including the orphaned Kafka topic definition and settings constant. Move PreprodFeature enum from the deleted producer module to quotas where it is actually used. Simplify the rerun analysis endpoint to check run_size directly instead of building an intermediate list. Co-Authored-By: Claude --- src/sentry/conf/server.py | 3 - src/sentry/conf/types/kafka_definition.py | 1 - src/sentry/features/temporary.py | 2 - .../preprod_artifact_rerun_analysis.py | 115 ++------------- .../project_preprod_artifact_update.py | 3 +- src/sentry/preprod/producer.py | 63 -------- src/sentry/preprod/quotas.py | 8 +- src/sentry/preprod/tasks.py | 54 +------ .../test_preprod_artifact_rerun_analysis.py | 139 ++---------------- tests/sentry/preprod/test_tasks.py | 119 +-------------- 10 files changed, 38 insertions(+), 469 deletions(-) delete mode 100644 src/sentry/preprod/producer.py diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index a9a4944d73c758..529fe0828e25a7 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -2691,7 +2691,6 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]: "shared-resources-usage": "default", "buffered-segments": "default", "buffered-segments-dlq": "default", - "preprod-artifact-events": "default", # Taskworker topics "taskworker": "default", "taskworker-dlq": "default", @@ -3026,8 +3025,6 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]: SENTRY_PROFILE_OCCURRENCES_FUTURES_MAX_LIMIT = 10000 SENTRY_PROFILE_EAP_FUTURES_MAX_LIMIT = 10000 -SENTRY_PREPROD_ARTIFACT_EVENTS_FUTURES_MAX_LIMIT = 10000 - # How long we should wait for a gateway proxy request to return before giving up GATEWAY_PROXY_TIMEOUT: int | None = ( int(os.environ["SENTRY_APIGW_PROXY_TIMEOUT"]) diff --git a/src/sentry/conf/types/kafka_definition.py b/src/sentry/conf/types/kafka_definition.py index f6268e9bf891ce..34d8d77f44a6bc 100644 --- a/src/sentry/conf/types/kafka_definition.py +++ b/src/sentry/conf/types/kafka_definition.py @@ -53,7 +53,6 @@ class Topic(Enum): INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings" INGEST_OCCURRENCES = "ingest-occurrences" INGEST_MONITORS = "ingest-monitors" - PREPROD_ARTIFACT_EVENTS = "preprod-artifact-events" MONITORS_CLOCK_TICK = "monitors-clock-tick" MONITORS_CLOCK_TASKS = "monitors-clock-tasks" MONITORS_INCIDENT_OCCURRENCES = "monitors-incident-occurrences" diff --git a/src/sentry/features/temporary.py b/src/sentry/features/temporary.py index 5a771c7d662412..78cdf5c1b53c2d 100644 --- a/src/sentry/features/temporary.py +++ b/src/sentry/features/temporary.py @@ -157,8 +157,6 @@ def register_temporary_features(manager: FeatureManager) -> None: manager.add("organizations:invite-members", OrganizationFeature, FeatureHandlerStrategy.INTERNAL, default=True, api_expose=True) # Enable rate limits for inviting members. manager.add("organizations:invite-members-rate-limits", OrganizationFeature, FeatureHandlerStrategy.INTERNAL, default=True, api_expose=False) - # Enable rollout of launchpad taskbroker shadowing/usage - manager.add("organizations:launchpad-taskbroker-rollout", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True) manager.add("organizations:mep-use-default-tags", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False) # Enable flamegraph visualization for MetricKit hang diagnostic stack traces manager.add("organizations:metrickit-flamegraph", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True) diff --git a/src/sentry/preprod/api/endpoints/preprod_artifact_rerun_analysis.py b/src/sentry/preprod/api/endpoints/preprod_artifact_rerun_analysis.py index c532dc73e02f3c..014114cf4db543 100644 --- a/src/sentry/preprod/api/endpoints/preprod_artifact_rerun_analysis.py +++ b/src/sentry/preprod/api/endpoints/preprod_artifact_rerun_analysis.py @@ -8,7 +8,7 @@ from rest_framework.request import Request from rest_framework.response import Response -from sentry import analytics, features +from sentry import analytics from sentry.api.api_owners import ApiOwner from sentry.api.api_publish_status import ApiPublishStatus from sentry.api.base import Endpoint, cell_silo_endpoint, internal_cell_silo_endpoint @@ -22,8 +22,7 @@ PreprodArtifactSizeComparison, PreprodArtifactSizeMetrics, ) -from sentry.preprod.producer import PreprodFeature, produce_preprod_artifact_to_kafka -from sentry.preprod.quotas import should_run_distribution, should_run_size +from sentry.preprod.quotas import should_run_size from sentry.preprod.tasks import dispatch_taskbroker logger = logging.getLogger(__name__) @@ -65,47 +64,12 @@ def post( organization = head_artifact.project.organization - # Empty list is valid - triggers default processing behavior - requested_features: list[PreprodFeature] = [] - run_size, _ = should_run_size(head_artifact, actor=request.user) if run_size: - requested_features.append(PreprodFeature.SIZE_ANALYSIS) - - run_distribution, _ = should_run_distribution(head_artifact, actor=request.user) - if run_distribution: - requested_features.append(PreprodFeature.BUILD_DISTRIBUTION) - - if PreprodFeature.SIZE_ANALYSIS in requested_features: cleanup_old_metrics(head_artifact) reset_artifact_data(head_artifact) - if features.has("organizations:launchpad-taskbroker-rollout", organization): - dispatched = dispatch_taskbroker( - head_artifact.project.id, organization.id, head_artifact_id - ) - else: - try: - produce_preprod_artifact_to_kafka( - project_id=head_artifact.project.id, - organization_id=organization.id, - artifact_id=head_artifact_id, - requested_features=requested_features, - ) - dispatched = True - except Exception: - logger.exception( - "preprod_artifact.rerun_analysis.dispatch_error", - extra={ - "artifact_id": head_artifact_id, - "user_id": request.user.id, - "organization_id": organization.id, - "project_id": head_artifact.project.id, - }, - ) - dispatched = False - - if not dispatched: + if not dispatch_taskbroker(head_artifact.project.id, organization.id, head_artifact_id): return Response( { "detail": f"Failed to queue analysis for artifact {head_artifact_id}", @@ -182,36 +146,9 @@ def post(self, request: Request) -> Response: reset_artifact_data(preprod_artifact) organization = preprod_artifact.project.organization - if features.has("organizations:launchpad-taskbroker-rollout", organization): - dispatched = dispatch_taskbroker( - preprod_artifact.project.id, organization.id, preprod_artifact_id - ) - else: - try: - produce_preprod_artifact_to_kafka( - project_id=preprod_artifact.project.id, - organization_id=organization.id, - artifact_id=preprod_artifact_id, - requested_features=[ - PreprodFeature.SIZE_ANALYSIS, - PreprodFeature.BUILD_DISTRIBUTION, - ], - ) - dispatched = True - except Exception as e: - logger.exception( - "preprod_artifact.admin_rerun_analysis.dispatch_error", - extra={ - "artifact_id": preprod_artifact_id, - "user_id": request.user.id, - "organization_id": organization.id, - "project_id": preprod_artifact.project.id, - "error": str(e), - }, - ) - dispatched = False - - if not dispatched: + if not dispatch_taskbroker( + preprod_artifact.project.id, organization.id, preprod_artifact_id + ): return Response( { "detail": f"Failed to queue analysis for artifact {preprod_artifact_id}", @@ -303,32 +240,7 @@ def post(self, request: Request) -> Response: cleanup_stats = cleanup_old_metrics(artifact) reset_artifact_data(artifact) - if features.has("organizations:launchpad-taskbroker-rollout", organization): - dispatched = dispatch_taskbroker(artifact.project.id, organization.id, artifact_id) - else: - try: - produce_preprod_artifact_to_kafka( - project_id=artifact.project.id, - organization_id=organization.id, - artifact_id=artifact_id, - requested_features=[ - PreprodFeature.SIZE_ANALYSIS, - PreprodFeature.BUILD_DISTRIBUTION, - ], - ) - dispatched = True - except Exception: - logger.exception( - "preprod_artifact.admin_batch_rerun_analysis.dispatch_error", - extra={ - "artifact_id": artifact_id, - "user_id": request.user.id, - "organization_id": organization.id, - "project_id": artifact.project.id, - }, - ) - dispatched = False - + dispatched = dispatch_taskbroker(artifact.project.id, organization.id, artifact_id) if not dispatched: artifact.refresh_from_db() @@ -367,14 +279,10 @@ def cleanup_old_metrics(preprod_artifact: PreprodArtifact) -> CleanupStats: PreprodArtifactSizeMetrics.objects.filter(preprod_artifact=preprod_artifact) ) - file_ids_to_delete = [] - if size_metrics: size_metric_ids = [sm.id for sm in size_metrics] - for size_metric in size_metrics: - if size_metric.analysis_file_id: - file_ids_to_delete.append(size_metric.analysis_file_id) + file_ids_to_delete = [sm.analysis_file_id for sm in size_metrics if sm.analysis_file_id] comparisons = PreprodArtifactSizeComparison.objects.filter( head_size_analysis_id__in=size_metric_ids @@ -392,10 +300,9 @@ def cleanup_old_metrics(preprod_artifact: PreprodArtifact) -> CleanupStats: id__in=size_metric_ids ).delete() - if file_ids_to_delete: - for file in File.objects.filter(id__in=file_ids_to_delete): - file.delete() - stats.files_total_deleted += 1 + for file in File.objects.filter(id__in=file_ids_to_delete): + file.delete() + stats.files_total_deleted += 1 PreprodArtifactSizeMetrics.objects.create( preprod_artifact=preprod_artifact, diff --git a/src/sentry/preprod/api/endpoints/project_preprod_artifact_update.py b/src/sentry/preprod/api/endpoints/project_preprod_artifact_update.py index eb12b45f4b46e4..1fda7dde0c79a1 100644 --- a/src/sentry/preprod/api/endpoints/project_preprod_artifact_update.py +++ b/src/sentry/preprod/api/endpoints/project_preprod_artifact_update.py @@ -26,8 +26,7 @@ PreprodArtifactMobileAppInfo, PreprodArtifactSizeMetrics, ) -from sentry.preprod.producer import PreprodFeature -from sentry.preprod.quotas import should_run_distribution, should_run_size +from sentry.preprod.quotas import PreprodFeature, should_run_distribution, should_run_size from sentry.preprod.vcs.status_checks.size.tasks import create_preprod_status_check_task logger = logging.getLogger(__name__) diff --git a/src/sentry/preprod/producer.py b/src/sentry/preprod/producer.py deleted file mode 100644 index a02e66f3470ac9..00000000000000 --- a/src/sentry/preprod/producer.py +++ /dev/null @@ -1,63 +0,0 @@ -from __future__ import annotations - -import logging -from enum import Enum - -from arroyo import Topic as ArroyoTopic -from arroyo.backends.kafka import KafkaPayload, KafkaProducer -from confluent_kafka import KafkaException -from django.conf import settings - -from sentry.conf.types.kafka_definition import Topic -from sentry.utils import json -from sentry.utils.arroyo_producer import SingletonProducer, get_arroyo_producer -from sentry.utils.kafka_config import get_topic_definition - -logger = logging.getLogger(__name__) - - -class PreprodFeature(Enum): - SIZE_ANALYSIS = "size_analysis" - BUILD_DISTRIBUTION = "build_distribution" - - -def _get_preprod_producer() -> KafkaProducer: - return get_arroyo_producer( - "sentry.preprod.producer", - Topic.PREPROD_ARTIFACT_EVENTS, - exclude_config_keys=["compression.type", "message.max.bytes"], - ) - - -_preprod_producer = SingletonProducer( - _get_preprod_producer, max_futures=settings.SENTRY_PREPROD_ARTIFACT_EVENTS_FUTURES_MAX_LIMIT -) - - -def produce_preprod_artifact_to_kafka( - project_id: int, - organization_id: int, - artifact_id: int, - requested_features: list[PreprodFeature] | None = None, -) -> None: - if requested_features is None: - requested_features = [] - payload_data = { - "artifact_id": str(artifact_id), - "project_id": str(project_id), - "organization_id": str(organization_id), - "requested_features": [feature.value for feature in requested_features], - } - - partition_key = f"{project_id}-{artifact_id}".encode() - payload = KafkaPayload(partition_key, json.dumps(payload_data).encode("utf-8"), []) - - try: - topic = get_topic_definition(Topic.PREPROD_ARTIFACT_EVENTS)["real_topic_name"] - _preprod_producer.produce(ArroyoTopic(topic), payload) - except KafkaException: - logger.exception( - "Failed to send preprod artifact message to Kafka", - extra={"artifact_id": artifact_id, "project_id": project_id}, - ) - raise # Re-raise to trigger task retry diff --git a/src/sentry/preprod/quotas.py b/src/sentry/preprod/quotas.py index 7a18bd716af179..a19c29f65e9cd8 100644 --- a/src/sentry/preprod/quotas.py +++ b/src/sentry/preprod/quotas.py @@ -3,6 +3,7 @@ import logging from collections.abc import Callable from datetime import datetime, timedelta +from enum import Enum from typing import Any import sentry_sdk @@ -15,11 +16,16 @@ from sentry.models.organization import Organization from sentry.preprod.artifact_search import artifact_matches_query from sentry.preprod.models import PreprodArtifact -from sentry.preprod.producer import PreprodFeature from sentry.users.models.user import User logger = logging.getLogger(__name__) + +class PreprodFeature(Enum): + SIZE_ANALYSIS = "size_analysis" + BUILD_DISTRIBUTION = "build_distribution" + + DEFAULT_SIZE_RETENTION_DAYS = 90 diff --git a/src/sentry/preprod/tasks.py b/src/sentry/preprod/tasks.py index 653355ca9a2c1a..6306e10210f732 100644 --- a/src/sentry/preprod/tasks.py +++ b/src/sentry/preprod/tasks.py @@ -11,7 +11,6 @@ from django.utils import timezone from taskbroker_client.retry import Retry -from sentry import features from sentry.constants import DataCategory from sentry.models.commitcomparison import CommitComparison from sentry.models.organization import Organization @@ -28,7 +27,6 @@ PreprodArtifactSizeMetrics, PreprodBuildConfiguration, ) -from sentry.preprod.producer import PreprodFeature, produce_preprod_artifact_to_kafka from sentry.preprod.quotas import ( has_installable_quota, has_size_quota, @@ -165,14 +163,9 @@ def assemble_preprod_artifact( except Exception: pass - if features.has("organizations:launchpad-taskbroker-rollout", organization): - taskbroker_dispatched = dispatch_taskbroker(project_id, org_id, artifact_id) - if not taskbroker_dispatched: - return - else: - kafka_dispatched = _dispatch_kafka(project_id, org_id, artifact_id, checksum) - if not kafka_dispatched: - return + taskbroker_dispatched = dispatch_taskbroker(project_id, org_id, artifact_id) + if not taskbroker_dispatched: + return logger.info( "Finished preprod artifact dispatch", @@ -976,47 +969,6 @@ def detect_expired_preprod_artifacts() -> None: ) -def _dispatch_kafka(project_id: int, org_id: int, artifact_id: int, checksum: str) -> bool: - # Note: requested_features is no longer used for filtering - all features are - # requested here, and the actual quota/filter checks happen in the update endpoint - # (project_preprod_artifact_update.py) after preprocessing completes. - try: - produce_preprod_artifact_to_kafka( - project_id=project_id, - organization_id=org_id, - artifact_id=artifact_id, - requested_features=[ - PreprodFeature.SIZE_ANALYSIS, - PreprodFeature.BUILD_DISTRIBUTION, - ], - ) - return True - except Exception as e: - user_friendly_error_message = "Failed to dispatch preprod artifact event for analysis" - sentry_sdk.capture_exception(e) - logger.exception( - user_friendly_error_message, - extra={ - "project_id": project_id, - "organization_id": org_id, - "checksum": checksum, - "preprod_artifact_id": artifact_id, - }, - ) - PreprodArtifact.objects.filter(id=artifact_id).update( - state=PreprodArtifact.ArtifactState.FAILED, - error_code=PreprodArtifact.ErrorCode.ARTIFACT_PROCESSING_ERROR, - error_message=user_friendly_error_message, - ) - create_preprod_status_check_task.apply_async( - kwargs={ - "preprod_artifact_id": artifact_id, - "caller": "assemble_dispatch_error", - } - ) - return False - - def dispatch_taskbroker(project_id: int, org_id: int, artifact_id: int) -> bool: try: logger.info( diff --git a/tests/sentry/preprod/api/endpoints/test_preprod_artifact_rerun_analysis.py b/tests/sentry/preprod/api/endpoints/test_preprod_artifact_rerun_analysis.py index 557edd1f2524a2..72b825b96a9029 100644 --- a/tests/sentry/preprod/api/endpoints/test_preprod_artifact_rerun_analysis.py +++ b/tests/sentry/preprod/api/endpoints/test_preprod_artifact_rerun_analysis.py @@ -167,12 +167,8 @@ def test_rerun_analysis_clears_distribution_error_fields(self) -> None: self.assert_artifact_reset(artifact) - @patch( - "sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.produce_preprod_artifact_to_kafka" - ) - def test_rerun_analysis_filters_size_analysis_by_query(self, mock_produce_to_kafka): - """Test that rerun analysis filters SIZE_ANALYSIS based on project query setting""" - from sentry.preprod.producer import PreprodFeature + @patch("sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.cleanup_old_metrics") + def test_rerun_analysis_skips_cleanup_when_size_query_excludes_artifact(self, mock_cleanup): from sentry.preprod.quotas import SIZE_ENABLED_QUERY_KEY artifact = self.create_preprod_artifact( @@ -184,22 +180,14 @@ def test_rerun_analysis_filters_size_analysis_by_query(self, mock_produce_to_kaf state=PreprodArtifact.ArtifactState.PROCESSED, ) - # Set up a query filter that should NOT match the artifact self.project.update_option(SIZE_ENABLED_QUERY_KEY, "app_id:com.other.app") self.get_success_response(self.organization.slug, artifact.id, status_code=200) - # Verify produce_preprod_artifact_to_kafka was called without SIZE_ANALYSIS - mock_produce_to_kafka.assert_called_once() - call_kwargs = mock_produce_to_kafka.call_args[1] - assert PreprodFeature.SIZE_ANALYSIS not in call_kwargs["requested_features"] - - @patch( - "sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.produce_preprod_artifact_to_kafka" - ) - def test_rerun_analysis_includes_size_analysis_when_query_matches(self, mock_produce_to_kafka): - """Test that rerun analysis includes SIZE_ANALYSIS when query matches""" - from sentry.preprod.producer import PreprodFeature + mock_cleanup.assert_not_called() + + @patch("sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.cleanup_old_metrics") + def test_rerun_analysis_runs_cleanup_when_size_query_matches(self, mock_cleanup): from sentry.preprod.quotas import SIZE_ENABLED_QUERY_KEY artifact = self.create_preprod_artifact( @@ -211,24 +199,14 @@ def test_rerun_analysis_includes_size_analysis_when_query_matches(self, mock_pro state=PreprodArtifact.ArtifactState.PROCESSED, ) - # Set up a query filter that SHOULD match the artifact self.project.update_option(SIZE_ENABLED_QUERY_KEY, "app_id:com.my.app") self.get_success_response(self.organization.slug, artifact.id, status_code=200) - # Verify produce_preprod_artifact_to_kafka was called with SIZE_ANALYSIS - mock_produce_to_kafka.assert_called_once() - call_kwargs = mock_produce_to_kafka.call_args[1] - assert PreprodFeature.SIZE_ANALYSIS in call_kwargs["requested_features"] - - @patch( - "sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.produce_preprod_artifact_to_kafka" - ) - def test_rerun_analysis_filters_distribution_by_query(self, mock_produce_to_kafka): - """Test that rerun analysis filters BUILD_DISTRIBUTION based on project query setting""" - from sentry.preprod.producer import PreprodFeature - from sentry.preprod.quotas import DISTRIBUTION_ENABLED_QUERY_KEY + mock_cleanup.assert_called_once() + @patch("sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.cleanup_old_metrics") + def test_rerun_analysis_runs_cleanup_when_no_query_set(self, mock_cleanup): artifact = self.create_preprod_artifact( project=self.project, app_name="MyApp", @@ -238,73 +216,12 @@ def test_rerun_analysis_filters_distribution_by_query(self, mock_produce_to_kafk state=PreprodArtifact.ArtifactState.PROCESSED, ) - # Set up a query filter that should NOT match the artifact - self.project.update_option(DISTRIBUTION_ENABLED_QUERY_KEY, "app_id:com.other.app") - self.get_success_response(self.organization.slug, artifact.id, status_code=200) - # Verify produce_preprod_artifact_to_kafka was called without BUILD_DISTRIBUTION - mock_produce_to_kafka.assert_called_once() - call_kwargs = mock_produce_to_kafka.call_args[1] - assert PreprodFeature.BUILD_DISTRIBUTION not in call_kwargs["requested_features"] - - @patch( - "sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.produce_preprod_artifact_to_kafka" - ) - def test_rerun_analysis_includes_all_features_when_no_query(self, mock_produce_to_kafka): - """Test that rerun analysis includes all features when no query is set""" - from sentry.preprod.producer import PreprodFeature - - artifact = self.create_preprod_artifact( - project=self.project, - app_name="MyApp", - app_id="com.my.app", - build_version="1.0.0", - build_number=1, - state=PreprodArtifact.ArtifactState.PROCESSED, - ) - - # Don't set any query filters - should include all features - - self.get_success_response(self.organization.slug, artifact.id, status_code=200) - - # Verify produce_preprod_artifact_to_kafka was called with both features - mock_produce_to_kafka.assert_called_once() - call_kwargs = mock_produce_to_kafka.call_args[1] - assert PreprodFeature.SIZE_ANALYSIS in call_kwargs["requested_features"] - assert PreprodFeature.BUILD_DISTRIBUTION in call_kwargs["requested_features"] - - @patch( - "sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.dispatch_taskbroker", - return_value=True, - ) - @patch( - "sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.produce_preprod_artifact_to_kafka" - ) - def test_rerun_dispatches_via_taskbroker_when_flag_enabled( - self, mock_produce_to_kafka, mockdispatch_taskbroker - ): - artifact = self.create_preprod_artifact( - project=self.project, - app_name="MyApp", - app_id="com.my.app", - build_version="1.0.0", - build_number=1, - state=PreprodArtifact.ArtifactState.PROCESSED, - ) - - with self.feature("organizations:launchpad-taskbroker-rollout"): - self.get_success_response(self.organization.slug, artifact.id, status_code=200) - - mock_produce_to_kafka.assert_not_called() - mockdispatch_taskbroker.assert_called_once() + mock_cleanup.assert_called_once() - @patch( - "sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.produce_preprod_artifact_to_kafka" - ) - def test_rerun_analysis_includes_feature_on_invalid_query(self, mock_produce_to_kafka): - """Test that rerun analysis includes features when query is invalid""" - from sentry.preprod.producer import PreprodFeature + @patch("sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.cleanup_old_metrics") + def test_rerun_analysis_runs_cleanup_on_invalid_query(self, mock_cleanup): from sentry.preprod.quotas import SIZE_ENABLED_QUERY_KEY artifact = self.create_preprod_artifact( @@ -316,16 +233,11 @@ def test_rerun_analysis_includes_feature_on_invalid_query(self, mock_produce_to_ state=PreprodArtifact.ArtifactState.PROCESSED, ) - # Set up an invalid query filter self.project.update_option(SIZE_ENABLED_QUERY_KEY, "invalid_field:value") self.get_success_response(self.organization.slug, artifact.id, status_code=200) - # Verify produce_preprod_artifact_to_kafka was called with SIZE_ANALYSIS - # (invalid query should be skipped, allowing the feature) - mock_produce_to_kafka.assert_called_once() - call_kwargs = mock_produce_to_kafka.call_args[1] - assert PreprodFeature.SIZE_ANALYSIS in call_kwargs["requested_features"] + mock_cleanup.assert_called_once() class PreprodArtifactAdminRerunAnalysisTest(BaseRerunAnalysisTest): @@ -424,28 +336,3 @@ def test_rerun_analysis_invalid_id(self) -> None: assert ( "preprod_artifact_id is required and must be a valid integer" in response.data["detail"] ) - - @patch( - "sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.dispatch_taskbroker", - return_value=True, - ) - @patch( - "sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.produce_preprod_artifact_to_kafka" - ) - def test_rerun_dispatches_via_taskbroker_when_flag_enabled( - self, mock_produce_to_kafka, mockdispatch_taskbroker - ): - artifact = self.create_preprod_artifact( - project=self.project, - app_name="MyApp", - app_id="com.my.app", - build_version="1.0.0", - build_number=1, - state=PreprodArtifact.ArtifactState.PROCESSED, - ) - - with self.feature("organizations:launchpad-taskbroker-rollout"): - self.get_success_response(preprod_artifact_id=artifact.id, status_code=200) - - mock_produce_to_kafka.assert_not_called() - mockdispatch_taskbroker.assert_called_once() diff --git a/tests/sentry/preprod/test_tasks.py b/tests/sentry/preprod/test_tasks.py index 61424fcb9376d9..05897fa890ebd7 100644 --- a/tests/sentry/preprod/test_tasks.py +++ b/tests/sentry/preprod/test_tasks.py @@ -391,119 +391,9 @@ def test_assemble_preprod_artifact_nonexistent_project(self) -> None: # Note: Tests currently expect ERROR state because the task tries to access # assemble_result.build_configuration which doesn't exist - @patch("sentry.preprod.tasks.produce_preprod_artifact_to_kafka") - def test_assemble_preprod_artifact_includes_all_features_when_no_query( - self, mock_produce_to_kafka - ) -> None: - """Test that assemble_preprod_artifact includes all features when no query is set""" - from sentry.preprod.producer import PreprodFeature - - content = b"test preprod artifact content no query" - fileobj = ContentFile(content) - total_checksum = sha1(content).hexdigest() - - blob = FileBlob.from_file_with_organization(fileobj, self.organization) - - artifact = create_preprod_artifact( - org_id=self.organization.id, - project_id=self.project.id, - checksum=total_checksum, - build_configuration_name="release", - ) - assert artifact is not None - - # Don't set any query filters - should include all features - - assemble_preprod_artifact( - org_id=self.organization.id, - project_id=self.project.id, - checksum=total_checksum, - chunks=[blob.checksum], - artifact_id=artifact.id, - ) - - # Verify produce_preprod_artifact_to_kafka was called with both features - mock_produce_to_kafka.assert_called_once() - call_kwargs = mock_produce_to_kafka.call_args[1] - assert PreprodFeature.SIZE_ANALYSIS in call_kwargs["requested_features"] - assert PreprodFeature.BUILD_DISTRIBUTION in call_kwargs["requested_features"] - - @patch("sentry.preprod.tasks.produce_preprod_artifact_to_kafka") - def test_assemble_preprod_artifact_includes_feature_on_invalid_query( - self, mock_produce_to_kafka - ) -> None: - """Test that assemble_preprod_artifact includes features when query is invalid""" - from sentry.preprod.producer import PreprodFeature - from sentry.preprod.quotas import SIZE_ENABLED_QUERY_KEY - - content = b"test preprod artifact content invalid query" - fileobj = ContentFile(content) - total_checksum = sha1(content).hexdigest() - - blob = FileBlob.from_file_with_organization(fileobj, self.organization) - - artifact = create_preprod_artifact( - org_id=self.organization.id, - project_id=self.project.id, - checksum=total_checksum, - build_configuration_name="release", - ) - assert artifact is not None - - # Set up an invalid query filter - self.project.update_option(SIZE_ENABLED_QUERY_KEY, "invalid_field:value") - - assemble_preprod_artifact( - org_id=self.organization.id, - project_id=self.project.id, - checksum=total_checksum, - chunks=[blob.checksum], - artifact_id=artifact.id, - ) - - # Verify produce_preprod_artifact_to_kafka was called with SIZE_ANALYSIS - # (invalid query should be skipped, allowing the feature) - mock_produce_to_kafka.assert_called_once() - call_kwargs = mock_produce_to_kafka.call_args[1] - assert PreprodFeature.SIZE_ANALYSIS in call_kwargs["requested_features"] - @patch("sentry.preprod.tasks.dispatch_taskbroker") - @patch("sentry.preprod.tasks.produce_preprod_artifact_to_kafka") - def test_only_taskbroker_dispatched_when_flag_enabled( - self, mock_produce_to_kafka, mock_shadow - ) -> None: - content = b"test shadow taskbroker dispatch" - fileobj = ContentFile(content) - total_checksum = sha1(content).hexdigest() - - blob = FileBlob.from_file_with_organization(fileobj, self.organization) - - artifact = create_preprod_artifact( - org_id=self.organization.id, - project_id=self.project.id, - checksum=total_checksum, - build_configuration_name="release", - ) - assert artifact is not None - - with self.feature("organizations:launchpad-taskbroker-rollout"): - assemble_preprod_artifact( - org_id=self.organization.id, - project_id=self.project.id, - checksum=total_checksum, - chunks=[blob.checksum], - artifact_id=artifact.id, - ) - - mock_produce_to_kafka.assert_not_called() - mock_shadow.assert_called_once_with(self.project.id, self.organization.id, artifact.id) - - @patch("sentry.preprod.tasks.dispatch_taskbroker") - @patch("sentry.preprod.tasks.produce_preprod_artifact_to_kafka") - def test_only_kafka_dispatched_when_flag_disabled( - self, mock_produce_to_kafka, mock_shadow - ) -> None: - content = b"test only kafka dispatch when flag disabled" + def test_assemble_preprod_artifact_dispatches_to_taskbroker(self, mock_dispatch) -> None: + content = b"test preprod artifact content no query" fileobj = ContentFile(content) total_checksum = sha1(content).hexdigest() @@ -525,10 +415,7 @@ def test_only_kafka_dispatched_when_flag_disabled( artifact_id=artifact.id, ) - mock_produce_to_kafka.assert_called_once() - mock_shadow.assert_not_called() - artifact.refresh_from_db() - assert artifact.state != PreprodArtifact.ArtifactState.FAILED + mock_dispatch.assert_called_once_with(self.project.id, self.organization.id, artifact.id) class CreatePreprodArtifactTest(TestCase):