Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ dependencies = [
"pyyaml~=6.0.3",
"coloredlogs~=15.0.1",
"python-json-logger~=4.1.0",
"asgi-correlation-id~=4.3.4",
"asgi-correlation-id>=4.3.4,<5.1.0",
"uuid6~=2025.0.1",
"celery~=5.6.0",
"psycopg2-binary~=2.9.11",
Expand Down Expand Up @@ -70,7 +70,7 @@ server = [
"starlette<1.0",
"starlette-exporter~=0.23.0",
"python-multipart~=0.0.26",
"uvicorn>=0.44,<0.49",
"uvicorn>=0.44,<0.50",
"alembic~=1.18.4",
"pyjwt>=2.12.1,<2.14.0",
"asyncpg~=0.31.0",
Expand Down
31 changes: 28 additions & 3 deletions syncmaster/worker/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
from datetime import UTC, datetime

from asgi_correlation_id import correlation_id
from asgi_correlation_id.extensions.celery import load_correlation_ids
from celery.signals import after_setup_task_logger
from celery.signals import after_setup_task_logger, before_task_publish, task_postrun, task_prerun
from celery.utils.log import get_task_logger
from jinja2 import Template
from sqlalchemy import Engine, select
from sqlalchemy.orm import Session, selectinload
from uuid6 import uuid7

from syncmaster.db.models import AuthData, Run, Status, Transfer
from syncmaster.db.repositories.utils import decrypt_auth_data
Expand All @@ -19,7 +19,32 @@
from syncmaster.worker.settings import WorkerAppSettings

logger = get_task_logger(__name__)
load_correlation_ids()

CORRELATION_ID_HEADER = "CORRELATION_ID"


@before_task_publish.connect(weak=False)
def transfer_correlation_id(headers: dict[str, str], **kwargs) -> None:
"""Transfer correlation ID from the request to the Celery task headers."""
cid = correlation_id.get()
if cid:
headers[CORRELATION_ID_HEADER] = cid


@task_prerun.connect(weak=False)
def load_correlation_id(task, **kwargs) -> None:
"""Load correlation ID from task headers, or generate a new one."""
id_value = task.request.get(CORRELATION_ID_HEADER)
if id_value:
correlation_id.set(id_value)
else:
correlation_id.set(uuid7().hex)


@task_postrun.connect(weak=False)
def cleanup_correlation_id(**kwargs) -> None:
"""Clear the correlation ID after the task completes."""
correlation_id.set(None)


@celery.task(name="run_transfer_task", bind=True, track_started=True)
Expand Down
Loading
Loading