Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airbyte_cdk/manifest_server/api_models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
DiscoverResponse,
FullResolveRequest,
ManifestResponse,
RequestContext,
ResolveRequest,
StreamTestReadRequest,
)
Expand All @@ -30,6 +31,7 @@
"ConnectorConfig",
"Manifest",
# Manifest request/response models
"RequestContext",
"FullResolveRequest",
"ManifestResponse",
"StreamTestReadRequest",
Expand Down
12 changes: 12 additions & 0 deletions airbyte_cdk/manifest_server/api_models/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@
from .dicts import ConnectorConfig, Manifest


class RequestContext(BaseModel):
"""Optional context information for tracing and observability."""

workspace_id: Optional[str] = None
project_id: Optional[str] = None


class StreamTestReadRequest(BaseModel):
"""Request to test read from a specific stream."""

Expand All @@ -24,13 +31,15 @@ class StreamTestReadRequest(BaseModel):
record_limit: int = Field(default=100, ge=1, le=5000)
page_limit: int = Field(default=5, ge=1, le=20)
slice_limit: int = Field(default=5, ge=1, le=20)
context: Optional[RequestContext] = None


class CheckRequest(BaseModel):
"""Request to check a manifest."""

manifest: Manifest
config: ConnectorConfig
context: Optional[RequestContext] = None


class CheckResponse(BaseModel):
Expand All @@ -45,6 +54,7 @@ class DiscoverRequest(BaseModel):

manifest: Manifest
config: ConnectorConfig
context: Optional[RequestContext] = None


class DiscoverResponse(BaseModel):
Expand All @@ -57,6 +67,7 @@ class ResolveRequest(BaseModel):
"""Request to resolve a manifest."""

manifest: Manifest
context: Optional[RequestContext] = None


class ManifestResponse(BaseModel):
Expand All @@ -71,3 +82,4 @@ class FullResolveRequest(BaseModel):
manifest: Manifest
config: ConnectorConfig
stream_limit: int = Field(default=100, ge=1, le=100)
context: Optional[RequestContext] = None
Empty file.
36 changes: 36 additions & 0 deletions airbyte_cdk/manifest_server/helpers/tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import logging
from typing import Optional

import ddtrace

logger = logging.getLogger(__name__)


def apply_trace_tags_from_context(
workspace_id: Optional[str] = None,
project_id: Optional[str] = None,
) -> None:
"""Apply trace tags from context to the current span."""
if not workspace_id and not project_id:
return

# Log the trace IDs for observability
log_parts = []
if workspace_id:
log_parts.append(f"workspace_id={workspace_id}")
if project_id:
log_parts.append(f"project_id={project_id}")

if log_parts:
logger.info(f"Processing request with trace tags: {', '.join(log_parts)}")

try:
span = ddtrace.tracer.current_span()
if span:
if workspace_id:
span.set_tag("workspace_id", workspace_id)
if project_id:
span.set_tag("project_id", project_id)
except Exception:
# Silently ignore any ddtrace-related errors (e.g. if ddtrace.auto wasn't run)
pass
38 changes: 37 additions & 1 deletion airbyte_cdk/manifest_server/routers/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@
StreamReadResponse,
StreamTestReadRequest,
)
from ..auth import verify_jwt_token
from ..command_processor.processor import ManifestCommandProcessor
from ..command_processor.utils import build_catalog, build_source
from ..helpers.auth import verify_jwt_token
from ..helpers.tracing import apply_trace_tags_from_context


def safe_build_source(
Expand Down Expand Up @@ -68,6 +69,13 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse:
"""
Test reading from a specific stream in the manifest.
"""
# Apply trace tags from context if provided
if request.context:
apply_trace_tags_from_context(
workspace_id=request.context.workspace_id,
project_id=request.context.project_id,
)

config_dict = request.config.model_dump()

catalog = build_catalog(request.stream_name)
Expand Down Expand Up @@ -104,6 +112,13 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse:
@router.post("/check", operation_id="check")
def check(request: CheckRequest) -> CheckResponse:
"""Check configuration against a manifest"""
# Apply trace tags from context if provided
if request.context:
apply_trace_tags_from_context(
workspace_id=request.context.workspace_id,
project_id=request.context.project_id,
)

source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
runner = ManifestCommandProcessor(source)
success, message = runner.check_connection(request.config.model_dump())
Expand All @@ -113,6 +128,13 @@ def check(request: CheckRequest) -> CheckResponse:
@router.post("/discover", operation_id="discover")
def discover(request: DiscoverRequest) -> DiscoverResponse:
"""Discover streams from a manifest"""
# Apply trace tags from context if provided
if request.context:
apply_trace_tags_from_context(
workspace_id=request.context.workspace_id,
project_id=request.context.project_id,
)

source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
runner = ManifestCommandProcessor(source)
catalog = runner.discover(request.config.model_dump())
Expand All @@ -124,6 +146,13 @@ def discover(request: DiscoverRequest) -> DiscoverResponse:
@router.post("/resolve", operation_id="resolve")
def resolve(request: ResolveRequest) -> ManifestResponse:
"""Resolve a manifest to its final configuration."""
# Apply trace tags from context if provided
if request.context:
apply_trace_tags_from_context(
workspace_id=request.context.workspace_id,
project_id=request.context.project_id,
)

source = safe_build_source(request.manifest.model_dump(), {})
return ManifestResponse(manifest=Manifest(**source.resolved_manifest))

Expand All @@ -135,6 +164,13 @@ def full_resolve(request: FullResolveRequest) -> ManifestResponse:

This is a similar operation to resolve, but has an extra step which generates streams from dynamic stream templates if the manifest contains any. This is used when a user clicks the generate streams button on a stream template in the Builder UI
"""
# Apply trace tags from context if provided
if request.context:
apply_trace_tags_from_context(
workspace_id=request.context.workspace_id,
project_id=request.context.project_id,
)

source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
manifest = {**source.resolved_manifest}
streams = manifest.get("streams", [])
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from fastapi import HTTPException
from fastapi.security import HTTPAuthorizationCredentials

from airbyte_cdk.manifest_server.auth import verify_jwt_token
from airbyte_cdk.manifest_server.helpers.auth import verify_jwt_token


class TestVerifyJwtToken:
Expand Down
Loading