feat(mcp): Add tools to modify stream sync settings and refresh catalog on existing connections#994
Conversation
…og on existing connections Adds two new MCP tools: - refresh_connection_catalog: Triggers a discover operation on a connection's source and updates the catalog with latest stream definitions and sync modes - set_stream_sync_mode: Safely changes the sync mode for a specific stream on a connection, with validation that the mode is supported Core logic lives in CloudConnection (connections.py) and api_util.py, with MCP tools as thin wrappers per the presentation layer pattern. Closes #993 Co-Authored-By: AJ Steers <aj@airbyte.io>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This PyAirbyte VersionYou can test this version of PyAirbyte using the following: # Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1773770361-mcp-stream-sync-mode-refresh-catalog' pyairbyte --help
# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1773770361-mcp-stream-sync-mode-refresh-catalog'PR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
📚 Show Repo GuidanceHelpful ResourcesCommunity SupportQuestions? Join the #pyairbyte channel in our Slack workspace. |
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughAdded a utility to request a refreshed connection catalog, CloudConnection methods to refresh a connection's catalog and to update a stream's sync settings, and MCP endpoints that expose catalog refresh and per-stream sync-mode modification (note: MCP functions are duplicated in the diff). Changes
Sequence Diagram(s)sequenceDiagram
participant Client as MCP Client
participant MCP as MCP Endpoint
participant CloudConn as CloudConnection
participant APIUtil as API Utility
participant ConfigAPI as Config API
rect rgba(100, 150, 200, 0.5)
Note over Client,ConfigAPI: Refresh Catalog Flow
Client->>MCP: refresh_connection_catalog(connection_id)
MCP->>CloudConn: connection.refresh_catalog()
CloudConn->>APIUtil: get_refreshed_connection_catalog(...)
APIUtil->>ConfigAPI: GET /connections/get?withRefreshedCatalog=true
ConfigAPI-->>APIUtil: Updated catalog
APIUtil-->>CloudConn: Catalog dict
CloudConn->>CloudConn: Validate & store catalog
CloudConn-->>MCP: Confirmation message
MCP-->>Client: Success with stream count & URL
end
rect rgba(150, 100, 200, 0.5)
Note over Client,CloudConn: Set Stream Sync Mode Flow
Client->>MCP: set_stream_sync_mode(connection_id, stream_name, sync_mode, ...)
MCP->>CloudConn: connection.set_stream_sync_mode(...)
CloudConn->>CloudConn: Locate stream in catalog
CloudConn->>CloudConn: Validate sync_mode in supportedSyncModes
CloudConn->>CloudConn: Update stream config
CloudConn->>CloudConn: Save updated catalog
CloudConn-->>MCP: Success
MCP-->>Client: Confirmation with stream & connection details
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…am_sync_mode Addresses CodeRabbit review feedback: - Add stream_namespace parameter to disambiguate same-named streams in different namespaces. Raises PyAirbyteInputError when name is ambiguous. - Add fail-fast guard when switching to incremental mode without a usable cursor field (no existing cursor, no default cursor, no source-defined cursor). Co-Authored-By: AJ Steers <aj@airbyte.io>
There was a problem hiding this comment.
Pull request overview
Adds new Airbyte Cloud MCP tools and core CloudConnection helpers to (1) refresh a connection’s catalog via a forced re-discover and (2) update per-stream sync mode settings safely.
Changes:
- Add MCP tool
refresh_connection_catalogto trigger a refreshed discover and replace the connection catalog. - Add MCP tool
set_stream_sync_modeto update a single stream’ssyncMode(optionally namespace-qualified) and related config. - Add Config API helper
get_refreshed_connection_catalogand CloudConnection methodsrefresh_catalog/set_stream_sync_mode.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
airbyte/mcp/cloud.py |
Adds two new MCP tool wrappers that call CloudConnection methods and return human-readable status strings. |
airbyte/cloud/connections.py |
Implements the core catalog refresh + per-stream sync mode update logic against the stored syncCatalog. |
airbyte/_util/api_util.py |
Adds a Config API helper to fetch a connection with withRefreshedCatalog: true. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return ( | ||
| f"Successfully set sync mode for stream '{stream_name}' " | ||
| f"on connection '{connection_id}' to '{sync_mode}'" | ||
| + ( | ||
| f" with destination sync mode '{destination_sync_mode}'" | ||
| if destination_sync_mode | ||
| else "" | ||
| ) | ||
| + (f" and cursor field '{cursor_field}'" if cursor_field else "") | ||
| + f". URL: {connection.connection_url}" | ||
| ) |
There was a problem hiding this comment.
Reasonable observation. Including namespace in the success message when it was explicitly supplied would improve clarity for multi-namespace sources. Will add if the human reviewer agrees.\n\n---\nDevin session
| api_util.replace_connection_catalog( | ||
| connection_id=self.connection_id, | ||
| configured_catalog_dict=refreshed_catalog, | ||
| api_root=self.workspace.api_root, | ||
| client_id=self.workspace.client_id, | ||
| client_secret=self.workspace.client_secret, | ||
| bearer_token=self.workspace.bearer_token, | ||
| ) |
There was a problem hiding this comment.
Good suggestion. import_raw_catalog does the same replace_connection_catalog call with the same auth params. Switching to self.import_raw_catalog(refreshed_catalog) would reduce duplication. Will apply if the human reviewer agrees.\n\n---\nDevin session
| available_streams = [ | ||
| f"{e.get('stream', {}).get('namespace', '')}.{e.get('stream', {}).get('name', '')}" | ||
| for e in catalog["streams"] | ||
| ] |
There was a problem hiding this comment.
Valid observation — when namespace is None, this produces .users. However, this is a diagnostic context field on an error (not user-facing UI), so the dot-prefix still communicates "no namespace" clearly enough. Happy to clean this up if the human reviewer agrees it's worth addressing.\n\n---\nDevin session
| raise PyAirbyteInputError( | ||
| message=(f"Sync mode '{sync_mode}' is not supported by stream '{stream_name}'."), | ||
| context={ | ||
| "stream_name": stream_name, | ||
| "requested_sync_mode": sync_mode, | ||
| "supported_sync_modes": supported_sync_modes, | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Fair point on consistency — connection_id is included in the other error contexts in this method. Will add if the human reviewer considers it worth a follow-up commit.\n\n---\nDevin session
| config["destinationSyncMode"] = destination_sync_mode | ||
|
|
||
| if cursor_field is not None: | ||
| config["cursorField"] = [cursor_field] |
There was a problem hiding this comment.
This is by design — allowing cursor_field to be set during full_refresh enables pre-configuring the cursor before a subsequent switch to incremental mode (a common two-step workflow). The docstring could be clearer about this, but silently ignoring the parameter would be surprising. Deferring to human reviewer on whether to add a note in the docstring.\n\n---\nDevin session
Summary
Adds two new MCP tools (and corresponding
CloudConnectioncore methods) to support automated incremental stream testing and configuration workflows:refresh_connection_catalog— Triggers a discover operation on a connection's source (viawithRefreshedCatalog: trueon the Config API) and replaces the connection's catalog with the refreshed result. Equivalent to "Refresh source schema" in the UI.set_stream_sync_mode— Safely modifies the sync mode for a single stream in a connection'ssyncCatalog. Validates that the requested mode is in the stream'ssupportedSyncModesbefore applying. Optionally setsdestinationSyncModeandcursorField.Core logic lives in
CloudConnection(connections.py) andapi_util.py; MCP tools incloud.pyare thin wrappers per the presentation-layer pattern.Closes #993
Review & Testing Checklist for Human
set_stream_sync_modematches streams bynameonly, not(name, namespace). Verify this is acceptable or if namespace matching is needed for multi-namespace sources.refresh_catalogtwo-step flow: The method does a GET (withwithRefreshedCatalog: true) then a separate POST to replace. Confirm this get-then-replace pattern is correct vs. a single update call.refresh_connection_catalog, then callset_stream_sync_modeto switch a stream fromfull_refreshtoincremental. Verify the catalog updates persist and a sync succeeds.Notes
check_guid_created_in_session) and markeddestructive=True, consistent with other catalog-mutating tools.cursorFieldis set as a single-element list ([cursor_field]) to match the Airbyte catalog wire format.Link to Devin session: https://app.devin.ai/sessions/dc6642a7916248b5ac6e92c493b8b870
Requested by: Aaron ("AJ") Steers (@aaronsteers)
Summary by CodeRabbit