Skip to content

Conversation

maxi297
Copy link
Contributor

@maxi297 maxi297 commented Aug 11, 2025

What

Migrate streams that are incremental and with partition routing to be instantiated as DefaultStream.

There were a couple of steps that needed to be done at the same time:

  • If we want to update SubstreamPartitionRouter to use DefaultStream, it means that all the streams need to be instantiable as DefaultStream. This is annoying because it means that streams with SubstreamPartitionRouter also need to be migrated at the same time
  • Retrievers doing things it shouldn't which lead custom retrievers to not be supported given certain conditions (see this for more information)
    • Slicing was done through Retriever
    • State management done through retriever.
      The PR is unfortunately quite big because of the items mentioned above...

How

Have SubstreamPartitionRouter support DefaultStream.
Remove condition that decides wether to instantiate a DefaultStream or a DeclarativeStream.

TODO:

  • Fix mypy
  • Tests failing
    • [DONE - deleted as it tests legacy declarative cursor and we don't instantiate DefaultStream anymore] test_parent_state_stream.py is still failing but @brianjlai mentioned he was removing the usage of ManifestDeclarativeSource in the code so I hope this could be fix by his PR
    • [WIP] test_per_partition_cursor_integration.py partition limit management has changed and the tests validating this need to be updated
    • Decide if we just remove test_integration.py as this is using AbstractSource, does not seem to have a lot of value and is hard to migrate to DefaultStream

Missing clean up (which could be consider out-of-scope):

  • CDS
  • Retriever.state and Retriever.stream_slices: In the meanwhile, should we hard fail when detecting that either of these methods are re-implemented?
  • Remove CustomIncrementalSync from the declarative language

Summary by CodeRabbit

  • New Features

    • Concurrency-enabled streaming runtime, PerPartitionRequestOptionsProvider, StateFilteringMessageRepository, injected stream slicer for HTTP resolution, and helpers to construct Airbyte state messages.
  • Improvements

    • Defer state emission until data is synced; more robust per-partition lifecycle and parent/child state propagation; guidance on partition/thread limits.
  • Deprecations

    • Retriever-level stream slicing and retriever state accessors marked deprecated.
  • Removals

    • Several no-op initial-state setters and closed-state APIs removed.
  • Tests

    • Tests migrated/expanded for concurrency and partition-based flows.
  • Documentation

    • Migration guide updated with Concurrent CDK guidance.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py (1)

744-766: Avoid orjson round-trip on state

Same rationale as above; use dict for reliability. wdyt?

-    final_state = [
-        orjson.loads(orjson.dumps(message.state.stream.stream_state))
-        for message in output
-        if message.state
-    ]
+    final_state = [
+        message.state.stream.stream_state.__dict__
+        for message in output
+        if message.state
+    ]
🧹 Nitpick comments (4)
unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py (4)

297-301: Remove duplicate slice for partition "2"

This second identical StreamSlice for partition "2" in Feb doesn’t affect the outcome but adds noise. Shall we drop it for clarity, wdyt?

         StreamSlice(
             partition={"partition_field": "2"},
             cursor_slice={"start_time": "2022-02-01", "end_time": "2022-02-28"},
         ),
-        StreamSlice(
-            partition={"partition_field": "2"},
-            cursor_slice={"start_time": "2022-02-01", "end_time": "2022-02-28"},
-        ),

317-324: Capture warnings without narrowing logger

Using logger="airbyte" may miss warnings if the code logs under module-specific names. Could we remove the filter to capture all WARN, wdyt?

-    with caplog.at_level(logging.WARNING, logger="airbyte"):
+    with caplog.at_level(logging.WARNING):
         with patch.object(SimpleRetriever, "_read_pages", side_effect=records):
             output = list(source.read(logger, {}, catalog, None))

594-596: Drop unused variable

partition_slices isn’t used in this test. Remove to reduce noise, wdyt?

-    partition_slices = [
-        StreamSlice(partition={"partition_field": str(i)}, cursor_slice={}) for i in range(1, 4)
-    ]

733-736: Capture warnings without restricting logger

Same as earlier: removing the logger filter makes the assertion resilient to module logger names. Ok to simplify, wdyt?

-    with caplog.at_level(logging.WARNING, logger="airbyte"):
+    with caplog.at_level(logging.WARNING):
         with patch.object(SimpleRetriever, "_read_pages", side_effect=records_list):
             with patch.object(ConcurrentPerPartitionCursor, "DEFAULT_MAX_PARTITIONS_NUMBER", 5):
                 output = list(source.read(logger, {}, catalog, initial_state))
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 1b4b756 and c004637.

📒 Files selected for processing (1)
  • unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py (17 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py (4)
airbyte_cdk/models/airbyte_protocol.py (4)
  • AirbyteMessage (79-88)
  • AirbyteStateMessage (67-75)
  • AirbyteStreamState (55-57)
  • AirbyteStateBlob (15-50)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
  • ConcurrentPerPartitionCursor (53-593)
  • state (145-165)
airbyte_cdk/sources/types.py (7)
  • Record (21-72)
  • StreamSlice (75-169)
  • partition (99-104)
  • cursor_slice (107-112)
  • data (35-36)
  • associated_slice (39-40)
  • get (146-147)
airbyte_cdk/test/catalog_builder.py (3)
  • CatalogBuilder (48-81)
  • ConfiguredAirbyteStreamBuilder (13-45)
  • with_name (27-29)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: Analyze (python)
🔇 Additional comments (4)
unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py (4)

87-96: LGTM: start/end time injection via RequestOption is correct

Using explicit start_time_option/end_time_option mirrors the new request plumbing and keeps tests realistic.


234-250: LGTM: slice derivation from mixed input state

The transition to partitions -> StreamSlice via to_slice() and the assertions look aligned with StreamSlice.eq semantics.


349-423: LGTM: substream partitions from parent without input state

The mocked parent results yielding child partitions across two time windows look correct, and assertions align with StreamSlice equality rules.


840-850: LGTM: iterator helper

The helper cleanly collects records until the next state message and keeps the tests tidy.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)

1-10: Apply Ruff formatting changes and commit them?

ruff format . reformatted 3 files:

  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Could you commit these formatting updates? wdyt?

♻️ Duplicate comments (5)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5)

1298-1301: Do not mutate component_definition; parse a defensive copy instead

In-place injection of "$parameters" can leak side-effects to callers. Can we copy before parsing, wdyt?

-        if "$parameters" not in component_definition and "parameters" in component_definition:
-            component_definition["$parameters"] = component_definition.get("parameters")  # type: ignore  # This is a dict
-        datetime_based_cursor_model = model_type.parse_obj(component_definition)
+        local_def = dict(component_definition)
+        if "$parameters" not in local_def and "parameters" in local_def:
+            local_def["$parameters"] = local_def.get("parameters")  # type: ignore  # This is a dict
+        datetime_based_cursor_model = model_type.parse_obj(local_def)

1610-1611: Same: avoid mutating the provided component_definition

Repeat of above in the per-partition path. Shall we use a local copy here as well, wdyt?

-        if "$parameters" not in component_definition and "parameters" in component_definition:
-            component_definition["$parameters"] = component_definition.get("parameters")  # type: ignore  # This is a dict
+        local_def = dict(component_definition)
+        if "$parameters" not in local_def and "parameters" in local_def:
+            local_def["$parameters"] = local_def.get("parameters")  # type: ignore  # This is a dict
-        datetime_based_cursor_model = model_type.parse_obj(component_definition)
+        datetime_based_cursor_model = model_type.parse_obj(local_def)

2053-2057: Type-annotation mismatch for stream_slicer

This variable can be a PartitionRouter or a Concurrent cursor; annotating as ConcurrentStreamSlicer trips mypy. Shall we drop the explicit annotation, wdyt?

-        stream_slicer: ConcurrentStreamSlicer = (
+        stream_slicer = (
             partition_router
             if isinstance(concurrent_cursor, FinalStateCursor)
             else concurrent_cursor
         )

3145-3161: Bug: paginator url_base fallback uses get_url and may pass a full URL

_url_base should come from Requester.get_url_base(...). Also, create_default_paginator expects a base, but _get_url may return the full URL. Can we fix by using get_url_base and returning the base explicitly, wdyt?

-        def _get_url(req: Requester) -> str:
+        def _get_url_base(req: Requester) -> str:
@@
-            _url: str = (
-                model.requester.url
-                if hasattr(model.requester, "url") and model.requester.url is not None
-                else req.get_url(stream_state=None, stream_slice=None, next_page_token=None)
-            )
             _url_base: str = (
                 model.requester.url_base
                 if hasattr(model.requester, "url_base") and model.requester.url_base is not None
-                else req.get_url(stream_state=None, stream_slice=None, next_page_token=None)
+                else req.get_url_base(stream_state=None, stream_slice=None, next_page_token=None)
             )
-
-            return _url or _url_base
+            if not _url_base:
+                # Fallback: derive from full URL when only url is configured
+                _url: str = (
+                    model.requester.url
+                    if hasattr(model.requester, "url") and model.requester.url is not None
+                    else req.get_url(stream_state=None, stream_slice=None, next_page_token=None)
+                )
+                _url_base = _url
+            return _url_base
@@
-                url_base=_get_url(requester),
+                url_base=_get_url_base(requester),

Also applies to: 3241-3249


3748-3797: Parent-state fallback guard and value selection are incorrect

Guard should be “parent_state is None” and the fallback should prefer child_state[cursor_field]. Can we tighten this to avoid bad state emission, wdyt?

-                if not parent_state and not isinstance(parent_state, dict):
-                    cursor_values = child_state.values()
-                    if cursor_values:
-                        incremental_sync_model: Union[
-                            DatetimeBasedCursorModel,
-                            IncrementingCountCursorModel,
-                            CustomIncrementalSyncModel,
-                        ] = (
-                            model.stream.incremental_sync  # type: ignore  # if we are there, it is because there is incremental_dependency and therefore there is an incremental_sync on the parent stream
-                            if isinstance(model.stream, DeclarativeStreamModel)
-                            else self._get_state_delegating_stream_model(
-                                has_parent_state, model.stream
-                            ).incremental_sync
-                        )
-                        cursor_field = InterpolatedString.create(
-                            incremental_sync_model.cursor_field,
-                            parameters=incremental_sync_model.parameters or {},
-                        ).eval(config)
-                        parent_state = AirbyteStateMessage(
-                            type=AirbyteStateType.STREAM,
-                            stream=AirbyteStreamState(
-                                stream_descriptor=StreamDescriptor(
-                                    name=parent_stream_name, namespace=None
-                                ),
-                                stream_state=AirbyteStateBlob(
-                                    {cursor_field: list(cursor_values)[0]}
-                                ),
-                            ),
-                        )
+                if parent_state is None:
+                    # Fallback: derive minimal parent state from child's legacy state
+                    selected_stream_model = (
+                        model.stream
+                        if isinstance(model.stream, DeclarativeStreamModel)
+                        else self._get_state_delegating_stream_model(has_parent_state, model.stream)
+                    )
+                    incr = getattr(selected_stream_model, "incremental_sync", None)
+                    cursor_field_str = (
+                        InterpolatedString.create(
+                            incr.cursor_field, parameters=getattr(incr, "parameters", {}) or {}
+                        ).eval(config)
+                        if incr and getattr(incr, "cursor_field", None)
+                        else None
+                    )
+                    value = child_state.get(cursor_field_str) if cursor_field_str else None
+                    if value is None:
+                        vals = list(child_state.values())
+                        value = vals[0] if vals else None
+                    if value is not None:
+                        parent_state = AirbyteStateMessage(
+                            type=AirbyteStateType.STREAM,
+                            stream=AirbyteStreamState(
+                                stream_descriptor=StreamDescriptor(
+                                    name=parent_stream_name, namespace=None
+                                ),
+                                stream_state=AirbyteStateBlob({(cursor_field_str or "cursor"): value}),
+                            ),
+                        )
🧹 Nitpick comments (9)
unit_tests/sources/declarative/parsers/testing_components.py (1)

49-51: Add test guard (and consider dropping @DataClass on the subclass)

To avoid any accidental pytest collection and keep consistency with nearby helpers, would you add test: ClassVar[bool] = False? Also, since no fields are added, do we want to drop @DataClass on the subclass and just inherit the base dataclass’ init/equality to reduce confusion, wdyt?

Proposed minimal tweak:

 @dataclass
 class TestingCustomRetriever(SimpleRetriever):
-    pass
+    __test__: ClassVar[bool] = False
+    pass
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (4)

926-941: Reduce reliance on private internals for cursor factory (and address the FIXME)

Accessing stream.cursor._cursor_factory and other underscored attributes makes the test brittle to internal refactors. Could we expose a tiny utility on DefaultStream/cursor for building an example ConcurrentCursor (or accept None in the factory as hinted) so the test doesn’t reach into privates, wdyt? If not feasible now, maybe open a follow-up to resolve the “FIXME should we be allowed to pass None”?


948-998: Custom retriever + transformations test: add a precise assertion?

This test proves transformations are preserved, but only via truthiness. Do we want to assert the AddFields content (e.g., first field path/value) to prevent regressions, similar to earlier tests, wdyt?

Example:

     assert isinstance(stream, DefaultStream)
-    assert get_retriever(stream).record_selector.transformations
+    transformations = get_retriever(stream).record_selector.transformations
+    assert transformations
+    af = transformations[0]
+    assert isinstance(af, AddFields)
+    assert af.fields[0].path == ["extra"]

1334-1341: Style nit: prefer “is True” for boolean asserts

Tiny readability tweak:

-    assert retriever.record_selector.transform_before_filtering == True
+    assert retriever.record_selector.transform_before_filtering is True

2109-2115: Asserting message repository via private field

The assertion against cursor._message_repository is valuable, but again touches a private. Would adding a simple public accessor (e.g., cursor.message_repository) make these tests more resilient, or alternatively asserting behavior (parent state suppression) in a focused unit test, wdyt?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)

2011-2015: Clarify error message for unsupported per-partition + IncrementingCountCursor

The message reads a bit confusing. Shall we make it explicit, wdyt?

-                raise ValueError(
-                    "PerPartition does not support per partition states because switching to global state is time based"
-                )
+                raise ValueError(
+                    "IncrementingCountCursor does not support per-partition routing. Use a global state (no partition router) or switch to DatetimeBasedCursor."
+                )

2119-2123: Prefer None over empty string for cursor_field

DefaultStream.cursor_field is Optional[str]; passing "" is ambiguous. Shall we pass None when unavailable, wdyt?

-            cursor_field=concurrent_cursor.cursor_field.cursor_field_key
-            if hasattr(concurrent_cursor, "cursor_field")
-            else "",  # FIXME we should have the cursor field has part of the interface of cursor,
+            cursor_field=(
+                getattr(getattr(concurrent_cursor, "cursor_field", None), "cursor_field_key", None)
+            ),

3867-3869: Tiny typo: extra spaces in error message

Shall we remove the extra spaces, wdyt?

-                raise ValueError("`condition` is only supported for     `ConfigComponentsResolver`")
+                raise ValueError("`condition` is only supported for `ConfigComponentsResolver`")

3879-3884: Reuse the retriever’s stream_slicer to avoid divergence

Since retriever was built from the same model, can we pass retriever.stream_slicer here to guarantee consistency, wdyt?

-        return HttpComponentsResolver(
-            retriever=retriever,
-            stream_slicer=self._build_stream_slicer_from_partition_router(model.retriever, config),
+        return HttpComponentsResolver(
+            retriever=retriever,
+            stream_slicer=retriever.stream_slicer,
             config=config,
             components_mapping=components_mapping,
             parameters=model.parameters or {},
         )
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between c004637 and 25ca5b8.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (40 hunks)
  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (15 hunks)
  • unit_tests/sources/declarative/parsers/testing_components.py (2 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.

Applied to files:

  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
📚 Learning: 2025-01-13T23:39:15.457Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.

Applied to files:

  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
🧬 Code graph analysis (3)
unit_tests/sources/declarative/parsers/testing_components.py (2)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • SimpleRetriever (2806-2860)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
  • SimpleRetriever (53-602)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (6)
airbyte_cdk/sources/message/repository.py (1)
  • StateFilteringMessageRepository (98-115)
airbyte_cdk/utils/datetime_helpers.py (1)
  • ab_datetime_parse (361-442)
airbyte_cdk/sources/streams/concurrent/default_stream.py (3)
  • DefaultStream (17-123)
  • cursor (92-93)
  • cursor_field (52-53)
unit_tests/connector_builder/test_connector_builder_handler.py (2)
  • get_retriever (443-448)
  • cursor_field (831-832)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)
  • ConcurrentPerPartitionCursor (53-593)
  • create (45-50)
  • cursor_field (141-142)
airbyte_cdk/sources/streams/concurrent/cursor.py (2)
  • ConcurrentCursor (135-518)
  • cursor_field (191-192)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (10)
airbyte_cdk/sources/declarative/extractors/record_extractor.py (1)
  • RecordExtractor (12-27)
airbyte_cdk/sources/declarative/requesters/request_options/per_partition_request_option_provider.py (1)
  • PerPartitionRequestOptionsProvider (8-95)
airbyte_cdk/sources/declarative/requesters/requester.py (2)
  • Requester (29-156)
  • get_url (38-47)
airbyte_cdk/sources/message/repository.py (2)
  • StateFilteringMessageRepository (98-115)
  • LogAppenderMessageRepositoryDecorator (118-157)
airbyte_cdk/sources/declarative/partition_routers/single_partition_router.py (2)
  • SinglePartitionRouter (13-57)
  • get_stream_state (53-57)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (4)
  • create (55-63)
  • stream_name (116-117)
  • StreamSlicerPartitionGenerator (123-146)
  • get_json_schema (28-32)
airbyte_cdk/sources/streams/concurrent/cursor.py (3)
  • cursor_field (191-192)
  • FinalStateCursor (90-132)
  • Cursor (51-87)
airbyte_cdk/sources/streams/concurrent/default_stream.py (6)
  • cursor_field (52-53)
  • name (44-45)
  • DefaultStream (17-123)
  • get_json_schema (55-56)
  • cursor (92-93)
  • namespace (48-49)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
  • primary_key (290-292)
  • primary_key (295-297)
  • name (106-114)
  • name (117-119)
airbyte_cdk/sources/connector_state_manager.py (2)
  • get_stream_state (53-67)
  • ConnectorStateManager (32-161)
🪛 GitHub Actions: Linters
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

[error] Ruff format would reformat 3 files. Run 'ruff format .' to fix code style issues.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] Ruff format would reformat 3 files. Run 'ruff format .' to fix code style issues.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-intercom
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Analyze (python)
🔇 Additional comments (9)
unit_tests/sources/declarative/parsers/testing_components.py (1)

16-16: Import addition looks good

Bringing in SimpleRetriever is necessary for the new test subclass; no concerns.

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (6)

166-166: StateFilteringMessageRepository import is appropriate

Used below to assert parent streams use the filtering wrapper; nice coverage.


202-203: Pre-parse config datetimes once

Good move factoring CONFIG_START_TIME/CONFIG_END_TIME to reduce duplication and drift across assertions.


721-721: Pass stream_name and assert DefaultStream parents

Providing stream_name here is key for proper naming propagation; asserting DefaultStream on parents aligns with the migration. Looks solid.

Also applies to: 727-728


1071-1074: Async retriever wiring assertions LGTM

Confirms DefaultStream and AsyncRetriever are produced as expected.


2525-2526: ConcurrentPerPartitionCursor expectations updated correctly

The new expected cursor/router types reflect the concurrent per-partition design; assertions look correct and scoped.

Also applies to: 2608-2612


3925-3926: GroupingPartitionRouter: pass stream_name and assert DefaultStream parent

Good consistency with SubstreamPartitionRouter tests and the broader DefaultStream migration.

Also applies to: 3936-3936, 3985-3985, 4037-4037

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

1-4: Fix Ruff formatting failures

CI says “Ruff format would reformat 3 files.” Can you run ruff format . (and ruff check .) locally to unblock, wdyt?


1993-2000: LGTM: start/end fields correctly wired to DatetimeBasedRequestOptionsProvider

Thanks for fixing the partition_field_start/partition_field_end wiring; this unblocks correct per-slice request options. Looks good to me.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)

1298-1301: Don't mutate component_definition in-place; parse a copy instead

Mutating the caller-provided mapping can leak side-effects (e.g., when passing model.dict). Shall we parse a shallow copy and inject $parameters there, wdyt?

-        if "$parameters" not in component_definition and "parameters" in component_definition:
-            component_definition["$parameters"] = component_definition.get("parameters")  # type: ignore  # This is a dict
-        datetime_based_cursor_model = model_type.parse_obj(component_definition)
+        local_def = dict(component_definition)
+        if "$parameters" not in local_def and "parameters" in local_def:
+            local_def["$parameters"] = local_def.get("parameters")  # type: ignore
+        datetime_based_cursor_model = model_type.parse_obj(local_def)

1609-1612: Same here: avoid in-place mutation of component_definition

Can we mirror the local copy approach here too, wdyt?

-        if "$parameters" not in component_definition and "parameters" in component_definition:
-            component_definition["$parameters"] = component_definition.get("parameters")  # type: ignore  # This is a dict
-        datetime_based_cursor_model = model_type.parse_obj(component_definition)
+        local_def = dict(component_definition)
+        if "$parameters" not in local_def and "parameters" in local_def:
+            local_def["$parameters"] = local_def.get("parameters")  # type: ignore
+        datetime_based_cursor_model = model_type.parse_obj(local_def)

3762-3797: Parent-state fallback: guard and value selection are off

The guard should be “is None”, and the value should prefer child_state[cursor_field] over the first arbitrary dict value. Propose tightening this, wdyt?

-            if not parent_state:
+            if not parent_state:
                 # there are two migration cases: state value from child stream or from global state
                 parent_state = ConcurrentPerPartitionCursor.get_global_state(
                     child_state, parent_stream_name
                 )
 
-                if not parent_state and not isinstance(parent_state, dict):
-                    cursor_values = child_state.values()
-                    if cursor_values:
-                        incremental_sync_model: Union[
-                            DatetimeBasedCursorModel,
-                            IncrementingCountCursorModel,
-                            CustomIncrementalSyncModel,
-                        ] = (
-                            model.stream.incremental_sync  # type: ignore  # if we are there, it is because there is incremental_dependency and therefore there is an incremental_sync on the parent stream
-                            if isinstance(model.stream, DeclarativeStreamModel)
-                            else self._get_state_delegating_stream_model(
-                                has_parent_state, model.stream
-                            ).incremental_sync
-                        )
-                        cursor_field = InterpolatedString.create(
-                            incremental_sync_model.cursor_field,
-                            parameters=incremental_sync_model.parameters or {},
-                        ).eval(config)
-                        parent_state = AirbyteStateMessage(
-                            type=AirbyteStateType.STREAM,
-                            stream=AirbyteStreamState(
-                                stream_descriptor=StreamDescriptor(
-                                    name=parent_stream_name, namespace=None
-                                ),
-                                stream_state=AirbyteStateBlob(
-                                    {cursor_field: list(cursor_values)[0]}
-                                ),
-                            ),
-                        )
+                if parent_state is None:
+                    # Fallback: derive minimal parent state from child's state blob
+                    selected_stream_model = (
+                        model.stream
+                        if isinstance(model.stream, DeclarativeStreamModel)
+                        else self._get_state_delegating_stream_model(has_parent_state, model.stream)
+                    )
+                    incr = getattr(selected_stream_model, "incremental_sync", None)
+                    cursor_field_str = (
+                        InterpolatedString.create(
+                            incr.cursor_field, parameters=getattr(incr, "parameters", {}) or {}
+                        ).eval(config)
+                        if incr and getattr(incr, "cursor_field", None)
+                        else None
+                    )
+                    value = child_state.get(cursor_field_str) if cursor_field_str else None
+                    if value is None:
+                        vals = list(child_state.values())
+                        value = vals[0] if vals else None
+                    if value is not None:
+                        parent_state = AirbyteStateMessage(
+                            type=AirbyteStateType.STREAM,
+                            stream=AirbyteStreamState(
+                                stream_descriptor=StreamDescriptor(
+                                    name=parent_stream_name, namespace=None
+                                ),
+                                stream_state=AirbyteStateBlob(
+                                    {cursor_field_str or "cursor": value}
+                                ),
+                            ),
+                        )
🧹 Nitpick comments (4)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)

2053-2057: Fix type: stream_slicer can be a PartitionRouter or a ConcurrentCursor

The annotation ConcurrentStreamSlicer is too narrow and may trip mypy. Shall we drop the explicit annotation, wdyt?

-        stream_slicer: ConcurrentStreamSlicer = (
+        stream_slicer = (
             partition_router
             if isinstance(concurrent_cursor, FinalStateCursor)
             else concurrent_cursor
         )

2012-2015: Clarify error message

Nit: The message reads a bit confusing. Shall we reword to explicitly call out IncrementingCountCursor, wdyt?

-                raise ValueError(
-                    "PerPartition does not support per partition states because switching to global state is time based"
-                )
+                raise ValueError(
+                    "IncrementingCountCursor does not support per-partition state; it requires a single global state."
+                )

3868-3868: Whitespace nit in error string

There are extra spaces before ConfigComponentsResolver. Trim for readability, wdyt?

-                raise ValueError("`condition` is only supported for     `ConfigComponentsResolver`")
+                raise ValueError("`condition` is only supported for `ConfigComponentsResolver`")

1748-1758: Run ruff format to satisfy linters

Ruff flagged formatting on these ranges. Can we run ruff format to keep CI green, wdyt?

Also applies to: 1764-1778, 3713-3719

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 25ca5b8 and e5ecf41.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (40 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (11)
airbyte_cdk/models/airbyte_protocol.py (3)
  • AirbyteStateBlob (15-50)
  • AirbyteStateMessage (67-75)
  • AirbyteStreamState (55-57)
airbyte_cdk/sources/declarative/extractors/record_extractor.py (1)
  • RecordExtractor (12-27)
airbyte_cdk/sources/declarative/requesters/request_options/per_partition_request_option_provider.py (1)
  • PerPartitionRequestOptionsProvider (8-95)
airbyte_cdk/sources/declarative/requesters/requester.py (4)
  • HttpMethod (18-26)
  • Requester (29-156)
  • get_url (38-47)
  • get_url_base (50-59)
airbyte_cdk/sources/message/repository.py (2)
  • StateFilteringMessageRepository (98-115)
  • LogAppenderMessageRepositoryDecorator (118-157)
airbyte_cdk/sources/declarative/partition_routers/single_partition_router.py (2)
  • SinglePartitionRouter (13-57)
  • get_stream_state (53-57)
airbyte_cdk/sources/declarative/interpolation/interpolated_string.py (3)
  • InterpolatedString (13-79)
  • create (63-79)
  • eval (35-55)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (5)
  • create (45-50)
  • cursor_field (141-142)
  • ConcurrentPerPartitionCursor (53-593)
  • get_parent_state (554-577)
  • get_global_state (580-593)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (5)
  • create (55-63)
  • stream_name (116-117)
  • StreamSlicerPartitionGenerator (123-146)
  • DeclarativePartitionFactory (35-63)
  • get_json_schema (28-32)
airbyte_cdk/sources/streams/concurrent/cursor.py (4)
  • cursor_field (191-192)
  • CursorField (40-48)
  • FinalStateCursor (90-132)
  • Cursor (51-87)
airbyte_cdk/sources/streams/concurrent/default_stream.py (6)
  • cursor_field (52-53)
  • name (44-45)
  • DefaultStream (17-123)
  • get_json_schema (55-56)
  • cursor (92-93)
  • namespace (48-49)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 1748-1758: Ruff format would reformat code (lines 1748-1758). Run 'ruff format' to apply formatting.


[error] 1764-1778: Ruff format would reformat code (lines 1764-1778). Run 'ruff format' to apply formatting.


[error] 3713-3719: Ruff format would reformat code (lines 3713-3719). Run 'ruff format' to apply formatting.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)

1993-2007: Partition field wiring fix LGTM

Using partition_field_start=cursor_model.partition_field_start and partition_field_end=cursor_model.partition_field_end is correct. Thanks for fixing the swap.


3145-3159: Correct base-URL fallback LGTM

Switching the fallback to requester.get_url_base(...) is the right call.


2877-2911: Verify stream_name propagation in substream wrapper
We’ve confirmed that all direct create_parent_stream_config(…, stream_name=…) calls are updated; please also ensure that _create_message_repository_substream_wrapper (invoked around line 3687) receives and forwards the new stream_name via its **kwargs, since it isn’t passed explicitly. wdyt?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

2151-2156: Bug: isinstance(..., A | B) raises at runtime

Using a union literal in isinstance is invalid at runtime in Python; this will raise TypeError. Can we switch to a tuple, wdyt?

-        if (
-            hasattr(model, "partition_router")
-            and isinstance(model, SimpleRetrieverModel | AsyncRetrieverModel)
-            and model.partition_router
-        ):
+        if (
+            hasattr(model, "partition_router")
+            and isinstance(model, (SimpleRetrieverModel, AsyncRetrieverModel))
+            and model.partition_router
+        ):
♻️ Duplicate comments (4)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)

1298-1301: Don't mutate component_definition in-place; parse a local copy instead

Mutating caller-owned dicts can leak side-effects. Can we parse a shallow copy and set "$parameters" there, wdyt?

-        if "$parameters" not in component_definition and "parameters" in component_definition:
-            component_definition["$parameters"] = component_definition.get("parameters")  # type: ignore  # This is a dict
-        datetime_based_cursor_model = model_type.parse_obj(component_definition)
+        local_def = dict(component_definition)
+        if "$parameters" not in local_def and "parameters" in local_def:
+            local_def["$parameters"] = local_def.get("parameters")  # type: ignore
+        datetime_based_cursor_model = model_type.parse_obj(local_def)

2053-2057: Type annotation mismatch: stream_slicer can be PartitionRouter or ConcurrentCursor

This variable holds either a PartitionRouter (final-state) or a ConcurrentCursor; annotating it as ConcurrentStreamSlicer is misleading and risks mypy issues. Shall we drop the explicit annotation (or use a Union), wdyt?

-        stream_slicer: ConcurrentStreamSlicer = (
+        # Duck-typed: may be a ConcurrentCursor or a PartitionRouter
+        stream_slicer = (
             partition_router
             if isinstance(concurrent_cursor, FinalStateCursor)
             else concurrent_cursor
         )

3704-3720: has_parent_state checks the wrong stream

This reads the child’s state (kwargs["stream_name"]) instead of the parent (model.stream.name). Can we correct it, wdyt?

-        has_parent_state = bool(
-            self._connector_state_manager.get_stream_state(kwargs.get("stream_name", ""), None)
-            if model.incremental_dependency
-            else False
-        )
+        parent_stream_name = model.stream.name or ""
+        has_parent_state = (
+            bool(self._connector_state_manager.get_stream_state(parent_stream_name, None))
+            if model.incremental_dependency
+            else False
+        )

3745-3801: Parent-state fallback guard and value selection need tightening

The guard should be “is None”, and the fallback should prefer the child’s value for the computed cursor_field (not an arbitrary first dict value). Can we apply the safer logic below, wdyt?

-        if model.incremental_dependency and child_state:
+        if model.incremental_dependency and child_state:
             parent_stream_name = model.stream.name or ""
             parent_state = ConcurrentPerPartitionCursor.get_parent_state(
                 child_state, parent_stream_name
             )
 
             if not parent_state:
                 # there are two migration cases: state value from child stream or from global state
                 parent_state = ConcurrentPerPartitionCursor.get_global_state(
                     child_state, parent_stream_name
                 )
 
-                if not parent_state and not isinstance(parent_state, dict):
-                    cursor_values = child_state.values()
-                    if cursor_values:
-                        incremental_sync_model: Union[
-                            DatetimeBasedCursorModel,
-                            IncrementingCountCursorModel,
-                            CustomIncrementalSyncModel,
-                        ] = (
-                            model.stream.incremental_sync  # type: ignore  # if we are there, it is because there is incremental_dependency and therefore there is an incremental_sync on the parent stream
-                            if isinstance(model.stream, DeclarativeStreamModel)
-                            else self._get_state_delegating_stream_model(
-                                has_parent_state, model.stream
-                            ).incremental_sync
-                        )
-                        cursor_field = InterpolatedString.create(
-                            incremental_sync_model.cursor_field,
-                            parameters=incremental_sync_model.parameters or {},
-                        ).eval(config)
-                        parent_state = AirbyteStateMessage(
-                            type=AirbyteStateType.STREAM,
-                            stream=AirbyteStreamState(
-                                stream_descriptor=StreamDescriptor(
-                                    name=parent_stream_name, namespace=None
-                                ),
-                                stream_state=AirbyteStateBlob(
-                                    {cursor_field: list(cursor_values)[0]}
-                                ),
-                            ),
-                        )
+                if parent_state is None:
+                    # Fallback: derive minimal parent state from the child's legacy state
+                    selected_stream_model = (
+                        model.stream
+                        if isinstance(model.stream, DeclarativeStreamModel)
+                        else self._get_state_delegating_stream_model(has_parent_state, model.stream)
+                    )
+                    incr = getattr(selected_stream_model, "incremental_sync", None)
+                    cursor_field = (
+                        InterpolatedString.create(
+                            incr.cursor_field, parameters=getattr(incr, "parameters", {}) or {}
+                        ).eval(config)
+                        if incr and getattr(incr, "cursor_field", None)
+                        else None
+                    )
+                    value = child_state.get(cursor_field) if cursor_field else None
+                    if value is None and child_state:
+                        # last resort: pick any single value
+                        value = next(iter(child_state.values()), None)
+                    if value is not None:
+                        parent_state = AirbyteStateMessage(
+                            type=AirbyteStateType.STREAM,
+                            stream=AirbyteStreamState(
+                                stream_descriptor=StreamDescriptor(
+                                    name=parent_stream_name, namespace=None
+                                ),
+                                stream_state=AirbyteStateBlob({(cursor_field or "cursor"): value}),
+                            ),
+                        )
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

3238-3242: Request options provider override: confirm intent

When a caller passes DefaultRequestOptionsProvider and a partition_router is present, we swap the provider to the router. Is this meant to always win over a user-supplied default provider, including cases where the caller expected query/body/header options from DefaultRequestOptionsProvider, wdyt?


645-645: Ruff format failing in CI

Ruff reports 3 files would be reformatted. Shall we run poetry run ruff format (or enable pre-commit) to unblock CI, wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between e5ecf41 and 9e6368f.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (40 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-01-14T00:20:32.310Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.

Applied to files:

  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
🧬 Code graph analysis (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (11)
airbyte_cdk/models/airbyte_protocol.py (3)
  • AirbyteStateBlob (15-50)
  • AirbyteStateMessage (67-75)
  • AirbyteStreamState (55-57)
airbyte_cdk/sources/declarative/extractors/record_extractor.py (1)
  • RecordExtractor (12-27)
airbyte_cdk/sources/declarative/requesters/request_options/per_partition_request_option_provider.py (1)
  • PerPartitionRequestOptionsProvider (8-95)
airbyte_cdk/sources/declarative/requesters/requester.py (3)
  • Requester (29-156)
  • get_url (38-47)
  • get_url_base (50-59)
airbyte_cdk/sources/message/repository.py (1)
  • StateFilteringMessageRepository (98-115)
airbyte_cdk/sources/declarative/partition_routers/single_partition_router.py (2)
  • SinglePartitionRouter (13-57)
  • get_stream_state (53-57)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (4)
  • create (45-50)
  • cursor_field (141-142)
  • get_parent_state (554-577)
  • get_global_state (580-593)
airbyte_cdk/sources/streams/concurrent/cursor.py (3)
  • cursor_field (191-192)
  • FinalStateCursor (90-132)
  • Cursor (51-87)
airbyte_cdk/sources/streams/concurrent/default_stream.py (5)
  • cursor_field (52-53)
  • name (44-45)
  • DefaultStream (17-123)
  • cursor (92-93)
  • namespace (48-49)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (5)
  • primary_key (290-292)
  • primary_key (295-297)
  • name (106-114)
  • name (117-119)
  • SimpleRetriever (53-602)
airbyte_cdk/sources/connector_state_manager.py (2)
  • get_stream_state (53-67)
  • ConnectorStateManager (32-161)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] Ruff format check failed. Command 'poetry run ruff format --diff .' exited with code 1. 3 files would be reformatted. Run 'ruff format' to fix code style issues.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (6)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (6)

1993-2007: Nice: per-partition request options provider wiring

Switching to PerPartitionRequestOptionsProvider for ConcurrentPerPartitionCursor looks correct and keeps params merged across partition and cursor; LGTM.


3145-3160: Good fix: fall back to get_url_base() when url_base is unset

The corrected fallback avoids building relative paths from a full URL; looks good.


3378-3383: Nice: bool coercion for has_parent_state removes mypy noise

Casting Optional[bool] to bool here is clean; LGTM.


3437-3440: Async download retriever builder looks sound

Passing Requester/RecordExtractor explicitly and reusing paginator wiring is clear; LGTM.

Also applies to: 3559-3560


3884-3885: Good: thread stream_slicer into HttpComponentsResolver

This keeps resolver behavior aligned with retriever slicing; LGTM.


4110-4113: Passing through kwargs to underlying partition router

Forwarding kwargs improves composability; LGTM.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (4)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)

1298-1301: Avoid mutating component_definition; parse a copy instead

Mutating caller-provided mappings can leak side effects (e.g., when passing model.dict). Shall we parse a shallow copy and set $parameters there, wdyt?

-        if "$parameters" not in component_definition and "parameters" in component_definition:
-            component_definition["$parameters"] = component_definition.get("parameters")  # type: ignore  # This is a dict
-        datetime_based_cursor_model = model_type.parse_obj(component_definition)
+        local_def = dict(component_definition)
+        if "$parameters" not in local_def and "parameters" in local_def:
+            local_def["$parameters"] = local_def.get("parameters")  # type: ignore
+        datetime_based_cursor_model = model_type.parse_obj(local_def)

1609-1612: Same: don’t mutate component_definition in the per-partition path

Can we apply the same non-mutating copy approach here too, wdyt?

-        if "$parameters" not in component_definition and "parameters" in component_definition:
-            component_definition["$parameters"] = component_definition.get("parameters")  # type: ignore  # This is a dict
-        datetime_based_cursor_model = model_type.parse_obj(component_definition)
+        local_def = dict(component_definition)
+        if "$parameters" not in local_def and "parameters" in local_def:
+            local_def["$parameters"] = local_def.get("parameters")  # type: ignore
+        datetime_based_cursor_model = model_type.parse_obj(local_def)

3712-3717: Bug: has_parent_state checks the child’s state, not the parent’s

This should key off the parent stream (model.stream.name), otherwise StateDelegatingStream decisions are wrong. Can we switch to the parent name, wdyt?

-        has_parent_state = bool(
-            self._connector_state_manager.get_stream_state(kwargs.get("stream_name", ""), None)
-            if model.incremental_dependency
-            else False
-        )
+        parent_stream_name = model.stream.name or ""
+        has_parent_state = (
+            bool(self._connector_state_manager.get_stream_state(parent_stream_name, None))
+            if model.incremental_dependency
+            else False
+        )

3766-3801: Parent-state fallback guard and value selection are incorrect

  • The guard should be explicit (parent_state is None).
  • Prefer the child’s cursor_field value over the first arbitrary dict value.

Can we apply the safer fallback below, wdyt?

-                if not parent_state and not isinstance(parent_state, dict):
-                    cursor_values = child_state.values()
-                    if cursor_values:
-                        incremental_sync_model: Union[
-                            DatetimeBasedCursorModel,
-                            IncrementingCountCursorModel,
-                            CustomIncrementalSyncModel,
-                        ] = (
-                            model.stream.incremental_sync  # type: ignore  # if we are there, it is because there is incremental_dependency and therefore there is an incremental_sync on the parent stream
-                            if isinstance(model.stream, DeclarativeStreamModel)
-                            else self._get_state_delegating_stream_model(
-                                has_parent_state, model.stream
-                            ).incremental_sync
-                        )
-                        cursor_field = InterpolatedString.create(
-                            incremental_sync_model.cursor_field,
-                            parameters=incremental_sync_model.parameters or {},
-                        ).eval(config)
-                        parent_state = AirbyteStateMessage(
-                            type=AirbyteStateType.STREAM,
-                            stream=AirbyteStreamState(
-                                stream_descriptor=StreamDescriptor(
-                                    name=parent_stream_name, namespace=None
-                                ),
-                                stream_state=AirbyteStateBlob(
-                                    {cursor_field: list(cursor_values)[0]}
-                                ),
-                            ),
-                        )
+                if parent_state is None:
+                    incr_model: Union[
+                        DatetimeBasedCursorModel, IncrementingCountCursorModel, CustomIncrementalSyncModel
+                    ] = (
+                        model.stream.incremental_sync  # type: ignore
+                        if isinstance(model.stream, DeclarativeStreamModel)
+                        else self._get_state_delegating_stream_model(has_parent_state, model.stream).incremental_sync
+                    )
+                    cursor_field = InterpolatedString.create(
+                        incr_model.cursor_field, parameters=incr_model.parameters or {}
+                    ).eval(config)
+                    value = child_state.get(cursor_field)
+                    if value is None:
+                        vals = list(child_state.values())
+                        value = vals[0] if vals else None
+                    if value is not None:
+                        parent_state = AirbyteStateMessage(
+                            type=AirbyteStateType.STREAM,
+                            stream=AirbyteStreamState(
+                                stream_descriptor=StreamDescriptor(name=parent_stream_name, namespace=None),
+                                stream_state=AirbyteStateBlob({cursor_field: value}),
+                            ),
+                        )
🧹 Nitpick comments (4)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)

2011-2015: Clarify error message

The message reads “PerPartition does not support per partition states…”. Should this say “IncrementingCountCursor does not support per-partition state; switching to global state is time-based”, wdyt?

-                raise ValueError(
-                    "PerPartition does not support per partition states because switching to global state is time based"
-                )
+                raise ValueError(
+                    "IncrementingCountCursor does not support per-partition state; switching to global state is time-based."
+                )

2053-2057: Type mismatch: stream_slicer variable can be a PartitionRouter or ConcurrentCursor

The annotation forces ConcurrentStreamSlicer but the value may be a PartitionRouter. Shall we drop the annotation (or use a Union) to keep mypy happy, wdyt?

-        stream_slicer: ConcurrentStreamSlicer = (
+        stream_slicer = (
             partition_router
             if isinstance(concurrent_cursor, FinalStateCursor)
             else concurrent_cursor
         )

2064-2066: Stop passing unused stream_slicer kwarg to SimpleRetriever

create_simple_retriever doesn’t use stream_slicer anymore (and it’s swallowed by **kwargs). Removing it reduces confusion without behavior change. Shall we drop it, wdyt?

-            request_options_provider=request_options_provider,
-            stream_slicer=stream_slicer,
-            partition_router=partition_router,
+            request_options_provider=request_options_provider,
+            partition_router=partition_router,

2120-2123: Prefer None over empty string for missing cursor_field

DefaultStream treats cursor_field truthily; passing None conveys intent better than "". Shall we switch, wdyt?

-            cursor_field=concurrent_cursor.cursor_field.cursor_field_key
-            if hasattr(concurrent_cursor, "cursor_field")
-            else "",  # FIXME we should have the cursor field has part of the interface of cursor,
+            cursor_field=(
+                concurrent_cursor.cursor_field.cursor_field_key
+                if hasattr(concurrent_cursor, "cursor_field")
+                else None
+            ),  # FIXME we should have the cursor field in Cursor's interface
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 9e6368f and f2f1363.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (41 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (11)
airbyte_cdk/models/airbyte_protocol.py (3)
  • AirbyteStateBlob (15-50)
  • AirbyteStateMessage (67-75)
  • AirbyteStreamState (55-57)
airbyte_cdk/sources/declarative/extractors/record_extractor.py (1)
  • RecordExtractor (12-27)
airbyte_cdk/sources/declarative/requesters/request_options/per_partition_request_option_provider.py (1)
  • PerPartitionRequestOptionsProvider (8-95)
airbyte_cdk/sources/declarative/requesters/requester.py (4)
  • HttpMethod (18-26)
  • Requester (29-156)
  • get_url (38-47)
  • get_url_base (50-59)
airbyte_cdk/sources/message/repository.py (2)
  • StateFilteringMessageRepository (98-115)
  • LogAppenderMessageRepositoryDecorator (118-157)
airbyte_cdk/sources/declarative/partition_routers/single_partition_router.py (2)
  • SinglePartitionRouter (13-57)
  • get_stream_state (53-57)
airbyte_cdk/sources/declarative/interpolation/interpolated_string.py (3)
  • InterpolatedString (13-79)
  • create (63-79)
  • eval (35-55)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (5)
  • create (45-50)
  • cursor_field (141-142)
  • ConcurrentPerPartitionCursor (53-593)
  • get_parent_state (554-577)
  • get_global_state (580-593)
airbyte_cdk/sources/streams/concurrent/cursor.py (3)
  • cursor_field (191-192)
  • FinalStateCursor (90-132)
  • Cursor (51-87)
airbyte_cdk/sources/streams/concurrent/default_stream.py (6)
  • cursor_field (52-53)
  • name (44-45)
  • DefaultStream (17-123)
  • get_json_schema (55-56)
  • cursor (92-93)
  • namespace (48-49)
airbyte_cdk/sources/declarative/extractors/record_selector.py (3)
  • name (57-65)
  • name (68-70)
  • RecordSelector (25-170)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 1748-3705: Ruff format would reformat this file. Run 'ruff format' to fix code style issues.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)

1307-1311: Nice: consistent parameter propagation to all interpolations

Threading model.parameters into cursor_field/partition fields/lookback/step/clamping fixes subtle jinja issues. LGTM!

Also applies to: 1314-1321, 1338-1344, 1421-1445


1993-2007: Good fix: correct start/end wiring + per-partition request options provider

partition_field_start/end now use the right model fields, and PerPartitionRequestOptionsProvider is applied only for ConcurrentPerPartitionCursor. Looks solid.


3152-3161: Correct fallback: use get_url_base() for _url_base

Thanks for fixing the fallback to req.get_url_base(). That avoids constructing incorrect relative pagination paths. LGTM.


1-1: Commit Ruff formatting changes

CI reported that ruff format reformatted airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1 file) and all checks now pass—could you commit those formatting changes so CI succeeds? wdyt?

Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re-reviewing, makes sense and good catch!

coderabbitai[bot]
coderabbitai bot previously requested changes Sep 3, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (3)
cdk-migrations.md (3)

21-24: Disambiguate AbstractStream vs DefaultStream to reduce confusion.

Since PR #707 instantiates incremental streams as DefaultStream but the migration step mentions AbstractStream, could we add one clarifying sentence, wdyt?

-Migration steps: If your custom component relies on SubstreamPartitionRouter.parent_stream_configs[x].stream, make sure you migrate from the `DeclarativeStream` interface to the `AbstractStream` one.
+Migration steps: If your custom component relies on `SubstreamPartitionRouter.parent_stream_configs[x].stream`, migrate from the `DeclarativeStream` interface to `AbstractStream`. Note: instantiation commonly uses `DefaultStream`, which implements the `AbstractStream` interface.

Also, can you verify that SubstreamPartitionRouter now expects an AbstractStream-compatible object?


3-12: Add a short “Breaking changes” callout with a migration checklist.

Would a quick checklist help readers scan and action the 7.0.0 items, wdyt?

 ## Upgrading to 7.0.0
+
+> [!IMPORTANT]
+> Breaking changes summary:
+> - Remove `CustomIncrementalSync`
+> - Do not implement `Retriever.state` or `Retriever.stream_slices`
+> - Migrate `SubstreamPartitionRouter` usages to `AbstractStream`/`DefaultStream`
+> 
+> Quick checklist:
+> - [ ] Replace `DeclarativeStream` with `DefaultStream` (implements `AbstractStream`) where applicable
+> - [ ] Remove retriever-owned state and slice generation
+> - [ ] Migrate any `RequestOptionsProvider`-only custom components to string interpolation if possible

27-30: Tighten deprecation phrasing and grammar

In cdk-migrations.md (lines 27–30), replace the existing text with:

Migration steps: Ensure that you do not implement `Retriever.stream_slices` or rely on `SimpleRetriever.stream_slicer`. Implement your own `PartitionRouter` to control slice generation.

Rationale: Unit-of-work generation is handled by the Concurrent CDK for parallelization. Going through the retriever for slice generation is no longer necessary. This method is deprecated and will be removed in a future major version.

Shall we specify the target removal version (e.g., 8.0.0) once decided? wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between f2f1363 and 38cd657.

📒 Files selected for processing (1)
  • cdk-migrations.md (1 hunks)
🧰 Additional context used
🪛 LanguageTool
cdk-migrations.md

[style] ~11-~11: Consider using “incompatible” to avoid wordiness.
Context: ...lSync do not account for that hence are not compatible with the new version of the CDK. Also, ...

(NOT_ABLE_PREMIUM)


[style] ~11-~11: ‘On top of that’ might be wordy. Consider a shorter alternative.
Context: ...rementalSync that were actually needed. On top of that, state management is much more complex ...

(EN_WORDINESS_PREMIUM_ON_TOP_OF_THAT)


[style] ~23-~23: Consider using “incompatible” to avoid wordiness.
Context: ...onale: DeclarativeStream interface is not compatible with the AbstractStream from the Conc...

(NOT_ABLE_PREMIUM)


[style] ~23-~23: Consider a more concise word here.
Context: ...bstractStream` from the Concurrent CDK. In order to avoid maintaining two different instant...

(IN_ORDER_TO_PREMIUM)


[style] ~29-~29: Consider a more concise word here.
Context: ...ent reasons to go through the retriever in order to get the stream slices. Hence, we are de...

(IN_ORDER_TO_PREMIUM)


[style] ~33-~33: To elevate your writing, try using a synonym here.
Context: ... ### Possible Missing Features We have seen that some custom components were create...

(SEEN_THAT)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
🔇 Additional comments (1)
cdk-migrations.md (1)

33-33: Fix typos and improve clarity in “Possible Missing Features”

-We have seen that some custom components were create just for the RequestOptionsProvider interface. There should always be an escape path for that which is the string interpolation. Given this is not enough, feel free to reach out to our team so that we can figure out a solution.
+We have seen that some custom components were created solely for the `RequestOptionsProvider` interface. There is an escape hatch via string interpolation. If that is insufficient, please reach out so we can find a solution.

Could we link to the RequestOptionsProvider docs or the CDK’s string interpolation filter here? wdyt?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

2957-2964: Undefined variable in error path

property_limit_type is referenced before assignment in the default case. Can we use model.property_limit_type (or a constant) instead, wdyt?

-            case _:
-                raise ValueError(f"Invalid PropertyLimitType {property_limit_type}")
+            case _:
+                raise ValueError(f"Invalid PropertyLimitType {model.property_limit_type}")
♻️ Duplicate comments (5)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)

1122-1126: Make the invariant explicit to avoid brittle indexing

Accessing intermediate_states[-2] will raise IndexError if the invariant changes. Since we expect two emissions (close_partition + final), could we assert the invariant first for a clearer failure and then compare, wdyt?

         # Assert that the number of intermediate states is as expected
         assert len(intermediate_states) - 1 == num_intermediate_states
+        # Invariant: we must have at least 2 state emissions (close_partition and final)
+        assert len(intermediate_states) >= 2, "Expected at least two state emissions"
         # Assert that ensure_at_least_one_state_emitted is called before yielding the last record from the last slice
         assert (
             intermediate_states[-1][0].stream.stream_state.__dict__["parent_state"]
             == intermediate_states[-2][0].stream.stream_state.__dict__["parent_state"]
         )
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)

1298-1301: Don’t mutate component_definition in-place; parse a copy

Mutating the caller-provided mapping still risks side-effects. Can we copy, set $parameters on the copy, and parse that, wdyt?

-        if "$parameters" not in component_definition and "parameters" in component_definition:
-            component_definition["$parameters"] = component_definition.get("parameters")  # type: ignore  # This is a dict
-        datetime_based_cursor_model = model_type.parse_obj(component_definition)
+        local_def = {**component_definition}
+        if "$parameters" not in local_def and "parameters" in local_def:
+            local_def["$parameters"] = local_def.get("parameters")  # type: ignore
+        datetime_based_cursor_model = model_type.parse_obj(local_def)

1609-1611: Repeat: avoid mutating component_definition here too

Same in per-partition cursor path; let’s parse a safe copy, wdyt?

-        if "$parameters" not in component_definition and "parameters" in component_definition:
-            component_definition["$parameters"] = component_definition.get("parameters")  # type: ignore  # This is a dict
-        datetime_based_cursor_model = model_type.parse_obj(component_definition)
+        local_def = {**component_definition}
+        if "$parameters" not in local_def and "parameters" in local_def:
+            local_def["$parameters"] = local_def.get("parameters")  # type: ignore
+        datetime_based_cursor_model = model_type.parse_obj(local_def)

3726-3734: Bug: has_parent_state checks the child’s state, not the parent’s

This should look up model.stream.name. Otherwise StateDelegatingStream decisions can be wrong. Shall we switch to the parent stream name, wdyt?

-        has_parent_state = bool(
-            self._connector_state_manager.get_stream_state(kwargs.get("stream_name", ""), None)
-            if model.incremental_dependency
-            else False
-        )
+        parent_stream_name = model.stream.name or ""
+        has_parent_state = (
+            bool(self._connector_state_manager.get_stream_state(parent_stream_name, None))
+            if model.incremental_dependency
+            else False
+        )

3760-3817: Parent-state fallback guard and value selection are off

Use an explicit None check and prefer the child’s value for the computed cursor_field over the “first dict value”. Also keep the union guards. Proposed fix below, wdyt?

@@
-        if model.incremental_dependency and child_state:
+        if model.incremental_dependency and child_state:
             parent_stream_name = model.stream.name or ""
             parent_state = ConcurrentPerPartitionCursor.get_parent_state(
                 child_state, parent_stream_name
             )
@@
-            if not parent_state:
+            if not parent_state:
                 # there are two migration cases: state value from child stream or from global state
                 parent_state = ConcurrentPerPartitionCursor.get_global_state(
                     child_state, parent_stream_name
                 )
 
-                if not parent_state and not isinstance(parent_state, dict):
-                    cursor_values = child_state.values()
-                    if cursor_values:
-                        incremental_sync_model: Union[
-                            DatetimeBasedCursorModel,
-                            IncrementingCountCursorModel,
-                            CustomIncrementalSyncModel,
-                        ] = (
-                            model.stream.incremental_sync  # type: ignore  # if we are there, it is because there is incremental_dependency and therefore there is an incremental_sync on the parent stream
-                            if isinstance(model.stream, DeclarativeStreamModel)
-                            else self._get_state_delegating_stream_model(
-                                has_parent_state, model.stream
-                            ).incremental_sync
-                        )
-                        cursor_field = InterpolatedString.create(
-                            incremental_sync_model.cursor_field,
-                            parameters=incremental_sync_model.parameters or {},
-                        ).eval(config)
-                        parent_state = AirbyteStateMessage(
+                if parent_state is None:
+                    # Fallback: derive minimal parent state from the child's legacy blob
+                    selected_stream_model = (
+                        model.stream
+                        if isinstance(model.stream, DeclarativeStreamModel)
+                        else self._get_state_delegating_stream_model(has_parent_state, model.stream)
+                    )
+                    incr = getattr(selected_stream_model, "incremental_sync", None)
+                    cursor_field_str = (
+                        InterpolatedString.create(
+                            incr.cursor_field, parameters=getattr(incr, "parameters", {}) or {}
+                        ).eval(config)
+                        if incr and getattr(incr, "cursor_field", None)
+                        else None
+                    )
+                    value = child_state.get(cursor_field_str) if cursor_field_str else None
+                    if value is None:
+                        vals = list(child_state.values())
+                        value = vals[0] if vals else None
+                    if value is not None:
+                        parent_state = AirbyteStateMessage(
                             type=AirbyteStateType.STREAM,
                             stream=AirbyteStreamState(
                                 stream_descriptor=StreamDescriptor(
                                     name=parent_stream_name, namespace=None
                                 ),
-                                stream_state=AirbyteStateBlob(
-                                    {cursor_field: list(cursor_values)[0]}
-                                ),
+                                stream_state=AirbyteStateBlob({(cursor_field_str or "cursor"): value}),
                             ),
                         )
             return ConnectorStateManager([parent_state] if parent_state else [])
🧹 Nitpick comments (18)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (9)

10-11: Avoid name shadowing: drop function import and keep module import for orjson

Importing both from orjson import orjson and import orjson shadows the function with the module name and is confusing. Since the code uses orjson.dumps, can we remove the function import to keep only import orjson, wdyt?

- from orjson import orjson

Also applies to: 325-326


1191-1195: Resolve FIXME or soften it to NOTE

Given the concurrent cursor now updates parent state earlier and the test codifies this with an extra posts fetch, could we replace the FIXME with a NOTE (or remove it) to reflect the accepted behavior, wdyt?

-                # FIXME this is an interesting case. The previous solution would not update the parent state until `ensure_at_least_one_state_emitted` but the concurrent cursor does just before which is probably fine too
+                # NOTE: Concurrent cursor updates parent state earlier than the legacy path; this test reflects that behavior.

1490-1492: Reduce repetition for “parent re-fetch” requests?

We add identical “re-fetch posts with start_time at latest parent cursor” tuples in multiple parametrizations. Would a small helper (e.g., make_posts_refresh_request(ts)) improve readability and keep these in sync, wdyt?


1635-1637: Same duplication as above

Same suggestion as Lines 1490–1492 to avoid repeating the “posts refresh” request tuple, wdyt?


1742-1742: Unify lookback_window units across tests

Some expected states assert lookback_window: 1 (days) while others assert 86400 (seconds). Can we standardize? If seconds are intended here, could we reference a constant instead of a magic number, wdyt?

-                "lookback_window": 86400,
+                "lookback_window": LOOKBACK_WINDOW_SECONDS,

Outside this hunk, add once near the existing LOOKBACK_WINDOW_DAYS definition:

# Add near LOOKBACK_WINDOW_DAYS
LOOKBACK_WINDOW_SECONDS = LOOKBACK_WINDOW_DAYS * 86400

2136-2140: Double-check parent lookback_window semantics when there are no slices

Expected parent_state uses START_DATE and sets lookback_window to 0 while the top-level stays at 1 elsewhere. Is 0 here intentional to indicate “no lookback applied for parent” in the no-slices path, and should we document this with a short inline comment for future readers, wdyt?


2336-2336: Consistency of lookback_window=0

This expected state uses lookback_window: 0. Given other tests assert 1 or 86400, can we confirm this is intentional for the “no records” path and, if so, consider centralizing the meaning of 0 vs 1/86400 to avoid confusion, wdyt?


2601-2617: Clarify mixed lookback behavior and START_DATE fallback in error case

The comment explains the error boundary, but the expected state mixes lookback_window (global 86400 vs parent 0) and applies START_DATE for a partition. Would it help to add a brief inline note explaining why the parent lookback is 0 while global stays 86400, and why START_DATE is used for the partially processed parent, wdyt?


3932-3932: Avoid magic number for lookback_window assertion

For readability and to align with other places, can we assert against a constant instead of 86400, wdyt?

-    assert final_state["lookback_window"] == 86400
+    assert final_state["lookback_window"] == LOOKBACK_WINDOW_SECONDS

(Add LOOKBACK_WINDOW_SECONDS as suggested in Lines 1742.)

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (6)

926-942: Reduce test brittleness by avoiding private internals (_cursor_factory, _partition_router)

These assertions reach into private fields (stream.cursor._cursor_factory.create(...) and stream.cursor._partition_router). Could we introduce a tiny helper or expose a public accessor on DefaultStream to obtain the constructed ConcurrentCursor/partition router so the test doesn’t couple to internals, wdyt? As a stopgap, a focused test helper keeps the private access in one place:

# test helper (can live near get_retriever/get_schema_loader)
def _get_concurrent_cursor_for(stream, stream_state: dict | None = None, lookback: timedelta = timedelta(0)):
    # Centralize the private access so future refactors touch one place.
    return stream.cursor._cursor_factory.create(stream_state or {}, lookback)

Then replace inline uses with _get_concurrent_cursor_for(stream).


948-999: Strengthen transformation assertions for custom retriever

You currently only assert “there are transformations”. Shall we validate the shape too (like other tests do) to catch wiring regressions, wdyt?

-    assert isinstance(stream, DefaultStream)
-    assert get_retriever(stream).record_selector.transformations
+    assert isinstance(stream, DefaultStream)
+    transformations = get_retriever(stream).record_selector.transformations
+    assert len(transformations) == 1
+    add_fields = transformations[0]
+    assert isinstance(add_fields, AddFields)
+    assert add_fields.fields[0].path == ["extra"]
+    assert add_fields.fields[0].value.string == "{{ response.to_add }}"

983-983: YAML indentation nit

For consistency with the rest of the file, would you mind aligning the nested $parameters entry to 4 spaces?

-   name: a_stream
+    name: a_stream

1255-1255: Use identity checks for booleans

Minor style nit: prefer “is True” to avoid accidental truthiness comparisons. Update both occurrences, wdyt?

-    assert get_retriever(stream).record_selector.transform_before_filtering == True
+    assert get_retriever(stream).record_selector.transform_before_filtering is True
-    assert retriever.record_selector.transform_before_filtering == True
+    assert retriever.record_selector.transform_before_filtering is True

Also applies to: 1339-1339


2619-2619: Optional: assert Cartesian product slicer composition

You already check the wrapper type and arity. Do we also want to assert that each underlying slicer is a ListPartitionRouter to fully validate composition, wdyt?

assert all(isinstance(s, ListPartitionRouter) for s in stream.cursor._partition_router.stream_slicers)

Also applies to: 2621-2623


166-166: StateFilteringMessageRepository: consider behavior-focused assertion

Import and type assertion look good. Would you also want a behavior test that parent streams don’t emit STATE messages (e.g., emit a STATE via the parent and assert the repository filters it), so we verify semantics rather than internals, wdyt?

Also applies to: 2118-2122

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)

2065-2069: Type hint doesn’t match assigned types for stream_slicer

This can be a PartitionRouter or a ConcurrentCursor. Shall we drop the explicit annotation (or use a precise Union) to keep mypy quiet, wdyt?

-        stream_slicer: ConcurrentStreamSlicer = (
+        stream_slicer = (
             partition_router
             if isinstance(concurrent_cursor, FinalStateCursor)
             else concurrent_cursor
         )

2117-2138: Prefer None over empty string for cursor_field

DefaultStream accepts Optional[str]; passing "" muddies intent. Shall we pass None when no cursor exists, wdyt?

-            cursor_field=concurrent_cursor.cursor_field.cursor_field_key
-            if hasattr(concurrent_cursor, "cursor_field")
-            else "",  # FIXME we should have the cursor field has part of the interface of cursor,
+            cursor_field=(
+                getattr(getattr(concurrent_cursor, "cursor_field", None), "cursor_field_key", None)
+            ),

3874-3903: Avoid recomputing the router

create_http_components_resolver builds the partition router twice. Shall we compute once and reuse, wdyt?

-        retriever = self._create_component_from_model(
+        router = self._build_stream_slicer_from_partition_router(model.retriever, config)
+        retriever = self._create_component_from_model(
             model=model.retriever,
             config=config,
             name=f"{stream_name if stream_name else '__http_components_resolver'}",
             primary_key=None,
-            stream_slicer=self._build_stream_slicer_from_partition_router(model.retriever, config),
+            stream_slicer=router,
             transformations=[],
         )
@@
-        return HttpComponentsResolver(
-            retriever=retriever,
-            stream_slicer=self._build_stream_slicer_from_partition_router(model.retriever, config),
+        return HttpComponentsResolver(
+            retriever=retriever,
+            stream_slicer=router,
             config=config,
             components_mapping=components_mapping,
             parameters=model.parameters or {},
         )
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 38cd657 and 8bef9dd.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (42 hunks)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (10 hunks)
  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (15 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.

Applied to files:

  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
📚 Learning: 2025-01-13T23:39:15.457Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.

Applied to files:

  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
🧬 Code graph analysis (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (10)
airbyte_cdk/sources/declarative/extractors/record_extractor.py (1)
  • RecordExtractor (12-27)
airbyte_cdk/sources/declarative/requesters/request_options/per_partition_request_option_provider.py (1)
  • PerPartitionRequestOptionsProvider (8-95)
airbyte_cdk/sources/declarative/requesters/requester.py (3)
  • Requester (29-156)
  • get_url (38-47)
  • get_url_base (50-59)
airbyte_cdk/sources/message/repository.py (2)
  • StateFilteringMessageRepository (98-115)
  • LogAppenderMessageRepositoryDecorator (118-157)
airbyte_cdk/sources/declarative/partition_routers/single_partition_router.py (2)
  • SinglePartitionRouter (13-57)
  • get_stream_state (53-57)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (5)
  • create (45-50)
  • cursor_field (141-142)
  • ConcurrentPerPartitionCursor (53-593)
  • get_parent_state (554-577)
  • get_global_state (580-593)
airbyte_cdk/sources/streams/concurrent/cursor.py (3)
  • cursor_field (191-192)
  • FinalStateCursor (90-132)
  • Cursor (51-87)
airbyte_cdk/sources/streams/concurrent/default_stream.py (6)
  • cursor_field (52-53)
  • name (44-45)
  • DefaultStream (17-123)
  • get_json_schema (55-56)
  • cursor (92-93)
  • namespace (48-49)
airbyte_cdk/sources/declarative/requesters/request_options/datetime_based_request_options_provider.py (1)
  • DatetimeBasedRequestOptionsProvider (20-92)
airbyte_cdk/sources/declarative/requesters/request_options/default_request_options_provider.py (1)
  • DefaultRequestOptionsProvider (15-60)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (6)
airbyte_cdk/sources/message/repository.py (1)
  • StateFilteringMessageRepository (98-115)
airbyte_cdk/utils/datetime_helpers.py (1)
  • ab_datetime_parse (361-442)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (2)
  • stream_name (116-117)
  • create (55-63)
airbyte_cdk/sources/streams/concurrent/adapters.py (3)
  • stream_name (311-312)
  • cursor (196-197)
  • cursor_field (189-193)
unit_tests/connector_builder/test_connector_builder_handler.py (2)
  • get_retriever (443-448)
  • cursor_field (831-832)
airbyte_cdk/sources/streams/concurrent/cursor.py (2)
  • ConcurrentCursor (135-518)
  • cursor_field (191-192)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Check: source-shopify
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
🔇 Additional comments (12)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)

453-456: Config refactor LGTM

Using START_DATE and explicit credentials in CONFIG looks good and keeps tests consistent.

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (6)

202-203: Good call: parse config times once

Caching parsed CONFIG_START_TIME/CONFIG_END_TIME avoids repeated parsing and keeps assertions tidy. LGTM.


721-721: Propagate stream_name into SubstreamPartitionRouter

Passing stream_name and asserting DefaultStream parents aligns with the new DefaultStream-only instantiation. Looks correct, wdyt?

Also applies to: 727-729


1071-1074: Async retriever + concurrent cursor wiring looks right

Nice coverage validating DefaultStream + AsyncRetriever path. LGTM.


2536-2538: Updated expectation: ConcurrentPerPartitionCursor + DefaultStream

These parametrized expectations match the new architecture. LGTM.


2185-2189: CustomPartitionRouter receives stream_name

Passing stream_name for custom routers ensures correct child naming. LGTM.


3936-3936: GroupingPartitionRouter: stream_name propagation and validation

Stream name propagation and the added invalid-configuration tests look solid. LGTM.

Also applies to: 3947-3947, 3995-3995, 4047-4047

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5)

2005-2019: LGTM: correct start/end wiring and per-partition wrapper

Using partition_field_start/partition_field_end from the cursor and wrapping with PerPartitionRequestOptionsProvider for ConcurrentPerPartitionCursor looks right.


3166-3175: LGTM: correct base URL fallback

Switching to get_url_base() for _url_base fallback fixes the earlier bug.


3253-3259: Confirm PartitionRouter implements RequestOptionsProvider everywhere

When request_options_provider is Default, you swap in partition_router. Can we confirm all routers used here implement the RequestOptionsProvider surface so headers/params/body methods exist, wdyt?


2220-2221: Confirm we still need attempt_to_create_cursor_if_not_provided=True

Given retriever.cursor is gone and we have a single cursor per stream, do we still need this escape hatch? If tests allow, can we remove it in a follow-up, wdyt?


101-101: LGTM: imports added for RecordExtractor and StateFilteringMessageRepository

These unblock typing/runtime and substream log/state filtering behavior.

Also applies to: 595-598

@maxi297 maxi297 dismissed coderabbitai[bot]’s stale review September 3, 2025 17:46

I think we've done enough tests to ensure this works good enough

@maxi297 maxi297 merged commit 5378cef into main Sep 3, 2025
29 checks passed
@maxi297 maxi297 deleted the maxi297/remove_declarative_stream branch September 3, 2025 17:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants