From 843a35bf5c08c50598d002d371a7694f9b1dbb03 Mon Sep 17 00:00:00 2001 From: Francesco Vigliaturo Date: Thu, 20 Nov 2025 09:10:57 +0100 Subject: [PATCH] fix(eu_data_export): fixes sts transfer job schedule Fixes data export TransferJob configuration to ensure jobs are one-shot. * Removed misleading job_duration parameter from create_transfer_job and related export functions, as the GCS Transfer Service uses schedule_end_date == schedule_start_date to define a single-run job. --- src/sentry/profiles/data_export.py | 5 ----- src/sentry/replays/data_export.py | 18 +++++------------- tests/sentry/replays/unit/test_data_export.py | 10 +++------- 3 files changed, 8 insertions(+), 25 deletions(-) diff --git a/src/sentry/profiles/data_export.py b/src/sentry/profiles/data_export.py index 791c51c67b9d28..19fae6409f368a 100644 --- a/src/sentry/profiles/data_export.py +++ b/src/sentry/profiles/data_export.py @@ -1,5 +1,4 @@ import logging -from datetime import timedelta from google.cloud.storage_transfer_v1 import TransferJob @@ -8,7 +7,6 @@ logger = logging.getLogger(__name__) -EXPORT_JOB_DURATION_DEFAULT = timedelta(days=5) EXPORT_JOB_SOURCE_BUCKET = "sentryio-profiles" @@ -17,7 +15,6 @@ def export_profiles_data( gcp_project_id: str, destination_bucket: str, destination_prefix: str, - blob_export_job_duration: timedelta = EXPORT_JOB_DURATION_DEFAULT, source_bucket: str = EXPORT_JOB_SOURCE_BUCKET, pubsub_topic_name: str | None = None, ) -> TransferJob: @@ -27,7 +24,6 @@ def export_profiles_data( "organization_id": organization_id, "gcp_project_id": gcp_project_id, "destination_bucket": destination_bucket, - "blob_export_job_duration": str(blob_export_job_duration), "source_bucket": source_bucket, "pubsub_topic_name": pubsub_topic_name, }, @@ -49,7 +45,6 @@ def export_profiles_data( destination_prefix=destination_prefix, notification_topic=pubsub_topic_name, job_description="Profiles EU Compliance Export", - job_duration=blob_export_job_duration, do_create_transfer_job=request_create_transfer_job, ) logger.info("Successfully scheduled recording export job.") diff --git a/src/sentry/replays/data_export.py b/src/sentry/replays/data_export.py index 1e3dff441e63e4..accc446eaddab6 100644 --- a/src/sentry/replays/data_export.py +++ b/src/sentry/replays/data_export.py @@ -45,7 +45,6 @@ logger = logging.getLogger(__name__) -EXPORT_JOB_DURATION_DEFAULT = timedelta(days=5) EXPORT_JOB_SOURCE_BUCKET = "sentry-replays" EXPORT_QUERY_ROWS_PER_PAGE = 1000 EXPORT_QUERY_PAGES_PER_TASK = 10 @@ -140,7 +139,6 @@ def create_transfer_job[T]( destination_bucket: str, destination_prefix: str, job_description: str, - job_duration: timedelta, do_create_transfer_job: Callable[[CreateTransferJobRequest], T], notification_topic: str | None = None, get_current_datetime: Callable[[], datetime] = lambda: datetime.now(tz=timezone.utc), @@ -149,9 +147,9 @@ def create_transfer_job[T]( Create a transfer-job which copies a bucket by prefix to another bucket. Transfer jobs are templates for transfer-job-runs. Transfer jobs are run based on a schedule. - They start immediately and run until the job_duration value. Automatic run creation based on - the schedule is one-time only. If it fails or you want to run the transfer-job twice you will - need to manually create a transfer-job-run on the second attempt. + To run a job once, the schedule start and end dates are set to the same day. + Automatic run creation based on the schedule is one-time only. If it fails or you want to run + the transfer-job twice you will need to manually create a transfer-job-run on the second attempt. Failure notifications are handled by pubsub. When the transfer service fails it will send a notification to the specified topic. That topic should be configured to propagate the failure @@ -162,15 +160,14 @@ def create_transfer_job[T]( :param source_bucket: :param source_prefix: :param destination_bucket: - :param job_duration: The amount of time the job should take to complete. Longer runs put less - pressure on our buckets. :param destination_prefix: :param notification_topic: topic to which we'll notify the success or failure of the transfer. :param do_create_transfer_job: Injected function which creates the transfer-job. :param get_current_datetime: Injected function which computes the current datetime. """ date_job_starts = get_current_datetime() - date_job_ends = date_job_starts + job_duration + # To make this a one-shot job, the start and end dates must be the same. + date_job_ends = date_job_starts transfer_job = TransferJob( description=job_description, @@ -576,7 +573,6 @@ def export_replay_blob_data[T]( gcp_project_id: str, destination_bucket: str, destination_prefix: str, - job_duration: timedelta, do_create_transfer_job: Callable[[CreateTransferJobRequest], T], pubsub_topic_name: str | None = None, source_bucket: str = EXPORT_JOB_SOURCE_BUCKET, @@ -598,7 +594,6 @@ def export_replay_blob_data[T]( destination_prefix=destination_prefix, notification_topic=pubsub_topic_name, job_description="Session Replay EU Compliance Export", - job_duration=job_duration, do_create_transfer_job=do_create_transfer_job, ) @@ -608,7 +603,6 @@ def export_replay_data( gcp_project_id: str, destination_bucket: str, destination_prefix: str, - blob_export_job_duration: timedelta = EXPORT_JOB_DURATION_DEFAULT, database_rows_per_page: int = EXPORT_QUERY_ROWS_PER_PAGE, database_pages_per_task: int = EXPORT_QUERY_PAGES_PER_TASK, source_bucket: str = EXPORT_JOB_SOURCE_BUCKET, @@ -620,7 +614,6 @@ def export_replay_data( "organization_id": organization_id, "gcp_project_id": gcp_project_id, "destination_bucket": destination_bucket, - "blob_export_job_duration": str(blob_export_job_duration), "database_rows_per_page": database_rows_per_page, "database_pages_per_task": database_pages_per_task, "source_bucket": source_bucket, @@ -658,7 +651,6 @@ def export_replay_data( destination_prefix=destination_prefix, pubsub_topic_name=pubsub_topic_name, source_bucket=source_bucket, - job_duration=blob_export_job_duration, do_create_transfer_job=request_create_transfer_job, ) logger.info("Successfully scheduled recording export job.") diff --git a/tests/sentry/replays/unit/test_data_export.py b/tests/sentry/replays/unit/test_data_export.py index 5f9dfa8949796b..c546ed175637fd 100644 --- a/tests/sentry/replays/unit/test_data_export.py +++ b/tests/sentry/replays/unit/test_data_export.py @@ -1,5 +1,5 @@ import base64 -from datetime import datetime, timedelta +from datetime import datetime from google.cloud import storage_transfer_v1 from google.cloud.storage_transfer_v1 import ( @@ -30,8 +30,7 @@ def test_export_blob_data() -> None: bucket_prefix = "PREFIX" start_date = datetime(year=2025, month=1, day=31) job_description = "something" - job_duration = timedelta(days=5) - end_date = start_date + job_duration + end_date = start_date result = create_transfer_job( gcp_project_id=gcs_project_id, @@ -41,7 +40,6 @@ def test_export_blob_data() -> None: destination_prefix="destination_prefix/", notification_topic=notification_topic, job_description=job_description, - job_duration=job_duration, transfer_job_name=None, do_create_transfer_job=lambda event: event, get_current_datetime=lambda: start_date, @@ -103,9 +101,7 @@ def test_retry_export_blob_data() -> None: def test_export_replay_blob_data() -> None: jobs = [] - export_replay_blob_data( - 1, "1", "test", "dest_prefix/", timedelta(days=1), lambda job: jobs.append(job) - ) + export_replay_blob_data(1, "1", "test", "dest_prefix/", lambda job: jobs.append(job)) # Assert a job is created for each retention-period. assert len(jobs) == 3