diff --git a/.github/actions/check-docker-tag/action.yml b/.github/actions/check-docker-tag/action.yml new file mode 100644 index 000000000..17e3abeb6 --- /dev/null +++ b/.github/actions/check-docker-tag/action.yml @@ -0,0 +1,28 @@ +name: 'Check Docker Tag Exists' +description: 'Check if a Docker tag exists on DockerHub to prevent overwrites' +inputs: + image_name: + description: 'Docker image name (e.g. airbyte/source-declarative-manifest)' + required: true + tag: + description: 'Docker tag to check' + required: true +runs: + using: "composite" + steps: + - name: "Check for existing tag (${{ inputs.image_name }}:${{ inputs.tag }})" + shell: bash + run: | + image="${{ inputs.image_name }}" + tag_input="${{ inputs.tag }}" + if [ -z "$image" ] || [ -z "$tag_input" ]; then + echo "Error: image_name and tag are required." + exit 1 + fi + tag="${image}:${tag_input}" + echo "Checking if tag '$tag' exists on DockerHub..." + if DOCKER_CLI_EXPERIMENTAL=enabled docker manifest inspect "$tag" > /dev/null 2>&1; then + echo "The tag '$tag' already exists on DockerHub. Skipping publish to prevent overwrite." + exit 1 + fi + echo "No existing tag '$tag' found. Proceeding with publish." \ No newline at end of file diff --git a/.github/workflows/docker-build-check.yml b/.github/workflows/docker-build-check.yml index e6cfcac1b..0cc3036c4 100644 --- a/.github/workflows/docker-build-check.yml +++ b/.github/workflows/docker-build-check.yml @@ -1,4 +1,6 @@ name: Docker Build Check +permissions: + contents: read on: pull_request: @@ -6,8 +8,8 @@ on: - main jobs: - docker-build-check: - name: SDM Docker Image Build # Renamed job to be more descriptive + sdm-docker-build-check: + name: SDM Docker Image Build runs-on: ubuntu-24.04 steps: - name: Checkout code @@ -42,3 +44,29 @@ jobs: push: false tags: airbyte/source-declarative-manifest:pr-${{ github.event.pull_request.number }} outputs: type=image,name=target,annotation-index.org.opencontainers.image.description=SDM Docker image for PR ${{ github.event.pull_request.number }} + + manifest-server-docker-build-check: + name: Manifest Server Docker Image Build + runs-on: ubuntu-24.04 + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up QEMU for multi-platform builds + uses: docker/setup-qemu-action@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Build Manifest Server Docker image for multiple platforms + id: manifest-server-build + uses: docker/build-push-action@v5 + with: + context: . + file: airbyte_cdk/manifest_server/Dockerfile + platforms: linux/amd64,linux/arm64 + push: false + tags: airbyte/manifest-server:pr-${{ github.event.pull_request.number }} + outputs: type=image,name=target,annotation-index.org.opencontainers.image.description=Manifest Server Docker image for PR ${{ github.event.pull_request.number }} diff --git a/.github/workflows/pypi_publish.yml b/.github/workflows/publish.yml similarity index 82% rename from .github/workflows/pypi_publish.yml rename to .github/workflows/publish.yml index dd84023f3..8b0b6abb9 100644 --- a/.github/workflows/pypi_publish.yml +++ b/.github/workflows/publish.yml @@ -6,6 +6,8 @@ # we have to also update the Trusted Publisher settings on PyPI. name: CDK Publish +permissions: + contents: read on: push: @@ -31,6 +33,11 @@ on: type: boolean required: true default: true + publish_manifest_server: + description: "Publish Manifest Server to DockerHub. If true, the workflow will publish the Manifest Server to DockerHub." + type: boolean + required: true + default: true update_connector_builder: description: "Update Connector Builder. If true, the workflow will create a PR to bump the CDK version used by Connector Builder." type: boolean @@ -204,18 +211,10 @@ jobs: - name: "Check for existing tag (version: ${{ env.VERSION || 'none' }} )" if: env.VERSION != '' - run: | - tag="airbyte/source-declarative-manifest:${{ env.VERSION }}" - if [ -z "$tag" ]; then - echo "Error: VERSION is not set. Ensure the tag follows the format 'refs/tags/vX.Y.Z'." - exit 1 - fi - echo "Checking if tag '$tag' exists on DockerHub..." - if DOCKER_CLI_EXPERIMENTAL=enabled docker manifest inspect "$tag" > /dev/null 2>&1; then - echo "The tag '$tag' already exists on DockerHub. Skipping publish to prevent overwrite." - exit 1 - fi - echo "No existing tag '$tag' found. Proceeding with publish." + uses: ./.github/actions/check-docker-tag + with: + image_name: airbyte/source-declarative-manifest + tag: ${{ env.VERSION }} - name: "Build and push (sha tag: '${{ github.sha }}')" # Only run if the version is not set @@ -250,6 +249,90 @@ jobs: tags: | airbyte/source-declarative-manifest:latest + publish_manifest_server: + name: Publish Manifest Server to DockerHub + if: > + (github.event_name == 'push' && + startsWith(github.ref, 'refs/tags/v')) || + (github.event_name == 'workflow_dispatch' && + github.event.inputs.publish_manifest_server == 'true' + ) + runs-on: ubuntu-24.04 + needs: [build] + environment: + name: DockerHub + url: https://hub.docker.com/r/airbyte/manifest-server/tags + env: + VERSION: ${{ needs.build.outputs.VERSION }} + IS_PRERELEASE: ${{ needs.build.outputs.IS_PRERELEASE }} + + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + # We need to download the build artifact again because the previous job was on a different runner + - name: Download Build Artifact + uses: actions/download-artifact@v4 + with: + name: Packages-${{ github.run_id }} + path: dist + + - name: Set up QEMU for multi-platform builds + uses: docker/setup-qemu-action@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Login to Docker Hub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKER_HUB_USERNAME }} + password: ${{ secrets.DOCKER_HUB_PASSWORD }} + + - name: "Check for existing tag (version: ${{ env.VERSION || 'none' }} )" + if: env.VERSION != '' + uses: ./.github/actions/check-docker-tag + with: + image_name: airbyte/manifest-server + tag: ${{ env.VERSION }} + + - name: "Build and push (sha tag: '${{ github.sha }}')" + # Only run if the version is not set + if: env.VERSION == '' + uses: docker/build-push-action@v5 + with: + context: . + file: airbyte_cdk/manifest_server/Dockerfile + platforms: linux/amd64,linux/arm64 + push: true + tags: | + airbyte/manifest-server:${{ github.sha }} + + - name: "Build and push (version tag: ${{ env.VERSION || 'none'}})" + # Only run if the version is set + if: env.VERSION != '' + uses: docker/build-push-action@v5 + with: + context: . + file: airbyte_cdk/manifest_server/Dockerfile + platforms: linux/amd64,linux/arm64 + push: true + tags: | + airbyte/manifest-server:${{ env.VERSION }} + + - name: Build and push ('latest' tag) + # Only run if version is set and IS_PRERELEASE is false + if: env.VERSION != '' && env.IS_PRERELEASE == 'false' + uses: docker/build-push-action@v5 + with: + context: . + file: airbyte_cdk/manifest_server/Dockerfile + platforms: linux/amd64,linux/arm64 + push: true + tags: | + airbyte/manifest-server:latest + update-connector-builder: # Create a PR against the Builder, to update the CDK version that it uses. # In the future, Builder may use the SDM docker image instead of the Python CDK package. diff --git a/airbyte_cdk/manifest_server/Dockerfile b/airbyte_cdk/manifest_server/Dockerfile new file mode 100644 index 000000000..483a58355 --- /dev/null +++ b/airbyte_cdk/manifest_server/Dockerfile @@ -0,0 +1,45 @@ +# Dockerfile for the Airbyte Manifest Server. +# +# This Dockerfile should be built from the root of the repository. +# +# Example: +# docker build -f airbyte_cdk/manifest_server/Dockerfile -t airbyte/manifest-server . + +FROM python:3.12-slim-bookworm + +# Install git (needed for dynamic versioning) and poetry +RUN apt-get update && \ + apt-get install -y git && \ + rm -rf /var/lib/apt/lists/* && \ + pip install poetry==1.8.3 + +# Configure poetry to not create virtual environments and disable interactive mode +ENV POETRY_NO_INTERACTION=1 \ + POETRY_CACHE_DIR=/tmp/poetry_cache + +WORKDIR /app + +# Copy poetry files (build from project root) +COPY pyproject.toml poetry.lock ./ + +# Copy the project source code (needed for dynamic versioning) +COPY . /app + +# Install dependencies and package directly to system Python +RUN --mount=type=cache,target=$POETRY_CACHE_DIR \ + poetry config virtualenvs.create false && \ + poetry install --extras manifest-server + +# Create a non-root user and group +RUN groupadd --gid 1000 airbyte && \ + useradd --uid 1000 --gid airbyte --shell /bin/bash --create-home airbyte + +# Change ownership +RUN chown -R airbyte:airbyte /app + +# Run app as non-root user +USER airbyte:airbyte + +EXPOSE 8080 + +CMD ["uvicorn", "airbyte_cdk.manifest_server.app:app", "--host", "0.0.0.0", "--port", "8080"] \ No newline at end of file diff --git a/airbyte_cdk/manifest_server/README.md b/airbyte_cdk/manifest_server/README.md new file mode 100644 index 000000000..0777c70c7 --- /dev/null +++ b/airbyte_cdk/manifest_server/README.md @@ -0,0 +1,142 @@ +# Manifest Server + +An HTTP server for running Airbyte declarative connectors via their manifest files. + +## Quick Start + +### Installation + +The manifest server is available as an extra dependency: + +```bash +# Using Poetry (preferred) +poetry install --extras manifest-server + +# Using pip +pip install airbyte-cdk[manifest-server] + +# Using uv +uv pip install 'airbyte-cdk[manifest-server]' +``` + +### Running the Server + +```bash +# Start the server (default port 8000) +manifest-server start + +# Start on a specific port +manifest-server start --port 8080 + +# Or using Python module +python -m airbyte_cdk.manifest_server.cli.run start +``` + +The server will start on `http://localhost:8000` by default. + +## API Endpoints + +### `/v1/manifest/test_read` +Test reading from a specific stream in the manifest. + +**POST** - Test stream reading with configurable limits for records, pages, and slices. + +### `/v1/manifest/check` +Check configuration against a manifest. + +**POST** - Validates connector configuration and returns success/failure status with message. + +### `/v1/manifest/discover` +Discover streams from a manifest. + +**POST** - Returns the catalog of available streams from the manifest. + +### `/v1/manifest/resolve` +Resolve a manifest to its final configuration. + +**POST** - Returns the resolved manifest without dynamic stream generation. + +### `/v1/manifest/full_resolve` +Fully resolve a manifest including dynamic streams. + +**POST** - Generates dynamic streams up to specified limits and includes them in the resolved manifest. + +## Custom Components + +The manifest server supports custom Python components, but this feature is **disabled by default** for security reasons. + +### Enabling Custom Components + +To allow custom Python components in your manifest files, set the environment variable: +```bash +export AIRBYTE_ENABLE_UNSAFE_CODE=true +``` + +## Authentication + +The manifest server supports optional JWT bearer token authentication: + +### Configuration +Set the environment variable to enable authentication: +```bash +export AB_JWT_SIGNATURE_SECRET="your-jwt-secret-key" +``` + +### Usage +When authentication is enabled, include a valid JWT token in the Authorization header: +```bash +curl -H "Authorization: Bearer " \ + http://localhost:8000/v1/manifest/test_read +``` + +### Behavior +- **Without `AB_JWT_SIGNATURE_SECRET`**: All requests pass through +- **With `AB_JWT_SIGNATURE_SECRET`**: Requires valid JWT bearer token using HS256 algorithm + +## OpenAPI Specification + +The manifest server provides an OpenAPI specification for API client generation: + +### Generating the OpenAPI Spec +```bash +# Generate OpenAPI YAML (default location) +manifest-server generate-openapi + +# Generate to custom location +manifest-server generate-openapi --output /path/to/openapi.yaml +``` + +The generated OpenAPI specification is consumed by other applications and tools to: +- Generate API clients in various programming languages +- Create SDK bindings for the manifest server +- Provide API documentation and validation +- Enable integration with API development tools + +### Interactive API Documentation + +When running, interactive API documentation is available at: +- Swagger UI: `http://localhost:8000/docs` +- ReDoc: `http://localhost:8000/redoc` + +## Testing + +Run the manifest server tests from the repository root: + +```bash +# Run all manifest server tests +poetry run pytest unit_tests/manifest_server/ -v +``` + +## Docker + +The manifest server can be containerized using the included Dockerfile. Build from the repository root: + +```bash +# Build from repository root (not from manifest_server subdirectory) +docker build -f airbyte_cdk/manifest_server/Dockerfile -t manifest-server . + +# Run the container +docker run -p 8080:8080 manifest-server +``` + +Note: The container runs on port 8080 by default. \ No newline at end of file diff --git a/airbyte_cdk/manifest_server/__init__.py b/airbyte_cdk/manifest_server/__init__.py new file mode 100644 index 000000000..a57e99013 --- /dev/null +++ b/airbyte_cdk/manifest_server/__init__.py @@ -0,0 +1,3 @@ +""" +.. include:: ./README.md +""" diff --git a/airbyte_cdk/manifest_server/api_models/__init__.py b/airbyte_cdk/manifest_server/api_models/__init__.py new file mode 100644 index 000000000..3cd942dc4 --- /dev/null +++ b/airbyte_cdk/manifest_server/api_models/__init__.py @@ -0,0 +1,49 @@ +""" +API Models for the Manifest Server Service. + +This package contains all Pydantic models used for API requests and responses. +""" + +from .dicts import ConnectorConfig, Manifest +from .manifest import ( + CheckRequest, + CheckResponse, + DiscoverRequest, + DiscoverResponse, + FullResolveRequest, + ManifestResponse, + ResolveRequest, + StreamTestReadRequest, +) +from .stream import ( + AuxiliaryRequest, + HttpRequest, + HttpResponse, + LogMessage, + StreamReadPages, + StreamReadResponse, + StreamReadSlices, +) + +__all__ = [ + # Typed Dicts + "ConnectorConfig", + "Manifest", + # Manifest request/response models + "FullResolveRequest", + "ManifestResponse", + "StreamTestReadRequest", + "ResolveRequest", + "CheckRequest", + "CheckResponse", + "DiscoverRequest", + "DiscoverResponse", + # Stream models + "AuxiliaryRequest", + "HttpRequest", + "HttpResponse", + "LogMessage", + "StreamReadResponse", + "StreamReadPages", + "StreamReadSlices", +] diff --git a/airbyte_cdk/manifest_server/api_models/capabilities.py b/airbyte_cdk/manifest_server/api_models/capabilities.py new file mode 100644 index 000000000..d357f66e8 --- /dev/null +++ b/airbyte_cdk/manifest_server/api_models/capabilities.py @@ -0,0 +1,7 @@ +from pydantic import BaseModel + + +class CapabilitiesResponse(BaseModel): + """Capabilities of the manifest server.""" + + custom_code_execution: bool diff --git a/airbyte_cdk/manifest_server/api_models/dicts.py b/airbyte_cdk/manifest_server/api_models/dicts.py new file mode 100644 index 000000000..ab67e96e8 --- /dev/null +++ b/airbyte_cdk/manifest_server/api_models/dicts.py @@ -0,0 +1,17 @@ +""" +Common API models shared across different endpoints. +""" + +from pydantic import BaseModel, ConfigDict + + +class Manifest(BaseModel): + """Base manifest model. Allows client generation to replace with proper JsonNode types.""" + + model_config = ConfigDict(extra="allow") + + +class ConnectorConfig(BaseModel): + """Base connector configuration model. Allows client generation to replace with proper JsonNode types.""" + + model_config = ConfigDict(extra="allow") diff --git a/airbyte_cdk/manifest_server/api_models/manifest.py b/airbyte_cdk/manifest_server/api_models/manifest.py new file mode 100644 index 000000000..a13189763 --- /dev/null +++ b/airbyte_cdk/manifest_server/api_models/manifest.py @@ -0,0 +1,73 @@ +""" +Manifest-related API models. + +These models define the request and response structures for manifest operations +like reading, resolving, and full resolution. +""" + +from typing import Any, List, Optional + +from airbyte_protocol_dataclasses.models import AirbyteCatalog +from pydantic import BaseModel, Field + +from .dicts import ConnectorConfig, Manifest + + +class StreamTestReadRequest(BaseModel): + """Request to test read from a specific stream.""" + + manifest: Manifest + config: ConnectorConfig + stream_name: str + state: List[Any] = Field(default_factory=list) + custom_components_code: Optional[str] = None + record_limit: int = Field(default=100, ge=1, le=5000) + page_limit: int = Field(default=5, ge=1, le=20) + slice_limit: int = Field(default=5, ge=1, le=20) + + +class CheckRequest(BaseModel): + """Request to check a manifest.""" + + manifest: Manifest + config: ConnectorConfig + + +class CheckResponse(BaseModel): + """Response to check a manifest.""" + + success: bool + message: Optional[str] = None + + +class DiscoverRequest(BaseModel): + """Request to discover a manifest.""" + + manifest: Manifest + config: ConnectorConfig + + +class DiscoverResponse(BaseModel): + """Response to discover a manifest.""" + + catalog: AirbyteCatalog + + +class ResolveRequest(BaseModel): + """Request to resolve a manifest.""" + + manifest: Manifest + + +class ManifestResponse(BaseModel): + """Response containing a manifest.""" + + manifest: Manifest + + +class FullResolveRequest(BaseModel): + """Request to fully resolve a manifest.""" + + manifest: Manifest + config: ConnectorConfig + stream_limit: int = Field(default=100, ge=1, le=100) diff --git a/airbyte_cdk/manifest_server/api_models/stream.py b/airbyte_cdk/manifest_server/api_models/stream.py new file mode 100644 index 000000000..4ffcac013 --- /dev/null +++ b/airbyte_cdk/manifest_server/api_models/stream.py @@ -0,0 +1,76 @@ +""" +Stream-related API models. + +These models define the structure for stream reading operations and responses. +They accurately reflect the runtime types returned by the CDK, particularly +fixing type mismatches like slice_descriptor being a string rather than an object. +""" + +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel + + +class HttpRequest(BaseModel): + """HTTP request details.""" + + url: str + headers: Optional[Dict[str, Any]] + http_method: str + body: Optional[str] = None + + +class HttpResponse(BaseModel): + """HTTP response details.""" + + status: int + body: Optional[str] = None + headers: Optional[Dict[str, Any]] = None + + +class LogMessage(BaseModel): + """Log message from stream processing.""" + + message: str + level: str + internal_message: Optional[str] = None + stacktrace: Optional[str] = None + + +class AuxiliaryRequest(BaseModel): + """Auxiliary HTTP request made during stream processing.""" + + title: str + type: str + description: str + request: HttpRequest + response: HttpResponse + + +class StreamReadPages(BaseModel): + """Pages of data read from a stream slice.""" + + records: List[object] + request: Optional[HttpRequest] = None + response: Optional[HttpResponse] = None + + +class StreamReadSlices(BaseModel): + """Slices of data read from a stream.""" + + pages: List[StreamReadPages] + slice_descriptor: Optional[str] # This is actually a string at runtime, not Dict[str, Any] + state: Optional[List[Dict[str, Any]]] = None + auxiliary_requests: Optional[List[AuxiliaryRequest]] = None + + +class StreamReadResponse(BaseModel): + """Complete stream read response with properly typed fields.""" + + logs: List[LogMessage] + slices: List[StreamReadSlices] + test_read_limit_reached: bool + auxiliary_requests: List[AuxiliaryRequest] + inferred_schema: Optional[Dict[str, Any]] + inferred_datetime_formats: Optional[Dict[str, str]] + latest_config_update: Optional[Dict[str, Any]] diff --git a/airbyte_cdk/manifest_server/app.py b/airbyte_cdk/manifest_server/app.py new file mode 100644 index 000000000..2171f93c4 --- /dev/null +++ b/airbyte_cdk/manifest_server/app.py @@ -0,0 +1,17 @@ +from fastapi import FastAPI + +from .routers import capabilities, health, manifest + +app = FastAPI( + title="Manifest Server", + description="A service for running low-code Airbyte connectors", + version="0.1.0", + contact={ + "name": "Airbyte", + "url": "https://airbyte.com", + }, +) + +app.include_router(health.router) +app.include_router(capabilities.router) +app.include_router(manifest.router, prefix="/v1") diff --git a/airbyte_cdk/manifest_server/auth.py b/airbyte_cdk/manifest_server/auth.py new file mode 100644 index 000000000..cc422b934 --- /dev/null +++ b/airbyte_cdk/manifest_server/auth.py @@ -0,0 +1,43 @@ +import os +from typing import Optional + +import jwt +from fastapi import HTTPException, Security, status +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer + +security = HTTPBearer(auto_error=False) + + +def verify_jwt_token( + credentials: Optional[HTTPAuthorizationCredentials] = Security(security), +) -> None: + """ + Verify JWT token if AB_JWT_SIGNATURE_SECRET is set, otherwise allow through. + + Args: + credentials: Bearer token credentials from request header + + Raises: + HTTPException: If token is invalid or missing when secret is configured + """ + jwt_secret = os.getenv("AB_JWT_SIGNATURE_SECRET") + + # If no secret is configured, allow all requests through + if not jwt_secret: + return + + if not credentials: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Bearer token required", + headers={"WWW-Authenticate": "Bearer"}, + ) + + try: + jwt.decode(credentials.credentials, jwt_secret, algorithms=["HS256"]) + except jwt.InvalidTokenError: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid token", + headers={"WWW-Authenticate": "Bearer"}, + ) diff --git a/airbyte_cdk/manifest_server/cli/__init__.py b/airbyte_cdk/manifest_server/cli/__init__.py new file mode 100644 index 000000000..4cc980fb3 --- /dev/null +++ b/airbyte_cdk/manifest_server/cli/__init__.py @@ -0,0 +1,5 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +"""The `airbyte_cdk.manifest_server.cli` module provides a standalone CLI for the Airbyte CDK Manifest Server. + +This CLI enables running a FastAPI server for managing and executing Airbyte declarative manifests. +""" diff --git a/airbyte_cdk/manifest_server/cli/_common.py b/airbyte_cdk/manifest_server/cli/_common.py new file mode 100644 index 000000000..f6e672aaf --- /dev/null +++ b/airbyte_cdk/manifest_server/cli/_common.py @@ -0,0 +1,28 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +"""Common utilities for manifest server CLI commands.""" + +import sys + +import rich_click as click + +# Import server dependencies with graceful fallback +try: + import fastapi # noqa: F401 + import uvicorn # noqa: F401 + + FASTAPI_AVAILABLE = True +except ImportError: + FASTAPI_AVAILABLE = False + + +def check_manifest_server_dependencies() -> None: + """Check if manifest-server dependencies are installed.""" + if not FASTAPI_AVAILABLE: + click.echo( + "❌ Manifest runner dependencies not found. Please install with:\n\n" + " pip install airbyte-cdk[manifest-server]\n" + " # or\n" + " poetry install --extras manifest-server\n", + err=True, + ) + sys.exit(1) diff --git a/airbyte_cdk/manifest_server/cli/_info.py b/airbyte_cdk/manifest_server/cli/_info.py new file mode 100644 index 000000000..e42fbaebc --- /dev/null +++ b/airbyte_cdk/manifest_server/cli/_info.py @@ -0,0 +1,30 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +"""Info command for the manifest server CLI.""" + +from typing import Any, Optional + +import rich_click as click + +# Import server dependencies with graceful fallback +fastapi: Optional[Any] = None +uvicorn: Optional[Any] = None + +try: + import fastapi # type: ignore[no-redef] + import uvicorn # type: ignore[no-redef] + + FASTAPI_AVAILABLE = True +except ImportError: + FASTAPI_AVAILABLE = False + + +@click.command() +def info() -> None: + """Show manifest server information and status.""" + if FASTAPI_AVAILABLE and fastapi is not None and uvicorn is not None: + click.echo("✅ Manifest runner dependencies are installed") + click.echo(f" FastAPI version: {fastapi.__version__}") + click.echo(f" Uvicorn version: {uvicorn.__version__}") + else: + click.echo("❌ Manifest runner dependencies not installed") + click.echo(" Install with: pip install airbyte-cdk[manifest-server]") diff --git a/airbyte_cdk/manifest_server/cli/_openapi.py b/airbyte_cdk/manifest_server/cli/_openapi.py new file mode 100644 index 000000000..4143ae336 --- /dev/null +++ b/airbyte_cdk/manifest_server/cli/_openapi.py @@ -0,0 +1,43 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +"""Generate OpenAPI command for the manifest server CLI.""" + +from pathlib import Path + +import rich_click as click +from yaml import dump + +from ._common import check_manifest_server_dependencies + + +@click.command("generate-openapi") +@click.option( + "--output", + "-o", + default="airbyte_cdk/manifest_server/openapi.yaml", + help="Output path for the OpenAPI YAML file", + show_default=True, +) +def generate_openapi(output: str) -> None: + """Generate OpenAPI YAML specification for the manifest server.""" + check_manifest_server_dependencies() + + # Import the FastAPI app + from airbyte_cdk.manifest_server.app import app + + # Get OpenAPI schema + openapi_schema = app.openapi() + + # Ensure output directory exists + output_path = Path(output) + output_path.parent.mkdir(parents=True, exist_ok=True) + + # Write YAML file with header comment + with open(output_path, "w") as f: + f.write("# This file is auto-generated. Do not edit manually.\n") + f.write("# To regenerate, run: manifest-server generate-openapi\n") + f.write("\n") + dump(openapi_schema, f, default_flow_style=False, sort_keys=False) + + click.echo(f"✅ OpenAPI YAML generated at: {output_path}") + click.echo(f" Title: {openapi_schema.get('info', {}).get('title', 'N/A')}") + click.echo(f" Version: {openapi_schema.get('info', {}).get('version', 'N/A')}") diff --git a/airbyte_cdk/manifest_server/cli/_start.py b/airbyte_cdk/manifest_server/cli/_start.py new file mode 100644 index 000000000..612a2d59d --- /dev/null +++ b/airbyte_cdk/manifest_server/cli/_start.py @@ -0,0 +1,38 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +"""Start command for the manifest server CLI.""" + +import rich_click as click + +from ._common import check_manifest_server_dependencies + + +@click.command() +@click.option( + "--host", + default="127.0.0.1", + help="Host to bind the server to", + show_default=True, +) +@click.option( + "--port", + default=8000, + help="Port to bind the server to", + show_default=True, +) +@click.option( + "--reload", + is_flag=True, + help="Enable auto-reload for development", +) +def start(host: str, port: int, reload: bool) -> None: + """Start the FastAPI manifest server server.""" + check_manifest_server_dependencies() + + # Import and use the main server function + from airbyte_cdk.manifest_server.main import run_server + + run_server( + host=host, + port=port, + reload=reload, + ) diff --git a/airbyte_cdk/manifest_server/cli/run.py b/airbyte_cdk/manifest_server/cli/run.py new file mode 100644 index 000000000..3b3140685 --- /dev/null +++ b/airbyte_cdk/manifest_server/cli/run.py @@ -0,0 +1,59 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +"""Standalone CLI for the Airbyte CDK Manifest Server. + +This CLI provides commands for running and managing the manifest server server. + +**Installation:** + +To use the manifest-server functionality, install the CDK with the manifest-server extra: + +```bash +pip install airbyte-cdk[manifest-server] +# or +poetry install --extras manifest-server +``` + +**Usage:** + +```bash +manifest-server start --port 8000 +manifest-server info +manifest-server --help +``` +""" + +import rich_click as click + +from ._info import info +from ._openapi import generate_openapi +from ._start import start + + +@click.group( + help=__doc__.replace("\n", "\n\n"), # Render docstring as help text (markdown) + invoke_without_command=True, +) +@click.pass_context +def cli( + ctx: click.Context, +) -> None: + """Airbyte Manifest Server CLI.""" + + if ctx.invoked_subcommand is None: + # If no subcommand is provided, show the help message. + click.echo(ctx.get_help()) + ctx.exit() + + +cli.add_command(start) +cli.add_command(info) +cli.add_command(generate_openapi) + + +def run() -> None: + """Entry point for the manifest-server CLI.""" + cli() + + +if __name__ == "__main__": + run() diff --git a/airbyte_cdk/manifest_server/command_processor/__init__.py b/airbyte_cdk/manifest_server/command_processor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/airbyte_cdk/manifest_server/command_processor/processor.py b/airbyte_cdk/manifest_server/command_processor/processor.py new file mode 100644 index 000000000..16d14a799 --- /dev/null +++ b/airbyte_cdk/manifest_server/command_processor/processor.py @@ -0,0 +1,122 @@ +import logging +from typing import Any, List, Mapping, Optional, Tuple + +from airbyte_protocol_dataclasses.models import ( + AirbyteCatalog, + Status, +) +from fastapi import HTTPException + +from airbyte_cdk.connector_builder.models import StreamRead +from airbyte_cdk.connector_builder.test_reader import TestReader +from airbyte_cdk.entrypoint import AirbyteEntrypoint +from airbyte_cdk.models import ( + AirbyteStateMessage, + ConfiguredAirbyteCatalog, +) +from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( + ConcurrentDeclarativeSource, +) +from airbyte_cdk.test.entrypoint_wrapper import AirbyteEntrypointException, EntrypointOutput + + +class ManifestCommandProcessor: + _source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]] + _logger = logging.getLogger("airbyte.manifest-server") + + def __init__( + self, source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]] + ) -> None: + self._source = source + + def test_read( + self, + config: Mapping[str, Any], + catalog: ConfiguredAirbyteCatalog, + state: List[AirbyteStateMessage], + record_limit: int, + page_limit: int, + slice_limit: int, + ) -> StreamRead: + """ + Test the read method of the source. + """ + + test_read_handler = TestReader( + max_pages_per_slice=page_limit, + max_slices=slice_limit, + max_record_limit=record_limit, + ) + + stream_read = test_read_handler.run_test_read( + source=self._source, + config=config, + configured_catalog=catalog, + state=state, + stream_name=catalog.streams[0].stream.name, + record_limit=record_limit, + ) + + return stream_read + + def check_connection( + self, + config: Mapping[str, Any], + ) -> Tuple[bool, Optional[str]]: + """ + Check the connection to the source. + """ + + spec = self._source.spec(self._logger) + entrypoint = AirbyteEntrypoint(source=self._source) + messages = entrypoint.check(spec, config) + output = EntrypointOutput( + messages=[AirbyteEntrypoint.airbyte_message_to_string(m) for m in messages], + command=["check"], + ) + self._raise_on_trace_message(output) + + status_messages = output.connection_status_messages + if not status_messages or status_messages[-1].connectionStatus is None: + return False, "Connection check did not return a status message" + + connection_status = status_messages[-1].connectionStatus + return ( + connection_status.status == Status.SUCCEEDED, + connection_status.message, + ) + + def discover( + self, + config: Mapping[str, Any], + ) -> Optional[AirbyteCatalog]: + """ + Discover the catalog from the source. + """ + spec = self._source.spec(self._logger) + entrypoint = AirbyteEntrypoint(source=self._source) + messages = entrypoint.discover(spec, config) + output = EntrypointOutput( + messages=[AirbyteEntrypoint.airbyte_message_to_string(m) for m in messages], + command=["discover"], + ) + self._raise_on_trace_message(output) + + try: + catalog_message = output.catalog + return catalog_message.catalog + except ValueError: + # No catalog message found + return None + + def _raise_on_trace_message( + self, + output: EntrypointOutput, + ) -> None: + """ + Raise an exception if a trace message is found. + """ + try: + output.raise_if_errors() + except AirbyteEntrypointException as e: + raise HTTPException(status_code=422, detail=e.message) diff --git a/airbyte_cdk/manifest_server/command_processor/utils.py b/airbyte_cdk/manifest_server/command_processor/utils.py new file mode 100644 index 000000000..125a977c3 --- /dev/null +++ b/airbyte_cdk/manifest_server/command_processor/utils.py @@ -0,0 +1,99 @@ +import copy +from typing import Any, Dict, List, Mapping, Optional + +from airbyte_protocol_dataclasses.models import AirbyteStateMessage + +from airbyte_cdk.models import ( + AirbyteStream, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + SyncMode, +) +from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( + ConcurrentDeclarativeSource, + TestLimits, +) + +SHOULD_NORMALIZE_KEY = "__should_normalize" +SHOULD_MIGRATE_KEY = "__should_migrate" + + +def build_catalog(stream_name: str) -> ConfiguredAirbyteCatalog: + return ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream( + name=stream_name, + json_schema={}, + supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], + ), + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.overwrite, + ) + ] + ) + + +def should_migrate_manifest(manifest: Mapping[str, Any]) -> bool: + """ + Determines whether the manifest should be migrated, + based on the presence of the "__should_migrate" key. + + This flag is set by the UI. + """ + return manifest.get(SHOULD_MIGRATE_KEY, False) + + +def should_normalize_manifest(manifest: Mapping[str, Any]) -> bool: + """ + Determines whether the manifest should be normalized, + based on the presence of the "__should_normalize" key. + + This flag is set by the UI. + """ + return manifest.get(SHOULD_NORMALIZE_KEY, False) + + +def build_source( + manifest: Dict[str, Any], + catalog: Optional[ConfiguredAirbyteCatalog], + config: Mapping[str, Any], + state: Optional[List[AirbyteStateMessage]], + record_limit: Optional[int] = None, + page_limit: Optional[int] = None, + slice_limit: Optional[int] = None, +) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]: + # We enforce a concurrency level of 1 so that the stream is processed on a single thread + # to retain ordering for the grouping of the builder message responses. + definition = copy.deepcopy(manifest) + if "concurrency_level" in definition: + definition["concurrency_level"]["default_concurrency"] = 1 + else: + definition["concurrency_level"] = { + "type": "ConcurrencyLevel", + "default_concurrency": 1, + } + + should_normalize = should_normalize_manifest(manifest) + if should_normalize: + del definition[SHOULD_NORMALIZE_KEY] + + should_migrate = should_migrate_manifest(manifest) + if should_migrate: + del definition[SHOULD_MIGRATE_KEY] + + return ConcurrentDeclarativeSource( + catalog=catalog, + state=state, + source_config=definition, + config=config, + normalize_manifest=should_normalize, + migrate_manifest=should_migrate, + emit_connector_builder_messages=True, + limits=TestLimits( + max_pages_per_slice=page_limit or TestLimits.DEFAULT_MAX_PAGES_PER_SLICE, + max_slices=slice_limit or TestLimits.DEFAULT_MAX_SLICES, + max_records=record_limit or TestLimits.DEFAULT_MAX_RECORDS, + ), + ) diff --git a/airbyte_cdk/manifest_server/main.py b/airbyte_cdk/manifest_server/main.py new file mode 100644 index 000000000..f87df7564 --- /dev/null +++ b/airbyte_cdk/manifest_server/main.py @@ -0,0 +1,24 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +"""Main entry point for the Airbyte Manifest Server server.""" + +import uvicorn + + +def run_server( + host: str = "127.0.0.1", port: int = 8000, reload: bool = False, log_level: str = "info" +) -> None: + """Run the FastAPI server.""" + + print(f"🚀 Starting Airbyte CDK Manifest Server on {host}:{port}") + + uvicorn.run( + "airbyte_cdk.manifest_server.app:app", + host=host, + port=port, + reload=reload, + log_level=log_level, + ) + + +if __name__ == "__main__": + run_server() diff --git a/airbyte_cdk/manifest_server/openapi.yaml b/airbyte_cdk/manifest_server/openapi.yaml new file mode 100644 index 000000000..927414a31 --- /dev/null +++ b/airbyte_cdk/manifest_server/openapi.yaml @@ -0,0 +1,641 @@ +# This file is auto-generated. Do not edit manually. +# To regenerate, run: manifest-server generate-openapi + +openapi: 3.1.0 +info: + title: Manifest Server + description: A service for running low-code Airbyte connectors + contact: + name: Airbyte + url: https://airbyte.com/ + version: 0.1.0 +paths: + /health/: + get: + tags: + - health + summary: Health + operationId: health_health__get + responses: + '200': + description: Successful Response + content: + application/json: + schema: + additionalProperties: + type: string + type: object + title: Response Health Health Get + /capabilities/: + get: + tags: + - capabilities + summary: Get Capabilities + description: "Get the capabilities available for the manifest server.\n\nReturns:\n\ + \ Dict containing the service capabilities including custom code execution\ + \ support." + operationId: getCapabilities + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/CapabilitiesResponse' + /v1/manifest/test_read: + post: + tags: + - manifest + summary: Test Read + description: Test reading from a specific stream in the manifest. + operationId: testRead + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/StreamTestReadRequest' + required: true + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/StreamRead' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + security: + - HTTPBearer: [] + /v1/manifest/check: + post: + tags: + - manifest + summary: Check + description: Check configuration against a manifest + operationId: check + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/CheckRequest' + required: true + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/CheckResponse' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + security: + - HTTPBearer: [] + /v1/manifest/discover: + post: + tags: + - manifest + summary: Discover + description: Discover streams from a manifest + operationId: discover + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/DiscoverRequest' + required: true + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/DiscoverResponse' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + security: + - HTTPBearer: [] + /v1/manifest/resolve: + post: + tags: + - manifest + summary: Resolve + description: Resolve a manifest to its final configuration. + operationId: resolve + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ResolveRequest' + required: true + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/ManifestResponse' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + security: + - HTTPBearer: [] + /v1/manifest/full_resolve: + post: + tags: + - manifest + summary: Full Resolve + description: 'Fully resolve a manifest including dynamic streams. + + + Generates dynamic streams up to the specified limit and includes + + them in the resolved manifest.' + operationId: fullResolve + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/FullResolveRequest' + required: true + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/ManifestResponse' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + security: + - HTTPBearer: [] +components: + schemas: + AirbyteCatalog: + properties: + streams: + items: + $ref: '#/components/schemas/AirbyteStream' + type: array + title: Streams + type: object + required: + - streams + title: AirbyteCatalog + AirbyteStream: + properties: + name: + type: string + title: Name + json_schema: + type: object + title: Json Schema + supported_sync_modes: + items: + $ref: '#/components/schemas/SyncMode' + type: array + title: Supported Sync Modes + source_defined_cursor: + anyOf: + - type: boolean + - type: 'null' + title: Source Defined Cursor + default_cursor_field: + anyOf: + - items: + type: string + type: array + - type: 'null' + title: Default Cursor Field + source_defined_primary_key: + anyOf: + - items: + items: + type: string + type: array + type: array + - type: 'null' + title: Source Defined Primary Key + namespace: + anyOf: + - type: string + - type: 'null' + title: Namespace + is_resumable: + anyOf: + - type: boolean + - type: 'null' + title: Is Resumable + is_file_based: + anyOf: + - type: boolean + - type: 'null' + title: Is File Based + type: object + required: + - name + - json_schema + - supported_sync_modes + title: AirbyteStream + AuxiliaryRequest: + properties: + title: + type: string + title: Title + type: + type: string + title: Type + description: + type: string + title: Description + request: + $ref: '#/components/schemas/HttpRequest' + response: + $ref: '#/components/schemas/HttpResponse' + type: object + required: + - title + - type + - description + - request + - response + title: AuxiliaryRequest + description: Auxiliary HTTP request made during stream processing. + CapabilitiesResponse: + properties: + custom_code_execution: + type: boolean + title: Custom Code Execution + type: object + required: + - custom_code_execution + title: CapabilitiesResponse + description: Capabilities of the manifest server. + CheckRequest: + properties: + manifest: + $ref: '#/components/schemas/Manifest' + config: + $ref: '#/components/schemas/ConnectorConfig' + type: object + required: + - manifest + - config + title: CheckRequest + description: Request to check a manifest. + CheckResponse: + properties: + success: + type: boolean + title: Success + message: + anyOf: + - type: string + - type: 'null' + title: Message + type: object + required: + - success + title: CheckResponse + description: Response to check a manifest. + ConnectorConfig: + properties: {} + additionalProperties: true + type: object + title: ConnectorConfig + description: Base connector configuration model. Allows client generation to + replace with proper JsonNode types. + DiscoverRequest: + properties: + manifest: + $ref: '#/components/schemas/Manifest' + config: + $ref: '#/components/schemas/ConnectorConfig' + type: object + required: + - manifest + - config + title: DiscoverRequest + description: Request to discover a manifest. + DiscoverResponse: + properties: + catalog: + $ref: '#/components/schemas/AirbyteCatalog' + type: object + required: + - catalog + title: DiscoverResponse + description: Response to discover a manifest. + FullResolveRequest: + properties: + manifest: + $ref: '#/components/schemas/Manifest' + config: + $ref: '#/components/schemas/ConnectorConfig' + stream_limit: + type: integer + maximum: 100.0 + minimum: 1.0 + title: Stream Limit + default: 100 + type: object + required: + - manifest + - config + title: FullResolveRequest + description: Request to fully resolve a manifest. + HTTPValidationError: + properties: + detail: + items: + $ref: '#/components/schemas/ValidationError' + type: array + title: Detail + type: object + title: HTTPValidationError + HttpRequest: + properties: + url: + type: string + title: Url + headers: + anyOf: + - type: object + - type: 'null' + title: Headers + http_method: + type: string + title: Http Method + body: + anyOf: + - type: string + - type: 'null' + title: Body + type: object + required: + - url + - headers + - http_method + title: HttpRequest + description: HTTP request details. + HttpResponse: + properties: + status: + type: integer + title: Status + body: + anyOf: + - type: string + - type: 'null' + title: Body + headers: + anyOf: + - type: object + - type: 'null' + title: Headers + type: object + required: + - status + title: HttpResponse + description: HTTP response details. + LogMessage: + properties: + message: + type: string + title: Message + level: + type: string + title: Level + internal_message: + anyOf: + - type: string + - type: 'null' + title: Internal Message + stacktrace: + anyOf: + - type: string + - type: 'null' + title: Stacktrace + type: object + required: + - message + - level + title: LogMessage + description: Log message from stream processing. + Manifest: + properties: {} + additionalProperties: true + type: object + title: Manifest + description: Base manifest model. Allows client generation to replace with proper + JsonNode types. + ManifestResponse: + properties: + manifest: + $ref: '#/components/schemas/Manifest' + type: object + required: + - manifest + title: ManifestResponse + description: Response containing a manifest. + ResolveRequest: + properties: + manifest: + $ref: '#/components/schemas/Manifest' + type: object + required: + - manifest + title: ResolveRequest + description: Request to resolve a manifest. + StreamRead: + properties: + logs: + items: + $ref: '#/components/schemas/LogMessage' + type: array + title: Logs + slices: + items: + $ref: '#/components/schemas/StreamReadSlices' + type: array + title: Slices + test_read_limit_reached: + type: boolean + title: Test Read Limit Reached + auxiliary_requests: + items: + $ref: '#/components/schemas/AuxiliaryRequest' + type: array + title: Auxiliary Requests + inferred_schema: + anyOf: + - type: object + - type: 'null' + title: Inferred Schema + inferred_datetime_formats: + anyOf: + - additionalProperties: + type: string + type: object + - type: 'null' + title: Inferred Datetime Formats + latest_config_update: + anyOf: + - type: object + - type: 'null' + title: Latest Config Update + type: object + required: + - logs + - slices + - test_read_limit_reached + - auxiliary_requests + - inferred_schema + - inferred_datetime_formats + - latest_config_update + title: StreamRead + description: Complete stream read response with properly typed fields. + StreamReadPages: + properties: + records: + items: {} + type: array + title: Records + request: + anyOf: + - $ref: '#/components/schemas/HttpRequest' + - type: 'null' + response: + anyOf: + - $ref: '#/components/schemas/HttpResponse' + - type: 'null' + type: object + required: + - records + title: StreamReadPages + description: Pages of data read from a stream slice. + StreamReadSlices: + properties: + pages: + items: + $ref: '#/components/schemas/StreamReadPages' + type: array + title: Pages + slice_descriptor: + anyOf: + - type: string + - type: 'null' + title: Slice Descriptor + state: + anyOf: + - items: + type: object + type: array + - type: 'null' + title: State + auxiliary_requests: + anyOf: + - items: + $ref: '#/components/schemas/AuxiliaryRequest' + type: array + - type: 'null' + title: Auxiliary Requests + type: object + required: + - pages + - slice_descriptor + title: StreamReadSlices + description: Slices of data read from a stream. + StreamTestReadRequest: + properties: + manifest: + $ref: '#/components/schemas/Manifest' + config: + $ref: '#/components/schemas/ConnectorConfig' + stream_name: + type: string + title: Stream Name + state: + items: {} + type: array + title: State + default: [] + custom_components_code: + anyOf: + - type: string + - type: 'null' + title: Custom Components Code + record_limit: + type: integer + maximum: 5000.0 + minimum: 1.0 + title: Record Limit + default: 100 + page_limit: + type: integer + maximum: 20.0 + minimum: 1.0 + title: Page Limit + default: 5 + slice_limit: + type: integer + maximum: 20.0 + minimum: 1.0 + title: Slice Limit + default: 5 + type: object + required: + - manifest + - config + - stream_name + title: StreamTestReadRequest + description: Request to test read from a specific stream. + SyncMode: + type: string + enum: + - full_refresh + - incremental + title: SyncMode + ValidationError: + properties: + loc: + items: + anyOf: + - type: string + - type: integer + type: array + title: Location + msg: + type: string + title: Message + type: + type: string + title: Error Type + type: object + required: + - loc + - msg + - type + title: ValidationError + securitySchemes: + HTTPBearer: + type: http + scheme: bearer diff --git a/airbyte_cdk/manifest_server/routers/__init__.py b/airbyte_cdk/manifest_server/routers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/airbyte_cdk/manifest_server/routers/capabilities.py b/airbyte_cdk/manifest_server/routers/capabilities.py new file mode 100644 index 000000000..2b630c274 --- /dev/null +++ b/airbyte_cdk/manifest_server/routers/capabilities.py @@ -0,0 +1,25 @@ +import os +from typing import Any, Dict + +from fastapi import APIRouter + +from ..api_models.capabilities import CapabilitiesResponse + +router = APIRouter( + prefix="/capabilities", + tags=["capabilities"], +) + + +@router.get("/", operation_id="getCapabilities") +def get_capabilities() -> CapabilitiesResponse: + """ + Get the capabilities available for the manifest server. + + Returns: + Dict containing the service capabilities including custom code execution support. + """ + # Read the same environment variable as the connector builder server + enable_unsafe_code = os.getenv("AIRBYTE_ENABLE_UNSAFE_CODE", "false").lower() == "true" + + return CapabilitiesResponse(custom_code_execution=enable_unsafe_code) diff --git a/airbyte_cdk/manifest_server/routers/health.py b/airbyte_cdk/manifest_server/routers/health.py new file mode 100644 index 000000000..96e46e4ea --- /dev/null +++ b/airbyte_cdk/manifest_server/routers/health.py @@ -0,0 +1,13 @@ +from typing import Dict + +from fastapi import APIRouter + +router = APIRouter( + prefix="/health", + tags=["health"], +) + + +@router.get("/") +def health() -> Dict[str, str]: + return {"status": "ok"} diff --git a/airbyte_cdk/manifest_server/routers/manifest.py b/airbyte_cdk/manifest_server/routers/manifest.py new file mode 100644 index 000000000..48799ddc1 --- /dev/null +++ b/airbyte_cdk/manifest_server/routers/manifest.py @@ -0,0 +1,155 @@ +import hashlib +from dataclasses import asdict +from typing import Any, Dict, List, Mapping, Optional + +import jsonschema +from airbyte_protocol_dataclasses.models import AirbyteStateMessage, ConfiguredAirbyteCatalog +from fastapi import APIRouter, Depends, HTTPException + +from airbyte_cdk.models import AirbyteStateMessageSerializer +from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( + ConcurrentDeclarativeSource, +) +from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import ( + INJECTED_COMPONENTS_PY, + INJECTED_COMPONENTS_PY_CHECKSUMS, +) + +from ..api_models import ( + CheckRequest, + CheckResponse, + DiscoverRequest, + DiscoverResponse, + FullResolveRequest, + Manifest, + ManifestResponse, + ResolveRequest, + StreamReadResponse, + StreamTestReadRequest, +) +from ..auth import verify_jwt_token +from ..command_processor.processor import ManifestCommandProcessor +from ..command_processor.utils import build_catalog, build_source + + +def safe_build_source( + manifest_dict: Dict[str, Any], + config_dict: Mapping[str, Any], + catalog: Optional[ConfiguredAirbyteCatalog] = None, + state: Optional[List[AirbyteStateMessage]] = None, + page_limit: Optional[int] = None, + slice_limit: Optional[int] = None, + record_limit: Optional[int] = None, +) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]: + """Wrapper around build_source that converts ValidationError to HTTPException.""" + try: + return build_source( + manifest_dict, + catalog, + config_dict, + state, + record_limit, + page_limit, + slice_limit, + ) + except jsonschema.exceptions.ValidationError as e: + raise HTTPException(status_code=400, detail=f"Invalid manifest: {e.message}") + + +router = APIRouter( + prefix="/manifest", + tags=["manifest"], + dependencies=[Depends(verify_jwt_token)], +) + + +@router.post("/test_read", operation_id="testRead") +def test_read(request: StreamTestReadRequest) -> StreamReadResponse: + """ + Test reading from a specific stream in the manifest. + """ + config_dict = request.config.model_dump() + + catalog = build_catalog(request.stream_name) + converted_state = [AirbyteStateMessageSerializer.load(state) for state in request.state] + + if request.custom_components_code: + config_dict[INJECTED_COMPONENTS_PY] = request.custom_components_code + config_dict[INJECTED_COMPONENTS_PY_CHECKSUMS] = { + "md5": hashlib.md5(request.custom_components_code.encode()).hexdigest() + } + + source = safe_build_source( + request.manifest.model_dump(), + config_dict, + catalog, + converted_state, + request.page_limit, + request.slice_limit, + request.record_limit, + ) + + runner = ManifestCommandProcessor(source) + cdk_result = runner.test_read( + config_dict, + catalog, + converted_state, + request.record_limit, + request.page_limit, + request.slice_limit, + ) + return StreamReadResponse.model_validate(asdict(cdk_result)) + + +@router.post("/check", operation_id="check") +def check(request: CheckRequest) -> CheckResponse: + """Check configuration against a manifest""" + source = safe_build_source(request.manifest.model_dump(), request.config.model_dump()) + runner = ManifestCommandProcessor(source) + success, message = runner.check_connection(request.config.model_dump()) + return CheckResponse(success=success, message=message) + + +@router.post("/discover", operation_id="discover") +def discover(request: DiscoverRequest) -> DiscoverResponse: + """Discover streams from a manifest""" + source = safe_build_source(request.manifest.model_dump(), request.config.model_dump()) + runner = ManifestCommandProcessor(source) + catalog = runner.discover(request.config.model_dump()) + if catalog is None: + raise HTTPException(status_code=422, detail="Connector did not return a discovered catalog") + return DiscoverResponse(catalog=catalog) + + +@router.post("/resolve", operation_id="resolve") +def resolve(request: ResolveRequest) -> ManifestResponse: + """Resolve a manifest to its final configuration.""" + source = safe_build_source(request.manifest.model_dump(), {}) + return ManifestResponse(manifest=Manifest(**source.resolved_manifest)) + + +@router.post("/full_resolve", operation_id="fullResolve") +def full_resolve(request: FullResolveRequest) -> ManifestResponse: + """ + Fully resolve a manifest, including dynamic streams. + + This is a similar operation to resolve, but has an extra step which generates streams from dynamic stream templates if the manifest contains any. This is used when a user clicks the generate streams button on a stream template in the Builder UI + """ + source = safe_build_source(request.manifest.model_dump(), request.config.model_dump()) + manifest = {**source.resolved_manifest} + streams = manifest.get("streams", []) + for stream in streams: + stream["dynamic_stream_name"] = None + + mapped_streams: Dict[str, List[Dict[str, Any]]] = {} + for stream in source.dynamic_streams: + generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], []) + + if len(generated_streams) < request.stream_limit: + generated_streams += [stream] + + for generated_streams_list in mapped_streams.values(): + streams.extend(generated_streams_list) + + manifest["streams"] = streams + return ManifestResponse(manifest=Manifest(**manifest)) diff --git a/poetry.lock b/poetry.lock index 66c7d34e1..e3ac8894c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.4 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" @@ -169,9 +169,9 @@ files = [ name = "anyio" version = "4.9.0" description = "High level compatibility layer for multiple asynchronous event loop implementations" -optional = true +optional = false python-versions = ">=3.9" -groups = ["main"] +groups = ["main", "dev"] files = [ {file = "anyio-4.9.0-py3-none-any.whl", hash = "sha256:9f76d541cad6e36af7beb62e978876f3b41e3e04f2c1fbf0884604c0a9c4d93c"}, {file = "anyio-4.9.0.tar.gz", hash = "sha256:673c0c244e15788651a4ff38710fea9675823028a6f08a5eda409e0c9840a028"}, @@ -1096,6 +1096,29 @@ files = [ [package.extras] test = ["pytest (>=6)"] +[[package]] +name = "fastapi" +version = "0.116.1" +description = "FastAPI framework, high performance, easy to learn, fast to code, ready for production" +optional = true +python-versions = ">=3.8" +groups = ["main"] +markers = "extra == \"manifest-server\"" +files = [ + {file = "fastapi-0.116.1-py3-none-any.whl", hash = "sha256:c46ac7c312df840f0c9e220f7964bada936781bc4e2e6eb71f1c4d7553786565"}, + {file = "fastapi-0.116.1.tar.gz", hash = "sha256:ed52cbf946abfd70c5a0dccb24673f0670deeb517a88b3544d03c2a6bf283143"}, +] + +[package.dependencies] +pydantic = ">=1.7.4,<1.8 || >1.8,<1.8.1 || >1.8.1,<2.0.0 || >2.0.0,<2.0.1 || >2.0.1,<2.1.0 || >2.1.0,<3.0.0" +starlette = ">=0.40.0,<0.48.0" +typing-extensions = ">=4.8.0" + +[package.extras] +all = ["email-validator (>=2.0.0)", "fastapi-cli[standard] (>=0.0.8)", "httpx (>=0.23.0)", "itsdangerous (>=1.1.0)", "jinja2 (>=3.1.5)", "orjson (>=3.2.1)", "pydantic-extra-types (>=2.0.0)", "pydantic-settings (>=2.0.0)", "python-multipart (>=0.0.18)", "pyyaml (>=5.3.1)", "ujson (>=4.0.1,!=4.0.2,!=4.1.0,!=4.2.0,!=4.3.0,!=5.0.0,!=5.1.0)", "uvicorn[standard] (>=0.12.0)"] +standard = ["email-validator (>=2.0.0)", "fastapi-cli[standard] (>=0.0.8)", "httpx (>=0.23.0)", "jinja2 (>=3.1.5)", "python-multipart (>=0.0.18)", "uvicorn[standard] (>=0.12.0)"] +standard-no-fastapi-cloud-cli = ["email-validator (>=2.0.0)", "fastapi-cli[standard-no-fastapi-cloud-cli] (>=0.0.8)", "httpx (>=0.23.0)", "jinja2 (>=3.1.5)", "python-multipart (>=0.0.18)", "uvicorn[standard] (>=0.12.0)"] + [[package]] name = "fastavro" version = "1.8.2" @@ -1661,9 +1684,9 @@ protobuf = ">=4.21.6" name = "h11" version = "0.16.0" description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1" -optional = true +optional = false python-versions = ">=3.8" -groups = ["main"] +groups = ["main", "dev"] files = [ {file = "h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86"}, {file = "h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1"}, @@ -1673,9 +1696,9 @@ files = [ name = "httpcore" version = "1.0.9" description = "A minimal low-level HTTP client." -optional = true +optional = false python-versions = ">=3.8" -groups = ["main"] +groups = ["main", "dev"] files = [ {file = "httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55"}, {file = "httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8"}, @@ -1695,9 +1718,9 @@ trio = ["trio (>=0.22.0,<1.0)"] name = "httpx" version = "0.28.1" description = "The next generation HTTP client." -optional = true +optional = false python-versions = ">=3.8" -groups = ["main"] +groups = ["main", "dev"] files = [ {file = "httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad"}, {file = "httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc"}, @@ -4902,9 +4925,9 @@ files = [ name = "sniffio" version = "1.3.1" description = "Sniff out which async library your code is running under" -optional = true +optional = false python-versions = ">=3.7" -groups = ["main"] +groups = ["main", "dev"] files = [ {file = "sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2"}, {file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"}, @@ -5020,6 +5043,26 @@ postgresql-psycopgbinary = ["psycopg[binary] (>=3.0.7)"] pymysql = ["pymysql"] sqlcipher = ["sqlcipher3_binary"] +[[package]] +name = "starlette" +version = "0.47.2" +description = "The little ASGI library that shines." +optional = true +python-versions = ">=3.9" +groups = ["main"] +markers = "extra == \"manifest-server\"" +files = [ + {file = "starlette-0.47.2-py3-none-any.whl", hash = "sha256:c5847e96134e5c5371ee9fac6fdf1a67336d5815e09eb2a01fdb57a351ef915b"}, + {file = "starlette-0.47.2.tar.gz", hash = "sha256:6ae9aa5db235e4846decc1e7b79c4f346adf41e9777aebeb49dfd09bbd7023d8"}, +] + +[package.dependencies] +anyio = ">=3.6.2,<5" +typing-extensions = {version = ">=4.10.0", markers = "python_version < \"3.13\""} + +[package.extras] +full = ["httpx (>=0.27.0,<0.29.0)", "itsdangerous", "jinja2", "python-multipart (>=0.0.18)", "pyyaml"] + [[package]] name = "tabulate" version = "0.9.0" @@ -5497,6 +5540,27 @@ h2 = ["h2 (>=4,<5)"] socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] zstd = ["zstandard (>=0.18.0)"] +[[package]] +name = "uvicorn" +version = "0.35.0" +description = "The lightning-fast ASGI server." +optional = true +python-versions = ">=3.9" +groups = ["main"] +markers = "extra == \"manifest-server\"" +files = [ + {file = "uvicorn-0.35.0-py3-none-any.whl", hash = "sha256:197535216b25ff9b785e29a0b79199f55222193d47f820816e7da751e9bc8d4a"}, + {file = "uvicorn-0.35.0.tar.gz", hash = "sha256:bc662f087f7cf2ce11a1d7fd70b90c9f98ef2e2831556dd078d131b96cc94a01"}, +] + +[package.dependencies] +click = ">=7.0" +h11 = ">=0.8" +typing-extensions = {version = ">=4.0", markers = "python_version < \"3.11\""} + +[package.extras] +standard = ["colorama (>=0.4) ; sys_platform == \"win32\"", "httptools (>=0.6.3)", "python-dotenv (>=0.13)", "pyyaml (>=5.1)", "uvloop (>=0.15.1) ; sys_platform != \"win32\" and sys_platform != \"cygwin\" and platform_python_implementation != \"PyPy\"", "watchfiles (>=0.13)", "websockets (>=10.4)"] + [[package]] name = "wcmatch" version = "10.0" @@ -5761,10 +5825,11 @@ type = ["pytest-mypy"] [extras] dev = ["pytest"] file-based = ["avro", "fastavro", "markdown", "pdf2image", "pdfminer.six", "pyarrow", "pytesseract", "python-calamine", "python-snappy", "unstructured", "unstructured.pytesseract"] +manifest-server = ["fastapi", "uvicorn"] sql = ["sqlalchemy"] vector-db-based = ["cohere", "langchain", "openai", "tiktoken"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<3.13" -content-hash = "87cbebf1b8811e4471133bcb0f39c6ef0129fd729635e90afb01b29539d79abf" +content-hash = "485d30a4b055f56882e9d2fc2f3b18aae0e634884fdf220715f35ffb3c76947c" diff --git a/pyproject.toml b/pyproject.toml index 9d6e13902..1e578f7cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -87,6 +87,8 @@ pytest = {version = "^7", optional = true } orjson = "^3.10.7" serpyco-rs = "^1.10.2" sqlalchemy = {version = "^2.0,!=2.0.36", optional = true } +fastapi = { version = ">=0.116.1", optional = true } +uvicorn = { version = ">=0.35.0", optional = true} xmltodict = ">=0.13,<0.15" anyascii = "^0.3.2" whenever = "^0.6.16" @@ -109,6 +111,7 @@ pytest-cov = "*" pytest-httpserver = "*" pytest-mock = "*" requests-mock = "*" +httpx = "*" # Required for FastAPI TestClient # Stubs packages for mypy typing types-requests = "^2.32.0.20241016" types-python-dateutil = "^2.9.0.20241003" @@ -121,11 +124,13 @@ file-based = ["avro", "fastavro", "pyarrow", "unstructured", "pdf2image", "pdfmi vector-db-based = ["langchain", "openai", "cohere", "tiktoken"] sql = ["sqlalchemy"] dev = ["pytest"] +manifest-server = ["fastapi", "uvicorn"] [tool.poetry.scripts] airbyte-cdk = "airbyte_cdk.cli.airbyte_cdk:cli" source-declarative-manifest = "airbyte_cdk.cli.source_declarative_manifest:run" +manifest-server = "airbyte_cdk.manifest_server.cli.run:run" # Ruff configuration moved to ruff.toml @@ -140,7 +145,7 @@ pre-commit = {cmd = "poetry run pre-commit run --all-files", help = "Run all pre # Build tasks assemble = {cmd = "bin/generate-component-manifest-dagger.sh", help = "Generate component manifest files."} build-package = {cmd = "poetry build", help = "Build the python package: source and wheels archives."} -build = {sequence = ["assemble", "build-package"], help = "Run all tasks to build the package."} +build = {sequence = ["assemble", "openapi-generate", "build-package"], help = "Run all tasks to build the package."} # Format check tasks format-check = {sequence = ["_format-check-ruff", "_format-check-prettier"], help = "Check formatting for all file types via Ruff and Prettier.", ignore_fail = "return_non_zero"} @@ -189,6 +194,9 @@ pre-push = {sequence = ["build", "check-local"], help = "Run all build and check docs-generate = {env = {PDOC_ALLOW_EXEC = "1"}, cmd = "python -m docs.generate run", help="Generate API documentation with PDoc."} docs-preview = {shell = "poe docs-generate && open docs/generated/index.html", help="Generate API documentation with PDoc and then open the docs in the default web browser."} +# Manifest server +openapi-generate = {cmd = "manifest-server generate-openapi", help = "Generate OpenAPI YAML specification for the manifest server."} + [tool.check-wheel-contents] # Quality control for Python wheel generation. Docs here: # - https://github.com/jwodder/check-wheel-contents @@ -202,7 +210,7 @@ ignore = [ [tool.airbyte_ci] python_versions = ["3.10", "3.11"] optional_poetry_groups = ["dev"] -poetry_extras = ["file-based", "vector-db-based"] +poetry_extras = ["file-based", "vector-db-based", "manifest-server"] poe_tasks = ["check-ci"] mount_docker_socket = true diff --git a/unit_tests/manifest_server/__init__.py b/unit_tests/manifest_server/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/unit_tests/manifest_server/command_processor/__init__.py b/unit_tests/manifest_server/command_processor/__init__.py new file mode 100644 index 000000000..4615b6e5f --- /dev/null +++ b/unit_tests/manifest_server/command_processor/__init__.py @@ -0,0 +1 @@ +"""Tests for the manifest_server package.""" diff --git a/unit_tests/manifest_server/command_processor/test_processor.py b/unit_tests/manifest_server/command_processor/test_processor.py new file mode 100644 index 000000000..b5be7bd00 --- /dev/null +++ b/unit_tests/manifest_server/command_processor/test_processor.py @@ -0,0 +1,310 @@ +from unittest.mock import Mock, patch + +import pytest +from airbyte_protocol_dataclasses.models import ( + AirbyteCatalog, + AirbyteConnectionStatus, + AirbyteErrorTraceMessage, + AirbyteMessage, + AirbyteStream, + AirbyteTraceMessage, + FailureType, + Status, + TraceType, +) +from airbyte_protocol_dataclasses.models import Type as AirbyteMessageType +from fastapi import HTTPException + +from airbyte_cdk.entrypoint import AirbyteEntrypoint +from airbyte_cdk.manifest_server.command_processor.processor import ManifestCommandProcessor +from airbyte_cdk.models.airbyte_protocol import ( + AirbyteStream as AirbyteStreamProtocol, +) +from airbyte_cdk.models.airbyte_protocol import ( + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + SyncMode, +) + + +class TestManifestCommandProcessor: + """Test cases for the ManifestCommandProcessor class.""" + + @pytest.fixture + def mock_source(self): + """Create a mock ManifestDeclarativeSource.""" + return Mock() + + @pytest.fixture + def command_processor(self, mock_source): + """Create a ManifestCommandProcessor instance with mocked source.""" + return ManifestCommandProcessor(mock_source) + + @pytest.fixture + def sample_config(self): + """Sample configuration for testing.""" + return {"api_key": "test_key", "base_url": "https://api.example.com"} + + @pytest.fixture + def sample_catalog(self): + """Sample configured catalog for testing.""" + return ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStreamProtocol( + name="test_stream", + json_schema={"type": "object"}, + supported_sync_modes=[SyncMode.full_refresh], + ), + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.overwrite, + ) + ] + ) + + @pytest.fixture + def sample_state(self): + """Sample state messages for testing.""" + return [] + + @patch("airbyte_cdk.manifest_server.command_processor.processor.TestReader") + def test_test_read_success( + self, mock_test_reader_class, command_processor, sample_config, sample_catalog + ): + """Test successful test_read execution with various parameters and state messages.""" + from airbyte_cdk.models.airbyte_protocol import ( + AirbyteStateMessage, + AirbyteStateType, + ) + + # Mock the TestReader instance and its run_test_read method + mock_test_reader_instance = Mock() + mock_test_reader_class.return_value = mock_test_reader_instance + + # Mock the StreamRead return value + mock_stream_read = Mock() + mock_test_reader_instance.run_test_read.return_value = mock_stream_read + + # Test with state messages and various parameter values + state_messages = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream={ + "stream_descriptor": {"name": "test_stream"}, + "stream_state": {"cursor": "2023-01-01"}, + }, + ) + ] + + record_limit = 50 + page_limit = 3 + slice_limit = 7 + + # Execute test_read + result = command_processor.test_read( + config=sample_config, + catalog=sample_catalog, + state=state_messages, + record_limit=record_limit, + page_limit=page_limit, + slice_limit=slice_limit, + ) + + # Verify TestReader was initialized with correct parameters + mock_test_reader_class.assert_called_once_with( + max_pages_per_slice=page_limit, + max_slices=slice_limit, + max_record_limit=record_limit, + ) + + # Verify run_test_read was called with correct parameters including state + mock_test_reader_instance.run_test_read.assert_called_once_with( + source=command_processor._source, + config=sample_config, + configured_catalog=sample_catalog, + stream_name="test_stream", + state=state_messages, + record_limit=record_limit, + ) + + # Verify the result is returned correctly + assert result == mock_stream_read + + @patch("airbyte_cdk.manifest_server.command_processor.processor.TestReader") + def test_test_read_exception_handling( + self, + mock_test_reader_class, + command_processor, + sample_config, + sample_catalog, + sample_state, + ): + """Test that exceptions from TestReader are properly propagated.""" + mock_test_reader_instance = Mock() + mock_test_reader_class.return_value = mock_test_reader_instance + + # Make run_test_read raise an exception + mock_test_reader_instance.run_test_read.side_effect = Exception("Test error") + + # Verify the exception is propagated + with pytest.raises(Exception, match="Test error"): + command_processor.test_read( + config=sample_config, + catalog=sample_catalog, + state=sample_state, + record_limit=100, + page_limit=5, + slice_limit=10, + ) + + def test_check_connection_success( + self, command_processor: ManifestCommandProcessor, sample_config + ): + """Test successful check_connection execution.""" + + # Mock the spec method + command_processor._source.spec.return_value = Mock() + + # Create mock messages with successful connection status + connection_status = AirbyteConnectionStatus( + status=Status.SUCCEEDED, message="Connection test succeeded" + ) + mock_message = AirbyteMessage( + type=AirbyteMessageType.CONNECTION_STATUS, connectionStatus=connection_status + ) + + # Mock the entrypoint method + with patch.object(AirbyteEntrypoint, "check", return_value=[mock_message]): + # Execute check_connection + success, message = command_processor.check_connection(sample_config) + + # Verify the result + assert success is True + assert message == "Connection test succeeded" + + # Verify spec was called + command_processor._source.spec.assert_called_once() + + def test_check_connection_failure(self, command_processor, sample_config): + """Test check_connection with failed status.""" + + # Mock the spec method + command_processor._source.spec.return_value = Mock() + + # Create mock messages with failed connection status + connection_status = AirbyteConnectionStatus(status=Status.FAILED, message="Invalid API key") + mock_message = AirbyteMessage( + type=AirbyteMessageType.CONNECTION_STATUS, connectionStatus=connection_status + ) + + # Mock the entrypoint method + with patch.object(AirbyteEntrypoint, "check", return_value=[mock_message]): + # Execute check_connection + success, message = command_processor.check_connection(sample_config) + + # Verify the result + assert success is False + assert message == "Invalid API key" + + def test_check_connection_no_status_message(self, command_processor, sample_config): + """Test check_connection when no connection status message is returned.""" + # Mock the spec method + command_processor._source.spec.return_value = Mock() + + # Mock the entrypoint method + with patch.object(AirbyteEntrypoint, "check", return_value=[]): + # Execute check_connection + success, message = command_processor.check_connection(sample_config) + + # Verify the result + assert success is False + assert message == "Connection check did not return a status message" + + def test_check_connection_with_trace_error(self, command_processor, sample_config): + """Test check_connection raises exception when trace error is present.""" + + # Mock the spec method + command_processor._source.spec.return_value = Mock() + + # Create mock trace error message + error_trace = AirbyteErrorTraceMessage( + message="Authentication failed", failure_type=FailureType.config_error + ) + trace_message = AirbyteTraceMessage( + type=TraceType.ERROR, error=error_trace, emitted_at=1234567890 + ) + mock_message = AirbyteMessage(type=AirbyteMessageType.TRACE, trace=trace_message) + + # Mock the entrypoint method + with patch.object(AirbyteEntrypoint, "check", return_value=[mock_message]): + # Verify exception is raised + with pytest.raises(HTTPException): + command_processor.check_connection(sample_config) + + def test_discover_success(self, command_processor, sample_config): + """Test successful discover execution.""" + + # Mock the spec method + command_processor._source.spec.return_value = Mock() + + # Create mock catalog + catalog = AirbyteCatalog( + streams=[ + AirbyteStream( + name="test_stream", + json_schema={"type": "object", "properties": {"id": {"type": "string"}}}, + supported_sync_modes=[SyncMode.full_refresh], + ) + ] + ) + mock_message = AirbyteMessage(type=AirbyteMessageType.CATALOG, catalog=catalog) + + # Mock the entrypoint method + with patch.object(AirbyteEntrypoint, "discover", return_value=[mock_message]): + # Execute discover + result = command_processor.discover(sample_config) + + # Verify the result + assert result == catalog + assert len(result.streams) == 1 + assert result.streams[0].name == "test_stream" + + # Verify spec was called + command_processor._source.spec.assert_called_once() + + def test_discover_no_catalog_message(self, command_processor, sample_config): + """Test discover when no catalog message is returned.""" + # Mock the spec method + command_processor._source.spec.return_value = Mock() + + # Mock the entrypoint method + with patch.object(AirbyteEntrypoint, "discover", return_value=[]): + # Execute discover + result = command_processor.discover(sample_config) + + # Verify the result is None + assert result is None + + def test_discover_with_trace_error(self, command_processor, sample_config): + """Test discover raises exception when trace error is present.""" + + # Mock the spec method + command_processor._source.spec.return_value = Mock() + + # Create mock trace error message + error_trace = AirbyteErrorTraceMessage( + message="Stream discovery failed", failure_type=FailureType.system_error + ) + trace_message = AirbyteTraceMessage( + type=TraceType.ERROR, + error=error_trace, + emitted_at=1234567890, + ) + mock_message = AirbyteMessage(type=AirbyteMessageType.TRACE, trace=trace_message) + + # Mock the entrypoint method + with patch.object(AirbyteEntrypoint, "discover", return_value=[mock_message]): + # Verify exception is raised + with pytest.raises(HTTPException): + command_processor.discover(sample_config) diff --git a/unit_tests/manifest_server/command_processor/test_utils.py b/unit_tests/manifest_server/command_processor/test_utils.py new file mode 100644 index 000000000..81b43f303 --- /dev/null +++ b/unit_tests/manifest_server/command_processor/test_utils.py @@ -0,0 +1,123 @@ +from unittest.mock import Mock, patch + +from airbyte_cdk.manifest_server.command_processor.utils import ( + SHOULD_MIGRATE_KEY, + SHOULD_NORMALIZE_KEY, + build_catalog, + build_source, +) + + +class TestManifestUtils: + """Test cases for the utils module.""" + + def test_build_catalog_creates_correct_structure(self): + """Test that build_catalog creates a properly structured ConfiguredAirbyteCatalog.""" + stream_name = "test_stream" + catalog = build_catalog(stream_name) + + # Verify catalog structure + assert len(catalog.streams) == 1 + + configured_stream = catalog.streams[0] + assert configured_stream.stream.name == stream_name + assert configured_stream.stream.json_schema == {} + + # Verify sync modes + from airbyte_cdk.models.airbyte_protocol import DestinationSyncMode, SyncMode + + assert SyncMode.full_refresh in configured_stream.stream.supported_sync_modes + assert SyncMode.incremental in configured_stream.stream.supported_sync_modes + assert configured_stream.sync_mode == SyncMode.incremental + assert configured_stream.destination_sync_mode == DestinationSyncMode.overwrite + + @patch("airbyte_cdk.manifest_server.command_processor.utils.ConcurrentDeclarativeSource") + def test_build_source_creates_source(self, mock_source_class): + """Test that build_source creates a ConcurrentDeclarativeSource with correct parameters.""" + # Setup mocks + mock_source = Mock() + mock_source_class.return_value = mock_source + + # Test with complex manifest and config structures + manifest = { + "version": "0.1.0", + "definitions": {"selector": {"extractor": {"field_path": ["data"]}}}, + "streams": [ + { + "name": "users", + "primary_key": "id", + "retriever": { + "requester": { + "url_base": "https://api.example.com", + "path": "/users", + } + }, + } + ], + "check": {"stream_names": ["users"]}, + } + + config = { + "api_key": "sk-test-123", + "base_url": "https://api.example.com", + "timeout": 30, + } + + # Call build_source with additional parameters + catalog = build_catalog("test_stream") + state = [] + result = build_source(manifest, catalog, config, state) + + # Verify ConcurrentDeclarativeSource was created with correct parameters + expected_source_config = { + **manifest, + "concurrency_level": {"type": "ConcurrencyLevel", "default_concurrency": 1}, + } + mock_source_class.assert_called_once_with( + catalog=catalog, + state=state, + source_config=expected_source_config, + config=config, + normalize_manifest=False, # Default when flag not set + migrate_manifest=False, # Default when flag not set + emit_connector_builder_messages=True, + limits=mock_source_class.call_args[1]["limits"], + ) + + assert result == mock_source + + @patch("airbyte_cdk.manifest_server.command_processor.utils.ConcurrentDeclarativeSource") + def test_build_source_with_normalize_flag(self, mock_source_class): + """Test build_source when normalize flag is set.""" + mock_source = Mock() + mock_source_class.return_value = mock_source + + manifest = {"streams": [{"name": "test_stream"}], SHOULD_NORMALIZE_KEY: True} + config = {"api_key": "test_key"} + catalog = build_catalog("test_stream") + state = [] + + build_source(manifest, catalog, config, state) + + # Verify normalize_manifest is True + call_args = mock_source_class.call_args[1] + assert call_args["normalize_manifest"] is True + assert call_args["migrate_manifest"] is False + + @patch("airbyte_cdk.manifest_server.command_processor.utils.ConcurrentDeclarativeSource") + def test_build_source_with_migrate_flag(self, mock_source_class): + """Test build_source when migrate flag is set.""" + mock_source = Mock() + mock_source_class.return_value = mock_source + + manifest = {"streams": [{"name": "test_stream"}], SHOULD_MIGRATE_KEY: True} + config = {"api_key": "test_key"} + catalog = build_catalog("test_stream") + state = [] + + build_source(manifest, catalog, config, state) + + # Verify migrate_manifest is True + call_args = mock_source_class.call_args[1] + assert call_args["normalize_manifest"] is False + assert call_args["migrate_manifest"] is True diff --git a/unit_tests/manifest_server/routers/__init__.py b/unit_tests/manifest_server/routers/__init__.py new file mode 100644 index 000000000..d6e7f1410 --- /dev/null +++ b/unit_tests/manifest_server/routers/__init__.py @@ -0,0 +1 @@ +"""Tests for the routers package.""" diff --git a/unit_tests/manifest_server/routers/test_capabilities.py b/unit_tests/manifest_server/routers/test_capabilities.py new file mode 100644 index 000000000..4c7d73f88 --- /dev/null +++ b/unit_tests/manifest_server/routers/test_capabilities.py @@ -0,0 +1,71 @@ +import os +from unittest.mock import patch + +from fastapi.testclient import TestClient + +from airbyte_cdk.manifest_server.app import app + +client = TestClient(app) + + +class TestCapabilities: + """Test cases for the capabilities endpoint.""" + + def test_capabilities_endpoint_exists(self): + """Test that the capabilities endpoint is accessible.""" + response = client.get("/capabilities/") + assert response.status_code == 200 + + def test_capabilities_custom_code_execution_false_by_default(self): + """Test that custom_code_execution is false by default when env var is not set.""" + with patch.dict(os.environ, {}, clear=True): + response = client.get("/capabilities/") + assert response.status_code == 200 + + data = response.json() + assert "custom_code_execution" in data + assert data["custom_code_execution"] is False + + def test_capabilities_custom_code_execution_false_when_env_var_false(self): + """Test that custom_code_execution is false when env var is explicitly set to false.""" + with patch.dict(os.environ, {"AIRBYTE_ENABLE_UNSAFE_CODE": "false"}): + response = client.get("/capabilities/") + assert response.status_code == 200 + + data = response.json() + assert "custom_code_execution" in data + assert data["custom_code_execution"] is False + + def test_capabilities_custom_code_execution_true_when_env_var_true(self): + """Test that custom_code_execution is true when env var is set to true.""" + with patch.dict(os.environ, {"AIRBYTE_ENABLE_UNSAFE_CODE": "true"}): + response = client.get("/capabilities/") + assert response.status_code == 200 + + data = response.json() + assert "custom_code_execution" in data + assert data["custom_code_execution"] is True + + def test_capabilities_custom_code_execution_case_insensitive(self): + """Test that env var parsing is case insensitive.""" + test_cases = ["TRUE", "True", "tRuE"] + + for value in test_cases: + with patch.dict(os.environ, {"AIRBYTE_ENABLE_UNSAFE_CODE": value}): + response = client.get("/capabilities/") + assert response.status_code == 200 + + data = response.json() + assert data["custom_code_execution"] is True + + def test_capabilities_custom_code_execution_invalid_values_default_to_false(self): + """Test that invalid env var values default to false.""" + invalid_values = ["yes", "1", "on", "enabled", "invalid"] + + for value in invalid_values: + with patch.dict(os.environ, {"AIRBYTE_ENABLE_UNSAFE_CODE": value}): + response = client.get("/capabilities/") + assert response.status_code == 200 + + data = response.json() + assert data["custom_code_execution"] is False diff --git a/unit_tests/manifest_server/routers/test_manifest.py b/unit_tests/manifest_server/routers/test_manifest.py new file mode 100644 index 000000000..4f8a90b93 --- /dev/null +++ b/unit_tests/manifest_server/routers/test_manifest.py @@ -0,0 +1,529 @@ +import hashlib +from unittest.mock import Mock, patch + +import pytest +from fastapi.testclient import TestClient + +from airbyte_cdk.connector_builder.models import StreamRead as CDKStreamRead +from airbyte_cdk.manifest_server.app import app + +client = TestClient(app) + + +class TestManifestRouter: + """Test cases for the manifest router endpoints.""" + + @pytest.fixture + def sample_manifest(self): + """Sample manifest for testing.""" + return { + "version": "6.48.15", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["products"]}, + "definitions": { + "base_requester": { + "type": "HttpRequester", + "url_base": "https://dummyjson.com", + } + }, + "streams": [ + { + "type": "DeclarativeStream", + "name": "products", + "primary_key": ["id"], + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://dummyjson.com", + "path": "products", + "http_method": "GET", + }, + }, + } + ], + } + + @pytest.fixture + def sample_config(self): + """Sample config for testing.""" + return {} + + @pytest.fixture + def mock_source(self): + """Mock source object.""" + mock_source = Mock() + mock_source.resolved_manifest = { + "version": "6.48.15", + "type": "DeclarativeSource", + "streams": [{"name": "products", "type": "DeclarativeStream"}], + } + mock_source.dynamic_streams = [] + return mock_source + + @pytest.fixture + def mock_stream_read(self): + """Mock StreamRead result.""" + return CDKStreamRead( + logs=[], + slices=[], + test_read_limit_reached=False, + auxiliary_requests=[], + inferred_schema=None, + inferred_datetime_formats=None, + latest_config_update=None, + ) + + @patch("airbyte_cdk.manifest_server.routers.manifest.ManifestCommandProcessor") + @patch("airbyte_cdk.manifest_server.routers.manifest.build_catalog") + @patch("airbyte_cdk.manifest_server.routers.manifest.build_source") + def test_test_read_endpoint_success( + self, + mock_build_source, + mock_build_catalog, + mock_runner_class, + sample_manifest, + sample_config, + mock_source, + mock_stream_read, + ): + """Test successful test_read endpoint call.""" + request_data = { + "manifest": sample_manifest, + "config": sample_config, + "stream_name": "products", + "state": [], + "record_limit": 100, + "page_limit": 5, + "slice_limit": 5, + } + + mock_build_source.return_value = mock_source + mock_build_catalog.return_value = Mock() + + mock_runner = Mock() + mock_runner.test_read.return_value = mock_stream_read + mock_runner_class.return_value = mock_runner + + response = client.post("/v1/manifest/test_read", json=request_data) + + assert response.status_code == 200 + # Verify build_source was called with correct arguments + mock_build_source.assert_called_once_with( + sample_manifest, + mock_build_catalog.return_value, + sample_config, + [], + 100, # record_limit + 5, # page_limit + 5, # slice_limit + ) + mock_build_catalog.assert_called_once_with("products") + mock_runner.test_read.assert_called_once() + + @patch("airbyte_cdk.manifest_server.routers.manifest.ManifestCommandProcessor") + @patch("airbyte_cdk.manifest_server.routers.manifest.build_catalog") + @patch("airbyte_cdk.manifest_server.routers.manifest.build_source") + def test_test_read_with_custom_components( + self, + mock_build_source, + mock_build_catalog, + mock_runner_class, + sample_manifest, + sample_config, + mock_source, + mock_stream_read, + ): + """Test test_read endpoint with custom components code.""" + custom_code = "def custom_function(): pass" + expected_checksum = hashlib.md5(custom_code.encode()).hexdigest() + + request_data = { + "manifest": sample_manifest, + "config": sample_config, + "stream_name": "products", + "state": [], + "custom_components_code": custom_code, + "record_limit": 50, + "page_limit": 3, + "slice_limit": 2, + } + + mock_build_source.return_value = mock_source + mock_build_catalog.return_value = Mock() + + mock_runner = Mock() + mock_runner.test_read.return_value = mock_stream_read + mock_runner_class.return_value = mock_runner + + response = client.post("/v1/manifest/test_read", json=request_data) + + assert response.status_code == 200 + + # Verify that build_source was called with config containing custom components + call_args = mock_build_source.call_args + config_arg = call_args[0][2] # Third argument is config + assert "__injected_components_py" in config_arg + assert config_arg["__injected_components_py"] == custom_code + assert "__injected_components_py_checksums" in config_arg + assert config_arg["__injected_components_py_checksums"]["md5"] == expected_checksum + + # Verify other arguments + mock_build_source.assert_called_once_with( + sample_manifest, + mock_build_catalog.return_value, + config_arg, + [], + 50, # record_limit + 3, # page_limit + 2, # slice_limit + ) + + @patch("airbyte_cdk.manifest_server.routers.manifest.AirbyteStateMessageSerializer") + @patch("airbyte_cdk.manifest_server.routers.manifest.ManifestCommandProcessor") + @patch("airbyte_cdk.manifest_server.routers.manifest.build_catalog") + @patch("airbyte_cdk.manifest_server.routers.manifest.build_source") + def test_test_read_with_state( + self, + mock_build_source, + mock_build_catalog, + mock_runner_class, + mock_serializer, + sample_manifest, + sample_config, + mock_source, + mock_stream_read, + ): + """Test test_read endpoint with state.""" + state_data = [{"type": "STREAM", "stream": {"stream_descriptor": {"name": "products"}}}] + + request_data = { + "manifest": sample_manifest, + "config": sample_config, + "stream_name": "products", + "state": state_data, + } + + mock_build_source.return_value = mock_source + mock_build_catalog.return_value = Mock() + mock_serializer.load.return_value = Mock() + + mock_runner = Mock() + mock_runner.test_read.return_value = mock_stream_read + mock_runner_class.return_value = mock_runner + + response = client.post("/v1/manifest/test_read", json=request_data) + + assert response.status_code == 200 + assert mock_serializer.load.call_count == len(state_data) + + def test_test_read_invalid_request(self): + """Test test_read endpoint with invalid request data.""" + invalid_request = { + "manifest": {}, + "config": {}, + "stream_name": "test", + "record_limit": -1, # Invalid - should be >= 1 + } + + response = client.post("/v1/manifest/test_read", json=invalid_request) + assert response.status_code == 422 # Validation error + + def test_resolve_endpoint_success(self, sample_manifest, mock_source): + """Test successful resolve endpoint call.""" + request_data = {"manifest": sample_manifest} + + with patch( + "airbyte_cdk.manifest_server.routers.manifest.build_source" + ) as mock_build_source: + mock_build_source.return_value = mock_source + + response = client.post("/v1/manifest/resolve", json=request_data) + + assert response.status_code == 200 + data = response.json() + assert "manifest" in data + assert data["manifest"] == mock_source.resolved_manifest + mock_build_source.assert_called_once_with( + sample_manifest, + None, # catalog + {}, # config + None, # state + None, # record_limit + None, # page_limit + None, # slice_limit + ) + + def test_resolve_invalid_manifest(self): + """Test resolve endpoint with invalid manifest.""" + request_data = {} # Missing required 'manifest' field + + response = client.post("/v1/manifest/resolve", json=request_data) + assert response.status_code == 422 # Validation error + + def test_full_resolve_endpoint_success(self, sample_manifest, sample_config, mock_source): + """Test successful full_resolve endpoint call.""" + # Setup mock source with dynamic streams + mock_source.dynamic_streams = [ + { + "name": "dynamic_stream_1", + "dynamic_stream_name": "template_stream", + "type": "DeclarativeStream", + }, + { + "name": "dynamic_stream_2", + "dynamic_stream_name": "template_stream", + "type": "DeclarativeStream", + }, + ] + + request_data = { + "manifest": sample_manifest, + "config": sample_config, + "stream_limit": 10, + } + + with patch( + "airbyte_cdk.manifest_server.routers.manifest.build_source" + ) as mock_build_source: + mock_build_source.return_value = mock_source + + response = client.post("/v1/manifest/full_resolve", json=request_data) + + assert response.status_code == 200 + data = response.json() + assert "manifest" in data + + # Verify that dynamic streams were added + streams = data["manifest"]["streams"] + assert len(streams) >= len(mock_source.resolved_manifest["streams"]) + + # Check that dynamic_stream_name is set to None for original streams + original_stream = next(s for s in streams if s["name"] == "products") + assert original_stream["dynamic_stream_name"] is None + + def test_full_resolve_with_stream_limit(self, sample_manifest, sample_config, mock_source): + """Test full_resolve endpoint respects stream_limit.""" + # Create more dynamic streams than the limit + mock_source.dynamic_streams = [ + { + "name": f"dynamic_stream_{i}", + "dynamic_stream_name": "template_stream", + "type": "DeclarativeStream", + } + for i in range(5) # 5 dynamic streams + ] + + request_data = { + "manifest": sample_manifest, + "config": sample_config, + "stream_limit": 2, # Limit to 2 streams per template + } + + with patch( + "airbyte_cdk.manifest_server.routers.manifest.build_source" + ) as mock_build_source: + mock_build_source.return_value = mock_source + + response = client.post("/v1/manifest/full_resolve", json=request_data) + + assert response.status_code == 200 + data = response.json() + + # Count dynamic streams added (should be limited to 2) + dynamic_streams = [ + s for s in data["manifest"]["streams"] if s["name"].startswith("dynamic_stream_") + ] + assert len(dynamic_streams) == 2 + + def test_full_resolve_multiple_dynamic_stream_templates( + self, sample_manifest, sample_config, mock_source + ): + """Test full_resolve with multiple dynamic stream templates.""" + mock_source.dynamic_streams = [ + { + "name": "dynamic_stream_1a", + "dynamic_stream_name": "template_a", + "type": "DeclarativeStream", + }, + { + "name": "dynamic_stream_1b", + "dynamic_stream_name": "template_a", + "type": "DeclarativeStream", + }, + { + "name": "dynamic_stream_2a", + "dynamic_stream_name": "template_b", + "type": "DeclarativeStream", + }, + ] + + request_data = { + "manifest": sample_manifest, + "config": sample_config, + "stream_limit": 1, # Only 1 stream per template + } + + with patch( + "airbyte_cdk.manifest_server.routers.manifest.build_source" + ) as mock_build_source: + mock_build_source.return_value = mock_source + + response = client.post("/v1/manifest/full_resolve", json=request_data) + + assert response.status_code == 200 + data = response.json() + + # Should have 2 dynamic streams (1 from each template) + dynamic_streams = [ + s for s in data["manifest"]["streams"] if s["name"].startswith("dynamic_stream_") + ] + assert len(dynamic_streams) == 2 + + # Verify we got one from each template + template_a_streams = [s for s in dynamic_streams if "1a" in s["name"]] + template_b_streams = [s for s in dynamic_streams if "2a" in s["name"]] + assert len(template_a_streams) == 1 + assert len(template_b_streams) == 1 + + @patch("airbyte_cdk.manifest_server.routers.manifest.ManifestCommandProcessor") + @patch("airbyte_cdk.manifest_server.routers.manifest.build_source") + def test_check_endpoint_success( + self, mock_build_source, mock_runner_class, sample_manifest, sample_config, mock_source + ): + """Test successful check endpoint call.""" + request_data = { + "manifest": sample_manifest, + "config": sample_config, + } + + mock_build_source.return_value = mock_source + + mock_runner = Mock() + mock_runner.check_connection.return_value = (True, "Connection successful") + mock_runner_class.return_value = mock_runner + + response = client.post("/v1/manifest/check", json=request_data) + + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + assert data["message"] == "Connection successful" + + mock_build_source.assert_called_once_with( + sample_manifest, + None, # catalog + sample_config, + None, # state + None, # record_limit + None, # page_limit + None, # slice_limit + ) + mock_runner_class.assert_called_once_with(mock_source) + mock_runner.check_connection.assert_called_once_with(sample_config) + + @patch("airbyte_cdk.manifest_server.routers.manifest.ManifestCommandProcessor") + @patch("airbyte_cdk.manifest_server.routers.manifest.build_source") + def test_check_endpoint_failure( + self, mock_build_source, mock_runner_class, sample_manifest, sample_config, mock_source + ): + """Test check endpoint with connection failure.""" + request_data = { + "manifest": sample_manifest, + "config": sample_config, + } + + mock_build_source.return_value = mock_source + + mock_runner = Mock() + mock_runner.check_connection.return_value = (False, "Invalid API key") + mock_runner_class.return_value = mock_runner + + response = client.post("/v1/manifest/check", json=request_data) + + assert response.status_code == 200 + data = response.json() + assert data["success"] is False + assert data["message"] == "Invalid API key" + + mock_build_source.assert_called_once_with( + sample_manifest, + None, # catalog + sample_config, + None, # state + None, # record_limit + None, # page_limit + None, # slice_limit + ) + + @patch("airbyte_cdk.manifest_server.routers.manifest.ManifestCommandProcessor") + @patch("airbyte_cdk.manifest_server.routers.manifest.build_source") + def test_discover_endpoint_success( + self, mock_build_source, mock_runner_class, sample_manifest, sample_config, mock_source + ): + """Test successful discover endpoint call.""" + from airbyte_protocol_dataclasses.models import AirbyteCatalog, AirbyteStream + + request_data = { + "manifest": sample_manifest, + "config": sample_config, + } + + # Create mock catalog + mock_catalog = AirbyteCatalog( + streams=[ + AirbyteStream( + name="products", + json_schema={"type": "object", "properties": {"id": {"type": "integer"}}}, + supported_sync_modes=["full_refresh"], + ) + ] + ) + + mock_build_source.return_value = mock_source + + mock_runner = Mock() + mock_runner.discover.return_value = mock_catalog + mock_runner_class.return_value = mock_runner + + response = client.post("/v1/manifest/discover", json=request_data) + + assert response.status_code == 200 + data = response.json() + assert "catalog" in data + assert data["catalog"]["streams"][0]["name"] == "products" + + mock_build_source.assert_called_once_with( + sample_manifest, + None, # catalog + sample_config, + None, # state + None, # record_limit + None, # page_limit + None, # slice_limit + ) + mock_runner_class.assert_called_once_with(mock_source) + mock_runner.discover.assert_called_once_with(sample_config) + + @patch("airbyte_cdk.manifest_server.routers.manifest.ManifestCommandProcessor") + @patch("airbyte_cdk.manifest_server.routers.manifest.build_source") + def test_discover_endpoint_missing_catalog( + self, mock_build_source, mock_runner_class, sample_manifest, sample_config, mock_source + ): + """Test discover endpoint with no catalog throws 422 error.""" + request_data = { + "manifest": sample_manifest, + "config": sample_config, + } + + mock_build_source.return_value = mock_source + + mock_runner = Mock() + mock_runner.discover.return_value = None # No catalog returned + mock_runner_class.return_value = mock_runner + + response = client.post("/v1/manifest/discover", json=request_data) + + assert response.status_code == 422 + data = response.json() + assert "Connector did not return a discovered catalog" in data["detail"] diff --git a/unit_tests/manifest_server/test_auth.py b/unit_tests/manifest_server/test_auth.py new file mode 100644 index 000000000..d9a439522 --- /dev/null +++ b/unit_tests/manifest_server/test_auth.py @@ -0,0 +1,141 @@ +import os +from datetime import datetime, timedelta, timezone +from unittest.mock import patch + +import jwt +import pytest +from fastapi import HTTPException +from fastapi.security import HTTPAuthorizationCredentials + +from airbyte_cdk.manifest_server.auth import verify_jwt_token + + +class TestVerifyJwtToken: + """Test cases for JWT token verification.""" + + def test_no_secret_allows_all_requests(self): + """Test that when AB_JWT_SIGNATURE_SECRET is not set, all requests pass through.""" + with patch.dict(os.environ, {}, clear=True): + # Should not raise any exception + verify_jwt_token(None) + verify_jwt_token(HTTPAuthorizationCredentials(scheme="Bearer", credentials="any-token")) + + def test_missing_credentials_with_secret_raises_401(self): + """Test that missing credentials raise 401 when secret is configured.""" + with patch.dict(os.environ, {"AB_JWT_SIGNATURE_SECRET": "test-secret"}): + with pytest.raises(HTTPException) as exc_info: + verify_jwt_token(None) + + assert exc_info.value.status_code == 401 + assert exc_info.value.detail == "Bearer token required" + assert exc_info.value.headers == {"WWW-Authenticate": "Bearer"} + + def test_invalid_token_raises_401(self): + """Test that invalid JWT tokens raise 401.""" + with patch.dict(os.environ, {"AB_JWT_SIGNATURE_SECRET": "test-secret"}): + invalid_credentials = HTTPAuthorizationCredentials( + scheme="Bearer", credentials="invalid.jwt.token" + ) + + with pytest.raises(HTTPException) as exc_info: + verify_jwt_token(invalid_credentials) + + assert exc_info.value.status_code == 401 + assert exc_info.value.detail == "Invalid token" + assert exc_info.value.headers == {"WWW-Authenticate": "Bearer"} + + def test_malformed_token_raises_401(self): + """Test that malformed tokens raise 401.""" + with patch.dict(os.environ, {"AB_JWT_SIGNATURE_SECRET": "test-secret"}): + malformed_credentials = HTTPAuthorizationCredentials( + scheme="Bearer", credentials="not-a-jwt-token" + ) + + with pytest.raises(HTTPException) as exc_info: + verify_jwt_token(malformed_credentials) + + assert exc_info.value.status_code == 401 + assert exc_info.value.detail == "Invalid token" + + def test_valid_token_passes(self): + """Test that valid JWT tokens pass verification.""" + secret = "test-secret-key" + payload = { + "exp": datetime.now(timezone.utc) + timedelta(hours=1), + "iat": datetime.now(timezone.utc), + "sub": "test-user", + } + valid_token = jwt.encode(payload, secret, algorithm="HS256") + + with patch.dict(os.environ, {"AB_JWT_SIGNATURE_SECRET": secret}): + valid_credentials = HTTPAuthorizationCredentials( + scheme="Bearer", credentials=valid_token + ) + + # Should not raise any exception + verify_jwt_token(valid_credentials) + + def test_expired_token_raises_401(self): + """Test that expired JWT tokens raise 401.""" + secret = "test-secret-key" + expired_payload = { + "exp": datetime.now(timezone.utc) - timedelta(hours=1), + "iat": datetime.now(timezone.utc) - timedelta(hours=2), + "sub": "test-user", + } + expired_token = jwt.encode(expired_payload, secret, algorithm="HS256") + + with patch.dict(os.environ, {"AB_JWT_SIGNATURE_SECRET": secret}): + expired_credentials = HTTPAuthorizationCredentials( + scheme="Bearer", credentials=expired_token + ) + + with pytest.raises(HTTPException) as exc_info: + verify_jwt_token(expired_credentials) + + assert exc_info.value.status_code == 401 + assert exc_info.value.detail == "Invalid token" + + def test_wrong_secret_raises_401(self): + """Test that tokens signed with wrong secret raise 401.""" + correct_secret = "correct-secret" + wrong_secret = "wrong-secret" + + payload = { + "exp": datetime.now(timezone.utc) + timedelta(hours=1), + "iat": datetime.now(timezone.utc), + "sub": "test-user", + } + token_with_wrong_secret = jwt.encode(payload, wrong_secret, algorithm="HS256") + + with patch.dict(os.environ, {"AB_JWT_SIGNATURE_SECRET": correct_secret}): + wrong_credentials = HTTPAuthorizationCredentials( + scheme="Bearer", credentials=token_with_wrong_secret + ) + + with pytest.raises(HTTPException) as exc_info: + verify_jwt_token(wrong_credentials) + + assert exc_info.value.status_code == 401 + assert exc_info.value.detail == "Invalid token" + + def test_empty_secret_allows_all_requests(self): + """Test that empty secret string allows all requests.""" + with patch.dict(os.environ, {"AB_JWT_SIGNATURE_SECRET": ""}): + # Should not raise any exception + verify_jwt_token(None) + verify_jwt_token(HTTPAuthorizationCredentials(scheme="Bearer", credentials="any-token")) + + def test_token_without_required_claims_passes(self): + """Test that tokens without standard claims still pass if signature is valid.""" + secret = "test-secret" + minimal_payload = {"custom": "data"} # No exp, iat, sub etc. + minimal_token = jwt.encode(minimal_payload, secret, algorithm="HS256") + + with patch.dict(os.environ, {"AB_JWT_SIGNATURE_SECRET": secret}): + minimal_credentials = HTTPAuthorizationCredentials( + scheme="Bearer", credentials=minimal_token + ) + + # Should not raise any exception - we only verify signature + verify_jwt_token(minimal_credentials)