diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index a9a4944d73c758..529fe0828e25a7 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -2691,7 +2691,6 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]: "shared-resources-usage": "default", "buffered-segments": "default", "buffered-segments-dlq": "default", - "preprod-artifact-events": "default", # Taskworker topics "taskworker": "default", "taskworker-dlq": "default", @@ -3026,8 +3025,6 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]: SENTRY_PROFILE_OCCURRENCES_FUTURES_MAX_LIMIT = 10000 SENTRY_PROFILE_EAP_FUTURES_MAX_LIMIT = 10000 -SENTRY_PREPROD_ARTIFACT_EVENTS_FUTURES_MAX_LIMIT = 10000 - # How long we should wait for a gateway proxy request to return before giving up GATEWAY_PROXY_TIMEOUT: int | None = ( int(os.environ["SENTRY_APIGW_PROXY_TIMEOUT"]) diff --git a/src/sentry/conf/types/kafka_definition.py b/src/sentry/conf/types/kafka_definition.py index f6268e9bf891ce..34d8d77f44a6bc 100644 --- a/src/sentry/conf/types/kafka_definition.py +++ b/src/sentry/conf/types/kafka_definition.py @@ -53,7 +53,6 @@ class Topic(Enum): INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings" INGEST_OCCURRENCES = "ingest-occurrences" INGEST_MONITORS = "ingest-monitors" - PREPROD_ARTIFACT_EVENTS = "preprod-artifact-events" MONITORS_CLOCK_TICK = "monitors-clock-tick" MONITORS_CLOCK_TASKS = "monitors-clock-tasks" MONITORS_INCIDENT_OCCURRENCES = "monitors-incident-occurrences" diff --git a/src/sentry/features/temporary.py b/src/sentry/features/temporary.py index 5a771c7d662412..78cdf5c1b53c2d 100644 --- a/src/sentry/features/temporary.py +++ b/src/sentry/features/temporary.py @@ -157,8 +157,6 @@ def register_temporary_features(manager: FeatureManager) -> None: manager.add("organizations:invite-members", OrganizationFeature, FeatureHandlerStrategy.INTERNAL, default=True, api_expose=True) # Enable rate limits for inviting members. manager.add("organizations:invite-members-rate-limits", OrganizationFeature, FeatureHandlerStrategy.INTERNAL, default=True, api_expose=False) - # Enable rollout of launchpad taskbroker shadowing/usage - manager.add("organizations:launchpad-taskbroker-rollout", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True) manager.add("organizations:mep-use-default-tags", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False) # Enable flamegraph visualization for MetricKit hang diagnostic stack traces manager.add("organizations:metrickit-flamegraph", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True) diff --git a/src/sentry/preprod/api/endpoints/preprod_artifact_rerun_analysis.py b/src/sentry/preprod/api/endpoints/preprod_artifact_rerun_analysis.py index c532dc73e02f3c..014114cf4db543 100644 --- a/src/sentry/preprod/api/endpoints/preprod_artifact_rerun_analysis.py +++ b/src/sentry/preprod/api/endpoints/preprod_artifact_rerun_analysis.py @@ -8,7 +8,7 @@ from rest_framework.request import Request from rest_framework.response import Response -from sentry import analytics, features +from sentry import analytics from sentry.api.api_owners import ApiOwner from sentry.api.api_publish_status import ApiPublishStatus from sentry.api.base import Endpoint, cell_silo_endpoint, internal_cell_silo_endpoint @@ -22,8 +22,7 @@ PreprodArtifactSizeComparison, PreprodArtifactSizeMetrics, ) -from sentry.preprod.producer import PreprodFeature, produce_preprod_artifact_to_kafka -from sentry.preprod.quotas import should_run_distribution, should_run_size +from sentry.preprod.quotas import should_run_size from sentry.preprod.tasks import dispatch_taskbroker logger = logging.getLogger(__name__) @@ -65,47 +64,12 @@ def post( organization = head_artifact.project.organization - # Empty list is valid - triggers default processing behavior - requested_features: list[PreprodFeature] = [] - run_size, _ = should_run_size(head_artifact, actor=request.user) if run_size: - requested_features.append(PreprodFeature.SIZE_ANALYSIS) - - run_distribution, _ = should_run_distribution(head_artifact, actor=request.user) - if run_distribution: - requested_features.append(PreprodFeature.BUILD_DISTRIBUTION) - - if PreprodFeature.SIZE_ANALYSIS in requested_features: cleanup_old_metrics(head_artifact) reset_artifact_data(head_artifact) - if features.has("organizations:launchpad-taskbroker-rollout", organization): - dispatched = dispatch_taskbroker( - head_artifact.project.id, organization.id, head_artifact_id - ) - else: - try: - produce_preprod_artifact_to_kafka( - project_id=head_artifact.project.id, - organization_id=organization.id, - artifact_id=head_artifact_id, - requested_features=requested_features, - ) - dispatched = True - except Exception: - logger.exception( - "preprod_artifact.rerun_analysis.dispatch_error", - extra={ - "artifact_id": head_artifact_id, - "user_id": request.user.id, - "organization_id": organization.id, - "project_id": head_artifact.project.id, - }, - ) - dispatched = False - - if not dispatched: + if not dispatch_taskbroker(head_artifact.project.id, organization.id, head_artifact_id): return Response( { "detail": f"Failed to queue analysis for artifact {head_artifact_id}", @@ -182,36 +146,9 @@ def post(self, request: Request) -> Response: reset_artifact_data(preprod_artifact) organization = preprod_artifact.project.organization - if features.has("organizations:launchpad-taskbroker-rollout", organization): - dispatched = dispatch_taskbroker( - preprod_artifact.project.id, organization.id, preprod_artifact_id - ) - else: - try: - produce_preprod_artifact_to_kafka( - project_id=preprod_artifact.project.id, - organization_id=organization.id, - artifact_id=preprod_artifact_id, - requested_features=[ - PreprodFeature.SIZE_ANALYSIS, - PreprodFeature.BUILD_DISTRIBUTION, - ], - ) - dispatched = True - except Exception as e: - logger.exception( - "preprod_artifact.admin_rerun_analysis.dispatch_error", - extra={ - "artifact_id": preprod_artifact_id, - "user_id": request.user.id, - "organization_id": organization.id, - "project_id": preprod_artifact.project.id, - "error": str(e), - }, - ) - dispatched = False - - if not dispatched: + if not dispatch_taskbroker( + preprod_artifact.project.id, organization.id, preprod_artifact_id + ): return Response( { "detail": f"Failed to queue analysis for artifact {preprod_artifact_id}", @@ -303,32 +240,7 @@ def post(self, request: Request) -> Response: cleanup_stats = cleanup_old_metrics(artifact) reset_artifact_data(artifact) - if features.has("organizations:launchpad-taskbroker-rollout", organization): - dispatched = dispatch_taskbroker(artifact.project.id, organization.id, artifact_id) - else: - try: - produce_preprod_artifact_to_kafka( - project_id=artifact.project.id, - organization_id=organization.id, - artifact_id=artifact_id, - requested_features=[ - PreprodFeature.SIZE_ANALYSIS, - PreprodFeature.BUILD_DISTRIBUTION, - ], - ) - dispatched = True - except Exception: - logger.exception( - "preprod_artifact.admin_batch_rerun_analysis.dispatch_error", - extra={ - "artifact_id": artifact_id, - "user_id": request.user.id, - "organization_id": organization.id, - "project_id": artifact.project.id, - }, - ) - dispatched = False - + dispatched = dispatch_taskbroker(artifact.project.id, organization.id, artifact_id) if not dispatched: artifact.refresh_from_db() @@ -367,14 +279,10 @@ def cleanup_old_metrics(preprod_artifact: PreprodArtifact) -> CleanupStats: PreprodArtifactSizeMetrics.objects.filter(preprod_artifact=preprod_artifact) ) - file_ids_to_delete = [] - if size_metrics: size_metric_ids = [sm.id for sm in size_metrics] - for size_metric in size_metrics: - if size_metric.analysis_file_id: - file_ids_to_delete.append(size_metric.analysis_file_id) + file_ids_to_delete = [sm.analysis_file_id for sm in size_metrics if sm.analysis_file_id] comparisons = PreprodArtifactSizeComparison.objects.filter( head_size_analysis_id__in=size_metric_ids @@ -392,10 +300,9 @@ def cleanup_old_metrics(preprod_artifact: PreprodArtifact) -> CleanupStats: id__in=size_metric_ids ).delete() - if file_ids_to_delete: - for file in File.objects.filter(id__in=file_ids_to_delete): - file.delete() - stats.files_total_deleted += 1 + for file in File.objects.filter(id__in=file_ids_to_delete): + file.delete() + stats.files_total_deleted += 1 PreprodArtifactSizeMetrics.objects.create( preprod_artifact=preprod_artifact, diff --git a/src/sentry/preprod/api/endpoints/project_preprod_artifact_update.py b/src/sentry/preprod/api/endpoints/project_preprod_artifact_update.py index eb12b45f4b46e4..1fda7dde0c79a1 100644 --- a/src/sentry/preprod/api/endpoints/project_preprod_artifact_update.py +++ b/src/sentry/preprod/api/endpoints/project_preprod_artifact_update.py @@ -26,8 +26,7 @@ PreprodArtifactMobileAppInfo, PreprodArtifactSizeMetrics, ) -from sentry.preprod.producer import PreprodFeature -from sentry.preprod.quotas import should_run_distribution, should_run_size +from sentry.preprod.quotas import PreprodFeature, should_run_distribution, should_run_size from sentry.preprod.vcs.status_checks.size.tasks import create_preprod_status_check_task logger = logging.getLogger(__name__) diff --git a/src/sentry/preprod/producer.py b/src/sentry/preprod/producer.py deleted file mode 100644 index a02e66f3470ac9..00000000000000 --- a/src/sentry/preprod/producer.py +++ /dev/null @@ -1,63 +0,0 @@ -from __future__ import annotations - -import logging -from enum import Enum - -from arroyo import Topic as ArroyoTopic -from arroyo.backends.kafka import KafkaPayload, KafkaProducer -from confluent_kafka import KafkaException -from django.conf import settings - -from sentry.conf.types.kafka_definition import Topic -from sentry.utils import json -from sentry.utils.arroyo_producer import SingletonProducer, get_arroyo_producer -from sentry.utils.kafka_config import get_topic_definition - -logger = logging.getLogger(__name__) - - -class PreprodFeature(Enum): - SIZE_ANALYSIS = "size_analysis" - BUILD_DISTRIBUTION = "build_distribution" - - -def _get_preprod_producer() -> KafkaProducer: - return get_arroyo_producer( - "sentry.preprod.producer", - Topic.PREPROD_ARTIFACT_EVENTS, - exclude_config_keys=["compression.type", "message.max.bytes"], - ) - - -_preprod_producer = SingletonProducer( - _get_preprod_producer, max_futures=settings.SENTRY_PREPROD_ARTIFACT_EVENTS_FUTURES_MAX_LIMIT -) - - -def produce_preprod_artifact_to_kafka( - project_id: int, - organization_id: int, - artifact_id: int, - requested_features: list[PreprodFeature] | None = None, -) -> None: - if requested_features is None: - requested_features = [] - payload_data = { - "artifact_id": str(artifact_id), - "project_id": str(project_id), - "organization_id": str(organization_id), - "requested_features": [feature.value for feature in requested_features], - } - - partition_key = f"{project_id}-{artifact_id}".encode() - payload = KafkaPayload(partition_key, json.dumps(payload_data).encode("utf-8"), []) - - try: - topic = get_topic_definition(Topic.PREPROD_ARTIFACT_EVENTS)["real_topic_name"] - _preprod_producer.produce(ArroyoTopic(topic), payload) - except KafkaException: - logger.exception( - "Failed to send preprod artifact message to Kafka", - extra={"artifact_id": artifact_id, "project_id": project_id}, - ) - raise # Re-raise to trigger task retry diff --git a/src/sentry/preprod/quotas.py b/src/sentry/preprod/quotas.py index 7a18bd716af179..a19c29f65e9cd8 100644 --- a/src/sentry/preprod/quotas.py +++ b/src/sentry/preprod/quotas.py @@ -3,6 +3,7 @@ import logging from collections.abc import Callable from datetime import datetime, timedelta +from enum import Enum from typing import Any import sentry_sdk @@ -15,11 +16,16 @@ from sentry.models.organization import Organization from sentry.preprod.artifact_search import artifact_matches_query from sentry.preprod.models import PreprodArtifact -from sentry.preprod.producer import PreprodFeature from sentry.users.models.user import User logger = logging.getLogger(__name__) + +class PreprodFeature(Enum): + SIZE_ANALYSIS = "size_analysis" + BUILD_DISTRIBUTION = "build_distribution" + + DEFAULT_SIZE_RETENTION_DAYS = 90 diff --git a/src/sentry/preprod/tasks.py b/src/sentry/preprod/tasks.py index 653355ca9a2c1a..6306e10210f732 100644 --- a/src/sentry/preprod/tasks.py +++ b/src/sentry/preprod/tasks.py @@ -11,7 +11,6 @@ from django.utils import timezone from taskbroker_client.retry import Retry -from sentry import features from sentry.constants import DataCategory from sentry.models.commitcomparison import CommitComparison from sentry.models.organization import Organization @@ -28,7 +27,6 @@ PreprodArtifactSizeMetrics, PreprodBuildConfiguration, ) -from sentry.preprod.producer import PreprodFeature, produce_preprod_artifact_to_kafka from sentry.preprod.quotas import ( has_installable_quota, has_size_quota, @@ -165,14 +163,9 @@ def assemble_preprod_artifact( except Exception: pass - if features.has("organizations:launchpad-taskbroker-rollout", organization): - taskbroker_dispatched = dispatch_taskbroker(project_id, org_id, artifact_id) - if not taskbroker_dispatched: - return - else: - kafka_dispatched = _dispatch_kafka(project_id, org_id, artifact_id, checksum) - if not kafka_dispatched: - return + taskbroker_dispatched = dispatch_taskbroker(project_id, org_id, artifact_id) + if not taskbroker_dispatched: + return logger.info( "Finished preprod artifact dispatch", @@ -976,47 +969,6 @@ def detect_expired_preprod_artifacts() -> None: ) -def _dispatch_kafka(project_id: int, org_id: int, artifact_id: int, checksum: str) -> bool: - # Note: requested_features is no longer used for filtering - all features are - # requested here, and the actual quota/filter checks happen in the update endpoint - # (project_preprod_artifact_update.py) after preprocessing completes. - try: - produce_preprod_artifact_to_kafka( - project_id=project_id, - organization_id=org_id, - artifact_id=artifact_id, - requested_features=[ - PreprodFeature.SIZE_ANALYSIS, - PreprodFeature.BUILD_DISTRIBUTION, - ], - ) - return True - except Exception as e: - user_friendly_error_message = "Failed to dispatch preprod artifact event for analysis" - sentry_sdk.capture_exception(e) - logger.exception( - user_friendly_error_message, - extra={ - "project_id": project_id, - "organization_id": org_id, - "checksum": checksum, - "preprod_artifact_id": artifact_id, - }, - ) - PreprodArtifact.objects.filter(id=artifact_id).update( - state=PreprodArtifact.ArtifactState.FAILED, - error_code=PreprodArtifact.ErrorCode.ARTIFACT_PROCESSING_ERROR, - error_message=user_friendly_error_message, - ) - create_preprod_status_check_task.apply_async( - kwargs={ - "preprod_artifact_id": artifact_id, - "caller": "assemble_dispatch_error", - } - ) - return False - - def dispatch_taskbroker(project_id: int, org_id: int, artifact_id: int) -> bool: try: logger.info( diff --git a/tests/sentry/preprod/api/endpoints/test_preprod_artifact_rerun_analysis.py b/tests/sentry/preprod/api/endpoints/test_preprod_artifact_rerun_analysis.py index 557edd1f2524a2..72b825b96a9029 100644 --- a/tests/sentry/preprod/api/endpoints/test_preprod_artifact_rerun_analysis.py +++ b/tests/sentry/preprod/api/endpoints/test_preprod_artifact_rerun_analysis.py @@ -167,12 +167,8 @@ def test_rerun_analysis_clears_distribution_error_fields(self) -> None: self.assert_artifact_reset(artifact) - @patch( - "sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.produce_preprod_artifact_to_kafka" - ) - def test_rerun_analysis_filters_size_analysis_by_query(self, mock_produce_to_kafka): - """Test that rerun analysis filters SIZE_ANALYSIS based on project query setting""" - from sentry.preprod.producer import PreprodFeature + @patch("sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.cleanup_old_metrics") + def test_rerun_analysis_skips_cleanup_when_size_query_excludes_artifact(self, mock_cleanup): from sentry.preprod.quotas import SIZE_ENABLED_QUERY_KEY artifact = self.create_preprod_artifact( @@ -184,22 +180,14 @@ def test_rerun_analysis_filters_size_analysis_by_query(self, mock_produce_to_kaf state=PreprodArtifact.ArtifactState.PROCESSED, ) - # Set up a query filter that should NOT match the artifact self.project.update_option(SIZE_ENABLED_QUERY_KEY, "app_id:com.other.app") self.get_success_response(self.organization.slug, artifact.id, status_code=200) - # Verify produce_preprod_artifact_to_kafka was called without SIZE_ANALYSIS - mock_produce_to_kafka.assert_called_once() - call_kwargs = mock_produce_to_kafka.call_args[1] - assert PreprodFeature.SIZE_ANALYSIS not in call_kwargs["requested_features"] - - @patch( - "sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.produce_preprod_artifact_to_kafka" - ) - def test_rerun_analysis_includes_size_analysis_when_query_matches(self, mock_produce_to_kafka): - """Test that rerun analysis includes SIZE_ANALYSIS when query matches""" - from sentry.preprod.producer import PreprodFeature + mock_cleanup.assert_not_called() + + @patch("sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.cleanup_old_metrics") + def test_rerun_analysis_runs_cleanup_when_size_query_matches(self, mock_cleanup): from sentry.preprod.quotas import SIZE_ENABLED_QUERY_KEY artifact = self.create_preprod_artifact( @@ -211,24 +199,14 @@ def test_rerun_analysis_includes_size_analysis_when_query_matches(self, mock_pro state=PreprodArtifact.ArtifactState.PROCESSED, ) - # Set up a query filter that SHOULD match the artifact self.project.update_option(SIZE_ENABLED_QUERY_KEY, "app_id:com.my.app") self.get_success_response(self.organization.slug, artifact.id, status_code=200) - # Verify produce_preprod_artifact_to_kafka was called with SIZE_ANALYSIS - mock_produce_to_kafka.assert_called_once() - call_kwargs = mock_produce_to_kafka.call_args[1] - assert PreprodFeature.SIZE_ANALYSIS in call_kwargs["requested_features"] - - @patch( - "sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.produce_preprod_artifact_to_kafka" - ) - def test_rerun_analysis_filters_distribution_by_query(self, mock_produce_to_kafka): - """Test that rerun analysis filters BUILD_DISTRIBUTION based on project query setting""" - from sentry.preprod.producer import PreprodFeature - from sentry.preprod.quotas import DISTRIBUTION_ENABLED_QUERY_KEY + mock_cleanup.assert_called_once() + @patch("sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.cleanup_old_metrics") + def test_rerun_analysis_runs_cleanup_when_no_query_set(self, mock_cleanup): artifact = self.create_preprod_artifact( project=self.project, app_name="MyApp", @@ -238,73 +216,12 @@ def test_rerun_analysis_filters_distribution_by_query(self, mock_produce_to_kafk state=PreprodArtifact.ArtifactState.PROCESSED, ) - # Set up a query filter that should NOT match the artifact - self.project.update_option(DISTRIBUTION_ENABLED_QUERY_KEY, "app_id:com.other.app") - self.get_success_response(self.organization.slug, artifact.id, status_code=200) - # Verify produce_preprod_artifact_to_kafka was called without BUILD_DISTRIBUTION - mock_produce_to_kafka.assert_called_once() - call_kwargs = mock_produce_to_kafka.call_args[1] - assert PreprodFeature.BUILD_DISTRIBUTION not in call_kwargs["requested_features"] - - @patch( - "sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.produce_preprod_artifact_to_kafka" - ) - def test_rerun_analysis_includes_all_features_when_no_query(self, mock_produce_to_kafka): - """Test that rerun analysis includes all features when no query is set""" - from sentry.preprod.producer import PreprodFeature - - artifact = self.create_preprod_artifact( - project=self.project, - app_name="MyApp", - app_id="com.my.app", - build_version="1.0.0", - build_number=1, - state=PreprodArtifact.ArtifactState.PROCESSED, - ) - - # Don't set any query filters - should include all features - - self.get_success_response(self.organization.slug, artifact.id, status_code=200) - - # Verify produce_preprod_artifact_to_kafka was called with both features - mock_produce_to_kafka.assert_called_once() - call_kwargs = mock_produce_to_kafka.call_args[1] - assert PreprodFeature.SIZE_ANALYSIS in call_kwargs["requested_features"] - assert PreprodFeature.BUILD_DISTRIBUTION in call_kwargs["requested_features"] - - @patch( - "sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.dispatch_taskbroker", - return_value=True, - ) - @patch( - "sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.produce_preprod_artifact_to_kafka" - ) - def test_rerun_dispatches_via_taskbroker_when_flag_enabled( - self, mock_produce_to_kafka, mockdispatch_taskbroker - ): - artifact = self.create_preprod_artifact( - project=self.project, - app_name="MyApp", - app_id="com.my.app", - build_version="1.0.0", - build_number=1, - state=PreprodArtifact.ArtifactState.PROCESSED, - ) - - with self.feature("organizations:launchpad-taskbroker-rollout"): - self.get_success_response(self.organization.slug, artifact.id, status_code=200) - - mock_produce_to_kafka.assert_not_called() - mockdispatch_taskbroker.assert_called_once() + mock_cleanup.assert_called_once() - @patch( - "sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.produce_preprod_artifact_to_kafka" - ) - def test_rerun_analysis_includes_feature_on_invalid_query(self, mock_produce_to_kafka): - """Test that rerun analysis includes features when query is invalid""" - from sentry.preprod.producer import PreprodFeature + @patch("sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.cleanup_old_metrics") + def test_rerun_analysis_runs_cleanup_on_invalid_query(self, mock_cleanup): from sentry.preprod.quotas import SIZE_ENABLED_QUERY_KEY artifact = self.create_preprod_artifact( @@ -316,16 +233,11 @@ def test_rerun_analysis_includes_feature_on_invalid_query(self, mock_produce_to_ state=PreprodArtifact.ArtifactState.PROCESSED, ) - # Set up an invalid query filter self.project.update_option(SIZE_ENABLED_QUERY_KEY, "invalid_field:value") self.get_success_response(self.organization.slug, artifact.id, status_code=200) - # Verify produce_preprod_artifact_to_kafka was called with SIZE_ANALYSIS - # (invalid query should be skipped, allowing the feature) - mock_produce_to_kafka.assert_called_once() - call_kwargs = mock_produce_to_kafka.call_args[1] - assert PreprodFeature.SIZE_ANALYSIS in call_kwargs["requested_features"] + mock_cleanup.assert_called_once() class PreprodArtifactAdminRerunAnalysisTest(BaseRerunAnalysisTest): @@ -424,28 +336,3 @@ def test_rerun_analysis_invalid_id(self) -> None: assert ( "preprod_artifact_id is required and must be a valid integer" in response.data["detail"] ) - - @patch( - "sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.dispatch_taskbroker", - return_value=True, - ) - @patch( - "sentry.preprod.api.endpoints.preprod_artifact_rerun_analysis.produce_preprod_artifact_to_kafka" - ) - def test_rerun_dispatches_via_taskbroker_when_flag_enabled( - self, mock_produce_to_kafka, mockdispatch_taskbroker - ): - artifact = self.create_preprod_artifact( - project=self.project, - app_name="MyApp", - app_id="com.my.app", - build_version="1.0.0", - build_number=1, - state=PreprodArtifact.ArtifactState.PROCESSED, - ) - - with self.feature("organizations:launchpad-taskbroker-rollout"): - self.get_success_response(preprod_artifact_id=artifact.id, status_code=200) - - mock_produce_to_kafka.assert_not_called() - mockdispatch_taskbroker.assert_called_once() diff --git a/tests/sentry/preprod/test_tasks.py b/tests/sentry/preprod/test_tasks.py index 61424fcb9376d9..05897fa890ebd7 100644 --- a/tests/sentry/preprod/test_tasks.py +++ b/tests/sentry/preprod/test_tasks.py @@ -391,119 +391,9 @@ def test_assemble_preprod_artifact_nonexistent_project(self) -> None: # Note: Tests currently expect ERROR state because the task tries to access # assemble_result.build_configuration which doesn't exist - @patch("sentry.preprod.tasks.produce_preprod_artifact_to_kafka") - def test_assemble_preprod_artifact_includes_all_features_when_no_query( - self, mock_produce_to_kafka - ) -> None: - """Test that assemble_preprod_artifact includes all features when no query is set""" - from sentry.preprod.producer import PreprodFeature - - content = b"test preprod artifact content no query" - fileobj = ContentFile(content) - total_checksum = sha1(content).hexdigest() - - blob = FileBlob.from_file_with_organization(fileobj, self.organization) - - artifact = create_preprod_artifact( - org_id=self.organization.id, - project_id=self.project.id, - checksum=total_checksum, - build_configuration_name="release", - ) - assert artifact is not None - - # Don't set any query filters - should include all features - - assemble_preprod_artifact( - org_id=self.organization.id, - project_id=self.project.id, - checksum=total_checksum, - chunks=[blob.checksum], - artifact_id=artifact.id, - ) - - # Verify produce_preprod_artifact_to_kafka was called with both features - mock_produce_to_kafka.assert_called_once() - call_kwargs = mock_produce_to_kafka.call_args[1] - assert PreprodFeature.SIZE_ANALYSIS in call_kwargs["requested_features"] - assert PreprodFeature.BUILD_DISTRIBUTION in call_kwargs["requested_features"] - - @patch("sentry.preprod.tasks.produce_preprod_artifact_to_kafka") - def test_assemble_preprod_artifact_includes_feature_on_invalid_query( - self, mock_produce_to_kafka - ) -> None: - """Test that assemble_preprod_artifact includes features when query is invalid""" - from sentry.preprod.producer import PreprodFeature - from sentry.preprod.quotas import SIZE_ENABLED_QUERY_KEY - - content = b"test preprod artifact content invalid query" - fileobj = ContentFile(content) - total_checksum = sha1(content).hexdigest() - - blob = FileBlob.from_file_with_organization(fileobj, self.organization) - - artifact = create_preprod_artifact( - org_id=self.organization.id, - project_id=self.project.id, - checksum=total_checksum, - build_configuration_name="release", - ) - assert artifact is not None - - # Set up an invalid query filter - self.project.update_option(SIZE_ENABLED_QUERY_KEY, "invalid_field:value") - - assemble_preprod_artifact( - org_id=self.organization.id, - project_id=self.project.id, - checksum=total_checksum, - chunks=[blob.checksum], - artifact_id=artifact.id, - ) - - # Verify produce_preprod_artifact_to_kafka was called with SIZE_ANALYSIS - # (invalid query should be skipped, allowing the feature) - mock_produce_to_kafka.assert_called_once() - call_kwargs = mock_produce_to_kafka.call_args[1] - assert PreprodFeature.SIZE_ANALYSIS in call_kwargs["requested_features"] - @patch("sentry.preprod.tasks.dispatch_taskbroker") - @patch("sentry.preprod.tasks.produce_preprod_artifact_to_kafka") - def test_only_taskbroker_dispatched_when_flag_enabled( - self, mock_produce_to_kafka, mock_shadow - ) -> None: - content = b"test shadow taskbroker dispatch" - fileobj = ContentFile(content) - total_checksum = sha1(content).hexdigest() - - blob = FileBlob.from_file_with_organization(fileobj, self.organization) - - artifact = create_preprod_artifact( - org_id=self.organization.id, - project_id=self.project.id, - checksum=total_checksum, - build_configuration_name="release", - ) - assert artifact is not None - - with self.feature("organizations:launchpad-taskbroker-rollout"): - assemble_preprod_artifact( - org_id=self.organization.id, - project_id=self.project.id, - checksum=total_checksum, - chunks=[blob.checksum], - artifact_id=artifact.id, - ) - - mock_produce_to_kafka.assert_not_called() - mock_shadow.assert_called_once_with(self.project.id, self.organization.id, artifact.id) - - @patch("sentry.preprod.tasks.dispatch_taskbroker") - @patch("sentry.preprod.tasks.produce_preprod_artifact_to_kafka") - def test_only_kafka_dispatched_when_flag_disabled( - self, mock_produce_to_kafka, mock_shadow - ) -> None: - content = b"test only kafka dispatch when flag disabled" + def test_assemble_preprod_artifact_dispatches_to_taskbroker(self, mock_dispatch) -> None: + content = b"test preprod artifact content no query" fileobj = ContentFile(content) total_checksum = sha1(content).hexdigest() @@ -525,10 +415,7 @@ def test_only_kafka_dispatched_when_flag_disabled( artifact_id=artifact.id, ) - mock_produce_to_kafka.assert_called_once() - mock_shadow.assert_not_called() - artifact.refresh_from_db() - assert artifact.state != PreprodArtifact.ArtifactState.FAILED + mock_dispatch.assert_called_once_with(self.project.id, self.organization.id, artifact.id) class CreatePreprodArtifactTest(TestCase):