Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions src/sentry/profiles/data_export.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
from datetime import timedelta

from google.cloud.storage_transfer_v1 import TransferJob

Expand All @@ -8,7 +7,6 @@

logger = logging.getLogger(__name__)

EXPORT_JOB_DURATION_DEFAULT = timedelta(days=5)
EXPORT_JOB_SOURCE_BUCKET = "sentryio-profiles"


Expand All @@ -17,7 +15,6 @@ def export_profiles_data(
gcp_project_id: str,
destination_bucket: str,
destination_prefix: str,
blob_export_job_duration: timedelta = EXPORT_JOB_DURATION_DEFAULT,
source_bucket: str = EXPORT_JOB_SOURCE_BUCKET,
pubsub_topic_name: str | None = None,
) -> TransferJob:
Expand All @@ -27,7 +24,6 @@ def export_profiles_data(
"organization_id": organization_id,
"gcp_project_id": gcp_project_id,
"destination_bucket": destination_bucket,
"blob_export_job_duration": str(blob_export_job_duration),
"source_bucket": source_bucket,
"pubsub_topic_name": pubsub_topic_name,
},
Expand All @@ -49,7 +45,6 @@ def export_profiles_data(
destination_prefix=destination_prefix,
notification_topic=pubsub_topic_name,
job_description="Profiles EU Compliance Export",
job_duration=blob_export_job_duration,
do_create_transfer_job=request_create_transfer_job,
)
logger.info("Successfully scheduled recording export job.")
Expand Down
18 changes: 5 additions & 13 deletions src/sentry/replays/data_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@

logger = logging.getLogger(__name__)

EXPORT_JOB_DURATION_DEFAULT = timedelta(days=5)
EXPORT_JOB_SOURCE_BUCKET = "sentry-replays"
EXPORT_QUERY_ROWS_PER_PAGE = 1000
EXPORT_QUERY_PAGES_PER_TASK = 10
Expand Down Expand Up @@ -140,7 +139,6 @@ def create_transfer_job[T](
destination_bucket: str,
destination_prefix: str,
job_description: str,
job_duration: timedelta,
do_create_transfer_job: Callable[[CreateTransferJobRequest], T],
notification_topic: str | None = None,
get_current_datetime: Callable[[], datetime] = lambda: datetime.now(tz=timezone.utc),
Expand All @@ -149,9 +147,9 @@ def create_transfer_job[T](
Create a transfer-job which copies a bucket by prefix to another bucket.

Transfer jobs are templates for transfer-job-runs. Transfer jobs are run based on a schedule.
They start immediately and run until the job_duration value. Automatic run creation based on
the schedule is one-time only. If it fails or you want to run the transfer-job twice you will
need to manually create a transfer-job-run on the second attempt.
To run a job once, the schedule start and end dates are set to the same day.
Automatic run creation based on the schedule is one-time only. If it fails or you want to run
the transfer-job twice you will need to manually create a transfer-job-run on the second attempt.

Failure notifications are handled by pubsub. When the transfer service fails it will send a
notification to the specified topic. That topic should be configured to propagate the failure
Expand All @@ -162,15 +160,14 @@ def create_transfer_job[T](
:param source_bucket:
:param source_prefix:
:param destination_bucket:
:param job_duration: The amount of time the job should take to complete. Longer runs put less
pressure on our buckets.
:param destination_prefix:
:param notification_topic: topic to which we'll notify the success or failure of the transfer.
:param do_create_transfer_job: Injected function which creates the transfer-job.
:param get_current_datetime: Injected function which computes the current datetime.
"""
date_job_starts = get_current_datetime()
date_job_ends = date_job_starts + job_duration
# To make this a one-shot job, the start and end dates must be the same.
date_job_ends = date_job_starts

transfer_job = TransferJob(
description=job_description,
Expand Down Expand Up @@ -576,7 +573,6 @@ def export_replay_blob_data[T](
gcp_project_id: str,
destination_bucket: str,
destination_prefix: str,
job_duration: timedelta,
do_create_transfer_job: Callable[[CreateTransferJobRequest], T],
pubsub_topic_name: str | None = None,
source_bucket: str = EXPORT_JOB_SOURCE_BUCKET,
Expand All @@ -598,7 +594,6 @@ def export_replay_blob_data[T](
destination_prefix=destination_prefix,
notification_topic=pubsub_topic_name,
job_description="Session Replay EU Compliance Export",
job_duration=job_duration,
do_create_transfer_job=do_create_transfer_job,
)

Expand All @@ -608,7 +603,6 @@ def export_replay_data(
gcp_project_id: str,
destination_bucket: str,
destination_prefix: str,
blob_export_job_duration: timedelta = EXPORT_JOB_DURATION_DEFAULT,
database_rows_per_page: int = EXPORT_QUERY_ROWS_PER_PAGE,
database_pages_per_task: int = EXPORT_QUERY_PAGES_PER_TASK,
source_bucket: str = EXPORT_JOB_SOURCE_BUCKET,
Expand All @@ -620,7 +614,6 @@ def export_replay_data(
"organization_id": organization_id,
"gcp_project_id": gcp_project_id,
"destination_bucket": destination_bucket,
"blob_export_job_duration": str(blob_export_job_duration),
"database_rows_per_page": database_rows_per_page,
"database_pages_per_task": database_pages_per_task,
"source_bucket": source_bucket,
Expand Down Expand Up @@ -658,7 +651,6 @@ def export_replay_data(
destination_prefix=destination_prefix,
pubsub_topic_name=pubsub_topic_name,
source_bucket=source_bucket,
job_duration=blob_export_job_duration,
do_create_transfer_job=request_create_transfer_job,
)
logger.info("Successfully scheduled recording export job.")
Expand Down
10 changes: 3 additions & 7 deletions tests/sentry/replays/unit/test_data_export.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import base64
from datetime import datetime, timedelta
from datetime import datetime

from google.cloud import storage_transfer_v1
from google.cloud.storage_transfer_v1 import (
Expand Down Expand Up @@ -30,8 +30,7 @@ def test_export_blob_data() -> None:
bucket_prefix = "PREFIX"
start_date = datetime(year=2025, month=1, day=31)
job_description = "something"
job_duration = timedelta(days=5)
end_date = start_date + job_duration
end_date = start_date

result = create_transfer_job(
gcp_project_id=gcs_project_id,
Expand All @@ -41,7 +40,6 @@ def test_export_blob_data() -> None:
destination_prefix="destination_prefix/",
notification_topic=notification_topic,
job_description=job_description,
job_duration=job_duration,
transfer_job_name=None,
do_create_transfer_job=lambda event: event,
get_current_datetime=lambda: start_date,
Expand Down Expand Up @@ -103,9 +101,7 @@ def test_retry_export_blob_data() -> None:

def test_export_replay_blob_data() -> None:
jobs = []
export_replay_blob_data(
1, "1", "test", "dest_prefix/", timedelta(days=1), lambda job: jobs.append(job)
)
export_replay_blob_data(1, "1", "test", "dest_prefix/", lambda job: jobs.append(job))

# Assert a job is created for each retention-period.
assert len(jobs) == 3
Expand Down
Loading