Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 20 additions & 16 deletions src/sentry/data_export/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ def _sentry_metric_attrs(
return attrs


def _page_token_b64_from_processor(
def _page_token_from_processor(
processor: IssuesByTagProcessor | DiscoverProcessor | ExploreProcessor,
) -> str | None:
) -> bytes | None:
if isinstance(processor, TraceItemFullExportProcessor) and processor.page_token is not None:
return base64.b64encode(processor.page_token).decode("ascii")
return processor.page_token
return None


Expand Down Expand Up @@ -162,7 +162,7 @@ def export_chunk_to_stored_blobs(
export_limit: int,
environment_id: int | None,
first_page: bool = True,
page_token: str | None = None,
page_token: bytes | str | None = None,
offset: int = 0,
bytes_written: int = 0,
batch_size: int = SNUBA_MAX_RESULTS,
Expand All @@ -174,7 +174,7 @@ def export_chunk_to_stored_blobs(
data_export,
environment_id,
output_mode,
page_token_b64=page_token,
page_token=page_token,
)

with tempfile.TemporaryFile(mode="w+b") as tf:
Expand Down Expand Up @@ -240,7 +240,7 @@ def _schedule_retry(
base_bytes_written: int,
environment_id: int | None,
export_retries: int,
page_token: str | None,
page_token: bytes | str | None,
delay_retry: bool = False,
) -> None:
assemble_download.apply_async(
Expand Down Expand Up @@ -280,7 +280,7 @@ def _schedule_next_task(
"bytes_written": bytes_written,
"environment_id": environment_id,
"export_retries": export_retries,
"page_token": _page_token_b64_from_processor(processor),
"page_token": _page_token_from_processor(processor),
}
should_continue = next_offset < export_limit and (
(isinstance(processor, TraceItemFullExportProcessor) and processor.page_token is not None)
Expand Down Expand Up @@ -325,7 +325,7 @@ def assemble_download(
environment_id: int | None = None,
export_retries: int = DEFAULT_EXPORT_RETRIES,
*,
page_token: str | None = None,
page_token: bytes | str | None = None,
**kwargs: Any,
) -> None:
# The API response to export the data contains the ID which you can use
Expand Down Expand Up @@ -573,7 +573,7 @@ def get_processor(
environment_id: int | None,
output_mode: OutputMode,
*,
page_token_b64: str | None = None,
page_token: bytes | str | None = None,
) -> IssuesByTagProcessor | DiscoverProcessor | ExploreProcessor | TraceItemFullExportProcessor:
try:
if data_export.query_type == ExportQueryType.ISSUES_BY_TAG:
Expand All @@ -597,17 +597,21 @@ def get_processor(
output_mode=output_mode,
)
elif data_export.query_type == ExportQueryType.TRACE_ITEM_FULL_EXPORT:
page_token: bytes | None = None
if page_token_b64:
try:
page_token = base64.b64decode(page_token_b64)
except (ValueError, TypeError) as e:
raise ExportError("Invalid export trace item pagination state.") from e
page_token_bytes: bytes | None = None
if page_token is not None:
# Handle both bytes (new) and base64 string (legacy) page tokens
if isinstance(page_token, str):
try:
page_token_bytes = base64.b64decode(page_token)
except (ValueError, TypeError) as e:
raise ExportError("Invalid export trace item pagination state.") from e
else:
page_token_bytes = page_token
return TraceItemFullExportProcessor(
explore_query=data_export.query_info,
organization=data_export.organization,
output_mode=output_mode,
page_token=page_token,
page_token=page_token_bytes,
)

else:
Expand Down
3 changes: 1 addition & 2 deletions src/sentry/relocation/services/relocation_export/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# in modules such as this one where hybrid cloud data models or service classes are
# defined, because we want to reflect on type annotations and avoid forward references.

import base64
import logging
from datetime import UTC, datetime
from io import BytesIO
Expand Down Expand Up @@ -64,7 +63,7 @@ def request_new_export(
requesting_region_name,
replying_region_name,
org_slug,
base64.b64encode(encrypt_with_public_key).decode("utf8"),
encrypt_with_public_key,
int(round(datetime.now(tz=UTC).timestamp())),
]
)
Expand Down
8 changes: 6 additions & 2 deletions src/sentry/relocation/tasks/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def fulfill_cross_region_export_request(
requesting_cell_name: str,
replying_cell_name: str,
org_slug: str,
encrypt_with_public_key: str,
encrypt_with_public_key: bytes | str,
# Unix timestamp, in seconds.
scheduled_at: int,
) -> None:
Expand All @@ -334,7 +334,11 @@ def fulfill_cross_region_export_request(
"""
from sentry.relocation.tasks.transfer import process_relocation_transfer_region

encrypt_with_public_key_bytes = base64.b64decode(encrypt_with_public_key.encode("utf8"))
# Handle both bytes (new) and base64 string (legacy)
if isinstance(encrypt_with_public_key, str):
encrypt_with_public_key_bytes = base64.b64decode(encrypt_with_public_key.encode("utf8"))
else:
encrypt_with_public_key_bytes = encrypt_with_public_key

logger_data = {
"uuid": uuid_str,
Expand Down
Loading