Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2691,7 +2691,6 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
"shared-resources-usage": "default",
"buffered-segments": "default",
"buffered-segments-dlq": "default",
"preprod-artifact-events": "default",
# Taskworker topics
"taskworker": "default",
"taskworker-dlq": "default",
Expand Down Expand Up @@ -3026,8 +3025,6 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
SENTRY_PROFILE_OCCURRENCES_FUTURES_MAX_LIMIT = 10000
SENTRY_PROFILE_EAP_FUTURES_MAX_LIMIT = 10000

SENTRY_PREPROD_ARTIFACT_EVENTS_FUTURES_MAX_LIMIT = 10000

# How long we should wait for a gateway proxy request to return before giving up
GATEWAY_PROXY_TIMEOUT: int | None = (
int(os.environ["SENTRY_APIGW_PROXY_TIMEOUT"])
Expand Down
1 change: 0 additions & 1 deletion src/sentry/conf/types/kafka_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ class Topic(Enum):
INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings"
INGEST_OCCURRENCES = "ingest-occurrences"
INGEST_MONITORS = "ingest-monitors"
PREPROD_ARTIFACT_EVENTS = "preprod-artifact-events"
MONITORS_CLOCK_TICK = "monitors-clock-tick"
MONITORS_CLOCK_TASKS = "monitors-clock-tasks"
MONITORS_INCIDENT_OCCURRENCES = "monitors-incident-occurrences"
Expand Down
2 changes: 0 additions & 2 deletions src/sentry/features/temporary.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ def register_temporary_features(manager: FeatureManager) -> None:
manager.add("organizations:invite-members", OrganizationFeature, FeatureHandlerStrategy.INTERNAL, default=True, api_expose=True)
# Enable rate limits for inviting members.
manager.add("organizations:invite-members-rate-limits", OrganizationFeature, FeatureHandlerStrategy.INTERNAL, default=True, api_expose=False)
# Enable rollout of launchpad taskbroker shadowing/usage
manager.add("organizations:launchpad-taskbroker-rollout", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True)
manager.add("organizations:mep-use-default-tags", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)
# Enable flamegraph visualization for MetricKit hang diagnostic stack traces
manager.add("organizations:metrickit-flamegraph", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True)
Expand Down
115 changes: 11 additions & 104 deletions src/sentry/preprod/api/endpoints/preprod_artifact_rerun_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from rest_framework.request import Request
from rest_framework.response import Response

from sentry import analytics, features
from sentry import analytics
from sentry.api.api_owners import ApiOwner
from sentry.api.api_publish_status import ApiPublishStatus
from sentry.api.base import Endpoint, cell_silo_endpoint, internal_cell_silo_endpoint
Expand All @@ -22,8 +22,7 @@
PreprodArtifactSizeComparison,
PreprodArtifactSizeMetrics,
)
from sentry.preprod.producer import PreprodFeature, produce_preprod_artifact_to_kafka
from sentry.preprod.quotas import should_run_distribution, should_run_size
from sentry.preprod.quotas import should_run_size
from sentry.preprod.tasks import dispatch_taskbroker

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -65,47 +64,12 @@ def post(

organization = head_artifact.project.organization

# Empty list is valid - triggers default processing behavior
requested_features: list[PreprodFeature] = []

run_size, _ = should_run_size(head_artifact, actor=request.user)
if run_size:
requested_features.append(PreprodFeature.SIZE_ANALYSIS)

run_distribution, _ = should_run_distribution(head_artifact, actor=request.user)
if run_distribution:
requested_features.append(PreprodFeature.BUILD_DISTRIBUTION)

if PreprodFeature.SIZE_ANALYSIS in requested_features:
cleanup_old_metrics(head_artifact)
reset_artifact_data(head_artifact)

if features.has("organizations:launchpad-taskbroker-rollout", organization):
dispatched = dispatch_taskbroker(
head_artifact.project.id, organization.id, head_artifact_id
)
else:
try:
produce_preprod_artifact_to_kafka(
project_id=head_artifact.project.id,
organization_id=organization.id,
artifact_id=head_artifact_id,
requested_features=requested_features,
)
dispatched = True
except Exception:
logger.exception(
"preprod_artifact.rerun_analysis.dispatch_error",
extra={
"artifact_id": head_artifact_id,
"user_id": request.user.id,
"organization_id": organization.id,
"project_id": head_artifact.project.id,
},
)
dispatched = False

if not dispatched:
if not dispatch_taskbroker(head_artifact.project.id, organization.id, head_artifact_id):
return Response(
{
"detail": f"Failed to queue analysis for artifact {head_artifact_id}",
Expand Down Expand Up @@ -182,36 +146,9 @@ def post(self, request: Request) -> Response:
reset_artifact_data(preprod_artifact)

organization = preprod_artifact.project.organization
if features.has("organizations:launchpad-taskbroker-rollout", organization):
dispatched = dispatch_taskbroker(
preprod_artifact.project.id, organization.id, preprod_artifact_id
)
else:
try:
produce_preprod_artifact_to_kafka(
project_id=preprod_artifact.project.id,
organization_id=organization.id,
artifact_id=preprod_artifact_id,
requested_features=[
PreprodFeature.SIZE_ANALYSIS,
PreprodFeature.BUILD_DISTRIBUTION,
],
)
dispatched = True
except Exception as e:
logger.exception(
"preprod_artifact.admin_rerun_analysis.dispatch_error",
extra={
"artifact_id": preprod_artifact_id,
"user_id": request.user.id,
"organization_id": organization.id,
"project_id": preprod_artifact.project.id,
"error": str(e),
},
)
dispatched = False

if not dispatched:
if not dispatch_taskbroker(
preprod_artifact.project.id, organization.id, preprod_artifact_id
):
return Response(
{
"detail": f"Failed to queue analysis for artifact {preprod_artifact_id}",
Expand Down Expand Up @@ -303,32 +240,7 @@ def post(self, request: Request) -> Response:
cleanup_stats = cleanup_old_metrics(artifact)
reset_artifact_data(artifact)

if features.has("organizations:launchpad-taskbroker-rollout", organization):
dispatched = dispatch_taskbroker(artifact.project.id, organization.id, artifact_id)
else:
try:
produce_preprod_artifact_to_kafka(
project_id=artifact.project.id,
organization_id=organization.id,
artifact_id=artifact_id,
requested_features=[
PreprodFeature.SIZE_ANALYSIS,
PreprodFeature.BUILD_DISTRIBUTION,
],
)
dispatched = True
except Exception:
logger.exception(
"preprod_artifact.admin_batch_rerun_analysis.dispatch_error",
extra={
"artifact_id": artifact_id,
"user_id": request.user.id,
"organization_id": organization.id,
"project_id": artifact.project.id,
},
)
dispatched = False

dispatched = dispatch_taskbroker(artifact.project.id, organization.id, artifact_id)
if not dispatched:
artifact.refresh_from_db()

Expand Down Expand Up @@ -367,14 +279,10 @@ def cleanup_old_metrics(preprod_artifact: PreprodArtifact) -> CleanupStats:
PreprodArtifactSizeMetrics.objects.filter(preprod_artifact=preprod_artifact)
)

file_ids_to_delete = []

if size_metrics:
size_metric_ids = [sm.id for sm in size_metrics]

for size_metric in size_metrics:
if size_metric.analysis_file_id:
file_ids_to_delete.append(size_metric.analysis_file_id)
file_ids_to_delete = [sm.analysis_file_id for sm in size_metrics if sm.analysis_file_id]

comparisons = PreprodArtifactSizeComparison.objects.filter(
head_size_analysis_id__in=size_metric_ids
Expand All @@ -392,10 +300,9 @@ def cleanup_old_metrics(preprod_artifact: PreprodArtifact) -> CleanupStats:
id__in=size_metric_ids
).delete()

if file_ids_to_delete:
for file in File.objects.filter(id__in=file_ids_to_delete):
file.delete()
stats.files_total_deleted += 1
for file in File.objects.filter(id__in=file_ids_to_delete):
file.delete()
stats.files_total_deleted += 1

PreprodArtifactSizeMetrics.objects.create(
preprod_artifact=preprod_artifact,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
PreprodArtifactMobileAppInfo,
PreprodArtifactSizeMetrics,
)
from sentry.preprod.producer import PreprodFeature
from sentry.preprod.quotas import should_run_distribution, should_run_size
from sentry.preprod.quotas import PreprodFeature, should_run_distribution, should_run_size
from sentry.preprod.vcs.status_checks.size.tasks import create_preprod_status_check_task

logger = logging.getLogger(__name__)
Expand Down
63 changes: 0 additions & 63 deletions src/sentry/preprod/producer.py

This file was deleted.

8 changes: 7 additions & 1 deletion src/sentry/preprod/quotas.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
from collections.abc import Callable
from datetime import datetime, timedelta
from enum import Enum
from typing import Any

import sentry_sdk
Expand All @@ -15,11 +16,16 @@
from sentry.models.organization import Organization
from sentry.preprod.artifact_search import artifact_matches_query
from sentry.preprod.models import PreprodArtifact
from sentry.preprod.producer import PreprodFeature
from sentry.users.models.user import User

logger = logging.getLogger(__name__)


class PreprodFeature(Enum):
SIZE_ANALYSIS = "size_analysis"
BUILD_DISTRIBUTION = "build_distribution"


DEFAULT_SIZE_RETENTION_DAYS = 90


Expand Down
54 changes: 3 additions & 51 deletions src/sentry/preprod/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from django.utils import timezone
from taskbroker_client.retry import Retry

from sentry import features
from sentry.constants import DataCategory
from sentry.models.commitcomparison import CommitComparison
from sentry.models.organization import Organization
Expand All @@ -28,7 +27,6 @@
PreprodArtifactSizeMetrics,
PreprodBuildConfiguration,
)
from sentry.preprod.producer import PreprodFeature, produce_preprod_artifact_to_kafka
from sentry.preprod.quotas import (
has_installable_quota,
has_size_quota,
Expand Down Expand Up @@ -165,14 +163,9 @@ def assemble_preprod_artifact(
except Exception:
pass

if features.has("organizations:launchpad-taskbroker-rollout", organization):
taskbroker_dispatched = dispatch_taskbroker(project_id, org_id, artifact_id)
if not taskbroker_dispatched:
return
else:
kafka_dispatched = _dispatch_kafka(project_id, org_id, artifact_id, checksum)
if not kafka_dispatched:
return
taskbroker_dispatched = dispatch_taskbroker(project_id, org_id, artifact_id)
if not taskbroker_dispatched:
return

logger.info(
"Finished preprod artifact dispatch",
Expand Down Expand Up @@ -976,47 +969,6 @@ def detect_expired_preprod_artifacts() -> None:
)


def _dispatch_kafka(project_id: int, org_id: int, artifact_id: int, checksum: str) -> bool:
# Note: requested_features is no longer used for filtering - all features are
# requested here, and the actual quota/filter checks happen in the update endpoint
# (project_preprod_artifact_update.py) after preprocessing completes.
try:
produce_preprod_artifact_to_kafka(
project_id=project_id,
organization_id=org_id,
artifact_id=artifact_id,
requested_features=[
PreprodFeature.SIZE_ANALYSIS,
PreprodFeature.BUILD_DISTRIBUTION,
],
)
return True
except Exception as e:
user_friendly_error_message = "Failed to dispatch preprod artifact event for analysis"
sentry_sdk.capture_exception(e)
logger.exception(
user_friendly_error_message,
extra={
"project_id": project_id,
"organization_id": org_id,
"checksum": checksum,
"preprod_artifact_id": artifact_id,
},
)
PreprodArtifact.objects.filter(id=artifact_id).update(
state=PreprodArtifact.ArtifactState.FAILED,
error_code=PreprodArtifact.ErrorCode.ARTIFACT_PROCESSING_ERROR,
error_message=user_friendly_error_message,
)
create_preprod_status_check_task.apply_async(
kwargs={
"preprod_artifact_id": artifact_id,
"caller": "assemble_dispatch_error",
}
)
return False


def dispatch_taskbroker(project_id: int, org_id: int, artifact_id: int) -> bool:
try:
logger.info(
Expand Down
Loading
Loading