-
Notifications
You must be signed in to change notification settings - Fork 30
chore(concurrent_declarative_source): Integrate ManifestDeclarativeSource into ConcurrentDeclarativeSource #704
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… simplify declarative source inheritance, and move deprecated classes to legacy
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@brian/merge_concurrent_declarative_source#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch brian/merge_concurrent_declarative_source Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
PyTest Results (Fast)3 719 tests +27 3 708 ✅ +27 7m 18s ⏱️ +57s Results for commit a7384b7. ± Comparison against base commit 2b07f93. This pull request removes 83 and adds 110 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
PyTest Results (Full)3 722 tests +27 3 711 ✅ +27 10m 12s ⏱️ +33s Results for commit a7384b7. ± Comparison against base commit 2b07f93. This pull request removes 83 and adds 110 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
📝 WalkthroughWalkthroughRemoves ManifestDeclarativeSource from top-level exports and migrates connector-builder and tests to use ConcurrentDeclarativeSource; introduces TestLimits propagation and a large refactor of ConcurrentDeclarativeSource adding manifest preprocessing, validation, migrations, dynamic streams, spec-driven config, and legacy import shims. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant ConnectorBuilder as connector_builder_handler
participant TestReader
participant Source as ConcurrentDeclarativeSource
Client->>ConnectorBuilder: read_stream(source, config, configured_catalog, state, limits)
ConnectorBuilder->>TestReader: run_test_read(source, configured_catalog, config, state, limits)
TestReader->>Source: read(config, catalog/state)
Source-->>TestReader: AirbyteMessages
TestReader-->>ConnectorBuilder: aggregated messages
ConnectorBuilder-->>Client: AirbyteMessage
sequenceDiagram
participant Caller
participant Source as ConcurrentDeclarativeSource
participant Schema as declarative_component_schema.yaml
participant Spec
participant Streams
Caller->>Source: __init__(manifest, config, state, catalog, limits, flags)
Source->>Schema: _get_declarative_component_schema()
Source->>Source: _pre_process_manifest / _fix_source_type / _migrate / _normalize
Source->>Source: _validate_source
alt spec present
Source->>Spec: build spec component & _migrate_and_transform_config
end
Caller->>Source: streams(config)
Source->>Source: resolve dynamic_streams & apply limits
Source->>Streams: instantiate stream models
Source-->>Caller: List[Stream]
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60–90 minutes Possibly related PRs
Suggested reviewers
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🔭 Outside diff range comments (2)
unit_tests/legacy/sources/declarative/partition_routers/test_parent_state_stream.py (2)
1516-1521
: Avoid mutating the shared manifest fixture across testsThis test mutates the manifest in place, which can leak state into other tests using the same SUBSTREAM_MANIFEST object. Shall we deep-copy before mutation to prevent cross-test contamination, wdyt?
Apply this minimal diff to defensively copy the manifest before changing fields:
@@ def test_incremental_parent_state_no_incremental_dependency( test_name, manifest, mock_requests, expected_records, initial_state, expected_state ): @@ - # Disable incremental_dependency + # Disable incremental_dependency + # Make a defensive copy to avoid leaking changes into other tests + manifest = copy.deepcopy(manifest) manifest["definitions"]["post_comments_stream"]["retriever"]["partition_router"][ "parent_stream_configs" ][0]["incremental_dependency"] = False manifest["definitions"]["post_comment_votes_stream"]["retriever"]["partition_router"][ "parent_stream_configs" ][0]["incremental_dependency"] = False
410-416
: StreamDescriptor name mismatch with configured streamIn test_partition_limitation the configured stream is “Rates”, but the initial_state StreamDescriptor uses “post_comment_votes”. This likely bypasses intended initial state routing. Should we align it to the local stream_name variable, wdyt?
- stream_descriptor=StreamDescriptor(name="post_comment_votes", namespace=None), + stream_descriptor=StreamDescriptor(name=stream_name, namespace=None),
🧹 Nitpick comments (25)
unit_tests/__init__.py (2)
9-9
: Clean up the leftover placeholder line and stale hack commentWith the hack retired, the commented placeholder adds noise. Would you be open to removing it, and optionally replacing the earlier “tests falling over” comments with a brief note stating the module intentionally has no side effects, wdyt?
Apply minimal cleanup:
-# placeholder = ManifestDeclarativeSource
Optionally, if you prefer a fully minimal file, here’s a concise version:
# Copyright (c) 2025 Airbyte, Inc., all rights reserved. """ This package intentionally has no side effects. Legacy test hacks were removed as part of CDS migration. """
6-6
: Remove dead commented import in unit_tests/init.pyWe’ve confirmed there are no active imports of
airbyte_cdk.sources.declarative.manifest_declarative_source
anywhere—legacy tests all use theairbyte_cdk.legacy…
path. Can we safely drop this noisy commented line? wdyt?
- File: unit_tests/init.py, line 6
Proposed diff:
-# from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
airbyte_cdk/legacy/__init__.py (1)
1-1
: Consider adding a brief module docstring and explicit exports for claritySince this package is intentionally a legacy namespace, would you add a short docstring explaining its purpose and make exports explicit to avoid accidental star-imports, e.g., all = []? This can help future maintainers quickly understand the transitional boundary, wdyt?
Example:
# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +"""Legacy namespace for backward compatibility. New code should not import from this package.""" +__all__ = []airbyte_cdk/legacy/sources/__init__.py (1)
1-1
: Optional: add docstring noting legacy scope and lock down star-exportsWould you add a small module docstring (e.g., “Legacy sources package — transitional only”) and set all = [] to keep the boundary explicit and avoid unintended re-exports, wdyt?
Example:
# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +"""Legacy sources package — transitional only. Prefer concurrent declarative sources.""" +__all__ = []unit_tests/legacy/sources/declarative/__init__.py (1)
1-1
: LGTM; optional docstring to clarify test package purposeThis initializer is fine as-is. Would you consider a one-line docstring like “Test-only legacy declarative package for backward-compat tests” to make its intent obvious to readers skimming the tree, wdyt?
unit_tests/legacy/sources/__init__.py (1)
1-1
: Looks good; minor suggestion to annotate intentThe package marker is sufficient. If helpful, add a tiny docstring clarifying that this is a test-only legacy namespace used during migration; it can reduce confusion when navigating tests, wdyt?
unit_tests/legacy/__init__.py (1)
1-1
: Package marker is fine; consider a clarifying docstringEverything’s OK here. Would you add a brief docstring that this package enables legacy test imports while the suite migrates to ConcurrentDeclarativeSource to aid future maintainers, wdyt?
airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py (6)
73-74
: Broaden the stream_cursor type for flexibilityThe global cursor likely only needs a DeclarativeCursor. Would you consider typing stream_cursor as DeclarativeCursor to keep this cursor generic (not just datetime-based), wdyt?
Apply this diff:
- stream_cursor: DatetimeBasedCursor, + stream_cursor: DeclarativeCursor,
79-81
: Remove unused private fields_current_partition and _last_slice are not used. Trimming these reduces noise. Keep _parent_state, as it’s used in get_stream_state. Shall we remove the unused ones, wdyt?
- self._current_partition: Optional[Mapping[str, Any]] = None - self._last_slice: bool = False self._parent_state: Optional[Mapping[str, Any]] = None
95-101
: Rename local variable ‘slice’ to avoid shadowing built-inUsing ‘slice’ shadows the built-in type. Would renaming to ‘stream_slice’ improve clarity, wdyt?
- for slice, is_last_slice, _ in iterate_with_last_flag_and_state( + for stream_slice, is_last_slice, _ in iterate_with_last_flag_and_state( self._get_active_cursor().generate_slices_from_partition(partition=partition), lambda: None, ): self._global_cursor.register_slice(is_last_slice and is_last_partition) - yield slice + yield stream_slice
89-101
: Optional: switch to global cursor earlier during slice generationCurrently, the switch to global happens in observe(). If the per-partition limit is reached during slice generation, we’ll continue generating per-partition slices for remaining partitions in the same run. Do we want to flip _use_global_cursor immediately after generating slices for a partition once limit_reached() is true, so subsequent partitions in the same run use the global cursor, wdyt?
Here’s a minimal tweak you could consider:
for partition, is_last_partition, parent_state in iterate_with_last_flag_and_state( self._partition_router.stream_slices(), self._partition_router.get_stream_state ): # Generate slices for the current cursor and handle the last slice using the flag self._parent_state = parent_state for stream_slice, is_last_slice, _ in iterate_with_last_flag_and_state( self._get_active_cursor().generate_slices_from_partition(partition=partition), lambda: None, ): self._global_cursor.register_slice(is_last_slice and is_last_partition) yield stream_slice + # After finishing this partition, re-evaluate partition limit + if not self._use_global_cursor and self._per_partition_cursor.limit_reached(): + self._use_global_cursor = TrueIf we keep the current behavior, could we add/keep a test that ensures switching in observe() is intentional and documented?
115-122
: Behavior check: switching logic occurs only on observe()The docstring says we switch when PerPartitionCursor.limit_reached() is True; in code the switch is checked in observe(). If no records are observed (e.g., empty slices) but the partition limit is exceeded, we’ll never switch. Is that acceptable for your use cases, or should we also check in stream_slices(), wdyt?
If you want, I can add a unit test that covers “limit exceeded but no records observed” to lock in the intended behavior.
6-9
: Replace direct legacy imports with re-exports in per_partition_with_global.pyIt looks like the only non-legacy module still pulling in
CursorFactory
andPerPartitionCursor
directly fromairbyte_cdk.legacy
. Since those are already re-exported in theairbyte_cdk.sources.declarative.incremental
package (via__init__.py
), can we swap the import inper_partition_with_global.py
so it only touches the public API? wdyt?• File: airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py (lines 6–8)
-from airbyte_cdk.legacy.sources.declarative.incremental.per_partition_cursor import ( - CursorFactory, - PerPartitionCursor, -) +from airbyte_cdk.sources.declarative.incremental import CursorFactory, PerPartitionCursorNote: the
__init__.py
in this folder is the dedicated re-export layer and may still import fromlegacy
, so no changes are needed there.unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py (1)
143-154
: Test name typo: ‘aysnc’ → ‘async’Minor nit to improve readability; shall we fix the function name, wdyt?
-def test_isinstance_global_cursor_aysnc_job_partition_router(): +def test_isinstance_global_cursor_async_job_partition_router():unit_tests/legacy/sources/declarative/incremental/test_per_partition_cursor.py (1)
10-14
: Prefer importing StreamSlice from airbyte_cdk.sources.types for consistencyTo keep tests consistent with the rest of the suite and reduce coupling to the legacy module, would you import StreamSlice from airbyte_cdk.sources.types instead of the legacy path, wdyt?
-from airbyte_cdk.legacy.sources.declarative.incremental.per_partition_cursor import ( - PerPartitionCursor, - PerPartitionKeySerializer, - StreamSlice, -) +from airbyte_cdk.legacy.sources.declarative.incremental.per_partition_cursor import ( + PerPartitionCursor, + PerPartitionKeySerializer, +) +from airbyte_cdk.sources.types import StreamSliceairbyte_cdk/sources/declarative/incremental/__init__.py (1)
5-8
: Layering check passed—no existing non-legacy importsI ran a search for
CursorFactory
/PerPartitionCursor
imports from
airbyte_cdk.sources.declarative.incremental
and found no uses in production code. This means our new re-export of the legacy symbols isn’t impacting any existing consumers. To keep our layering clean, we have two paths forward:• Option 1: Leave the direct import as-is, add a “remove in next major” comment or changelog entry, and revisit in the major bump.
• Option 2: Introduce a lazy__getattr__
shim that emits aDeprecationWarning
on access (as in the example below) to guide users toward the legacy path.Wdyt?
Example shim for Option 2:
def __getattr__(name): if name in {"CursorFactory", "PerPartitionCursor"}: import warnings from airbyte_cdk.legacy.sources.declarative.incremental.per_partition_cursor import ( CursorFactory as _CursorFactory, PerPartitionCursor as _PerPartitionCursor, ) warnings.warn( f"airbyte_cdk.sources.declarative.incremental.{name} is deprecated; " f"use airbyte_cdk.legacy.sources.declarative.incremental.per_partition_cursor.{name} instead.", DeprecationWarning, stacklevel=2, ) return {"CursorFactory": _CursorFactory, "PerPartitionCursor": _PerPartitionCursor}[name] raise AttributeError(f"module {__name__!r} has no attribute {name!r}")unit_tests/sources/declarative/parsers/test_manifest_normalizer.py (1)
6-8
: Public facade for _get_declarative_component_schema?I’ve confirmed that
_get_declarative_component_schema
is defined here:
- unit_tests/sources/declarative/parsers/test_manifest_normalizer.py imports from
airbyte_cdk.sources.declarative.concurrent_declarative_source
- concurrent_declarative_source.py defines
def _get_declarative_component_schema() -> Dict[str, Any]:
(line 143)Since it’s a private helper, could we:
- Re‐export it under a public name (e.g. add it to the module’s
__init__.py
), or- Introduce a small, explicit facade in our public API that the tests can import
…to guard against future churn? wdyt?
airbyte_cdk/connector_builder/main.py (1)
73-79
: Align state typing with generic parameter (Optional[List[AirbyteStateMessage]])handle_connector_builder_request’s source is parameterized with Optional[List[AirbyteStateMessage]], but the state parameter itself is typed as List[AirbyteStateMessage]. Would you consider making state Optional to keep the signature coherent and reflect that non-test_read flows may pass no state, wdyt?
-def handle_connector_builder_request( - source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], - command: str, - config: Mapping[str, Any], - catalog: Optional[ConfiguredAirbyteCatalog], - state: List[AirbyteStateMessage], - limits: TestLimits, -) -> AirbyteMessage: +def handle_connector_builder_request( + source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], + command: str, + config: Mapping[str, Any], + catalog: Optional[ConfiguredAirbyteCatalog], + state: Optional[List[AirbyteStateMessage]], + limits: TestLimits, +) -> AirbyteMessage:unit_tests/source_declarative_manifest/test_source_declarative_remote_manifest.py (1)
35-35
: Assertion Updated — LGTMI’ve verified that
create_declarative_source
inairbyte_cdk/cli/source_declarative_manifest/_run.py
indeed returnsConcurrentDeclarativeSource
(lines 165–167, 209–212), so the updatedisinstance
check is solid.Would you like to add a quick sanity check that the returned object implements the core Source API (e.g.,
.discover
and.read
) to guard against future regressions? For example, in
unit_tests/source_declarative_manifest/test_source_declarative_remote_manifest.py
after line 35:assert isinstance(source, ConcurrentDeclarativeSource) + assert hasattr(source, "discover"), "Source should implement .discover" + assert hasattr(source, "read"), "Source should implement .read"wdyt?
airbyte_cdk/connector_builder/test_reader/reader.py (2)
88-94
: Type parameter choice for source state — consider simplifyingYou annotate
source
asConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]
, but thestate
parameter torun_test_read
(and_read_stream
) isList[AirbyteStateMessage]
. To reduce cognitive overhead, would you prefer to align both to the same state type? Two options:
- Narrow the generic: use
ConcurrentDeclarativeSource[List[AirbyteStateMessage]]
- Or widen the method signature: accept
Optional[List[AirbyteStateMessage]]
The first option is less invasive to call sites. Example:
- source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], + source: ConcurrentDeclarativeSource[List[AirbyteStateMessage]],And mirror the same change in
_read_stream
. Wdyt?
382-389
: Docs updated for CDS read path — minor consistency tweakDocstrings now reference
ConcurrentDeclarativeSource
, which is great. For consistency with the previous comment, do you want to adjust the parameter doc (“state (List[AirbyteStateMessage])”) if you decide to widen/narrow the type above, wdyt?Also applies to: 397-401
unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py (2)
67-67
: Path computation uses string split — consider Pathlib for portabilitySplitting on
"/legacy/sources/declarative"
works in CI but is brittle across platforms. Would you consider using Pathlib to traverse parents instead?- spec_root = test_path.split("/legacy/sources/declarative")[0] + spec_root = str(Path(__file__).resolve().parents[3]) # up to unit_testsThis keeps intent while avoiding string path assumptions. Wdyt?
2273-2275
: Correct ancestor index forparents
lookupLooks like
parents[5]
is out of range (only indices 0–4 exist), so to mirror five.parent
calls you’d wantparents[4]
. For example:- yaml_file_path = Path(__file__).resolve().parents[5] / "airbyte_cdk/sources/declarative/declarative_component_schema.yaml" + yaml_file_path = Path(__file__).resolve().parents[4] / "airbyte_cdk/sources/declarative/declarative_component_schema.yaml"Wdyt?
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
143-159
: Consider enhancing error messages for schema loading failuresThe function properly handles both cases where
pkgutil.get_data
returnsNone
and when aFileNotFoundError
occurs. However, the error messages could be more helpful by including the expected file path. Would it make sense to include the full path in the error message to help with debugging, wdyt?- "Failed to read manifest component json schema required for deduplication" + "Failed to read manifest component json schema from 'airbyte_cdk/sources/declarative/declarative_component_schema.yaml'"
353-358
: Consider providing more context in validation errorsWhen validation fails, it might be helpful to include details about which part of the manifest failed validation. Would adding the specific validation error path help developers debug manifest issues more quickly, wdyt?
except ValidationError as e: raise ValidationError( - "Validation against json schema defined in declarative_component_schema.yaml schema failed" + f"Validation against json schema defined in declarative_component_schema.yaml failed: {e.message} at path {'.'.join(str(p) for p in e.path)}" ) from e
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (30)
airbyte_cdk/__init__.py
(0 hunks)airbyte_cdk/connector_builder/connector_builder_handler.py
(3 hunks)airbyte_cdk/connector_builder/main.py
(2 hunks)airbyte_cdk/connector_builder/test_reader/reader.py
(6 hunks)airbyte_cdk/legacy/__init__.py
(1 hunks)airbyte_cdk/legacy/sources/__init__.py
(1 hunks)airbyte_cdk/legacy/sources/declarative/__init__.py
(1 hunks)airbyte_cdk/legacy/sources/declarative/incremental/__init__.py
(1 hunks)airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py
(1 hunks)airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(10 hunks)airbyte_cdk/sources/declarative/incremental/__init__.py
(1 hunks)airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py
(1 hunks)airbyte_cdk/sources/declarative/retrievers/retriever.py
(1 hunks)unit_tests/__init__.py
(1 hunks)unit_tests/connector_builder/test_property_chunking.py
(4 hunks)unit_tests/legacy/__init__.py
(1 hunks)unit_tests/legacy/sources/__init__.py
(1 hunks)unit_tests/legacy/sources/declarative/__init__.py
(1 hunks)unit_tests/legacy/sources/declarative/incremental/__init__.py
(1 hunks)unit_tests/legacy/sources/declarative/incremental/test_per_partition_cursor.py
(1 hunks)unit_tests/legacy/sources/declarative/incremental/test_per_partition_cursor_integration.py
(2 hunks)unit_tests/legacy/sources/declarative/partition_routers/__init__.py
(1 hunks)unit_tests/legacy/sources/declarative/partition_routers/test_parent_state_stream.py
(1 hunks)unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py
(5 hunks)unit_tests/source_declarative_manifest/test_source_declarative_remote_manifest.py
(2 hunks)unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py
(2 hunks)unit_tests/sources/declarative/parsers/test_manifest_normalizer.py
(1 hunks)unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py
(2 hunks)unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py
(1 hunks)unit_tests/test/test_standard_tests.py
(1 hunks)
💤 Files with no reviewable changes (1)
- airbyte_cdk/init.py
🧰 Additional context used
🧠 Learnings (7)
📓 Common learnings
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Applied to files:
airbyte_cdk/legacy/sources/declarative/__init__.py
unit_tests/sources/declarative/parsers/test_manifest_normalizer.py
airbyte_cdk/legacy/sources/declarative/incremental/__init__.py
airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py
airbyte_cdk/connector_builder/main.py
unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py
unit_tests/legacy/sources/declarative/partition_routers/test_parent_state_stream.py
airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py
unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py
unit_tests/source_declarative_manifest/test_source_declarative_remote_manifest.py
unit_tests/__init__.py
airbyte_cdk/connector_builder/test_reader/reader.py
airbyte_cdk/sources/declarative/incremental/__init__.py
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
unit_tests/legacy/sources/declarative/incremental/test_per_partition_cursor_integration.py
airbyte_cdk/connector_builder/connector_builder_handler.py
📚 Learning: 2024-11-15T00:59:08.154Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/spec.json:9-15
Timestamp: 2024-11-15T00:59:08.154Z
Learning: When code in `airbyte_cdk/cli/source_declarative_manifest/` is being imported from another repository, avoid suggesting modifications to it during the import process.
Applied to files:
airbyte_cdk/legacy/sources/declarative/__init__.py
unit_tests/sources/declarative/parsers/test_manifest_normalizer.py
airbyte_cdk/legacy/sources/declarative/incremental/__init__.py
airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py
airbyte_cdk/connector_builder/main.py
unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py
unit_tests/legacy/sources/declarative/partition_routers/test_parent_state_stream.py
airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py
unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py
unit_tests/source_declarative_manifest/test_source_declarative_remote_manifest.py
unit_tests/__init__.py
📚 Learning: 2024-12-11T16:34:46.319Z
Learnt from: pnilan
PR: airbytehq/airbyte-python-cdk#0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.
Applied to files:
airbyte_cdk/legacy/sources/declarative/__init__.py
📚 Learning: 2024-11-18T23:40:06.391Z
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Applied to files:
unit_tests/sources/declarative/parsers/test_manifest_normalizer.py
airbyte_cdk/connector_builder/main.py
unit_tests/legacy/sources/declarative/partition_routers/test_parent_state_stream.py
airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py
unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py
unit_tests/source_declarative_manifest/test_source_declarative_remote_manifest.py
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: 2025-01-13T23:39:15.457Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.
Applied to files:
unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py
unit_tests/test/test_standard_tests.py
📚 Learning: 2025-01-14T00:20:32.310Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.
Applied to files:
airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py
unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py
🧬 Code Graph Analysis (18)
airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py (1)
airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py (2)
CursorFactory
(19-24)PerPartitionCursor
(27-365)
airbyte_cdk/connector_builder/main.py (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
ConcurrentDeclarativeSource
(163-1079)airbyte_cdk/models/airbyte_protocol.py (1)
AirbyteStateMessage
(67-75)
airbyte_cdk/sources/declarative/retrievers/retriever.py (1)
airbyte_cdk/sources/types.py (1)
StreamSlice
(75-169)
unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
ConcurrentDeclarativeSource
(163-1079)
unit_tests/legacy/sources/declarative/partition_routers/test_parent_state_stream.py (2)
unit_tests/connector_builder/test_connector_builder_handler.py (1)
manifest_declarative_source
(956-957)airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py (1)
ManifestDeclarativeSource
(97-610)
airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py (1)
airbyte_cdk/legacy/sources/declarative/declarative_source.py (1)
DeclarativeSource
(16-45)
unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py (1)
airbyte_cdk/sources/types.py (1)
StreamSlice
(75-169)
unit_tests/test/test_standard_tests.py (4)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
ConcurrentDeclarativeSource
(163-1079)unit_tests/sources/test_source.py (1)
source
(65-66)airbyte_cdk/sources/source.py (1)
Source
(55-95)airbyte_cdk/test/standard_tests/_job_runner.py (1)
IConnector
(25-34)
unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py (1)
airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py (1)
ManifestDeclarativeSource
(97-610)
unit_tests/source_declarative_manifest/test_source_declarative_remote_manifest.py (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
ConcurrentDeclarativeSource
(163-1079)
airbyte_cdk/connector_builder/test_reader/reader.py (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
ConcurrentDeclarativeSource
(163-1079)airbyte_cdk/models/airbyte_protocol.py (2)
AirbyteStateMessage
(67-75)AirbyteMessage
(79-88)
unit_tests/legacy/sources/declarative/incremental/test_per_partition_cursor.py (5)
airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py (1)
PerPartitionCursor
(27-365)airbyte_cdk/sources/streams/checkpoint/per_partition_key_serializer.py (1)
PerPartitionKeySerializer
(7-22)airbyte_cdk/sources/types.py (1)
StreamSlice
(75-169)airbyte_cdk/sources/declarative/incremental/declarative_cursor.py (1)
DeclarativeCursor
(9-13)airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py (1)
GlobalSubstreamCursor
(71-351)
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (2)
airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py (2)
CursorFactory
(19-24)PerPartitionCursor
(27-365)airbyte_cdk/sources/types.py (2)
Record
(21-72)StreamSlice
(75-169)
airbyte_cdk/sources/declarative/incremental/__init__.py (1)
airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py (2)
CursorFactory
(19-24)PerPartitionCursor
(27-365)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (11)
airbyte_cdk/config_observation.py (1)
create_connector_config_control_message
(98-104)airbyte_cdk/manifest_migrations/migration_handler.py (1)
ManifestMigrationHandler
(34-180)airbyte_cdk/sources/declarative/interpolation/interpolated_boolean.py (1)
InterpolatedBoolean
(29-66)airbyte_cdk/sources/declarative/spec/spec.py (5)
Spec
(28-90)migrate_config
(64-72)transform_config
(74-81)validate_config
(83-90)generate_spec
(45-62)airbyte_cdk/sources/declarative/parsers/custom_code_compiler.py (1)
get_registered_components_module
(98-131)airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py (2)
ManifestComponentTransformer
(91-226)propagate_types_and_parameters
(92-193)airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py (1)
ManifestNormalizer
(72-519)airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py (1)
ManifestReferenceResolver
(16-185)airbyte_cdk/sources/declarative/retrievers/retriever.py (1)
Retriever
(12-53)airbyte_cdk/sources/declarative/yaml_declarative_source.py (1)
_emit_manifest_debug_message
(58-59)airbyte_cdk/sources/declarative/interpolation/jinja.py (1)
eval
(85-123)
unit_tests/legacy/sources/declarative/incremental/test_per_partition_cursor_integration.py (3)
airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py (1)
PerPartitionCursor
(27-365)airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py (1)
ManifestDeclarativeSource
(97-610)airbyte_cdk/sources/types.py (2)
Record
(21-72)StreamSlice
(75-169)
unit_tests/connector_builder/test_property_chunking.py (7)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
ConcurrentDeclarativeSource
(163-1079)TestLimits
(129-140)airbyte_cdk/test/mock_http/response_builder.py (1)
find_template
(189-198)airbyte_cdk/utils/datetime_helpers.py (1)
ab_datetime_parse
(361-442)unit_tests/sources/test_source.py (1)
source
(65-66)airbyte_cdk/sources/declarative/retrievers/retriever.py (2)
state
(37-48)state
(52-53)unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (4)
state
(89-90)state
(93-94)state
(211-213)state
(216-217)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
state
(573-574)state
(577-580)
airbyte_cdk/connector_builder/connector_builder_handler.py (3)
unit_tests/sources/test_source.py (1)
source
(65-66)airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
TestLimits
(129-140)airbyte_cdk/models/airbyte_protocol.py (2)
AirbyteStateMessage
(67-75)AirbyteMessage
(79-88)
🔇 Additional comments (45)
unit_tests/legacy/sources/declarative/partition_routers/__init__.py (1)
1-1
: LGTM: header-only change is fineNo functional impact; aligns with the broader PR header additions. Keeping the package init present helps test discovery—seems appropriate, wdyt?
unit_tests/__init__.py (1)
1-1
: LGTM: copyright headerConsistent with the PR’s header updates. No runtime behavior changes—looks good, wdyt?
unit_tests/legacy/sources/declarative/incremental/__init__.py (1)
1-1
: LGTM: Legacy package initializer addedSimple namespace initializer; no issues.
airbyte_cdk/legacy/sources/declarative/__init__.py (1)
1-1
: LGTM: Legacy declarative namespace initializer addedHeader-only; consistent with the rest of the legacy package scaffolding.
unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py (1)
38-38
: LGTM: StreamSlice import moved to central types moduleImporting StreamSlice from airbyte_cdk.sources.types aligns with the new public API; thanks for keeping tests current.
airbyte_cdk/sources/declarative/retrievers/retriever.py (1)
9-9
: No old per_partition_cursor imports remainI ran a broad search and didn’t find any
from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import StreamSlice
lines—everything now imports fromairbyte_cdk.sources.types
. Ready to approve? wdyt?airbyte_cdk/legacy/sources/declarative/incremental/__init__.py (1)
1-1
: LGTM: Minimal package init to enable legacy namespaceThe empty initializer is fine for establishing the legacy import path. No further action needed.
unit_tests/legacy/sources/declarative/incremental/test_per_partition_cursor.py (1)
15-18
: LGTM: Legacy and non-legacy incremental imports look correctThe split between legacy PerPartitionCursor and non-legacy DeclarativeCursor/GlobalSubstreamCursor aligns with the PR migration goals.
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (2)
11-14
: LGTM: Importing CursorFactory/PerPartitionCursor from legacy pathThis aligns the test with the legacy module surface and the broader migration. Nice.
35-35
: LGTM: StreamSlice now imported from typesImporting StreamSlice from airbyte_cdk.sources.types matches its canonical location and keeps things consistent.
unit_tests/test/test_standard_tests.py (2)
8-10
: Import swap to ConcurrentDeclarativeSource looks correctSwitching the test target to ConcurrentDeclarativeSource aligns with the PR’s objective and keeps the contract under IConnector intact. Nice.
18-25
: ✅ Verified: IConnector is runtime-checkable
IConnector inairbyte_cdk/test/standard_tests/_job_runner.py
is decorated with@runtime_checkable
(lines 24–25), so bothisinstance
andissubclass
checks will behave as intended.unit_tests/legacy/sources/declarative/partition_routers/test_parent_state_stream.py (1)
11-13
: Import path updated to legacy module is appropriateUsing airbyte_cdk.legacy.sources.declarative.manifest_declarative_source for ManifestDeclarativeSource is consistent with the migration plan.
airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py (1)
23-23
: Legacy import swap preserves behavior and decouples from non-legacy modulesImporting DeclarativeSource from the legacy namespace matches the layering rules described in the PR. Looks good to me.
unit_tests/legacy/sources/declarative/incremental/test_per_partition_cursor_integration.py (1)
10-15
: Legacy import updates and types usage are consistent
- PerPartitionCursor and ManifestDeclarativeSource now point to legacy paths.
- StreamSlice import from sources.types matches usage in tests.
All consistent with the consolidation plan.
Also applies to: 29-29
airbyte_cdk/connector_builder/main.py (1)
28-30
: Using ConcurrentDeclarativeSource in connector-builder entry-point is aligned with the migrationThe import update is consistent with the broader shift to concurrent CDK.
unit_tests/source_declarative_manifest/test_source_declarative_remote_manifest.py (1)
15-17
: Migration to ConcurrentDeclarativeSource looks correctImport path aligns with the new consolidated source implementation and keeps the test in the concurrent runtime path. Nice and focused change, wdyt?
unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py (2)
24-26
: Import switched to ConcurrentDeclarativeSource — aligned with consolidationThis keeps custom-components tests on the concurrent engine as intended. Looks good to me, wdyt?
256-256
: No lingering ManifestDeclarativeSource references detected
The ripgrep scan only turned up commented‐out mentions and documentation notes—there are no active imports in our tests.airbyte_cdk/connector_builder/test_reader/reader.py (2)
26-28
: Moving TestReader to ConcurrentDeclarativeSource — good alignmentSwitching the test reader to the concurrent source maintains parity with the runtime used in Builder flows. This aligns well with the PR goals. Nicely done, wdyt?
118-123
: Verified:deprecation_warnings()
is implemented onConcurrentDeclarativeSource
The method is defined at line 407 in airbyte_cdk/sources/declarative/concurrent_declarative_source.py, so callingsource.deprecation_warnings()
here is safe.unit_tests/connector_builder/test_property_chunking.py (4)
201-207
: Precomputing day_before via ab_datetime_parse avoids empty partitions — good catchUsing
ab_datetime_parse
and subtracting one day ensures the concurrent cursor produces partitions under the frozen clock. This addresses the concurrent engine’s partitioning behavior neatly. Looks great, wdyt?
214-216
: Passing a concrete ConfiguredAirbyteCatalog improves clarityExplicitly loading and passing the catalog clarifies the test setup and avoids hidden defaults. This also pairs well with CDS’ constructor accepting a catalog. Nice improvement, wdyt?
221-228
: Direct CDS construction with limits and message emission — appropriate for Builder testsInstantiating CDS with
emit_connector_builder_messages=True
andlimits
mirrors the Builder’s expectations and avoids side effects. This should keep the test hermetic and fast. LGTM, wdyt?
250-251
: handle_connector_builder_request now receives catalog — consistent with upstream changesSwitching to pass the already loaded
catalog
matches the updated handler signature and makes the flow explicit. Good change, wdyt?unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py (3)
20-22
: Legacy import path update — correct and consistent with layeringImporting ManifestDeclarativeSource from the legacy path preserves backwards compatibility while keeping non-legacy code free from legacy references. Solid change, wdyt?
1023-1023
: Patched read path moved under legacy — correct targetPatching
airbyte_cdk.legacy.sources.declarative.declarative_source.DeclarativeSource.read
matches the relocated module. Looks good, wdyt?
2443-2445
: orjson patch path updated to legacy — correct and necessaryThis ensures the test intercepts the correct serialization call under the legacy module. LGTM, wdyt?
airbyte_cdk/connector_builder/connector_builder_handler.py (6)
90-96
: LGTM! Clean integration with ConcurrentDeclarativeSourceThe updated signature properly integrates
ConcurrentDeclarativeSource
with the newlimits
parameter, aligning well with the concurrent processing paradigm.
98-111
: Good use of limits for test configurationThe limits are properly propagated to both the TestReader initialization and the
run_test_read
method. This provides consistent bounded execution for testing scenarios.
129-131
: Clean multi-line signature formattingThe signature is properly formatted across multiple lines, maintaining consistency with the other function signatures in this file.
148-150
: Consistent signature style maintainedThe multi-line formatting matches the pattern established for other functions accepting
ConcurrentDeclarativeSource
.
153-167
: Well-structured dynamic stream handlingThe logic properly initializes base streams with
dynamic_stream_name
set toNone
and then appends dynamically generated streams while respecting thelimits.max_streams
constraint. The approach of usingsetdefault
with the dynamic stream name as key is clean and efficient.
176-182
: Improved error handling with explicit AirbyteTracedException pathGood addition of the explicit
except AirbyteTracedException
block before the generic exception handler. This preserves the original traced exception details while still providing a fallback for unexpected errors. The error messages are descriptive and properly filtered for secrets, wdyt?airbyte_cdk/sources/declarative/concurrent_declarative_source.py (11)
1-1
: Copyright year updated to 2025Nice attention to detail on updating the copyright year!
161-163
: Good documentation of technical debtThe TODO comment clearly explains why
AbstractSource
is still needed and when it can be removed. This helps future maintainers understand the transitional state.
182-183
: Logger initialization looks goodUsing
f"airbyte.{self.name}"
provides clear namespace separation for logging.
237-243
: Comprehensive manifest processing pipelineExcellent separation of concerns with distinct phases: pre-processing (reference resolution), validation, and post-processing (migration/normalization). The flow is logical and maintainable.
279-302
: Well-documented manifest preprocessingThe docstring clearly explains the method's purpose, and the implementation follows a logical sequence: fixing source type → resolving references → propagating types/parameters. The inline comments help understand each step.
360-379
: Solid config migration and transformation implementationThe method properly handles the case where no spec component exists and correctly emits control messages when the config is modified. Writing back to the config file when changes occur is a nice touch for persistence.
504-554
: Clever cache optimization for parent streamsThis is a sophisticated optimization that identifies parent streams and enables caching on their requesters to avoid redundant API calls. The logic handles both
StateDelegatingStream
types and regular streams, applying cache configuration appropriately. Nice performance optimization!
879-901
: Clean handling of conditional streamsThe method properly evaluates conditional streams based on interpolated boolean conditions and handles the default stream type assignment when not specified.
902-999
: Robust dynamic stream configuration with proper error handlingThe implementation includes comprehensive validation and error handling:
- Validates resolver configuration presence
- Checks resolver type validity
- Ensures unique stream names with context-appropriate error messages
- Properly distinguishes between system and config errors
The differentiation between
ConfigComponentsResolver
errors (user config issue) and other resolver errors (system issue) is particularly thoughtful.
1071-1073
: Simple and effective debug loggingClean implementation that provides the necessary debug information without cluttering the code.
1074-1079
: Straightforward logger configurationSimple conditional logic to enable debug logging when needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a couple of questions but it overall feels great!
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
414-453
: Unify use of transformed config across read/discover/streams/_group_streamsThere’s a mix of using the incoming config vs. self._config (post spec migrations/transformations). That can lead to subtle mismatches between stream construction, validation, and runtime behavior. Suggest normalizing to an effective_config = self._config or config at the start of each method and using it throughout.
Apply these diffs:
@@ def read(self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: Optional[List[AirbyteStateMessage]] = None) -> Iterator[AirbyteMessage]: - concurrent_streams, _ = self._group_streams(config=config) + effective_config = self._config or config + concurrent_streams, _ = self._group_streams(config=effective_config) @@ - yield from super().read(logger, config, filtered_catalog, state) + yield from super().read(logger, effective_config, filtered_catalog, state)@@ def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: - concurrent_streams, synchronous_streams = self._group_streams(config=config) + effective_config = self._config or config + concurrent_streams, synchronous_streams = self._group_streams(config=effective_config)@@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: - if self._spec_component: - self._spec_component.validate_config(config) + effective_config = self._config or config + if self._spec_component: + self._spec_component.validate_config(effective_config) @@ - stream_configs = ( - self._stream_configs(self._source_config, config=config) + self.dynamic_streams - ) + stream_configs = self._stream_configs(self._source_config, config=effective_config) + self._dynamic_stream_configs( + manifest=self._source_config, config=effective_config, with_dynamic_stream_name=True + ) @@ - if api_budget_model: - self._constructor.set_api_budget(api_budget_model, config) + if api_budget_model: + self._constructor.set_api_budget(api_budget_model, effective_config) @@ - for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) + for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) # ↓ pass the same effective config used above - ] + ]@@ def _group_streams(self, config: Mapping[str, Any]) -> Tuple[List[AbstractStream], List[Stream]]: - streams = self._stream_configs(self._source_config, config) + self._dynamic_stream_configs( - self._source_config, config + config = self._config or config + streams = self._stream_configs(self._source_config, config) + self._dynamic_stream_configs( + self._source_config, config ) @@ - for declarative_stream in self.streams(config=config): + for declarative_stream in self.streams(config=config): ...This should eliminate drift between validated, transformed config and what actually gets used at runtime, wdyt?
Also applies to: 454-461, 472-497, 606-617, 620-621
253-272
: Guard against invalid concurrency levels to prevent deadlocks or zero-worker poolsIf a manifest returns 0 or 1 for concurrency_level, num_workers could be set to an unsafe value despite the comment and initial partitions fix. Can we clamp the value to at least _LOWEST_SAFE_CONCURRENCY_LEVEL?
Apply this diff:
- concurrency_level = concurrency_level_component.get_concurrency_level() + concurrency_level = max( + concurrency_level_component.get_concurrency_level(), + self._LOWEST_SAFE_CONCURRENCY_LEVEL, + ) initial_number_of_partitions_to_generate = max( concurrency_level // 2, 1 )This ensures both the worker pool size and startup partition count remain safe, wdyt?
♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
400-404
: Migration note: inline message_repository usage when AbstractSource dependency is removedGiven the plan to drop AbstractSource, do you want to add a TODO linking to the follow-up PR where we’ll switch internal call sites to self._message_repository and remove this property, wdyt?
🧹 Nitpick comments (10)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (10)
351-362
: Improve schema validation error to include path/contextRe-raising a generic ValidationError loses the failing path/message. Including e.message and e.path speeds up debugging.
Apply this diff:
- except ValidationError as e: - raise ValidationError( - "Validation against json schema defined in declarative_component_schema.yaml schema failed" - ) from e + except ValidationError as e: + path = "/".join(map(str, e.path)) or "<root>" + raise ValidationError(f"Manifest validation failed at {path}: {e.message}") from ewdyt?
865-887
: Honor $parameters when evaluating ConditionalStreamsConditionalStreams conditions often rely on propagated $parameters. Currently parameters={} which can mis-evaluate conditions.
Apply this diff:
- interpolated_boolean = InterpolatedBoolean( - condition=current_stream_config.get("condition"), - parameters={}, - ) + interpolated_boolean = InterpolatedBoolean( + condition=current_stream_config.get("condition"), + parameters=current_stream_config.get("$parameters", {}), + )This aligns with how parameters were propagated by ManifestComponentTransformer, wdyt?
498-550
: Safer cache flag propagation to avoid KeyError on nested dictsSome nested requester blocks may be missing; direct indexing can KeyError. Using setdefault makes this robust without changing behavior.
Apply this diff:
@staticmethod def _initialize_cache_for_parent_streams( stream_configs: List[Dict[str, Any]], ) -> List[Dict[str, Any]]: parent_streams = set() - def update_with_cache_parent_configs( + def _enable_cache_on_requester(stream_dict: dict) -> None: + stream_dict.setdefault("retriever", {}).setdefault("requester", {})["use_cache"] = True + + def update_with_cache_parent_configs( parent_configs: list[dict[str, Any]], ) -> None: for parent_config in parent_configs: parent_streams.add(parent_config["stream"]["name"]) if parent_config["stream"]["type"] == "StateDelegatingStream": - parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][ - "use_cache" - ] = True - parent_config["stream"]["incremental_stream"]["retriever"]["requester"][ - "use_cache" - ] = True + _enable_cache_on_requester(parent_config["stream"]["full_refresh_stream"]) + _enable_cache_on_requester(parent_config["stream"]["incremental_stream"]) else: - parent_config["stream"]["retriever"]["requester"]["use_cache"] = True + _enable_cache_on_requester(parent_config["stream"]) @@ for stream_config in stream_configs: if stream_config.get("incremental_sync", {}).get("parent_stream"): parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) - stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][ - "use_cache" - ] = True + _enable_cache_on_requester(stream_config["incremental_sync"]["parent_stream"]) @@ for stream_config in stream_configs: if stream_config["name"] in parent_streams: if stream_config["type"] == "StateDelegatingStream": - stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = ( - True - ) - stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = ( - True - ) + _enable_cache_on_requester(stream_config["full_refresh_stream"]) + _enable_cache_on_requester(stream_config["incremental_stream"]) else: - stream_config["retriever"]["requester"]["use_cache"] = True + _enable_cache_on_requester(stream_config)This keeps semantics intact but avoids brittle indexing, wdyt?
918-935
: Dynamic streams: propagate parent parameters and set cache safely
- Setting use_cache should be tolerant when retriever/requester keys are absent.
- Propagating parameters from the dynamic definition helps resolve templates that rely on $parameters.
Apply this diff:
- if "retriever" in components_resolver_config: - components_resolver_config["retriever"]["requester"]["use_cache"] = True + if "retriever" in components_resolver_config: + components_resolver_config.setdefault("retriever", {}).setdefault("requester", {})["use_cache"] = True @@ - dynamic_stream = { - **ManifestComponentTransformer().propagate_types_and_parameters( - "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters - ) - } + dynamic_stream = { + **ManifestComponentTransformer().propagate_types_and_parameters( + "", + dynamic_stream, + dynamic_definition.get("$parameters", {}), + use_parent_parameters=use_parent_parameters, + ) + }Optionally, consider guarding stream_template presence:
- stream_template_config = dynamic_definition["stream_template"] + stream_template_config = dynamic_definition.get("stream_template") + if not stream_template_config: + raise ValueError(f"Missing 'stream_template' in dynamic definition: {dynamic_definition}")Does this align with the intended DX, wdyt?
Also applies to: 945-949
143-159
: Nit: use yaml.safe_load and simplify error handlingMinor cleanup to prefer safe_load and reduce redundant exceptions.
Apply this diff:
def _get_declarative_component_schema() -> Dict[str, Any]: try: raw_component_schema = pkgutil.get_data( "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml" ) if raw_component_schema is not None: - declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader) + declarative_component_schema = yaml.safe_load(raw_component_schema) return declarative_component_schema # type: ignore else: raise RuntimeError( "Failed to read manifest component json schema required for deduplication" ) except FileNotFoundError as e: raise FileNotFoundError( f"Failed to read manifest component json schema required for deduplication: {e}" )wdyt?
226-233
: Simplify: _constructor conditional is unreachablecomponent_factory is always truthy here, so the else branch will never execute. Shall we simplify?
Apply this diff:
- self._constructor = ( - component_factory - if component_factory - else ModelToComponentFactory( - emit_connector_builder_messages=emit_connector_builder_messages, - max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), - ) - ) + self._constructor = component_factoryKeeps things tidy during migration, wdyt?
221-223
: Clarify: components_module is set but not usedIt looks like the value isn’t referenced later (side-effects occur via sys.modules). Would you prefer to drop the assignment or leave it as a hint for future maintainers? I can add a brief comment to explain the intent, wdyt?
612-617
: Minor: ensure _limits checks don’t introduce unexpected behaviorThe optional limits are handled well. If we later make limits configurable in production code paths, we might want a single helper to pass max_records/slices/pages consistently to all StreamSlicerPartitionGenerator call sites to avoid drift. Want me to factor a small helper for that, wdyt?
Also applies to: 642-644, 760-774, 776-793, 824-836, 838-849
371-382
: I/O behavior: emitting CONTROL message to stdoutPrinting the CONTROL message for migrated config is consistent with platform expectations. If you’d like, we can guard the print with a “running in platform” env flag to avoid noisy output in local debugging. Should I add a small guard, wdyt?
581-597
: Makeconnection_checker
robust and avoid mutating_source_config
in-placeA quick scan of all manifest files shows every test manifest already defines a
check
block, so this change won’t alter existing behavior—but it will
- prevent a KeyError if
check
is ever omitted- avoid mutating the loaded config dict
Please replace the current lines in
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(around line 581) with:- check = self._source_config["check"] - if "type" not in check: - check["type"] = "CheckStream" + check = dict(self._source_config.get("check", {})) + if "type" not in check: + check["type"] = "CheckStream"Also, would you like to default a
stream_names
list (e.g. using the first stream when omitted) to mirror MDS behavior, or keep that for a follow-up? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(14 hunks)unit_tests/__init__.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- unit_tests/init.py
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
📚 Learning: 2024-11-18T23:40:06.391Z
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Applied to files:
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Applied to files:
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
🧬 Code graph analysis (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (9)
airbyte_cdk/config_observation.py (1)
create_connector_config_control_message
(98-104)airbyte_cdk/manifest_migrations/migration_handler.py (2)
ManifestMigrationHandler
(34-180)apply_migrations
(43-63)airbyte_cdk/sources/abstract_source.py (3)
message_repository
(314-315)check
(92-99)check_connection
(59-71)airbyte_cdk/sources/declarative/spec/spec.py (5)
Spec
(28-90)migrate_config
(64-72)transform_config
(74-81)validate_config
(83-90)generate_spec
(45-62)airbyte_cdk/sources/declarative/parsers/custom_code_compiler.py (1)
get_registered_components_module
(98-131)airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py (2)
ManifestComponentTransformer
(91-226)propagate_types_and_parameters
(92-193)airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py (1)
ManifestNormalizer
(72-519)airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py (2)
ManifestReferenceResolver
(16-185)preprocess_manifest
(102-107)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
get_message_repository
(3833-3834)create_component
(774-807)get_model_deprecations
(824-828)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
383-386
: Fix: configure() signature currently overrides AbstractSource incorrectlyThis override’s parameters don’t match AbstractSource.configure(logger, config). When _spec_component is present (truthy _config), super().configure receives the wrong types and can break logging or validation. Can we align the signature and consistently apply the transformed config?
Apply this diff:
- def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]: - config = self._config or config - return super().configure(config, temp_dir) + def configure(self, logger: logging.Logger, config: Mapping[str, Any]) -> Mapping[str, Any]: + effective_config = self._config or config + return super().configure(logger, effective_config)This also keeps behavior consistent with AbstractSource and avoids accidental parameter swapping, wdyt?
⛔ Skipped due to learnings
Learnt from: ChristoGrab PR: airbytehq/airbyte-python-cdk#58 File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0 Timestamp: 2024-11-18T23:40:06.391Z Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One comment regarding a test that was moved to legacy
unit_tests/legacy/sources/declarative/partition_routers/test_parent_state_stream.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
airbyte_cdk/connector_builder/test_reader/reader.py (1)
232-232
: Syntax error in f-string: missing closing brace/backtick in error messageThis will raise a SyntaxError at import time. Can we fix the message formatting?
- raise ValueError(f"Unknown type for cursor field `{field}") + raise ValueError(f"Unknown type for cursor field `{field}`")
🧹 Nitpick comments (2)
airbyte_cdk/connector_builder/test_reader/reader.py (2)
88-88
: Consider dropping the generic parameterization in the type hint for simplicityAnnotating the source as
ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]
is technically fine, but sinceConcurrentDeclarativeSource.read(...)
already pinsstate
toOptional[List[AirbyteStateMessage]]
, parameterizing the class here adds little value and could create churn if its generic is repurposed later. Would you simplify to the raw class annotation?Suggested change:
- source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], + source: ConcurrentDeclarativeSource,
386-386
: Mirror the earlier suggestion: avoid parameterizing the source type here tooSame rationale as above — keeping the annotation as
ConcurrentDeclarativeSource
avoids over-specification at call boundaries, wdyt?Suggested change:
- source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], + source: ConcurrentDeclarativeSource,
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
airbyte_cdk/connector_builder/test_reader/reader.py
(7 hunks)airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py
(1 hunks)airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(14 hunks)unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py
(5 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py
- unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py
- airbyte_cdk/sources/declarative/concurrent_declarative_source.py
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Applied to files:
airbyte_cdk/connector_builder/test_reader/reader.py
🧬 Code graph analysis (1)
airbyte_cdk/connector_builder/test_reader/reader.py (4)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
ConcurrentDeclarativeSource
(163-1055)airbyte_cdk/models/airbyte_protocol.py (2)
AirbyteStateMessage
(67-75)AirbyteMessage
(79-88)airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
cursor_field
(52-53)airbyte_cdk/sources/streams/concurrent/adapters.py (3)
cursor_field
(189-193)state
(111-114)state
(117-119)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-shopify
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (5)
airbyte_cdk/connector_builder/test_reader/reader.py (5)
26-28
: Import switch to ConcurrentDeclarativeSource looks rightThis aligns with the PR goal of consolidating onto the concurrent source; no concerns here. Nice catch keeping the import local to the declarative package, wdyt?
60-61
: Docstring updated to ConcurrentDeclarativeSourceThe parameter docs reflect the new source type and remain accurate and concise. Looks good, wdyt?
392-393
: Docstring aligns with concurrent source usage via AirbyteEntrypointThe narrative matches the actual behavior and error-handling path; no action needed, wdyt?
401-402
: Parameter docstring updated to ConcurrentDeclarativeSourceConsistent with the function signature and broader migration, looks good, wdyt?
130-132
: Clarify cursor_field normalization: keep the current guard
- The
SchemaInferrer
initializer treats bothNone
and[]
the same way—_cursor_field = [] if cursor_field is None else cursor_field
—so passingNone
or an empty list results in no cursor requirements being added .- The
_cursor_field_to_nested_and_composite_field
helper only handles non-empty strings or lists of strings and does not special-case falsy inputs; calling it on[]
would raise anIndexError
when accessingfield[0]
. The existingif stream and stream.cursor_field
guard correctly prevents that.Keeping the guard ensures we only invoke the helper on valid data. If we’d like to officially support falsy cursor fields in the helper (or align its docstring), we could add a branch there—but reverting the guard would introduce a crash. Wdyt?
Likely an incorrect or invalid review comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! 🚢
unit_tests/legacy/sources/declarative/partition_routers/test_parent_state_stream.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (9)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (9)
349-360
: Preserve schema validation details for faster debuggingRe-raising ValidationError with a generic message hides the offending path/value. Could we include e.message and e.path in the error? It accelerates root cause analysis, wdyt?
@@ def _validate_source(self) -> None: - try: - validate(self._source_config, self._declarative_component_schema) - except ValidationError as e: - raise ValidationError( - "Validation against json schema defined in declarative_component_schema.yaml schema failed" - ) from e + try: + validate(self._source_config, self._declarative_component_schema) + except ValidationError as e: + path = "/".join(map(str, getattr(e, "path", []))) + msg = f"Manifest validation failed against declarative_component_schema.yaml: {e.message}" + if path: + msg += f" (at: {path})" + raise ValidationError(msg) from e
140-156
: Cache declarative component schema to avoid repeated disk I/O_get_declarative_component_schema() is called per-instance; using an lru_cache avoids repeated yaml.loads and pkgutil reads. Low-risk speed win, wdyt?
@@ -from copy import deepcopy +from copy import deepcopy +from functools import lru_cache @@ -def _get_declarative_component_schema() -> Dict[str, Any]: +@lru_cache(maxsize=1) +def _get_declarative_component_schema() -> Dict[str, Any]:Also applies to: 6-6
385-397
: Avoid external mutation of internal manifest stateresolved_manifest returns the internal dict, which external callers could mutate. Would you return a deep copy to preserve encapsulation, wdyt?
@@ def resolved_manifest(self) -> Mapping[str, Any]: - return self._source_config + return deepcopy(self._source_config)
224-231
: Simplify constructor assignment (minor clean-up)component_factory is always truthy here. Would you simplify by assigning it directly to self._constructor, wdyt?
@@ - self._constructor = ( - component_factory - if component_factory - else ModelToComponentFactory( - emit_connector_builder_messages=emit_connector_builder_messages, - max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), - ) - ) + self._constructor = component_factory
281-303
: Fix _pre_process_manifest docstring to reflect return value and behaviorDocstring says “modifies in place” and “Returns: None” but the function returns a transformed manifest. Would you update the docstring to avoid confusion, wdyt?
@@ def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: - """ - Preprocesses the provided manifest dictionary by resolving any manifest references. - - This method modifies the input manifest in place, resolving references using the - ManifestReferenceResolver to ensure all references within the manifest are properly handled. - - Args: - manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in. - - Returns: - None - """ + """ + Preprocess the provided manifest by: + - Ensuring a top-level type (DeclarativeSource) is set + - Resolving references (ManifestReferenceResolver) + - Propagating types and parameters (ManifestComponentTransformer) + + Args: + manifest (Dict[str, Any]): Incoming manifest to preprocess. + + Returns: + Dict[str, Any]: The preprocessed manifest (new object; original is not mutated). + """
497-547
: Defensive dict access in _initialize_cache_for_parent_streams to avoid KeyErrorsThe code indexes deeply (e.g., ["stream"]["retriever"]["requester"]) assuming schema guarantees. If any upstream normalization/migration yields a partial object, this will crash. Do we want to use get/setdefault patterns to make this resilient, or assert with a clearer config_error before mutation, wdyt? For example:
- if parent_config["stream"]["type"] == "StateDelegatingStream": + stream = parent_config.get("stream", {}) + if stream.get("type") == "StateDelegatingStream": - parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = True + stream.setdefault("full_refresh_stream", {}).setdefault("retriever", {}).setdefault("requester", {})["use_cache"] = TrueI didn’t apply all instances to keep the diff small, but happy to follow through if you’d like.
Would you prefer we rely on schema guarantees here and add assertions instead of defensive defaults?
371-379
: Don’t fail the run if writing the migrated config file errors; still emit the control messageA filesystem error (permissions, read-only FS) will currently raise and stop execution. Since the control message is what orchestrators consume, could we swallow I/O errors and proceed, logging a warning, wdyt?
@@ def _migrate_and_transform_config( - if config_path: - with open(config_path, "w") as f: - json.dump(mutable_config, f) + if config_path: + try: + with open(config_path, "w") as f: + json.dump(mutable_config, f) + except OSError as io_err: + self.logger.warning("Failed to write migrated config to %s: %s", config_path, io_err)
496-548
: Type-hint consistency (minor)A few places use builtin generics (list[dict[str, Any]]) while others use typing.List/Dict. Keeping one style helps readability. Would you like to standardize on builtin generics (Python 3.11+) in this file, wdyt?
579-595
: Strengthenconnection_checker
for missing/malformedcheck
blocksA quick scan of our YAML manifests shows only two files without a top-level
check:
key—one in the migrations folder and one in a parser test resource—not the actual source manifests loaded at runtime. All real declarative source manifests already includecheck
, so defaulting it in code should be low-risk.• Found 8 candidate YAML files; only
–airbyte_cdk/manifest_migrations/migrations/registry.yaml
(migration script)
–unit_tests/sources/declarative/parsers/resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml
(parser test)
lack a top-levelcheck:
.
• All runtime source manifests (e.g. inunit_tests/resources/.../manifest.yaml
) includecheck:
.Would you be up for updating the snippet as follows to avoid
KeyError
and give users a clearer, traced exception on misconfiguration?@@ def connection_checker(self) -> ConnectionChecker: - check = self._source_config["check"] + check = dict(self._source_config.get("check", {})) if "type" not in check: check["type"] = "CheckStream" @@ - if isinstance(check_stream, ConnectionChecker): + if isinstance(check_stream, ConnectionChecker): return check_stream - else: - raise ValueError( - f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" - ) + else: + raise AirbyteTracedException( + message="Invalid 'check' configuration in manifest. Expected a ConnectionChecker component.", + internal_message=f"Expected ConnectionChecker, got {check_stream.__class__}", + failure_type=FailureType.config_error, + )This should harden against missing or malformed
check
blocks and surface config errors more actionably—wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(14 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
📚 Learning: 2024-11-18T23:40:06.391Z
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Applied to files:
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Applied to files:
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
🧬 Code graph analysis (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (8)
airbyte_cdk/config_observation.py (1)
create_connector_config_control_message
(98-104)airbyte_cdk/manifest_migrations/migration_handler.py (2)
ManifestMigrationHandler
(34-180)apply_migrations
(43-63)airbyte_cdk/sources/declarative/spec/spec.py (2)
Spec
(28-90)generate_spec
(45-62)airbyte_cdk/sources/declarative/parsers/custom_code_compiler.py (1)
get_registered_components_module
(98-131)airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py (2)
ManifestComponentTransformer
(91-226)propagate_types_and_parameters
(92-193)airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py (1)
ManifestNormalizer
(72-519)airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py (2)
ManifestReferenceResolver
(16-185)preprocess_manifest
(102-107)airbyte_cdk/utils/traced_exception.py (1)
AirbyteTracedException
(25-145)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: source-shopify
- GitHub Check: Check: destination-motherduck
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
…te parameter - Add default value of None to state parameter in ConcurrentDeclarativeSource.__init__() - Remove Generic[TState] from class definition as it adds no meaningful value - Update all type annotations throughout codebase to use concrete Optional[List[AirbyteStateMessage]] type - Fix test parameter order to match updated constructor signature - Resolves breaking change introduced in PR #704 where Optional state parameter lacked default value Fixes: #704 Co-Authored-By: AJ Steers <aj@airbyte.io>
With the change to move the connector builder to use the concurrent CDK, there is no longer a need to have the decoupled ManifestDeclarativeSource and ConcurrentDeclarativeSource.
To simplify our inheritance model, what we ideally want is a singluar ConcurrentDeclarativeSource which implements the Source interface. However, because we still have a few remaining legacy flows, we will have to for the time continue to implement
AbstractSource
and once finished with our refactor, replace it once and for all.A few notes on specific parts of this refactor
airbyte_cdk/legacy
and there will be no references to the legacy component from regular components. However, the inverse is acceptable when a legacy component references a regular componentManifestDeclarativeSource
andConcurrentDeclarativeSource
, I opted for just a straight copy on the first pass just to simplify the migration so that first__init__
looks a bit verbose in copying the exact functionality overA few notes on testing
legacy
also sits at the top level of theunit_tests
. This is also the simplest solution if we want to retain testing of legacy components without having to re-work any of CI commands which may reference running tests from within the/unit_tests
directory/declarative
. It might seem like we're losing coverage, but much of this stuff isn't actually used in prod anymore because we instantiate concurrent low-code cursors in all flows now. So they should still pass when instantiated byManifestDeclarativeSources
, the tests might fail if instantiated byConcurrentDeclarativeSources
if there has been drift between concurrent and declarative cursor behavior. Examples of this are:test_per_partition_cursor.py
test_per_partition_cursor_integration.py
Summary by CodeRabbit
New Features
Breaking Changes
Improvements