From 54ba3b96eac973c2d78c7097966e60cd726b624e Mon Sep 17 00:00:00 2001 From: "Pedro S. Lopez" Date: Thu, 4 Sep 2025 10:01:41 -0400 Subject: [PATCH 1/5] feat: add trace tags --- .../manifest_server/dependencies/__init__.py | 0 .../{ => dependencies}/auth.py | 0 .../manifest_server/dependencies/tracing.py | 44 +++++++++++++++++++ .../manifest_server/routers/manifest.py | 4 +- .../manifest_server/dependencies/__init__.py | 0 .../{ => dependencies}/test_auth.py | 2 +- 6 files changed, 47 insertions(+), 3 deletions(-) create mode 100644 airbyte_cdk/manifest_server/dependencies/__init__.py rename airbyte_cdk/manifest_server/{ => dependencies}/auth.py (100%) create mode 100644 airbyte_cdk/manifest_server/dependencies/tracing.py create mode 100644 unit_tests/manifest_server/dependencies/__init__.py rename unit_tests/manifest_server/{ => dependencies}/test_auth.py (98%) diff --git a/airbyte_cdk/manifest_server/dependencies/__init__.py b/airbyte_cdk/manifest_server/dependencies/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/airbyte_cdk/manifest_server/auth.py b/airbyte_cdk/manifest_server/dependencies/auth.py similarity index 100% rename from airbyte_cdk/manifest_server/auth.py rename to airbyte_cdk/manifest_server/dependencies/auth.py diff --git a/airbyte_cdk/manifest_server/dependencies/tracing.py b/airbyte_cdk/manifest_server/dependencies/tracing.py new file mode 100644 index 000000000..8649df100 --- /dev/null +++ b/airbyte_cdk/manifest_server/dependencies/tracing.py @@ -0,0 +1,44 @@ +""" +FastAPI dependencies for the Manifest Server. + +This module contains reusable FastAPI dependencies that can be used across +different routers in the manifest server. +""" + +import logging +from typing import Optional + +import ddtrace +from fastapi import Header + +logger = logging.getLogger(__name__) + + +def apply_trace_tags( + workspace_id: Optional[str] = Header(None, alias="x-workspace-id"), + project_id: Optional[str] = Header(None, alias="x-project-id"), +) -> None: + """FastAPI dependency to apply trace tags from headers to the current span.""" + if not workspace_id and not project_id: + return + + # Log the trace IDs for observability + log_parts = [] + if workspace_id: + log_parts.append(f"workspace_id={workspace_id}") + if project_id: + log_parts.append(f"project_id={project_id}") + + if log_parts: + logger.info(f"Processing request with trace tags: {', '.join(log_parts)}") + + try: + span = ddtrace.tracer.current_span() + if span: + if workspace_id: + span.set_tag("workspace_id", workspace_id) + if project_id: + span.set_tag("project_id", project_id) + except Exception: + # Silently ignore any ddtrace-related errors (e.g. if ddtrace.auto wasn't run) + pass diff --git a/airbyte_cdk/manifest_server/routers/manifest.py b/airbyte_cdk/manifest_server/routers/manifest.py index 035058ec1..2c0243dce 100644 --- a/airbyte_cdk/manifest_server/routers/manifest.py +++ b/airbyte_cdk/manifest_server/routers/manifest.py @@ -27,9 +27,9 @@ StreamReadResponse, StreamTestReadRequest, ) -from ..auth import verify_jwt_token from ..command_processor.processor import ManifestCommandProcessor from ..command_processor.utils import build_catalog, build_source +from ..dependencies import apply_trace_tags, verify_jwt_token def safe_build_source( @@ -59,7 +59,7 @@ def safe_build_source( router = APIRouter( prefix="/manifest", tags=["manifest"], - dependencies=[Depends(verify_jwt_token)], + dependencies=[Depends(verify_jwt_token), Depends(apply_trace_tags)], ) diff --git a/unit_tests/manifest_server/dependencies/__init__.py b/unit_tests/manifest_server/dependencies/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/unit_tests/manifest_server/test_auth.py b/unit_tests/manifest_server/dependencies/test_auth.py similarity index 98% rename from unit_tests/manifest_server/test_auth.py rename to unit_tests/manifest_server/dependencies/test_auth.py index d9a439522..b92109600 100644 --- a/unit_tests/manifest_server/test_auth.py +++ b/unit_tests/manifest_server/dependencies/test_auth.py @@ -7,7 +7,7 @@ from fastapi import HTTPException from fastapi.security import HTTPAuthorizationCredentials -from airbyte_cdk.manifest_server.auth import verify_jwt_token +from airbyte_cdk.manifest_server.dependencies import verify_jwt_token class TestVerifyJwtToken: From ef774a6f54c26b5bc9bcec1f346dca60ae08eadf Mon Sep 17 00:00:00 2001 From: "Pedro S. Lopez" Date: Thu, 4 Sep 2025 10:07:45 -0400 Subject: [PATCH 2/5] full module --- airbyte_cdk/manifest_server/routers/manifest.py | 3 ++- unit_tests/manifest_server/dependencies/test_auth.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/manifest_server/routers/manifest.py b/airbyte_cdk/manifest_server/routers/manifest.py index 2c0243dce..059155238 100644 --- a/airbyte_cdk/manifest_server/routers/manifest.py +++ b/airbyte_cdk/manifest_server/routers/manifest.py @@ -29,7 +29,8 @@ ) from ..command_processor.processor import ManifestCommandProcessor from ..command_processor.utils import build_catalog, build_source -from ..dependencies import apply_trace_tags, verify_jwt_token +from ..dependencies.auth import verify_jwt_token +from ..dependencies.tracing import apply_trace_tags def safe_build_source( diff --git a/unit_tests/manifest_server/dependencies/test_auth.py b/unit_tests/manifest_server/dependencies/test_auth.py index b92109600..d97999284 100644 --- a/unit_tests/manifest_server/dependencies/test_auth.py +++ b/unit_tests/manifest_server/dependencies/test_auth.py @@ -7,7 +7,7 @@ from fastapi import HTTPException from fastapi.security import HTTPAuthorizationCredentials -from airbyte_cdk.manifest_server.dependencies import verify_jwt_token +from airbyte_cdk.manifest_server.dependencies.auth import verify_jwt_token class TestVerifyJwtToken: From 727f297feba940dbc2ef66e8cb8fc255d02da995 Mon Sep 17 00:00:00 2001 From: "Pedro S. Lopez" Date: Thu, 4 Sep 2025 10:30:40 -0400 Subject: [PATCH 3/5] use body attrs rather than headesr --- .../manifest_server/api_models/__init__.py | 2 + .../manifest_server/api_models/manifest.py | 12 ++++++ .../manifest_server/dependencies/tracing.py | 9 ++-- .../{dependencies => helpers}/__init__.py | 0 .../{dependencies => helpers}/auth.py | 0 .../manifest_server/helpers/tracing.py | 43 +++++++++++++++++++ .../manifest_server/routers/manifest.py | 41 ++++++++++++++++-- .../{dependencies => helpers}/__init__.py | 0 .../{dependencies => helpers}/test_auth.py | 2 +- 9 files changed, 100 insertions(+), 9 deletions(-) rename airbyte_cdk/manifest_server/{dependencies => helpers}/__init__.py (100%) rename airbyte_cdk/manifest_server/{dependencies => helpers}/auth.py (100%) create mode 100644 airbyte_cdk/manifest_server/helpers/tracing.py rename unit_tests/manifest_server/{dependencies => helpers}/__init__.py (100%) rename unit_tests/manifest_server/{dependencies => helpers}/test_auth.py (98%) diff --git a/airbyte_cdk/manifest_server/api_models/__init__.py b/airbyte_cdk/manifest_server/api_models/__init__.py index 3cd942dc4..3469fd7e3 100644 --- a/airbyte_cdk/manifest_server/api_models/__init__.py +++ b/airbyte_cdk/manifest_server/api_models/__init__.py @@ -12,6 +12,7 @@ DiscoverResponse, FullResolveRequest, ManifestResponse, + RequestContext, ResolveRequest, StreamTestReadRequest, ) @@ -30,6 +31,7 @@ "ConnectorConfig", "Manifest", # Manifest request/response models + "RequestContext", "FullResolveRequest", "ManifestResponse", "StreamTestReadRequest", diff --git a/airbyte_cdk/manifest_server/api_models/manifest.py b/airbyte_cdk/manifest_server/api_models/manifest.py index a13189763..a17ac9c63 100644 --- a/airbyte_cdk/manifest_server/api_models/manifest.py +++ b/airbyte_cdk/manifest_server/api_models/manifest.py @@ -13,6 +13,13 @@ from .dicts import ConnectorConfig, Manifest +class RequestContext(BaseModel): + """Optional context information for tracing and observability.""" + + workspace_id: Optional[str] = None + project_id: Optional[str] = None + + class StreamTestReadRequest(BaseModel): """Request to test read from a specific stream.""" @@ -24,6 +31,7 @@ class StreamTestReadRequest(BaseModel): record_limit: int = Field(default=100, ge=1, le=5000) page_limit: int = Field(default=5, ge=1, le=20) slice_limit: int = Field(default=5, ge=1, le=20) + context: Optional[RequestContext] = None class CheckRequest(BaseModel): @@ -31,6 +39,7 @@ class CheckRequest(BaseModel): manifest: Manifest config: ConnectorConfig + context: Optional[RequestContext] = None class CheckResponse(BaseModel): @@ -45,6 +54,7 @@ class DiscoverRequest(BaseModel): manifest: Manifest config: ConnectorConfig + context: Optional[RequestContext] = None class DiscoverResponse(BaseModel): @@ -57,6 +67,7 @@ class ResolveRequest(BaseModel): """Request to resolve a manifest.""" manifest: Manifest + context: Optional[RequestContext] = None class ManifestResponse(BaseModel): @@ -71,3 +82,4 @@ class FullResolveRequest(BaseModel): manifest: Manifest config: ConnectorConfig stream_limit: int = Field(default=100, ge=1, le=100) + context: Optional[RequestContext] = None diff --git a/airbyte_cdk/manifest_server/dependencies/tracing.py b/airbyte_cdk/manifest_server/dependencies/tracing.py index 8649df100..9c72ba968 100644 --- a/airbyte_cdk/manifest_server/dependencies/tracing.py +++ b/airbyte_cdk/manifest_server/dependencies/tracing.py @@ -9,16 +9,15 @@ from typing import Optional import ddtrace -from fastapi import Header logger = logging.getLogger(__name__) -def apply_trace_tags( - workspace_id: Optional[str] = Header(None, alias="x-workspace-id"), - project_id: Optional[str] = Header(None, alias="x-project-id"), +def apply_trace_tags_from_context( + workspace_id: Optional[str] = None, + project_id: Optional[str] = None, ) -> None: - """FastAPI dependency to apply trace tags from headers to the current span.""" + """Apply trace tags from context to the current span.""" if not workspace_id and not project_id: return diff --git a/airbyte_cdk/manifest_server/dependencies/__init__.py b/airbyte_cdk/manifest_server/helpers/__init__.py similarity index 100% rename from airbyte_cdk/manifest_server/dependencies/__init__.py rename to airbyte_cdk/manifest_server/helpers/__init__.py diff --git a/airbyte_cdk/manifest_server/dependencies/auth.py b/airbyte_cdk/manifest_server/helpers/auth.py similarity index 100% rename from airbyte_cdk/manifest_server/dependencies/auth.py rename to airbyte_cdk/manifest_server/helpers/auth.py diff --git a/airbyte_cdk/manifest_server/helpers/tracing.py b/airbyte_cdk/manifest_server/helpers/tracing.py new file mode 100644 index 000000000..9c72ba968 --- /dev/null +++ b/airbyte_cdk/manifest_server/helpers/tracing.py @@ -0,0 +1,43 @@ +""" +FastAPI dependencies for the Manifest Server. + +This module contains reusable FastAPI dependencies that can be used across +different routers in the manifest server. +""" + +import logging +from typing import Optional + +import ddtrace + +logger = logging.getLogger(__name__) + + +def apply_trace_tags_from_context( + workspace_id: Optional[str] = None, + project_id: Optional[str] = None, +) -> None: + """Apply trace tags from context to the current span.""" + if not workspace_id and not project_id: + return + + # Log the trace IDs for observability + log_parts = [] + if workspace_id: + log_parts.append(f"workspace_id={workspace_id}") + if project_id: + log_parts.append(f"project_id={project_id}") + + if log_parts: + logger.info(f"Processing request with trace tags: {', '.join(log_parts)}") + + try: + span = ddtrace.tracer.current_span() + if span: + if workspace_id: + span.set_tag("workspace_id", workspace_id) + if project_id: + span.set_tag("project_id", project_id) + except Exception: + # Silently ignore any ddtrace-related errors (e.g. if ddtrace.auto wasn't run) + pass diff --git a/airbyte_cdk/manifest_server/routers/manifest.py b/airbyte_cdk/manifest_server/routers/manifest.py index 059155238..4fefb2129 100644 --- a/airbyte_cdk/manifest_server/routers/manifest.py +++ b/airbyte_cdk/manifest_server/routers/manifest.py @@ -29,8 +29,8 @@ ) from ..command_processor.processor import ManifestCommandProcessor from ..command_processor.utils import build_catalog, build_source -from ..dependencies.auth import verify_jwt_token -from ..dependencies.tracing import apply_trace_tags +from ..helpers.auth import verify_jwt_token +from ..helpers.tracing import apply_trace_tags_from_context def safe_build_source( @@ -60,7 +60,7 @@ def safe_build_source( router = APIRouter( prefix="/manifest", tags=["manifest"], - dependencies=[Depends(verify_jwt_token), Depends(apply_trace_tags)], + dependencies=[Depends(verify_jwt_token)], ) @@ -69,6 +69,13 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse: """ Test reading from a specific stream in the manifest. """ + # Apply trace tags from context if provided + if request.context: + apply_trace_tags_from_context( + workspace_id=request.context.workspace_id, + project_id=request.context.project_id, + ) + config_dict = request.config.model_dump() catalog = build_catalog(request.stream_name) @@ -105,6 +112,13 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse: @router.post("/check", operation_id="check") def check(request: CheckRequest) -> CheckResponse: """Check configuration against a manifest""" + # Apply trace tags from context if provided + if request.context: + apply_trace_tags_from_context( + workspace_id=request.context.workspace_id, + project_id=request.context.project_id, + ) + source = safe_build_source(request.manifest.model_dump(), request.config.model_dump()) runner = ManifestCommandProcessor(source) success, message = runner.check_connection(request.config.model_dump()) @@ -114,6 +128,13 @@ def check(request: CheckRequest) -> CheckResponse: @router.post("/discover", operation_id="discover") def discover(request: DiscoverRequest) -> DiscoverResponse: """Discover streams from a manifest""" + # Apply trace tags from context if provided + if request.context: + apply_trace_tags_from_context( + workspace_id=request.context.workspace_id, + project_id=request.context.project_id, + ) + source = safe_build_source(request.manifest.model_dump(), request.config.model_dump()) runner = ManifestCommandProcessor(source) catalog = runner.discover(request.config.model_dump()) @@ -125,6 +146,13 @@ def discover(request: DiscoverRequest) -> DiscoverResponse: @router.post("/resolve", operation_id="resolve") def resolve(request: ResolveRequest) -> ManifestResponse: """Resolve a manifest to its final configuration.""" + # Apply trace tags from context if provided + if request.context: + apply_trace_tags_from_context( + workspace_id=request.context.workspace_id, + project_id=request.context.project_id, + ) + source = safe_build_source(request.manifest.model_dump(), {}) return ManifestResponse(manifest=Manifest(**source.resolved_manifest)) @@ -136,6 +164,13 @@ def full_resolve(request: FullResolveRequest) -> ManifestResponse: This is a similar operation to resolve, but has an extra step which generates streams from dynamic stream templates if the manifest contains any. This is used when a user clicks the generate streams button on a stream template in the Builder UI """ + # Apply trace tags from context if provided + if request.context: + apply_trace_tags_from_context( + workspace_id=request.context.workspace_id, + project_id=request.context.project_id, + ) + source = safe_build_source(request.manifest.model_dump(), request.config.model_dump()) manifest = {**source.resolved_manifest} streams = manifest.get("streams", []) diff --git a/unit_tests/manifest_server/dependencies/__init__.py b/unit_tests/manifest_server/helpers/__init__.py similarity index 100% rename from unit_tests/manifest_server/dependencies/__init__.py rename to unit_tests/manifest_server/helpers/__init__.py diff --git a/unit_tests/manifest_server/dependencies/test_auth.py b/unit_tests/manifest_server/helpers/test_auth.py similarity index 98% rename from unit_tests/manifest_server/dependencies/test_auth.py rename to unit_tests/manifest_server/helpers/test_auth.py index d97999284..4d48a070f 100644 --- a/unit_tests/manifest_server/dependencies/test_auth.py +++ b/unit_tests/manifest_server/helpers/test_auth.py @@ -7,7 +7,7 @@ from fastapi import HTTPException from fastapi.security import HTTPAuthorizationCredentials -from airbyte_cdk.manifest_server.dependencies.auth import verify_jwt_token +from airbyte_cdk.manifest_server.helpers.auth import verify_jwt_token class TestVerifyJwtToken: From 7bc84f119d73327a53ac3de04d3b968f3011bbdd Mon Sep 17 00:00:00 2001 From: "Pedro S. Lopez" Date: Thu, 4 Sep 2025 10:33:29 -0400 Subject: [PATCH 4/5] rm old string --- airbyte_cdk/manifest_server/helpers/tracing.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/airbyte_cdk/manifest_server/helpers/tracing.py b/airbyte_cdk/manifest_server/helpers/tracing.py index 9c72ba968..ef36f2f1c 100644 --- a/airbyte_cdk/manifest_server/helpers/tracing.py +++ b/airbyte_cdk/manifest_server/helpers/tracing.py @@ -1,10 +1,3 @@ -""" -FastAPI dependencies for the Manifest Server. - -This module contains reusable FastAPI dependencies that can be used across -different routers in the manifest server. -""" - import logging from typing import Optional From 73dffd023c55c20b432f4386a6ab91712dd0201b Mon Sep 17 00:00:00 2001 From: "Pedro S. Lopez" Date: Thu, 4 Sep 2025 11:06:31 -0400 Subject: [PATCH 5/5] rm moved file --- .../manifest_server/dependencies/tracing.py | 43 ------------------- 1 file changed, 43 deletions(-) delete mode 100644 airbyte_cdk/manifest_server/dependencies/tracing.py diff --git a/airbyte_cdk/manifest_server/dependencies/tracing.py b/airbyte_cdk/manifest_server/dependencies/tracing.py deleted file mode 100644 index 9c72ba968..000000000 --- a/airbyte_cdk/manifest_server/dependencies/tracing.py +++ /dev/null @@ -1,43 +0,0 @@ -""" -FastAPI dependencies for the Manifest Server. - -This module contains reusable FastAPI dependencies that can be used across -different routers in the manifest server. -""" - -import logging -from typing import Optional - -import ddtrace - -logger = logging.getLogger(__name__) - - -def apply_trace_tags_from_context( - workspace_id: Optional[str] = None, - project_id: Optional[str] = None, -) -> None: - """Apply trace tags from context to the current span.""" - if not workspace_id and not project_id: - return - - # Log the trace IDs for observability - log_parts = [] - if workspace_id: - log_parts.append(f"workspace_id={workspace_id}") - if project_id: - log_parts.append(f"project_id={project_id}") - - if log_parts: - logger.info(f"Processing request with trace tags: {', '.join(log_parts)}") - - try: - span = ddtrace.tracer.current_span() - if span: - if workspace_id: - span.set_tag("workspace_id", workspace_id) - if project_id: - span.set_tag("project_id", project_id) - except Exception: - # Silently ignore any ddtrace-related errors (e.g. if ddtrace.auto wasn't run) - pass