Skip to content

Conversation

pedroslopez
Copy link
Contributor

@pedroslopez pedroslopez commented Aug 22, 2025

Introduces an HTTP interface for running declarative manifests, manifest-sererv. This will be used as a more lightweight and cdk-native replacement of the connector-builder-server that is currently a part of the Platform. More than just a connector-builder-specific server, this is structured as a more generic interface that can be leveraged outside of the platform itself, and have more flexible deployment options.

What / How

Endpoints

This first iteration includes the following endpoints:

  • /v1/manifest/test_read - Test reading from a specific stream in the manifest. Meant for use in the connector builder or to preview a stream.
  • /v1/manifest/check - Check configuration against a declarative source.
  • /v1/manifest/discover - Discover streams from a declarative source.

Connector builder / Manifest-composition helpers:

  • /v1/manifest/resolve - Resolve a manifest to its final configuration.
  • /v1/manifest/full_resolve - Fully resolve a manifest including dynamic streams.

Configuration

Authentication

JWT-based authentication can be optionally enabled. This uses the same internal auth mechanism that airbyte-platform uses to make it easy to plug into an airbyte instance. To enable, AB_JWT_SIGNATURE_SECRET must be set, with which the JWT signatures will be validated.

Custom Components

Custom python component support is off by default. To enable custom python components, set AIRBYTE_ENABLE_UNSAFE_CODE=true

CLI

To make it easy to run the server and perform actions, a CLI command is exposed for the manifest sererv.

OpenAPI

The platform will use the openapi file in this repo to generate an API client to talk to the manifest server. For this reason, the openapi.yaml file is checked into the repo. The file can be regenerated by using the cli.

Summary by CodeRabbit

  • New Features

    • Introduced a Manifest Server with endpoints for health, capabilities, and manifest operations (test_read, check, discover, resolve, full_resolve).
    • Optional JWT authentication via environment secret.
    • New CLI: manifest-server (start, info, generate-openapi).
    • OpenAPI spec generation and included YAML.
    • Dockerfile added with multi-arch image support; default container port 8080.
  • Documentation

    • Added README covering installation, usage, endpoints, auth, and Docker instructions.
  • Chores

    • New GitHub Action to prevent Docker tag overwrites.
    • CI workflows updated to build and publish multi-arch images and enforce tag checks.
  • Tests

    • Added extensive unit tests for routers, auth, processor, and utilities.

Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@pedro/add-manifest-runner#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch pedro/add-manifest-runner

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

@github-actions github-actions bot added the enhancement New feature or request label Aug 22, 2025
Copy link

github-actions bot commented Aug 22, 2025

PyTest Results (Fast)

3 762 tests  +41   3 751 ✅ +41   9m 22s ⏱️ + 1m 57s
    1 suites ± 0      11 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit caa8b15. ± Comparison against base commit 2aa7b58.

♻️ This comment has been updated with latest results.

Copy link

github-actions bot commented Aug 22, 2025

PyTest Results (Full)

3 765 tests  +41   3 754 ✅ +41   11m 13s ⏱️ +41s
    1 suites ± 0      11 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit caa8b15. ± Comparison against base commit 2aa7b58.

♻️ This comment has been updated with latest results.

@pedroslopez
Copy link
Contributor Author

/poetry-lock

pedroslopez and others added 2 commits August 22, 2025 08:21
…in permissions

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
airbyte_cdk/manifest_server/routers/manifest.py (1)

141-159: Ensure immutable manifests, global stream limits, and drop transient fields

I dug into how source.resolved_manifest and dynamic_streams work and have a few adjustments to keep things safe and predictable—wdyt?

  • Use a deep copy of resolved_manifest so we don’t accidentally mutate any shared state:
    • add import copy
    • change manifest = {**source.resolved_manifest}manifest = copy.deepcopy(source.resolved_manifest)

  • Remove the dynamic_stream_name field entirely from all streams (base and generated) rather than setting it to None, so the final manifest is clean:
    • in the base streams list:
    stream.pop("dynamic_stream_name", None)
    • when collecting generated streams, build a sanitized dict without that key

  • Enforce one global stream_limit rather than per dynamic‐stream name:
    • track total_added = 0
    • for each entry in source.dynamic_streams (the property inherited from ManifestDeclarativeSource), break once total_added >= request.stream_limit

Here’s a proposed diff—feel free to tweak:

--- a/airbyte_cdk/manifest_server/routers/manifest.py
+++ b/airbyte_cdk/manifest_server/routers/manifest.py
@@
+import copy
@@
-    manifest = {**source.resolved_manifest}
+    manifest = copy.deepcopy(source.resolved_manifest)
     streams = manifest.get("streams", [])
-    for stream in streams:
-        stream["dynamic_stream_name"] = None
+    for stream in streams:
+        stream.pop("dynamic_stream_name", None)
 
-    mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
-    for stream in source.dynamic_streams:
-        generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], [])
-
-        if len(generated_streams) < request.stream_limit:
-            generated_streams += [stream]
+    total_added = 0
+    mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
+    for stream in source.dynamic_streams:
+        if total_added >= request.stream_limit:
+            break
+        name = stream.get("dynamic_stream_name")
+        sanitized = {k: v for k, v in stream.items() if k != "dynamic_stream_name"}
+        mapped_streams.setdefault(name, []).append(sanitized)
+        total_added += 1
 
     for generated_streams_list in mapped_streams.values():
         streams.extend(generated_streams_list)

Let me know if this captures everything!

♻️ Duplicate comments (2)
.github/actions/check-docker-tag/action.yml (1)

16-22: Good fix: input validation now correctly checks image and tag separately.

This addresses the earlier bug about empty tags slipping through by validating both image_name and tag prior to concatenation. Nice cleanup, wdyt?

airbyte_cdk/manifest_server/routers/manifest.py (1)

73-83: Nice fix: state is typed and custom code injection happens before build.

Converting state with AirbyteStateMessageSerializer before building the source and injecting custom_components_code prior to safe_build_source resolves the shadowing/ordering issues flagged earlier. Thanks for addressing this.

🧹 Nitpick comments (8)
.github/actions/check-docker-tag/action.yml (3)

24-27: Add a small retry to reduce flakiness on registry/network hiccups?

docker manifest inspect can intermittently fail due to transient network or Docker Hub rate limiting. Would you add a short retry with backoff before concluding the tag does not exist, to reduce false negatives?

-        if DOCKER_CLI_EXPERIMENTAL=enabled docker manifest inspect "$tag" > /dev/null 2>&1; then
+        tries=0
+        max_tries=3
+        while ! DOCKER_CLI_EXPERIMENTAL=enabled docker manifest inspect "$tag" > /dev/null 2>&1; do
+          tries=$((tries+1))
+          if [ "$tries" -ge "$max_tries" ]; then
+            break
+          fi
+          sleep $((tries))  # simple backoff
+        done
+        if [ "$tries" -lt "$max_tries" ]; then
           echo "The tag '$tag' already exists on DockerHub."
-          echo "tag_exists=true" >> "$GITHUB_OUTPUT"
+          echo "tag_exists=true" >> "$GITHUB_OUTPUT"
         else
           echo "No existing tag '$tag' found."
           echo "tag_exists=false" >> "$GITHUB_OUTPUT"
         fi

If you prefer to keep it simpler, we can skip this, but it does help with occasional flakes, wdyt?


28-28: Add a trailing newline at EOF.

YAMLlint flags the missing final newline. Can we add one to appease linters and avoid diff churn later, wdyt?


24-24: Confirm Docker availability on hosts

I traced the two uses of this action in .github/workflows/pypi_publish.yml at lines 214 and 295, and both jobs run on the GitHub-hosted ubuntu-24.04 image, which includes the Docker CLI by default. That means our current workflows are covered. To make this requirement explicit for anyone who might reuse the action on a different runner, how about adding a note to the action’s README.md or the action.yml description stating that Docker CLI must be installed? wdyt?

• Usage locations:

  • .github/workflows/pypi_publish.yml:214
  • .github/workflows/pypi_publish.yml:295
airbyte_cdk/manifest_server/routers/manifest.py (5)

57-58: Broaden validation error handling and surface json path.

Would we also want to catch SchemaError and include a path hint when available so clients get a more actionable 400, wdyt?

-    except jsonschema.exceptions.ValidationError as e:
-        raise HTTPException(status_code=400, detail=f"Invalid manifest: {e.message}")
+    except (jsonschema.exceptions.ValidationError, jsonschema.exceptions.SchemaError) as e:
+        path = "/".join(map(str, getattr(e, "path", []))) if getattr(e, "path", None) else None
+        detail = f"Invalid manifest: {getattr(e, 'message', str(e))}"
+        if path:
+            detail += f" at path: {path}"
+        raise HTTPException(status_code=400, detail=detail)

106-113: Minor: avoid repeated model_dump calls for config.

Tiny nit: caching config_dict removes duplication and avoids serializing twice, wdyt?

 def check(request: CheckRequest) -> CheckResponse:
     """Check configuration against a manifest"""
-    source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
+    config_dict = request.config.model_dump()
+    source = safe_build_source(request.manifest.model_dump(), config_dict)
     runner = ManifestCommandProcessor(source)
-    success, message = runner.check_connection(request.config.model_dump())
+    success, message = runner.check_connection(config_dict)
     return CheckResponse(success=success, message=message)

115-124: Same nit as above: cache config once.

Mirroring the check endpoint for consistency/readability, wdyt?

 def discover(request: DiscoverRequest) -> DiscoverResponse:
     """Discover streams from a manifest"""
-    source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
+    config_dict = request.config.model_dump()
+    source = safe_build_source(request.manifest.model_dump(), config_dict)
     runner = ManifestCommandProcessor(source)
-    catalog = runner.discover(request.config.model_dump())
+    catalog = runner.discover(config_dict)
     if catalog is None:
         raise HTTPException(status_code=422, detail="Connector did not return a discovered catalog")
     return DiscoverResponse(catalog=catalog)

126-131: Resolve without config: is this intentional?

The resolve endpoint uses an empty dict for config. If manifests can reference config values (templating, secrets), should resolve optionally accept config to mirror how it will be used at runtime, or do we explicitly want a config-agnostic resolution here, wdyt?

If the intention is config-agnostic resolution, consider documenting that in the OpenAPI description for /resolve so clients don’t expect config interpolation.


68-69: Optional: declare response_model on routes for clearer OpenAPI and client generation.

FastAPI infers request models from function annotations but doesn’t set response schemas unless response_model is provided. Declaring it stabilizes the generated openapi.yaml and downstream clients, wdyt?

Example for one route (apply similarly to others):

-@router.post("/test_read", operation_id="testRead")
+@router.post("/test_read", operation_id="testRead", response_model=StreamRead)
 def test_read(request: StreamTestReadRequest) -> StreamRead:

Suggested response models:

  • /test_read -> StreamRead
  • /check -> CheckResponse
  • /discover -> DiscoverResponse
  • /resolve -> ManifestResponse
  • /full_resolve -> ManifestResponse


Also applies to: 106-107, 115-116, 126-127, 133-134

</blockquote></details>

</blockquote></details>

<details>
<summary>📜 Review details</summary>

**Configuration used**: CodeRabbit UI

**Review profile**: CHILL

**Plan**: Pro

**💡 Knowledge Base configuration:**

- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

<details>
<summary>📥 Commits</summary>

Reviewing files that changed from the base of the PR and between f8aa7285295d35e89dfed8010ccc9c535dcce7d4 and b3b0787f2e3823f4f5182c25d2a63c369d54c898.

</details>

<details>
<summary>📒 Files selected for processing (3)</summary>

* `.github/actions/check-docker-tag/action.yml` (1 hunks)
* `airbyte_cdk/manifest_server/command_processor/utils.py` (1 hunks)
* `airbyte_cdk/manifest_server/routers/manifest.py` (1 hunks)

</details>

<details>
<summary>🚧 Files skipped from review as they are similar to previous changes (1)</summary>

* airbyte_cdk/manifest_server/command_processor/utils.py

</details>

<details>
<summary>🧰 Additional context used</summary>

<details>
<summary>🧬 Code graph analysis (1)</summary>

<details>
<summary>airbyte_cdk/manifest_server/routers/manifest.py (6)</summary><blockquote>

<details>
<summary>airbyte_cdk/manifest_server/api_models/manifest.py (8)</summary>

* `CheckRequest` (29-33)
* `CheckResponse` (36-40)
* `DiscoverRequest` (43-47)
* `DiscoverResponse` (50-53)
* `FullResolveRequest` (68-73)
* `ManifestResponse` (62-65)
* `ResolveRequest` (56-59)
* `StreamTestReadRequest` (16-26)

</details>
<details>
<summary>airbyte_cdk/manifest_server/api_models/stream.py (1)</summary>

* `StreamRead` (67-76)

</details>
<details>
<summary>airbyte_cdk/manifest_server/auth.py (1)</summary>

* `verify_jwt_token` (11-43)

</details>
<details>
<summary>airbyte_cdk/manifest_server/command_processor/processor.py (4)</summary>

* `ManifestCommandProcessor` (26-151)
* `test_read` (33-61)
* `check_connection` (63-79)
* `discover` (81-92)

</details>
<details>
<summary>airbyte_cdk/manifest_server/command_processor/utils.py (2)</summary>

* `build_catalog` (22-35)
* `build_source` (58-91)

</details>
<details>
<summary>airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)</summary>

* `dynamic_streams` (273-278)

</details>

</blockquote></details>

</details><details>
<summary>🪛 YAMLlint (1.37.1)</summary>

<details>
<summary>.github/actions/check-docker-tag/action.yml</summary>

[error] 28-28: no new line character at the end of file

(new-line-at-end-of-file)

</details>

</details>

</details>

<details>
<summary>⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)</summary>

* GitHub Check: Check: source-intercom
* GitHub Check: Check: source-hardcoded-records
* GitHub Check: Check: source-pokeapi
* GitHub Check: Check: source-shopify
* GitHub Check: Check: destination-motherduck
* GitHub Check: Pytest (Fast)
* GitHub Check: SDM Docker Image Build
* GitHub Check: Manifest Server Docker Image Build
* GitHub Check: Pytest (All, Python 3.10, Ubuntu)
* GitHub Check: Pytest (All, Python 3.11, Ubuntu)

</details>

</details>

<!-- This is an auto-generated comment by CodeRabbit for review status -->

Copy link
Contributor

@aaronsteers aaronsteers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pedroslopez - Really nice work on this! I'm approving to unblock merging today if ready, with just one change I'd like to see in this iteration before merging: specifically my inline comments to see if we can reduce the amount of code to be maintained by leveraging existing methods on EntrypointOutput:

https://airbytehq.github.io/airbyte-python-cdk/airbyte_cdk/test/entrypoint_wrapper.html#EntrypointOutput

My other comment is in regards to having a single module (outside the server module) where local CLI and MCP use cases could use the same logic. The MCP code (here) as of now makes heavy use of the airbyte_cdk.connector_builder.connector_builder_handler module (pdoc link here). Where possible (at least in the long run), we should probably see about centralizing common logic into that module or another one shared by the different implementations.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
airbyte_cdk/manifest_server/command_processor/processor.py (2)

69-86: Nice: leveraging EntrypointOutput.connection_status_messages and raise_if_errors

This addresses prior feedback to use the prebuilt helpers and to surface trace errors cleanly. Looks good to me. Wdyt?


95-109: LGTM on discover: using EntrypointOutput.catalog with error surfacing

Using output.catalog and wrapping trace errors aligns with earlier guidance; the ValueError guard is a reasonable fallback. Wdyt?

🧹 Nitpick comments (6)
airbyte_cdk/manifest_server/api_models/__init__.py (3)

1-5: Polish the module docstring to remove “Server Service” redundancy.

Would you update the wording for brevity/clarity, wdyt?

-"""
-API Models for the Manifest Server Service.
-
-This package contains all Pydantic models used for API requests and responses.
-"""
+"""
+Manifest Server API models.
+
+Re-exports Pydantic models used for request and response payloads.
+"""

7-17: Optionally type __all__ with Final for static analyzers.

Adding Final communicates intent and prevents accidental reassignment by tools. Ok to add this import, wdyt?

+from typing import Final
 from .dicts import ConnectorConfig, Manifest
 from .manifest import (
     CheckRequest,
     CheckResponse,
     DiscoverRequest,
     DiscoverResponse,
     FullResolveRequest,
     ManifestResponse,
     ResolveRequest,
     StreamTestReadRequest,
 )

28-49: Tighten __all__: clarify comment and sort Stream exports for quick scanning.

  • “Typed Dicts” is slightly misleading—these are Pydantic models that allow extra keys.
  • Sorting the StreamRead entries (Pages, Response, Slices) improves scanability. Also annotate __all__ with Final (pairs with the import above). wdyt?
-__all__ = [
-    # Typed Dicts
+__all__: Final = [
+    # Dict-like Pydantic models
     "ConnectorConfig",
     "Manifest",
     # Manifest request/response models
     "FullResolveRequest",
     "ManifestResponse",
     "StreamTestReadRequest",
     "ResolveRequest",
     "CheckRequest",
     "CheckResponse",
     "DiscoverRequest",
     "DiscoverResponse",
     # Stream models
     "AuxiliaryRequest",
     "HttpRequest",
     "HttpResponse",
     "LogMessage",
-    "StreamReadResponse",
-    "StreamReadPages",
+    "StreamReadPages",
+    "StreamReadResponse",
     "StreamReadSlices",
 ]
airbyte_cdk/manifest_server/command_processor/processor.py (3)

111-121: Good HTTP mapping of traced errors (422)

Catching AirbyteEntrypointException and converting to HTTPException(422) is appropriate here. Do we also want to include a small hint (e.g., last log message) in the detail for faster debugging, or keep it minimal for now? Wdyt?


17-17: Remove unused import AirbyteMessageSerializer

This import appears unused in this module. Shall we drop it to keep things lean? Wdyt?

-from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer

55-56: Prefer explicit stream selection in test_read

It looks like test_read today always uses the first stream in the catalog (catalog.streams[0]), which will silently ignore additional streams if someone passes a multi-stream catalog. This could lead to unexpected behavior when more than one stream is configured.

Would you consider one of the following approaches?

  • Have the endpoint accept a stream_name parameter, then look it up in catalog.streams and raise a clear error if it’s missing.
  • Keep the signature as-is but assert that len(catalog.streams) == 1 and error out otherwise, making it explicit that only single-stream catalogs are supported.

Either option makes the behavior intentional rather than implicit—wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between b3b0787 and 24c2184.

📒 Files selected for processing (5)
  • airbyte_cdk/manifest_server/api_models/__init__.py (1 hunks)
  • airbyte_cdk/manifest_server/api_models/stream.py (1 hunks)
  • airbyte_cdk/manifest_server/command_processor/processor.py (1 hunks)
  • airbyte_cdk/manifest_server/routers/manifest.py (1 hunks)
  • unit_tests/manifest_server/command_processor/test_processor.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • unit_tests/manifest_server/command_processor/test_processor.py
  • airbyte_cdk/manifest_server/api_models/stream.py
  • airbyte_cdk/manifest_server/routers/manifest.py
🧰 Additional context used
🧬 Code graph analysis (2)
airbyte_cdk/manifest_server/command_processor/processor.py (4)
airbyte_cdk/connector_builder/models.py (1)
  • StreamRead (57-64)
airbyte_cdk/connector_builder/test_reader/reader.py (2)
  • TestReader (39-463)
  • run_test_read (84-161)
airbyte_cdk/entrypoint.py (2)
  • AirbyteEntrypoint (53-368)
  • airbyte_message_to_string (331-342)
airbyte_cdk/test/entrypoint_wrapper.py (4)
  • AirbyteEntrypointException (54-70)
  • EntrypointOutput (73-347)
  • connection_status_messages (171-172)
  • raise_if_errors (228-238)
airbyte_cdk/manifest_server/api_models/__init__.py (3)
airbyte_cdk/manifest_server/api_models/dicts.py (2)
  • ConnectorConfig (14-17)
  • Manifest (8-11)
airbyte_cdk/manifest_server/api_models/manifest.py (8)
  • CheckRequest (29-33)
  • CheckResponse (36-40)
  • DiscoverRequest (43-47)
  • DiscoverResponse (50-53)
  • FullResolveRequest (68-73)
  • ManifestResponse (62-65)
  • ResolveRequest (56-59)
  • StreamTestReadRequest (16-26)
airbyte_cdk/manifest_server/api_models/stream.py (7)
  • AuxiliaryRequest (40-47)
  • HttpRequest (14-20)
  • HttpResponse (23-28)
  • LogMessage (31-37)
  • StreamReadPages (50-55)
  • StreamReadResponse (67-76)
  • StreamReadSlices (58-64)
🪛 GitHub Actions: Generate Docs
airbyte_cdk/manifest_server/command_processor/processor.py

[error] 18-18: ModuleNotFoundError: No module named 'airbyte_cdk.sources.declarative.manifest_declarative_source' imported by processor.py (line 18) during docs generation (poetry run poe docs-generate).

🪛 GitHub Actions: Pytest (Fast)
airbyte_cdk/manifest_server/command_processor/processor.py

[error] 18-18: ModuleNotFoundError: No module named 'airbyte_cdk.sources.declarative.manifest_declarative_source' (import in processor.py:18). Command: poetry run coverage run -m pytest --durations=5 --exitfirst -m "not slow".

🪛 GitHub Actions: PyTest Matrix
airbyte_cdk/manifest_server/command_processor/processor.py

[error] 18-18: Step: poetry run coverage run -m pytest --durations=10 -m 'not linting and not super_slow and not flaky'; Error: ModuleNotFoundError: No module named 'airbyte_cdk.sources.declarative.manifest_declarative_source'

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-shopify
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: SDM Docker Image Build
🔇 Additional comments (1)
airbyte_cdk/manifest_server/api_models/__init__.py (1)

18-26: LGTM on re-exporting stream models.

The selection matches the concrete models defined in stream.py and keeps the public surface coherent.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
airbyte_cdk/manifest_server/routers/manifest.py (3)

76-81: Fail fast at the edge when custom_components_code is provided.

Even with central gating, would you also gate at the route level to give immediate feedback and avoid building anything when unsafe code is disabled, wdyt?

Apply this diff:

     if request.custom_components_code:
+        if os.environ.get("AIRBYTE_ENABLE_UNSAFE_CODE", "").lower() not in ("1", "true", "yes"):
+            raise HTTPException(
+                status_code=403,
+                detail="Custom components are disabled. Set AIRBYTE_ENABLE_UNSAFE_CODE=true to enable.",
+            )
         config_dict[INJECTED_COMPONENTS_PY] = request.custom_components_code
         config_dict[INJECTED_COMPONENTS_PY_CHECKSUMS] = {
             "md5": hashlib.md5(request.custom_components_code.encode()).hexdigest()
         }

131-137: Docstring could benefit from a short example.

Would a brief before/after example of dynamic stream expansion help clarify usage and when to prefer full_resolve vs resolve, wdyt?


35-57: Enforce unsafe custom code gate centrally in safe_build_source (security).

As-is, any endpoint can pass INJECTED_COMPONENTS_PY in config and reach build_source without server-side gating, which risks arbitrary code execution when the compiler is enabled. Can we block this in safe_build_source unless AIRBYTE_ENABLE_UNSAFE_CODE is truthy, wdyt?

Apply this diff:

-import hashlib
+import hashlib
+import os
+import copy
@@
 def safe_build_source(
@@
 ) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]:
     """Wrapper around build_source that converts ValidationError to HTTPException."""
+    # Block injected custom components unless explicitly enabled.
+    if INJECTED_COMPONENTS_PY in config_dict:
+        if os.environ.get("AIRBYTE_ENABLE_UNSAFE_CODE", "").lower() not in ("1", "true", "yes"):
+            raise HTTPException(
+                status_code=403,
+                detail="Custom components are disabled. Set AIRBYTE_ENABLE_UNSAFE_CODE=true to enable.",
+            )
     try:
         return build_source(
             manifest_dict,
             catalog,
             config_dict,
             state,
             record_limit,
             page_limit,
             slice_limit,
         )

Also applies to: 1-3

🧹 Nitpick comments (3)
airbyte_cdk/manifest_server/routers/manifest.py (3)

145-153: Map dynamic stream generation errors to a client error.

source.dynamic_streams can raise; currently this would 500. Shall we convert failures into a 422 to keep API behavior consistent, wdyt?

Apply this diff:

-    for stream in source.dynamic_streams:
+    try:
+        dynamic_streams = source.dynamic_streams
+    except Exception as e:
+        raise HTTPException(status_code=422, detail=f"Failed to generate dynamic streams: {e}")
+    for stream in dynamic_streams:
         generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], [])

104-111: Minor: avoid duplicate model_dump calls.

Store config_dict once to reduce repetition and keep logs/debugging consistent, wdyt?

Apply this diff:

-    source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
+    config_dict = request.config.model_dump()
+    source = safe_build_source(request.manifest.model_dump(), config_dict)
     runner = ManifestCommandProcessor(source)
-    success, message = runner.check_connection(request.config.model_dump())
+    success, message = runner.check_connection(config_dict)

113-121: Minor: same dedup for discover.

Mirror the check() cleanup so we don’t call model_dump twice, wdyt?

Apply this diff:

-    source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
+    config_dict = request.config.model_dump()
+    source = safe_build_source(request.manifest.model_dump(), config_dict)
     runner = ManifestCommandProcessor(source)
-    catalog = runner.discover(request.config.model_dump())
+    catalog = runner.discover(config_dict)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between c403e34 and 2982dca.

📒 Files selected for processing (1)
  • airbyte_cdk/manifest_server/routers/manifest.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte_cdk/manifest_server/routers/manifest.py (4)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
  • ConcurrentDeclarativeSource (165-1057)
airbyte_cdk/manifest_server/api_models/manifest.py (4)
  • FullResolveRequest (68-73)
  • ManifestResponse (62-65)
  • ResolveRequest (56-59)
  • StreamTestReadRequest (16-26)
airbyte_cdk/manifest_server/command_processor/processor.py (4)
  • ManifestCommandProcessor (23-122)
  • test_read (32-60)
  • check_connection (62-87)
  • discover (89-110)
airbyte_cdk/manifest_server/command_processor/utils.py (2)
  • build_catalog (22-35)
  • build_source (58-91)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-shopify
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/manifest_server/routers/manifest.py (1)

66-101: Nice fix on ordering and typed state.

Injecting custom code before safe_build_source and converting state to AirbyteStateMessage (converted_state) looks correct and avoids shadowing. LGTM.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
airbyte_cdk/manifest_server/command_processor/utils.py (1)

58-99: Clamp concurrency to a safe minimum, strip UI-only flags reliably, and pass concrete ints to TestLimits.

  • Setting default_concurrency=1 risks deadlock; ConcurrentDeclarativeSource documents a lowest safe level of 2. Let’s clamp to that minimum instead of forcing 1. This was raised earlier as well.
  • We currently delete the special keys only when True; if present and False they leak into the manifest and may violate schema. Pop them unconditionally and use the popped booleans.
  • Using “or” will turn 0 into defaults; coalesce None explicitly to preserve zeros and satisfy mypy. This was also noted earlier.

Would you be open to this refactor, wdyt?

@@
-def build_source(
+def build_source(
     manifest: Dict[str, Any],
     catalog: Optional[ConfiguredAirbyteCatalog],
     config: Mapping[str, Any],
     state: Optional[List[AirbyteStateMessage]],
     record_limit: Optional[int] = None,
     page_limit: Optional[int] = None,
     slice_limit: Optional[int] = None,
 ) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]:
-    # We enforce a concurrency level of 1 so that the stream is processed on a single thread
-    # to retain ordering for the grouping of the builder message responses.
-    definition = copy.deepcopy(manifest)
-    if "concurrency_level" in definition:
-        definition["concurrency_level"]["default_concurrency"] = 1
-    else:
-        definition["concurrency_level"] = {
-            "type": "ConcurrencyLevel",
-            "default_concurrency": 1,
-        }
-
-    should_normalize = should_normalize_manifest(manifest)
-    if should_normalize:
-        del definition[SHOULD_NORMALIZE_KEY]
-
-    should_migrate = should_migrate_manifest(manifest)
-    if should_migrate:
-        del definition[SHOULD_MIGRATE_KEY]
+    # Make a shallow copy we can safely mutate and that mypy accepts.
+    manifest_cfg: dict[str, Any] = dict(manifest)
+
+    # Read and strip UI-only flags unconditionally.
+    should_normalize = bool(manifest_cfg.pop(SHOULD_NORMALIZE_KEY, False))
+    should_migrate = bool(manifest_cfg.pop(SHOULD_MIGRATE_KEY, False))
+
+    # Enforce a safe minimum concurrency; levels < 2 may deadlock.
+    lowest_safe = getattr(ConcurrentDeclarativeSource, "_LOWEST_SAFE_CONCURRENCY_LEVEL", 2)
+    desired_concurrency = max(1, lowest_safe)
+    concurrency = dict(manifest_cfg.get("concurrency_level") or {})
+    concurrency.setdefault("type", "ConcurrencyLevel")
+    concurrency["default_concurrency"] = desired_concurrency
+    manifest_cfg["concurrency_level"] = concurrency
@@
-    return ConcurrentDeclarativeSource(
+    return ConcurrentDeclarativeSource(
         catalog=catalog,
         state=state,
-        source_config=definition,
+        source_config=manifest_cfg,
         config=config,
         normalize_manifest=should_normalize,
         migrate_manifest=should_migrate,
         emit_connector_builder_messages=True,
         limits=TestLimits(
-            max_pages_per_slice=page_limit or TestLimits.DEFAULT_MAX_PAGES_PER_SLICE,
-            max_slices=slice_limit or TestLimits.DEFAULT_MAX_SLICES,
-            max_records=record_limit or TestLimits.DEFAULT_MAX_RECORDS,
+            max_pages_per_slice=page_limit if page_limit is not None else TestLimits.DEFAULT_MAX_PAGES_PER_SLICE,
+            max_slices=slice_limit if slice_limit is not None else TestLimits.DEFAULT_MAX_SLICES,
+            max_records=record_limit if record_limit is not None else TestLimits.DEFAULT_MAX_RECORDS,
         ),
     )

Follow-ups:

  • If strict message ordering is required, should we preserve ordering in response assembly instead of serializing to one worker? I can draft an ordering/grouping strategy at the API layer, wdyt?
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 2982dca and c691373.

📒 Files selected for processing (1)
  • airbyte_cdk/manifest_server/command_processor/utils.py (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.

Applied to files:

  • airbyte_cdk/manifest_server/command_processor/utils.py
🧬 Code graph analysis (1)
airbyte_cdk/manifest_server/command_processor/utils.py (2)
airbyte_cdk/models/airbyte_protocol.py (1)
  • AirbyteStateMessage (67-75)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
  • ConcurrentDeclarativeSource (165-1057)
  • TestLimits (126-137)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
  • GitHub Check: Publish Manifest Server to DockerHub
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: SDM Docker Image Build
🔇 Additional comments (2)
airbyte_cdk/manifest_server/command_processor/utils.py (2)

38-46: LGTM: migrate flag helper is clear and side-effect free.

No issues; straightforward and readable.


48-55: LGTM: normalize flag helper mirrors migrate and keeps behavior explicit.

Looks good as-is.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
.github/workflows/publish.yml (5)

36-40: New input for manifest-server publishing

Adding publish_manifest_server is clear and consistent with other flags. Do we want its default to true, or should we default to false to reduce accidental publishes on manual runs, wdyt?


269-280: Is the artifact download actually needed here?

The manifest-server image builds from airbyte_cdk/manifest_server/Dockerfile and doesn’t obviously use dist/*. If the Dockerfile doesn’t COPY from dist, can we drop the artifact download to save time/bandwidth, wdyt?

-      # We need to download the build artifact again because the previous job was on a different runner
-      - name: Download Build Artifact
-        uses: actions/download-artifact@v4
-        with:
-          name: Packages-${{ github.run_id }}
-          path: dist

300-311: Enable Buildx cache for faster multi-arch rebuilds?

Adding GHA cache can materially speed subsequent publishes. Shall we enable it, wdyt?

       - name: "Build and push (sha tag: '${{ github.sha }}')"
         # Only run if the version is not set
         if: env.VERSION == ''
         uses: docker/build-push-action@v5
         with:
           context: .
           file: airbyte_cdk/manifest_server/Dockerfile
           platforms: linux/amd64,linux/arm64
           push: true
+          cache-from: type=gha
+          cache-to: type=gha,mode=max
           tags: |
             airbyte/manifest-server:${{ github.sha }}

312-323: Ditto: caching on versioned build

Same cache suggestion for the versioned build to keep times down, wdyt?

       - name: "Build and push (version tag: ${{ env.VERSION || 'none'}})"
         # Only run if the version is set
         if: env.VERSION != ''
         uses: docker/build-push-action@v5
         with:
           context: .
           file: airbyte_cdk/manifest_server/Dockerfile
           platforms: linux/amd64,linux/arm64
           push: true
+          cache-from: type=gha
+          cache-to: type=gha,mode=max
           tags: |
             airbyte/manifest-server:${{ env.VERSION }}

324-335: And caching on latest push

Applying the same cache config here keeps parity across all three build steps. Add if you agree, wdyt?

       - name: Build and push ('latest' tag)
         # Only run if version is set and IS_PRERELEASE is false
         if: env.VERSION != '' && env.IS_PRERELEASE == 'false'
         uses: docker/build-push-action@v5
         with:
           context: .
           file: airbyte_cdk/manifest_server/Dockerfile
           platforms: linux/amd64,linux/arm64
           push: true
+          cache-from: type=gha
+          cache-to: type=gha,mode=max
           tags: |
             airbyte/manifest-server:latest
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between c691373 and caa8b15.

📒 Files selected for processing (1)
  • .github/workflows/publish.yml (4 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-11-20T12:05:50.628Z
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#77
File: .github/workflows/pypi_publish.yml:70-75
Timestamp: 2024-11-20T12:05:50.628Z
Learning: In the file `.github/workflows/pypi_publish.yml`, when setting the `VERSION` variable in a shell script, avoid adding quotes around the variable assignment, as it can include the quotes in the variable's value. This can lead to unintended Docker tags like `airbyte/source-declarative-manifest:'6.7.0dev8'`.

Applied to files:

  • .github/workflows/publish.yml
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Build and Inspect Python Package
🔇 Additional comments (4)
.github/workflows/publish.yml (4)

9-10: Least-privilege default looks good

Setting top-level permissions to contents: read is a solid baseline; job-level perms override where needed. Nice.


214-217: Reusable tag-check action is a good swap

Moving the SDM tag-existence check to ./.github/actions/check-docker-tag simplifies the workflow and centralizes logic. Looks good.


252-260: Job gating and dependency chain look correct

The new publish_manifest_server job mirrors SDM’s gating and depends on build outputs (VERSION/IS_PRERELEASE). This alignment should avoid surprise triggers.


293-299: Pre-flight tag check for manifest-server

Great to prevent overwriting versioned tags. No action needed.

@pedroslopez pedroslopez merged commit e80c173 into main Aug 27, 2025
28 of 29 checks passed
@pedroslopez pedroslopez deleted the pedro/add-manifest-runner branch August 27, 2025 05:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants