diff --git a/pyproject.toml b/pyproject.toml index efec0d273e9ba6..51ac45768d0c54 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,7 @@ dependencies = [ "google-cloud-pubsub>=2.23.0", "google-cloud-spanner>=3.49.0", "google-cloud-storage>=2.18.0", + "google-cloud-storage-transfer>=1.17.0", "google-crc32c>=1.6.0", "googleapis-common-protos>=1.63.2", "grpc-google-iam-v1>=0.13.1", @@ -287,6 +288,7 @@ module = [ "google.cloud.*", "google.resumable_media.common.*", "google.rpc.status_pb2.*", + "google.type.*", "kombu.*", "mistune.*", "onelogin.saml2.auth.*", diff --git a/src/sentry/api/urls.py b/src/sentry/api/urls.py index d7e1bd1fe9202b..ff5b825e271823 100644 --- a/src/sentry/api/urls.py +++ b/src/sentry/api/urls.py @@ -462,6 +462,7 @@ from sentry.relocation.api.endpoints.recover import RelocationRecoverEndpoint from sentry.relocation.api.endpoints.retry import RelocationRetryEndpoint from sentry.relocation.api.endpoints.unpause import RelocationUnpauseEndpoint +from sentry.replays.endpoints.data_export_notifications import DataExportNotificationsEndpoint from sentry.replays.endpoints.organization_replay_count import OrganizationReplayCountEndpoint from sentry.replays.endpoints.organization_replay_details import OrganizationReplayDetailsEndpoint from sentry.replays.endpoints.organization_replay_events_meta import ( @@ -3567,6 +3568,11 @@ def create_group_urls(name_prefix: str) -> list[URLPattern | URLResolver]: AcceptOrganizationInvite.as_view(), name="sentry-api-0-organization-accept-organization-invite", ), + re_path( + r"^data-export/notifications/google-cloud/$", + DataExportNotificationsEndpoint.as_view(), + name="sentry-api-0-data-export-notifications", + ), re_path( r"^accept-invite/(?P[^/]+)/(?P[^/]+)/$", AcceptOrganizationInvite.as_view(), diff --git a/src/sentry/replays/data_export.py b/src/sentry/replays/data_export.py new file mode 100644 index 00000000000000..9536669c576529 --- /dev/null +++ b/src/sentry/replays/data_export.py @@ -0,0 +1,680 @@ +import base64 +import csv +import io +import logging +from collections.abc import Callable, Generator +from datetime import datetime, timedelta, timezone +from typing import Any, Protocol + +from django.db.models import F +from google.cloud import storage_transfer_v1 +from google.cloud.storage_transfer_v1 import ( + CreateTransferJobRequest, + GcsData, + NotificationConfig, + RunTransferJobRequest, + Schedule, + TransferJob, + TransferSpec, +) +from google.type import date_pb2 +from snuba_sdk import ( + Column, + Condition, + Direction, + Entity, + Function, + Limit, + Offset, + Op, + OrderBy, + Query, + Request, +) + +from sentry.models.files.utils import get_storage +from sentry.models.organization import Organization +from sentry.models.project import Project +from sentry.services.filestore.gcs import GoogleCloudStorage +from sentry.snuba.referrer import Referrer +from sentry.tasks.base import instrumented_task +from sentry.taskworker.config import TaskworkerConfig +from sentry.taskworker.namespaces import replays_tasks +from sentry.taskworker.retry import Retry +from sentry.utils import json +from sentry.utils.snuba import raw_snql_query + +logger = logging.getLogger(__name__) + +EXPORT_JOB_DURATION_DEFAULT = timedelta(days=5) +EXPORT_JOB_SOURCE_BUCKET = "sentry-replays" +EXPORT_QUERY_ROWS_PER_PAGE = 1000 +EXPORT_QUERY_PAGES_PER_TASK = 10 + +# $$ __$$\ $$ | \_$$ _|$$ __$$\ $$ | $$ |$$ | $$ |$$ __$$\ $$ | $$ |$$ __$$\ $$ _____| +# $$$$$$\ $$\ $$$$$$\ $$$$$$\ $$\ $$\ $$\ $$\ $$$$$$\ $$\ $$\ $$$$$$\ $$$$$$$$\ +# $$ / \__|$$ | $$ | $$ / \__|$$ |$$ / $$ | $$ |$$ / $$ |$$ | $$ |$$ / \__|$$ | +# $$ | $$ | $$ | $$ | $$$$$ / $$$$$$$$ |$$ | $$ |$$ | $$ |\$$$$$$\ $$$$$\ +# $$ | $$ | $$ | $$ | $$ $$< $$ __$$ |$$ | $$ |$$ | $$ | \____$$\ $$ __| +# $$ | $$\ $$ | $$ | $$ | $$\ $$ |\$$\ $$ | $$ |$$ | $$ |$$ | $$ |$$\ $$ |$$ | +# \$$$$$$ |$$$$$$$$\ $$$$$$\ \$$$$$$ |$$ | \$$\ $$ | $$ | $$$$$$ |\$$$$$$ |\$$$$$$ |$$$$$$$$\ +# \______/ \________|\______| \______/ \__| \__|\__| \__| \______/ \______/ \______/ \________| + + +class QueryFnProtocol(Protocol): + def __call__(self, limit: int, offset: int) -> Request: ... + + +def rows_to_csv(rows: list[dict[str, Any]]) -> str: + buf = io.StringIO() + writer = csv.writer(buf) + + for i, row in enumerate(rows): + if i == 0: + writer.writerow(row.keys()) + + writer.writerow(row.values()) + + return buf.getvalue() + + +def export_clickhouse_rows( + query_fn: QueryFnProtocol, + referrer: str = Referrer.EU_DATA_EXPORT.value, + num_pages: int = EXPORT_QUERY_PAGES_PER_TASK, + limit: int = EXPORT_QUERY_ROWS_PER_PAGE, + offset: int = 0, +) -> Generator[dict[str, Any]]: + """ + ClickHouse row export. + + :param query_fn: Any function which returns a request which is paginatable by limit and offset. + :param referrer: A unique identifier for a given data-export query. + :param limit: The number of rows to limit the query by. + :param offset: The initial offset value to offset the query by. + :param num_pages: The maximum number of pages we'll query before exiting. The number of pages + we query is intentionally capped. This ensures termination and encourages appropriate + bounding by the calling function. + :param max_retries: The maximum number of queries we'll make to the database before quitting. + :param retry_after_seconds: The number of seconds to wait after each query failure. + """ + assert limit > 0, "limit must be a positive integer greater than zero." + assert num_pages > 0, "num_pages must be a positive integer greater than zero." + assert offset >= 0, "offset must be a positive integer greater than or equal to zero." + + # Iteration is capped to a maximum number of pages. This ensures termination and encourages + # appropriate bounding by the calling function. Ideally this export is ran in an asynchonrous + # task. Tasks typically have a deadline so iterating forever is undesireable. Each task should + # process a chunk of data commit it (and perhaps its progress) and then schedule another task + # to complete the remainder of the job which itself is bounded. + for _ in range(num_pages): + results = raw_snql_query(query_fn(limit=limit, offset=offset), referrer)["data"] + if results: + yield from results + + offset += len(results) + + if len(results) != limit: + break + + +# $$$$$$\ $$$$$$\ $$$$$$\ +# $$ __$$\ $$ __$$\ $$ __$$\ +# $$ / \__|$$ / \__|$$ / \__| +# $$ |$$$$\ $$ | \$$$$$$\ +# $$ |\_$$ |$$ | \____$$\ +# $$ | $$ |$$ | $$\ $$\ $$ | +# \$$$$$$ |\$$$$$$ |\$$$$$$ | +# \______/ \______/ \______/ + + +def request_create_transfer_job(request: CreateTransferJobRequest) -> None: + client = storage_transfer_v1.StorageTransferServiceClient() + client.create_transfer_job(request) + return None + + +def create_transfer_job[T]( + gcp_project_id: str, + transfer_job_name: str | None, + source_bucket: str, + source_prefix: str, + destination_bucket: str, + job_description: str, + job_duration: timedelta, + do_create_transfer_job: Callable[[CreateTransferJobRequest], T], + notification_topic: str | None = None, + get_current_datetime: Callable[[], datetime] = lambda: datetime.now(tz=timezone.utc), +) -> T: + """ + Create a transfer-job which copies a bucket by prefix to another bucket. + + Transfer jobs are templates for transfer-job-runs. Transfer jobs are run based on a schedule. + They start immediately and run until the job_duration value. Automatic run creation based on + the schedule is one-time only. If it fails or you want to run the transfer-job twice you will + need to manually create a transfer-job-run on the second attempt. + + Failure notifications are handled by pubsub. When the transfer service fails it will send a + notification to the specified topic. That topic should be configured to propagate the failure + notice to our HTTP endpoint which will then call the appropriate retry function. + + :param gcp_project_id: The GCP project_id. This can be extracted from the storage class + returned by `get_storage` function. + :param source_bucket: + :param source_prefix: + :param destination_bucket: + :param job_duration: The amount of time the job should take to complete. Longer runs put less + pressure on our buckets. + :param notification_topic: Specifying a topic will enable automatic run retries on failure. + :param do_create_transfer_job: Injected function which creates the transfer-job. + :param get_current_datetime: Injected function which computes the current datetime. + """ + date_job_starts = get_current_datetime() + date_job_ends = date_job_starts + job_duration + + transfer_job = TransferJob( + description=job_description, + project_id=gcp_project_id, + status=storage_transfer_v1.TransferJob.Status.ENABLED, + transfer_spec=TransferSpec( + gcs_data_source=GcsData(bucket_name=source_bucket, path=source_prefix), + gcs_data_sink=GcsData(bucket_name=destination_bucket), + ), + schedule=Schedule( + schedule_start_date=date_pb2.Date( + year=date_job_starts.year, + month=date_job_starts.month, + day=date_job_starts.day, + ), + schedule_end_date=date_pb2.Date( + year=date_job_ends.year, + month=date_job_ends.month, + day=date_job_ends.day, + ), + ), + ) + + if notification_topic: + transfer_job.notification_config = NotificationConfig( + pubsub_topic=notification_topic, + event_types=[NotificationConfig.EventType.TRANSFER_OPERATION_FAILED], + payload_format=NotificationConfig.PayloadFormat.JSON, + ) + + if transfer_job_name: + transfer_job.name = transfer_job_name + + request = CreateTransferJobRequest(transfer_job=transfer_job) + return do_create_transfer_job(request) + + +def request_run_transfer_job(request: RunTransferJobRequest) -> None: + client = storage_transfer_v1.StorageTransferServiceClient() + client.run_transfer_job(request) + return None + + +def retry_transfer_job_run[T]( + event: dict[str, Any], + do_run_transfer_job: Callable[[RunTransferJobRequest], T], +) -> T | None: + """ + Retry a failed transfer job run. + + This function expects an event structured in the Google Cloud pubsub notification format. + + :param event: + :param do_run_transfer_job: Any callback function which triggers a `run_transfer_job` action + on GCP. You should use `request_run_transfer_job` by default unless you need to manually + specify credentials or have some other divergent behavior. + """ + if "data" not in event: + return None + + # Decode the Pub/Sub message payload + message = base64.b64decode(event["data"]).decode("utf-8") + payload = json.loads(message) + + # Check for a failed transfer operation + if "transferOperation" in payload and payload["transferOperation"]["status"] == "FAILED": + job_name = payload["transferOperation"]["transferJobName"] + gcp_project_id = payload["transferOperation"]["projectId"] + + request = RunTransferJobRequest(job_name=job_name, project_id=gcp_project_id) + return do_run_transfer_job(request) + + return None + + +# $$$$$$$\ $$$$$$$$\ $$$$$$$\ $$\ $$$$$$\ $$\ $$\ +# $$ __$$\ $$ _____|$$ __$$\ $$ | $$ __$$\\$$\ $$ | +# $$ | $$ |$$ | $$ | $$ |$$ | $$ / $$ |\$$\ $$ / +# $$$$$$$ |$$$$$\ $$$$$$$ |$$ | $$$$$$$$ | \$$$$ / +# $$ __$$< $$ __| $$ ____/ $$ | $$ __$$ | \$$ / +# $$ | $$ |$$ | $$ | $$ | $$ | $$ | $$ | +# $$ | $$ |$$$$$$$$\ $$ | $$$$$$$$\ $$ | $$ | $$ | +# \__| \__|\________|\__| \________|\__| \__| \__| + + +def query_replays_dataset( + project_id: int, + start: datetime, + end: datetime, + limit: int, + offset: int, +) -> Request: + assert start < end, "Start date must be less than the ending date." + assert project_id > 0, "Project ID must be greater than zero." + assert limit > 0, "limit must be a positive integer greater than zero." + assert offset >= 0, "offset must be a positive integer greater than or equal to zero." + + def hash_(value: Column | str) -> Function: + return Function("cityHash64", parameters=[value]) + + query = Query( + match=Entity("replays"), + select=[ + Column("replay_id"), + Column("debug_id"), + Column("count_info_events"), + Column("count_warning_events"), + Column("count_error_events"), + Column("info_id"), + Column("warning_id"), + Column("error_id"), + Column("fatal_id"), + Column("replay_type"), + Column("error_sample_rate"), + Column("session_sample_rate"), + Column("event_hash"), + Column("segment_id"), + Column("trace_ids"), + Column("title"), + Column("url"), + Column("urls"), + Column("is_archived"), + Column("error_ids"), + Column("project_id"), + Column("timestamp"), + Column("replay_start_timestamp"), + Column("platform"), + Column("environment"), + Column("release"), + Column("dist"), + Column("ip_address_v4"), + Column("ip_address_v6"), + Column("user"), + Column("user_id"), + Column("user_name"), + Column("user_email"), + Column("user_geo_city"), + Column("user_geo_country_code"), + Column("user_geo_region"), + Column("user_geo_subdivision"), + Column("viewed_by_id"), + Column("os_name"), + Column("os_version"), + Column("browser_name"), + Column("browser_version"), + Column("device_name"), + Column("device_brand"), + Column("device_family"), + Column("device_model"), + Column("ota_updates_channel"), + Column("ota_updates_runtime_version"), + Column("ota_updates_update_id"), + Column("sdk_name"), + Column("sdk_version"), + Column("tags.key"), + Column("tags.value"), + Column("click_node_id"), + Column("click_tag"), + Column("click_id"), + Column("click_class"), + Column("click_text"), + Column("click_role"), + Column("click_alt"), + Column("click_testid"), + Column("click_aria_label"), + Column("click_title"), + Column("click_component_name"), + Column("click_is_dead"), + Column("click_is_rage"), + Column("count_errors"), + Column("count_urls"), + Column("retention_days"), + Column("partition"), + Column("offset"), + ], + where=[ + Condition(Column("project_id"), Op.EQ, project_id), + Condition(Column("timestamp"), Op.GTE, start), + Condition(Column("timestamp"), Op.LT, end), + ], + orderby=[ + OrderBy(Column("project_id"), Direction.ASC), + OrderBy(Function("toStartOfDay", parameters=[Column("timestamp")]), Direction.ASC), + OrderBy(hash_(Column("replay_id")), Direction.ASC), + OrderBy(Column("event_hash"), Direction.ASC), + ], + limit=Limit(limit), + offset=Offset(offset), + ) + + return Request( + dataset="replays", + app_id="replay-backend-web", + query=query, + tenant_ids={}, + ) + + +def get_replay_date_query_ranges( + project_id: int, + referrer: str = Referrer.EU_DATA_EXPORT.value, +) -> Generator[tuple[datetime, datetime, int]]: + """ + SQL: + SELECT formatDateTime(toStartOfDay(timestamp), '%F'), count() + FROM replays_dist + WHERE project_id = 11276 + GROUP BY toStartOfDay(timestamp) + ORDER BY toStartOfDay(timestamp) + """ + to_start_of_day_timestamp = Function("toStartOfDay", parameters=[Column("timestamp")]) + + # Snuba requires a start and end range but we don't know the start and end yet! We specify an + # arbitrarily large range to accommodate. If you're debugging a failed export in the year 3000 + # I am very sorry for the inconvenience this has caused you. + min_date = datetime(year=1970, month=1, day=1) + max_date = datetime(year=3000, month=1, day=1) + + query = Query( + match=Entity("replays"), + select=[ + Function("formatDateTime", parameters=[to_start_of_day_timestamp, "%F"], alias="day"), + Function("count", parameters=[], alias="max_rows_to_export"), + ], + where=[ + Condition(Column("project_id"), Op.EQ, project_id), + Condition(Column("timestamp"), Op.GTE, min_date), + Condition(Column("timestamp"), Op.LT, max_date), + ], + orderby=[OrderBy(to_start_of_day_timestamp, Direction.ASC)], + groupby=[to_start_of_day_timestamp], + ) + + request = Request( + dataset="replays", + app_id="replay-backend-web", + query=query, + tenant_ids={}, + ) + + results = raw_snql_query(request, referrer)["data"] + for result in results: + start = datetime.fromisoformat(result["day"]) + end = start + timedelta(days=1) + yield start, end, result["max_rows_to_export"] + + +def export_replay_row_set( + project_id: int, + start: datetime, + end: datetime, + limit: int, + initial_offset: int, + write_to_storage: Callable[[str, str], None], + num_pages: int = EXPORT_QUERY_PAGES_PER_TASK, +) -> int | None: + rows = list( + export_clickhouse_rows( + lambda limit, offset: query_replays_dataset(project_id, start, end, limit, offset), + limit=limit, + offset=initial_offset, + num_pages=num_pages, + ) + ) + + if len(rows) > 0: + filename = f"clickhouse/session-replay/{project_id}/{start.isoformat()}/{end.isoformat()}/{initial_offset}" + csv_data = rows_to_csv(rows) + write_to_storage(filename, csv_data) + + if len(rows) == (limit * num_pages): + return initial_offset + len(rows) + else: + return None + + +def save_to_storage(destination_bucket: str, filename: str, contents: str) -> None: + storage = get_storage(None) + assert isinstance(storage, GoogleCloudStorage) + storage.bucket_name = destination_bucket + storage.save(filename, io.BytesIO(contents.encode())) + + +@instrumented_task( + name="sentry.replays.tasks.export_replay_row_set_async", + default_retry_delay=5, # Five seconds because we want to give rate-limits some time to reset. + max_retries=120, # Retry a lot because if it fails we have to start over from the beginning. + taskworker_config=TaskworkerConfig( + namespace=replays_tasks, + processing_deadline_duration=15 * 60, + retry=Retry( + times=120, + delay=5, + ), + ), +) +def export_replay_row_set_async( + project_id: int, + start: datetime, + end: datetime, + destination_bucket: str, + max_rows_to_export: int, + limit: int = EXPORT_QUERY_ROWS_PER_PAGE, + offset: int = 0, + num_pages: int = EXPORT_QUERY_PAGES_PER_TASK, +): + """ + Export all replay rows which belong to the project and exist within the range. + + :param project_id: Sentry Project ID. + :param start: Inclusive, minimum date in the queried range. + :param end: Exclusive, maximum date in the queried range. + :param destination_bucket: Which bucket the resulting CSV will be uploaded for. + :param max_rows_to_export: The maximum number of rows which may be executed by this task + chain. The max_rows_to_export value should match the number of rows present in your range. + This value is specified to protect against malformed behavior in the code which might + produce infinite (or at least very long) task recursion. + :param file_number: The file's position in the export sequence. Incremented by one each time + the task is chained. This keeps filenames predictable and ordered. + :param limit: The maximum number of rows to query by for a given page. + :param offset: The offset within the query range to query for. Must constantly increment and + never overlap with previous runs. + :param num_pages: The maximum number of pages to query per task. + """ + assert limit > 0, "Limit must be greater than 0." + assert offset >= 0, "Offset must be greater than or equal to 0." + assert start < end, "Start must be before end date." + assert num_pages > 0, "num_pages must be greater than 0." + + next_offset = export_replay_row_set( + project_id, + start, + end, + limit, + offset, + lambda filename, contents: save_to_storage(destination_bucket, filename, contents), + num_pages, + ) + + # Tasks can run for a defined length of time. The export can take an unbounded length of time + # to complete. For this reason we cap the amount of work we'll perform within a single task's + # lifetime and schedule the remainder of the work to take place on another task. + # + # The call chain is explicitly terminated by a pre-computed max_rows_to_export value. If this + # value is exceeded the chain exits immediately even if more rows could have been found. Its + # unlikely there will be more rows because in order to export your data you need to terminate + # your Sentry account. Under those conditions you're no longer a Sentry customer and should + # not be ingesting any data into Sentry. + if next_offset and next_offset < max_rows_to_export: + # We assert the call chain is making forward progress. + assert next_offset > offset, "next_offset was not greater than previous offset." + # We assert the call chain is making meaningful progress. We should not overlap. + assert next_offset == (offset + (limit * num_pages)), "next_offset overlapped previous run." + + export_replay_row_set_async.delay( + project_id=project_id, + start=start, + end=end, + limit=limit, + offset=next_offset, + destination_bucket=destination_bucket, + max_rows_to_export=max_rows_to_export, + num_pages=num_pages, + ) + + +@instrumented_task( + name="sentry.replays.tasks.export_replay_project_async", + taskworker_config=TaskworkerConfig(namespace=replays_tasks), +) +def export_replay_project_async( + project_id: int, + limit: int, + destination_bucket: str, + num_pages: int = EXPORT_QUERY_PAGES_PER_TASK, +): + """ + Export every replay for a given Sentry Project ID. + + A task will be spawned for each day and will export that day's rows. This means we have a + maximum parallelism of 90 simultaneous processes. This value may be lower given the demand on + the task broker itself. If more parallelism is desired you will need to tweak the granularity + of the `get_replay_date_query_ranges` query. + + :param project_id: Sentry Project ID. + :param limit: The maximum number of rows to query for in any given replay. + :param destination_bucket: + :param num_pages: The maximum number of pages to query for within a single task execution. + """ + # Each populated day bucket is scheduled for export. + for start, end, max_rows_to_export in get_replay_date_query_ranges(project_id): + export_replay_row_set_async.delay( + project_id=project_id, + start=start, + end=end, + destination_bucket=destination_bucket, + max_rows_to_export=max_rows_to_export, + limit=limit, + offset=0, + num_pages=num_pages, + ) + + +def export_replay_blob_data[T]( + project_id: int, + gcp_project_id: str, + destination_bucket: str, + job_duration: timedelta, + do_create_transfer_job: Callable[[CreateTransferJobRequest], T], + pubsub_topic_name: str | None = None, + source_bucket: str = EXPORT_JOB_SOURCE_BUCKET, +): + # In the future we could set a non-unique transfer-job name. This would prevent duplicate runs + # from doing the same work over and over again. However, we'd need to catch the exception, + # look-up any active runs, and, if no active runs, schedule a new run. This is a bit much for + # now. + # + # transfer_job_name = f"{source_bucket}/{project_id}/{start_date_rounded_to_day}" + + for retention_days in (30, 60, 90): + create_transfer_job( + gcp_project_id=gcp_project_id, + transfer_job_name=None, + source_bucket=source_bucket, + source_prefix=f"{retention_days}/{project_id}", + destination_bucket=destination_bucket, + notification_topic=pubsub_topic_name, + job_description="Session Replay EU Compliance Export", + job_duration=job_duration, + do_create_transfer_job=do_create_transfer_job, + ) + + +def export_replay_data( + organization_id: int, + gcp_project_id: str, + destination_bucket: str, + blob_export_job_duration: timedelta = EXPORT_JOB_DURATION_DEFAULT, + database_rows_per_page: int = EXPORT_QUERY_ROWS_PER_PAGE, + database_pages_per_task: int = EXPORT_QUERY_PAGES_PER_TASK, + source_bucket: str = EXPORT_JOB_SOURCE_BUCKET, + pubsub_topic_name: str | None = None, +): + logger.info( + "Starting replay export...", + extra={ + "organization_id": organization_id, + "gcp_project_id": gcp_project_id, + "destination_bucket": destination_bucket, + "blob_export_job_duration": str(blob_export_job_duration), + "database_rows_per_page": database_rows_per_page, + "database_pages_per_task": database_pages_per_task, + "source_bucket": source_bucket, + "pubsub_topic_name": pubsub_topic_name, + }, + ) + + try: + organization = Organization.objects.filter(id=organization_id).get() + logger.info("Found organization", extra={"organization.slug": organization.slug}) + except Organization.DoesNotExist: + logger.exception("Could not find organization", extra={"organization.id": organization_id}) + return None + + projects = list( + Project.objects.filter( + organization_id=organization_id, flags=F("flags").bitor(Project.flags.has_replays) + ) + ) + + if not projects: + logger.info("No projects with replays found.") + return None + + logger.info("Found projects with replays.", extra={"number_of_projects": len(projects)}) + + for project in projects: + logger.info( + "Starting recording export job for project", extra={"project_slug": project.slug} + ) + export_replay_blob_data( + project_id=project.id, + gcp_project_id=gcp_project_id, + destination_bucket=destination_bucket, + pubsub_topic_name=pubsub_topic_name, + source_bucket=source_bucket, + job_duration=blob_export_job_duration, + do_create_transfer_job=request_create_transfer_job, + ) + logger.info("Successfully scheduled recording export job.") + + for project in projects: + logger.info( + "Starting database export job for project", extra={"project_slug": project.slug} + ) + export_replay_project_async.delay( + project_id=project.id, + limit=database_rows_per_page, + destination_bucket=destination_bucket, + num_pages=database_pages_per_task, + ) + logger.info("Successfully scheduled database export job.") + + # Really need a way to signal an export has finished or failed. Probably a screen in the + # application exposed to the customer or admins. This will require database models, front-end + # engineers, API blueprints, a concept of a work group... + logger.info("Export finished! It will run in the background. No further action is required.") diff --git a/src/sentry/replays/endpoints/data_export_notifications.py b/src/sentry/replays/endpoints/data_export_notifications.py new file mode 100644 index 00000000000000..a914462ef6c7f3 --- /dev/null +++ b/src/sentry/replays/endpoints/data_export_notifications.py @@ -0,0 +1,25 @@ +import logging + +from rest_framework.request import Request +from rest_framework.response import Response + +from sentry.api.api_owners import ApiOwner +from sentry.api.api_publish_status import ApiPublishStatus +from sentry.api.base import Endpoint, control_silo_endpoint +from sentry.api.permissions import SentryIsAuthenticated +from sentry.replays.data_export import request_run_transfer_job, retry_transfer_job_run + +logger = logging.getLogger() + + +@control_silo_endpoint +class DataExportNotificationsEndpoint(Endpoint): + """PubSub notifications endpoint.""" + + owner = ApiOwner.REPLAY + publish_status = {"POST": ApiPublishStatus.PRIVATE} + permission_classes = (SentryIsAuthenticated,) + + def post(self, request: Request) -> Response: + retry_transfer_job_run(request.data, request_run_transfer_job) + return Response("", status=200) diff --git a/src/sentry/snuba/referrer.py b/src/sentry/snuba/referrer.py index 7b9238d633f5e3..550c08a30f9bd8 100644 --- a/src/sentry/snuba/referrer.py +++ b/src/sentry/snuba/referrer.py @@ -750,6 +750,7 @@ class Referrer(StrEnum): REPLAYS_FILE_REFERRER = "replays.query.download_replay_segments" REPLAYS_SCRIPTS_DELETE_REPLAYS = "replays.scripts.delete_replays" FEEDBACKS_LABEL_QUERY = "feedbacks.label_query" + EU_DATA_EXPORT = "sentry.internal.eu-compliance-data-export" REPORTS_KEY_ERRORS = "reports.key_errors" REPORTS_KEY_PERFORMANCE_ISSUES = "reports.key_performance_issues" REPORTS_KEY_TRANSACTIONS_P95 = "reports.key_transactions.p95" diff --git a/static/app/data/controlsiloUrlPatterns.ts b/static/app/data/controlsiloUrlPatterns.ts index 1d897954d4d0e5..042f2a73c5d086 100644 --- a/static/app/data/controlsiloUrlPatterns.ts +++ b/static/app/data/controlsiloUrlPatterns.ts @@ -132,6 +132,7 @@ const patterns: RegExp[] = [ new RegExp('^api/0/api-tokens/[^/]+/$'), new RegExp('^api/0/authenticators/$'), new RegExp('^api/0/accept-invite/[^/]+/[^/]+/[^/]+/$'), + new RegExp('^api/0/data-export/notifications/google-cloud/$'), new RegExp('^api/0/accept-invite/[^/]+/[^/]+/$'), new RegExp('^api/0/notification-defaults/$'), new RegExp('^api/0/sentry-apps-stats/$'), diff --git a/tests/sentry/replays/conftest.py b/tests/sentry/replays/conftest.py new file mode 100644 index 00000000000000..c404c7ef7d7946 --- /dev/null +++ b/tests/sentry/replays/conftest.py @@ -0,0 +1,20 @@ +from typing import Any + +import pytest +import requests +from django.conf import settings + + +class ReplayStore: + + def save(self, data: dict[str, Any]) -> None: + request_url = settings.SENTRY_SNUBA + "/tests/entities/replays/insert" + response = requests.post(request_url, json=[data]) + assert response.status_code == 200 + return None + + +@pytest.fixture +def replay_store() -> ReplayStore: + assert requests.post(settings.SENTRY_SNUBA + "/tests/replays/drop").status_code == 200 + return ReplayStore() diff --git a/tests/sentry/replays/endpoints/test_data_export_notifications.py b/tests/sentry/replays/endpoints/test_data_export_notifications.py new file mode 100644 index 00000000000000..d7b304855894a1 --- /dev/null +++ b/tests/sentry/replays/endpoints/test_data_export_notifications.py @@ -0,0 +1,35 @@ +import base64 +from unittest.mock import patch + +from sentry.testutils.cases import APITestCase +from sentry.testutils.silo import control_silo_test +from sentry.utils import json + + +@control_silo_test +class DataExportNotificationsTestCase(APITestCase): + endpoint = "sentry-api-0-data-export-notifications" + + def setUp(self) -> None: + super().setUp() + self.login_as(user=self.user) + + @patch("sentry.replays.endpoints.data_export_notifications.retry_transfer_job_run") + def test_simple(self, retry_transfer_job_run) -> None: # type: ignore[no-untyped-def] + retry_transfer_job_run.return_value = None + + data = { + "data": base64.b64encode( + json.dumps( + { + "transferOperation": { + "status": "FAILED", + "transferJobName": "test", + "projectId": "test-project", + } + } + ).encode() + ).decode("utf-8") + } + self.get_success_response(method="post", **data, status_code=200) + assert retry_transfer_job_run.called diff --git a/tests/sentry/replays/integration/test_data_export.py b/tests/sentry/replays/integration/test_data_export.py new file mode 100644 index 00000000000000..44e0b5331892cb --- /dev/null +++ b/tests/sentry/replays/integration/test_data_export.py @@ -0,0 +1,116 @@ +import csv +import datetime +import uuid + +import pytest + +from sentry.replays.data_export import ( + export_clickhouse_rows, + export_replay_row_set, + get_replay_date_query_ranges, + query_replays_dataset, +) +from sentry.replays.testutils import mock_replay +from sentry.testutils.skips import requires_snuba + + +@pytest.mark.snuba +@requires_snuba +def test_export_clickhouse_rows(replay_store) -> None: # type: ignore[no-untyped-def] + """ + Assert searches can find a replay if the search range does not cover segment-0. + """ + replay1_id = uuid.uuid4().hex + replay2_id = uuid.uuid4().hex + replay3_id = uuid.uuid4().hex + replay4_id = uuid.uuid4().hex + replay5_id = uuid.uuid4().hex + + t0 = datetime.datetime.now() + t1 = t0 + datetime.timedelta(minutes=1) + t2 = t0 + datetime.timedelta(minutes=2) + t3 = t0 + datetime.timedelta(minutes=3) + + replay_store.save(mock_replay(t1, 1, replay1_id, segment_id=0)) + replay_store.save(mock_replay(t1, 1, replay2_id, segment_id=0)) + replay_store.save(mock_replay(t2, 1, replay3_id, segment_id=1)) + replay_store.save(mock_replay(t3, 1, replay4_id, segment_id=0)) + replay_store.save(mock_replay(t2, 2, replay5_id, segment_id=0)) + + query_fn = lambda limit, offset: query_replays_dataset(1, t0, t3, limit, offset) + rows = list(export_clickhouse_rows(query_fn, limit=1, num_pages=10)) + assert len(rows) == 3 + + +@pytest.mark.snuba +@requires_snuba +def test_export_replay_row_set(replay_store) -> None: # type: ignore[no-untyped-def] + replay1_id = "030c5419-9e0f-46eb-ae18-bfe5fd0331b5" + replay2_id = "0dbda2b3-9286-4ecc-a409-aa32b241563d" + replay3_id = "ff08c103-a9a4-47c0-9c29-73b932c2da34" + t0 = datetime.datetime.now() + t1 = t0 + datetime.timedelta(seconds=30) + t2 = t0 + datetime.timedelta(minutes=1) + + replay_store.save(mock_replay(t0, 1, replay1_id, segment_id=0)) + replay_store.save(mock_replay(t1, 1, replay2_id, segment_id=0)) + replay_store.save(mock_replay(t2, 1, replay3_id, segment_id=0)) + + class Sink: + def __init__(self) -> None: + self.filename: str | None = None + self.contents: str | None = None + + def __call__(self, filename: str, contents: str) -> None: + self.filename = filename + self.contents = contents + + sink = Sink() + export_replay_row_set(1, t0, t2, limit=1, initial_offset=0, write_to_storage=sink) + + assert sink.filename is not None + assert sink.contents is not None + + csvfile = csv.reader(sink.contents.splitlines()) + rows = [r for r in csvfile] + assert len(rows) == 3 + assert rows[1][0] == replay1_id + assert rows[2][0] == replay2_id + + +@pytest.mark.snuba +@requires_snuba +def test_get_replay_date_query_ranges(replay_store) -> None: # type: ignore[no-untyped-def] + replay1_id = str(uuid.uuid4()) + replay2_id = str(uuid.uuid4()) + replay3_id = str(uuid.uuid4()) + replay4_id = str(uuid.uuid4()) + replay5_id = str(uuid.uuid4()) + + t0 = datetime.datetime.now() + t1 = t0 + datetime.timedelta(days=10) + t2 = t0 + datetime.timedelta(days=20) + + replay_store.save(mock_replay(t0, 1, replay1_id, segment_id=0)) + replay_store.save(mock_replay(t1, 1, replay2_id, segment_id=0)) + replay_store.save(mock_replay(t2, 1, replay3_id, segment_id=0)) + replay_store.save(mock_replay(t2, 1, replay4_id, segment_id=0)) + replay_store.save(mock_replay(t2, 2, replay5_id, segment_id=0)) + + results = list(get_replay_date_query_ranges(1)) + assert len(results) == 3 + assert results[0][0] == datetime.datetime(year=t0.year, month=t0.month, day=t0.day) + assert results[0][1] == datetime.datetime( + year=t0.year, month=t0.month, day=t0.day + ) + datetime.timedelta(days=1) + assert results[0][2] == 1 + assert results[1][0] == datetime.datetime(year=t1.year, month=t1.month, day=t1.day) + assert results[1][1] == datetime.datetime( + year=t1.year, month=t1.month, day=t1.day + ) + datetime.timedelta(days=1) + assert results[1][2] == 1 + assert results[2][0] == datetime.datetime(year=t2.year, month=t2.month, day=t2.day) + assert results[2][1] == datetime.datetime( + year=t2.year, month=t2.month, day=t2.day + ) + datetime.timedelta(days=1) + assert results[2][2] == 2 diff --git a/tests/sentry/replays/test_data_export.py b/tests/sentry/replays/test_data_export.py new file mode 100644 index 00000000000000..ec127d6d3bb012 --- /dev/null +++ b/tests/sentry/replays/test_data_export.py @@ -0,0 +1,308 @@ +import csv +import datetime +import io +import uuid +from unittest.mock import patch + +import pytest +from django.db.models import F + +from sentry.models.project import Project +from sentry.replays.data_export import ( + export_replay_data, + export_replay_project_async, + export_replay_row_set_async, +) +from sentry.replays.testutils import mock_replay +from sentry.testutils.helpers import TaskRunner +from sentry.testutils.pytest.fixtures import django_db_all +from sentry.testutils.skips import requires_snuba + + +@django_db_all +@pytest.mark.snuba +@requires_snuba +def test_replay_data_export(default_organization, default_project, replay_store) -> None: # type: ignore[no-untyped-def] + replay_id = str(uuid.uuid4()) + t0 = datetime.datetime(year=2025, month=1, day=1) + t1 = t0 + datetime.timedelta(days=1) + replay_store.save(mock_replay(t0, default_project.id, replay_id, segment_id=0)) + + # Setting has_replays flag because the export will skip projects it assumes do not have + # replays. + default_project.flags.has_replays = True + default_project.update(flags=F("flags").bitor(getattr(Project.flags, "has_replays"))) + + with ( + TaskRunner(), + patch("sentry.replays.data_export.request_create_transfer_job") as create_job, + patch("sentry.replays.data_export.save_to_storage") as store_rows, + ): + export_replay_data( + organization_id=default_organization.id, + gcp_project_id="1", + destination_bucket="destination", + database_rows_per_page=1, + ) + assert create_job.called + assert store_rows.called + assert store_rows.call_count == 1 + assert store_rows.call_args[0][0] == "destination" + assert ( + store_rows.call_args[0][1] + == f"clickhouse/session-replay/{default_project.id}/{t0.isoformat()}/{t1.isoformat()}/0" + ) + assert replay_id in store_rows.call_args[0][2] + + +@django_db_all +@pytest.mark.snuba +@requires_snuba +def test_replay_data_export_invalid_organization(default_project, replay_store) -> None: # type: ignore[no-untyped-def] + replay_id = str(uuid.uuid4()) + t0 = datetime.datetime.now() + replay_store.save(mock_replay(t0, default_project.id, replay_id, segment_id=0)) + + # Setting has_replays flag because the export will skip projects it assumes do not have + # replays. + default_project.flags.has_replays = True + default_project.update(flags=F("flags").bitor(getattr(Project.flags, "has_replays"))) + + with ( + TaskRunner(), + patch("sentry.replays.data_export.request_create_transfer_job") as create_job, + patch("sentry.replays.data_export.save_to_storage") as store_rows, + ): + export_replay_data( + organization_id=1, + gcp_project_id="1", + destination_bucket="destination", + database_rows_per_page=1, + ) + assert not create_job.called + assert not store_rows.called + + +@django_db_all +@pytest.mark.snuba +@requires_snuba +def test_replay_data_export_no_replay_projects( # type: ignore[no-untyped-def] + default_organization, default_project, replay_store +) -> None: + replay_id = str(uuid.uuid4()) + t0 = datetime.datetime.now() + replay_store.save(mock_replay(t0, default_project.id, replay_id, segment_id=0)) + + with ( + TaskRunner(), + patch("sentry.replays.data_export.request_create_transfer_job") as create_job, + patch("sentry.replays.data_export.save_to_storage") as store_rows, + ): + export_replay_data( + organization_id=default_organization.id, + gcp_project_id="1", + destination_bucket="destination", + database_rows_per_page=1, + ) + assert not create_job.called + assert not store_rows.called + + +@django_db_all +@pytest.mark.snuba +@requires_snuba +def test_replay_data_export_no_replay_data( # type: ignore[no-untyped-def] + default_organization, default_project +) -> None: + # Setting has_replays flag because the export will skip projects it assumes do not have + # replays. + default_project.flags.has_replays = True + default_project.update(flags=F("flags").bitor(getattr(Project.flags, "has_replays"))) + + with ( + TaskRunner(), + patch("sentry.replays.data_export.request_create_transfer_job") as create_job, + patch("sentry.replays.data_export.save_to_storage") as store_rows, + ): + export_replay_data( + organization_id=default_organization.id, + gcp_project_id="1", + destination_bucket="destination", + database_rows_per_page=1, + ) + + # Blob data is scheduled for export but there no database rows found so we export nothing. + assert create_job.called + assert not store_rows.called + + +@django_db_all +@pytest.mark.snuba +@requires_snuba +def test_export_replay_row_set_async(replay_store) -> None: # type: ignore[no-untyped-def] + replay1_id = "030c5419-9e0f-46eb-ae18-bfe5fd0331b5" + replay2_id = "0dbda2b3-9286-4ecc-a409-aa32b241563d" + replay3_id = "ff08c103-a9a4-47c0-9c29-73b932c2da34" + + t0 = datetime.datetime.now() + t1 = t0 + datetime.timedelta(days=1) + t2 = t0 + datetime.timedelta(days=2) + t3 = t0 + datetime.timedelta(days=3) + + replay_store.save(mock_replay(t0, 1, replay1_id, segment_id=0)) + replay_store.save(mock_replay(t1, 1, replay2_id, segment_id=0)) + replay_store.save(mock_replay(t2, 1, replay3_id, segment_id=0)) + + # Assert the number of runs required to export the database given a set of parameters. + with TaskRunner(): + with patch("sentry.replays.data_export.save_to_storage") as store_rows: + export_replay_row_set_async.delay( + project_id=1, + start=t0, + end=t3, + destination_bucket="test", + max_rows_to_export=3, + limit=1, + num_pages=1, + ) + assert store_rows.call_count == 3 + + with patch("sentry.replays.data_export.save_to_storage") as store_rows: + export_replay_row_set_async.delay( + project_id=1, + start=t0, + end=t3, + destination_bucket="test", + max_rows_to_export=3, + limit=1, + num_pages=3, + ) + assert store_rows.call_count == 1 + + with patch("sentry.replays.data_export.save_to_storage") as store_rows: + export_replay_row_set_async.delay( + project_id=1, + start=t0, + end=t3, + destination_bucket="test", + max_rows_to_export=3, + limit=3, + num_pages=1, + ) + assert store_rows.call_count == 1 + + with patch("sentry.replays.data_export.save_to_storage") as store_rows: + export_replay_row_set_async.delay( + project_id=1, + start=t0, + end=t3, + destination_bucket="test", + max_rows_to_export=3, + limit=2, + num_pages=1, + ) + assert store_rows.call_count == 2 + + with patch("sentry.replays.data_export.save_to_storage") as store_rows: + export_replay_row_set_async.delay( + project_id=1, + start=t0, + end=t3, + destination_bucket="test", + max_rows_to_export=3, + limit=2, + num_pages=2, + ) + assert store_rows.call_count == 1 + + # Assert export has a maximum call depth. + with TaskRunner(): + # We would have exported three but we hit the max call depth. + with patch("sentry.replays.data_export.save_to_storage") as store_rows: + export_replay_row_set_async.delay( + project_id=1, + start=t0, + end=t3, + destination_bucket="test", + max_rows_to_export=1, + limit=1, + num_pages=1, + ) + reader = csv.reader(io.StringIO(store_rows.call_args[0][2])) + assert sum(1 for _ in reader) == 2 # Includes headers. + + # We get more than the max call depth because it was within the bounds of the task. + with patch("sentry.replays.data_export.save_to_storage") as store_rows: + export_replay_row_set_async.delay( + project_id=1, + start=t0, + end=t3, + destination_bucket="test", + max_rows_to_export=1, + limit=3, + num_pages=1, + ) + reader = csv.reader(io.StringIO(store_rows.call_args[0][2])) + assert sum(1 for _ in reader) == 4 # Includes headers. + + +@django_db_all +@pytest.mark.snuba +@requires_snuba +def test_export_replay_project_async(replay_store) -> None: # type: ignore[no-untyped-def] + replay1_id = str(uuid.uuid4()) + replay2_id = str(uuid.uuid4()) + replay3_id = str(uuid.uuid4()) + replay4_id = str(uuid.uuid4()) + replay5_id = str(uuid.uuid4()) + + t0 = datetime.datetime.now() + t1 = t0 + datetime.timedelta(days=1) + t2 = t0 + datetime.timedelta(days=2) + + replay_store.save(mock_replay(t0, 1, replay1_id, segment_id=0)) + replay_store.save(mock_replay(t0, 1, replay2_id, segment_id=0)) + replay_store.save(mock_replay(t1, 1, replay3_id, segment_id=0)) + replay_store.save(mock_replay(t1, 1, replay4_id, segment_id=0)) + replay_store.save(mock_replay(t2, 1, replay5_id, segment_id=0)) + + with TaskRunner(): + # Assert we need five runs to export the row set. + with patch("sentry.replays.data_export.save_to_storage") as store_rows: + export_replay_project_async.delay( + project_id=1, + destination_bucket="test", + limit=1, + num_pages=1, + ) + assert store_rows.call_count == 5 + + # Assert we can reduce the run count by modifying the limit. + with patch("sentry.replays.data_export.save_to_storage") as store_rows: + export_replay_project_async.delay( + project_id=1, + destination_bucket="test", + limit=2, + num_pages=1, + ) + assert store_rows.call_count == 3 + + # Assert we can reduce the run count by modifying the number of pages per run. + with patch("sentry.replays.data_export.save_to_storage") as store_rows: + export_replay_project_async.delay( + project_id=1, + destination_bucket="test", + limit=1, + num_pages=2, + ) + assert store_rows.call_count == 3 + + # Assert we need three runs because date bucketing is the limiting factor. + with patch("sentry.replays.data_export.save_to_storage") as store_rows: + export_replay_project_async.delay( + project_id=1, + destination_bucket="test", + limit=1000, + num_pages=1000, + ) + assert store_rows.call_count == 3 diff --git a/tests/sentry/replays/unit/test_data_export.py b/tests/sentry/replays/unit/test_data_export.py new file mode 100644 index 00000000000000..5c769e3aabf163 --- /dev/null +++ b/tests/sentry/replays/unit/test_data_export.py @@ -0,0 +1,106 @@ +import base64 +from datetime import datetime, timedelta + +from google.cloud import storage_transfer_v1 +from google.cloud.storage_transfer_v1 import ( + CreateTransferJobRequest, + GcsData, + NotificationConfig, + RunTransferJobRequest, + Schedule, + TransferJob, + TransferSpec, +) +from google.type import date_pb2 + +from sentry.replays.data_export import ( + create_transfer_job, + export_replay_blob_data, + retry_transfer_job_run, +) +from sentry.utils import json + + +def test_export_blob_data() -> None: + # Function parameters which could be abstracted to test multiple variations of this behavior. + gcs_project_id = "1" + pubsub_topic = "PUBSUB_TOPIC" + bucket_name = "BUCKET" + bucket_prefix = "PREFIX" + start_date = datetime(year=2025, month=1, day=31) + job_description = "something" + job_duration = timedelta(days=5) + end_date = start_date + job_duration + + result = create_transfer_job( + gcp_project_id=gcs_project_id, + source_bucket=bucket_name, + source_prefix=bucket_prefix, + destination_bucket="b", + notification_topic=pubsub_topic, + job_description=job_description, + job_duration=job_duration, + transfer_job_name=None, + do_create_transfer_job=lambda event: event, + get_current_datetime=lambda: start_date, + ) + + assert result == CreateTransferJobRequest( + transfer_job=TransferJob( + description=job_description, + project_id=gcs_project_id, + status=storage_transfer_v1.TransferJob.Status.ENABLED, + transfer_spec=TransferSpec( + gcs_data_source=GcsData(bucket_name=bucket_name, path=bucket_prefix), + gcs_data_sink=GcsData(bucket_name="b"), + ), + schedule=Schedule( + schedule_start_date=date_pb2.Date( + year=start_date.year, + month=start_date.month, + day=start_date.day, + ), + schedule_end_date=date_pb2.Date( + year=end_date.year, + month=end_date.month, + day=end_date.day, + ), + ), + notification_config=NotificationConfig( + pubsub_topic=pubsub_topic, + event_types=[NotificationConfig.EventType.TRANSFER_OPERATION_FAILED], + payload_format=NotificationConfig.PayloadFormat.JSON, + ), + ) + ) + + +def test_retry_export_blob_data() -> None: + job_name = "job-name" + job_project_id = "project-name" + + transfer_operation = { + "transferOperation": { + "status": "FAILED", + "transferJobName": job_name, + "projectId": job_project_id, + } + } + + result = retry_transfer_job_run( + {"data": base64.b64encode(json.dumps(transfer_operation).encode()).decode("utf-8")}, + lambda request: request, + ) + + assert result == RunTransferJobRequest(job_name=job_name, project_id=job_project_id) + + +def test_export_replay_blob_data() -> None: + jobs = [] + export_replay_blob_data(1, "1", "test", timedelta(days=1), lambda job: jobs.append(job)) + + # Assert a job is created for each retention-period. + assert len(jobs) == 3 + assert jobs[0].transfer_job.transfer_spec.gcs_data_source.path == "30/1" + assert jobs[1].transfer_job.transfer_spec.gcs_data_source.path == "60/1" + assert jobs[2].transfer_job.transfer_spec.gcs_data_source.path == "90/1" diff --git a/uv.lock b/uv.lock index 5b35bfe8d56a24..47eb9c8f11f800 100644 --- a/uv.lock +++ b/uv.lock @@ -703,6 +703,20 @@ wheels = [ { url = "https://pypi.devinfra.sentry.io/wheels/google_cloud_storage-2.18.0-py2.py3-none-any.whl", hash = "sha256:e8e1a9577952143c3fca8163005ecfadd2d70ec080fa158a8b305000e2c22fbb" }, ] +[[package]] +name = "google-cloud-storage-transfer" +version = "1.17.0" +source = { registry = "https://pypi.devinfra.sentry.io/simple" } +dependencies = [ + { name = "google-api-core", extra = ["grpc"], marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "google-auth", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "proto-plus", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "protobuf", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, +] +wheels = [ + { url = "https://pypi.devinfra.sentry.io/wheels/google_cloud_storage_transfer-1.17.0-py3-none-any.whl", hash = "sha256:19dee4cddfee35a36208f9a2a08487bc78aaf7a6695935a6333b4ae9a6e17360" }, +] + [[package]] name = "google-crc32c" version = "1.6.0" @@ -1961,6 +1975,7 @@ dependencies = [ { name = "google-cloud-pubsub", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "google-cloud-spanner", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "google-cloud-storage", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "google-cloud-storage-transfer", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "google-crc32c", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "googleapis-common-protos", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "grpc-google-iam-v1", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -2123,6 +2138,7 @@ requires-dist = [ { name = "google-cloud-pubsub", specifier = ">=2.23.0" }, { name = "google-cloud-spanner", specifier = ">=3.49.0" }, { name = "google-cloud-storage", specifier = ">=2.18.0" }, + { name = "google-cloud-storage-transfer", specifier = ">=1.17.0" }, { name = "google-crc32c", specifier = ">=1.6.0" }, { name = "googleapis-common-protos", specifier = ">=1.63.2" }, { name = "grpc-google-iam-v1", specifier = ">=0.13.1" },