-
Notifications
You must be signed in to change notification settings - Fork 30
fix(extra_fields): Fix issue with substream partition router picking extra fields #727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@aldogonzalez8/fix-extra-fields#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch aldogonzalez8/fix-extra-fields Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
📝 WalkthroughWalkthroughAdds Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Runner
participant CPS as CartesianProductStreamSlicer
participant S1 as Sub-slicer A
participant S2 as Sub-slicer B
Runner->>CPS: stream_slices(config, stream_state)
CPS->>S1: stream_slices(...)
S1-->>CPS: slices A with extra_fields A_i
CPS->>S2: stream_slices(...)
S2-->>CPS: slices B with extra_fields B_j
rect rgb(245,248,255)
note over CPS: For each cartesian pair (A_i, B_j)
CPS->>CPS: Merge extra_fields via dict(ChainMap(A_i, B_j))
CPS-->>Runner: StreamSlice(partition=..., cursor_slice=..., extra_fields=merged)
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Would you like a negative test that ensures Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
airbyte_cdk/sources/declarative/partition_routers/cartesian_product_stream_slicer.py (1)
152-163
: Fix mypy errors by annotating and merging extra_fields without ChainMapmypy is flagging:
- Need type annotation for extra_fields (Line 152).
- ChainMap arg-type mismatch for list[Mapping[str, Any]].
Since StreamSlice.extra_fields returns a Mapping and we only need a merged dict with left-most precedence (same as ChainMap), we can avoid ChainMap here and keep type safety. Also annotate the variable to satisfy mypy. WDYT?
Apply this diff:
@@ - extra_fields = dict(ChainMap(*[s.extra_fields for s in stream_slice_tuple])) + # Merge extra_fields with left-most precedence (same as ChainMap), + # but using typed updates to satisfy mypy. + extra_fields_list: List[Mapping[str, Any]] = [s.extra_fields for s in stream_slice_tuple] + extra_fields: Dict[str, Any] = {} + for m in reversed(extra_fields_list): + extra_fields.update(m) @@ - yield StreamSlice( - partition=partition, cursor_slice=cursor_slice, extra_fields=extra_fields - ) + yield StreamSlice( + partition=partition, + cursor_slice=cursor_slice, + extra_fields=extra_fields, + )And add Dict to imports:
-from typing import Any, Iterable, List, Mapping, Optional +from typing import Any, Iterable, List, Mapping, Optional, Dictunit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (1)
17-21
: Fix broken import path to unblock pytest collectionCI fails with ModuleNotFoundError for airbyte_cdk.sources.declarative.incremental.per_partition_cursor. StreamSlice is provided by airbyte_cdk.sources.types, and PerPartitionCursor/CursorFactory are re-exported from airbyte_cdk.sources.declarative.incremental. Shall we switch to those stable imports? WDYT?
Apply this diff:
-from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import ( - CursorFactory, - PerPartitionCursor, - StreamSlice, -) +from airbyte_cdk.sources.declarative.incremental import CursorFactory, PerPartitionCursor +from airbyte_cdk.sources.types import StreamSlice
🧹 Nitpick comments (4)
unit_tests/sources/declarative/partition_routers/test_cartesian_product_partition_router.py (3)
180-239
: Great coverage for extra_fields; consider adding a conflict-precedence caseYou assert partition equality and then separately verify extra_fields content—nice separation. Would you consider adding a second scenario where two sub-slicers emit the same extra_field key with different values to lock down “first-slicer-wins” precedence (which mirrors ChainMap semantics and our merge logic), e.g., two parents both emitting "meta.id"? WDYT?
I can draft a parametrized test variant showing conflicting keys resolved by left-most slicer.
5-5
: Nit: simplify importMinor:
import pytest as pytest
can beimport pytest
. Keeps things tidy. WDYT?Apply this diff:
-import pytest as pytest +import pytest
230-239
: Minor assertion structure suggestionYou already verify partitions and extra_fields arrays separately, which is clear. Would you also want to assert that the cursor_slice for these cases is empty to prevent regressions where a parent inadvertently injects a cursor? WDYT?
If yes, add:
cursor_slices = [s.cursor_slice for s in slices] assert cursor_slices == [{}] * len(expected_slices)unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (1)
88-91
: Remove noisy prints in testsThe debug prints can clutter CI logs and make failures harder to scan. Since pytest captures logs nicely, could we drop these two prints? WDYT?
Apply this diff:
- print(f"about to emit {results}") yield from results - print(f"setting state to {stream_slice}") self._state = stream_slice
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
airbyte_cdk/sources/declarative/partition_routers/cartesian_product_stream_slicer.py
(2 hunks)unit_tests/sources/declarative/partition_routers/helpers.py
(1 hunks)unit_tests/sources/declarative/partition_routers/test_cartesian_product_partition_router.py
(2 hunks)unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
unit_tests/sources/declarative/partition_routers/helpers.py (2)
airbyte_cdk/sources/types.py (6)
StreamSlice
(75-169)Record
(21-72)partition
(99-104)cursor_slice
(107-112)data
(35-36)associated_slice
(39-40)airbyte_cdk/sources/declarative/interpolation/interpolated_string.py (1)
InterpolatedString
(13-79)
airbyte_cdk/sources/declarative/partition_routers/cartesian_product_stream_slicer.py (1)
airbyte_cdk/sources/types.py (4)
extra_fields
(115-117)StreamSlice
(75-169)partition
(99-104)cursor_slice
(107-112)
unit_tests/sources/declarative/partition_routers/test_cartesian_product_partition_router.py (5)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)
ParentStreamConfig
(2716-2753)SubstreamPartitionRouter
(2969-2976)RequestOption
(1316-1335)airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (3)
ParentStreamConfig
(31-76)SubstreamPartitionRouter
(80-422)stream_slices
(150-231)airbyte_cdk/sources/types.py (4)
StreamSlice
(75-169)extra_fields
(115-117)partition
(99-104)cursor_slice
(107-112)unit_tests/sources/declarative/partition_routers/helpers.py (2)
MockStream
(17-96)stream_slices
(54-65)airbyte_cdk/sources/declarative/partition_routers/cartesian_product_stream_slicer.py (1)
stream_slices
(146-163)
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (1)
unit_tests/sources/declarative/partition_routers/helpers.py (1)
MockStream
(17-96)
🪛 GitHub Actions: Pytest (Fast)
unit_tests/sources/declarative/partition_routers/helpers.py
[error] 9-9: ModuleNotFoundError: No module named 'airbyte_cdk.sources.declarative.incremental.per_partition_cursor' while importing test helpers (import path in helpers.py:9).
unit_tests/sources/declarative/partition_routers/test_cartesian_product_partition_router.py
[error] 24-24: ImportError during test collection: ModuleNotFoundError: No module named 'airbyte_cdk.sources.declarative.incremental.per_partition_cursor' (import caused by helpers.py:9).
🪛 GitHub Actions: PyTest Matrix
unit_tests/sources/declarative/partition_routers/helpers.py
[error] 9-9: ModuleNotFoundError: No module named 'airbyte_cdk.sources.declarative.incremental.per_partition_cursor'. This import caused PyTest collection to fail during the run 'poetry run coverage run -m pytest --durations=10 -m "not linting and not super_slow and not flaky"'.
unit_tests/sources/declarative/partition_routers/test_cartesian_product_partition_router.py
[error] 24-24: PyTest collection error: ModuleNotFoundError: No module named 'airbyte_cdk.sources.declarative.incremental.per_partition_cursor' (imported via helpers.py:9).
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py
[error] 64-64: PyTest collection error: ModuleNotFoundError: No module named 'airbyte_cdk.sources.declarative.incremental.per_partition_cursor' (imported via helpers.py:9).
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/partition_routers/cartesian_product_stream_slicer.py
[error] 152-152: mypy error in cartesian_product_stream_slicer.py:152: Need type annotation for 'extra_fields' (hint: "extra_fields: dict[, ] = ...") [var-annotated] (Command: poetry run mypy --config-file mypy.ini airbyte_cdk)
[error] 152-152: mypy error in cartesian_product_stream_slicer.py:152: Argument 1 to ChainMap has incompatible type *list[Mapping[str, Any]]; expected MutableMapping[Never, Never] [arg-type] (Command: poetry run mypy --config-file mypy.ini airbyte_cdk)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Check: source-shopify
🔇 Additional comments (10)
unit_tests/sources/declarative/partition_routers/test_cartesian_product_partition_router.py (2)
14-17
: Looks good: exercising extra_fields via SubstreamPartitionRouter is valuableThe added imports for ParentStreamConfig/SubstreamPartitionRouter align with the new test case validating extra_fields propagation. No changes requested. WDYT?
242-276
: Multiple-cursor-slice guardrail retainedNice to see the explicit test safeguarding against multiple cursor components in the Cartesian product. All good.
unit_tests/sources/declarative/partition_routers/helpers.py (1)
16-96
: Helper implementation LGTMThe helper’s stream_slices/read_records behavior mirrors what the tests need, including state updates after a full slice. Nice encapsulation.
airbyte_cdk/sources/declarative/partition_routers/cartesian_product_stream_slicer.py (3)
60-85
: Request-options merging remains consistentUsing ChainMap to merge request params maintains first-slicer-wins semantics, consistent with docs. No action needed.
19-37
: Warning propagation for nested SubstreamPartitionRouterThe recursive check and warning message match the test expectation and keep behavior explicit. LGTM.
146-160
: Cursor-slice selection logic remains correctCollecting non-empty cursor slices and enforcing single component preserves previous invariant. Good safeguard.
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (4)
139-176
: State migration test reads cleanThe parent state migration coverage across initial-state variants is thorough. Nice job preventing regressions.
593-706
: Interpolation test coverage is solidValidates both parameters and config interpolation for request params. Nothing to change here.
875-966
: Resumable full-refresh parent scenarios are well coveredThe paging checkpoints and optional incremental dependency behavior are exercised end-to-end. Looks good.
980-1129
: End-to-end substream cursor test is comprehensiveGreat use of PerPartitionCursor/ChildPartitionResumableFullRefreshCursor to validate both substream and parent state. No changes requested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (2)
17-21
: Fix import path causing test collection to fail (per_partition_cursor no longer importable).CI is failing during pytest collection with ImportError for
airbyte_cdk.sources.declarative.incremental.per_partition_cursor
. These symbols appear to be re-exported at theincremental
package root, andStreamSlice
moved underairbyte_cdk.sources.types
. Shall we import from the stable locations to unblock CI, wdyt?Apply this diff to fix the failing import:
-from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import ( - CursorFactory, - PerPartitionCursor, - StreamSlice, -) +from airbyte_cdk.sources.declarative.incremental import CursorFactory, PerPartitionCursorAdditionally, update the types import to source
StreamSlice
fromairbyte_cdk.sources.types
(see separate diff for Line 36).
83-91
: Access Record.data instead of dict-style indexing; remove noisy prints.
MockIncrementalStream
currently indexes records like a dict (record["updated_at"]
), but in tests you passRecord
objects. This will raise at runtime unlessRecord
implements__getitem__
. Also,Record
safely and drop the prints, wdyt?Proposed fix:
- results = [ - record - for record in self._records - if stream_slice["start_time"] <= record["updated_at"] <= stream_slice["end_time"] - ] - print(f"about to emit {results}") - yield from results - print(f"setting state to {stream_slice}") - self._state = stream_slice + def _updated_at(rec: Any) -> Any: + # Support both raw dicts and Record(data=..., associated_slice=..., stream_name=...) + try: + # Record object path + return rec.data["updated_at"] + except AttributeError: + # Dict-like path + return rec["updated_at"] + + results = [ + record + for record in self._records + if stream_slice["start_time"] <= _updated_at(record) <= stream_slice["end_time"] + ] + yield from results + # reflect checkpoint on the current parent slice window + self._state = stream_slice
🧹 Nitpick comments (6)
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (6)
117-129
: Tighten pagination guardrails in MockResumableFullRefreshStream.read_records.If
next_page_token
points past the availablerecord_pages
, this will raise IndexError. Do you want to make the mock more defensive to simplify future tests, wdyt?Example tweak:
- page_number = self.state.get("next_page_token") or 1 - yield from self._record_pages[page_number - 1] + page_number = self.state.get("next_page_token") or 1 + if 1 <= page_number <= len(self._record_pages): + yield from self._record_pages[page_number - 1] + else: + # Nothing to emit, finish + page_number = len(self._record_pages)
298-303
: Normalize slices to dicts if SubstreamPartitionRouter now returns StreamSlice objects.In newer CDK bits, routers commonly yield
StreamSlice
. Given you later accesss.extra_fields
, returningStreamSlice
here seems likely. Shall we normalize tos.partition
when comparing to dictexpected_slices
to keep this test resilient, wdyt?Proposed adjustment:
- slices = [s for s in partition_router.stream_slices()] + # Support both dict slices and StreamSlice(partition=..., extra_fields=...) + slices = [ + (s.partition if isinstance(s, StreamSlice) else s) + for s in partition_router.stream_slices() + ]If
SubstreamPartitionRouter.stream_slices()
is intentionally still returning dicts, feel free to skip this change and we can pivot theextra_fields
test to build expected dicts fromStreamSlice.extra_fields
separately.
388-394
: Nit: fix lookback_window typo in test data.
"looback_window"
looks like a typo and might trip future readers or parameter parsing. Should we correct it to"lookback_window"
, wdyt?Suggested change:
- { - "looback_window": 1, + { + "lookback_window": 1, "use_global_cursor": True, "state": {"updated": "2023-05-27T00:00:00Z"}, },
68-75
: Remove unused date_ranges param or use it in filtering.
date_ranges
is accepted and stored but never used. For clarity in tests, should we drop it (YAGNI) or incorporate it into the filtering logic, wdyt?Possible minimal cleanup:
-class MockIncrementalStream(MockStream): - def __init__(self, slices, records, name, cursor_field="", cursor=None, date_ranges=None): - super().__init__(slices, records, name, cursor_field, cursor) - if date_ranges is None: - date_ranges = [] - self._date_ranges = date_ranges - self._state = {} +class MockIncrementalStream(MockStream): + def __init__(self, slices, records, name, cursor_field="", cursor=None): + super().__init__(slices, records, name, cursor_field, cursor) + self._state = {}
140-170
: Potential mismatch between returned slice type and expectations across tests.Some tests assert directly against dicts, while the new extra_fields test accesses
s.extra_fields
, implyingStreamSlice
return type. Can we confirm intended return type forSubstreamPartitionRouter.stream_slices()
and align all tests consistently (either assert ons.partition
or switch expectations toStreamSlice
), wdyt?If we standardize on
StreamSlice
, similar normalization to the suggestion on Line 301 would be needed in these parametrized cases too.Also applies to: 172-209, 211-238, 240-276
1289-1300
: Targeted logging assertion looks good; consider stronger scoping of the logger name.The warning capture uses
logger="airbyte"
. If implementation ever logs under a more specific name (e.g.,airbyte.cdk.partitioning
), this test could miss it. Would you like to either (a) assert onrecord.message
regardless of logger, or (b) parametrize accepted logger names, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
unit_tests/sources/declarative/partition_routers/test_cartesian_product_partition_router.py
(2 hunks)unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- unit_tests/sources/declarative/partition_routers/test_cartesian_product_partition_router.py
🧰 Additional context used
🧬 Code graph analysis (1)
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (1)
unit_tests/sources/declarative/partition_routers/helpers.py (1)
MockStream
(17-96)
🪛 GitHub Actions: PyTest Matrix
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py
[error] 37-37: Pytest collection failed due to ImportError: No module named 'airbyte_cdk.sources.declarative.incremental.per_partition_cursor' while importing test_substream_partition_router.py (line 37). Command: poetry run coverage run -m pytest --durations=10 -m "not linting and not super_slow and not flaky".
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-shopify
- GitHub Check: SDM Docker Image Build
🔇 Additional comments (4)
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (4)
140-170
: LGTM on switching to shared MockStream helper.Importing
MockStream
from the shared helper reduces duplication and keeps behavior consistent across tests. Nice cleanup!
706-735
: Nice coverage for AirbyteMessage and Record inputs.The dual tests validating partitions from both
AirbyteMessage
andRecord
inputs are helpful to prevent regressions in parent record handling. Thanks for including both paths!Also applies to: 737-757
875-966
: Solid end-to-end coverage for resumable full-refresh parent flows.The tests exercise pagination-driven parent state updates, with and without
incremental_dependency
, and validate both router-level and substream cursor-level state. This should catch regressions in checkpointing semantics. Nicely done!Also applies to: 980-1129
1131-1194
: Great addition validating extra_fields propagation.Asserting that nested dpath extra fields are extracted and surfaced on
StreamSlice.extra_fields
is exactly the coverage needed for the PR’s objective. Thanks for including both nested and flat field cases!
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (7)
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (7)
37-37
: Good move centralizing MockStream into a shared helperPulling MockStream from unit_tests/.../helpers reduces duplication and keeps test behavior consistent. Do you want help migrating any remaining tests that still define a local MockStream, wdyt?
75-91
: Stabilize MockIncrementalStream: remove prints and assert incremental modeThe print statements add noise to test logs, and an assertion on sync_mode would catch misuses early. Shall we fold these in, wdyt?
def read_records( self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, ) -> Iterable[Mapping[str, Any]]: - results = [ + assert sync_mode == SyncMode.incremental, "MockIncrementalStream should be read in incremental mode in tests" + results = [ record for record in self._records if stream_slice["start_time"] <= record["updated_at"] <= stream_slice["end_time"] ] - print(f"about to emit {results}") yield from results - print(f"setting state to {stream_slice}") self._state = stream_slice
742-742
: Use keyword args when constructing Record to avoid positional mismatchRecord’s constructor expects stream_name as the 2nd positional argument. Passing {} here sets stream_name to a dict and leaves associated_slice unset. Can we switch to keywords to make intent explicit and future-proof, wdyt?
- [parent_slice], [Record({"id": "record value"}, {})], "first_stream" + [parent_slice], + [Record(data={"id": "record value"}, stream_name="first_stream")], + "first_stream"
781-785
: Prefer explicit keywords for Record in incremental testsThese currently set stream_name to a StreamSlice. Using keywords clarifies intent and binds associated_slice correctly. Shall we update like below, wdyt?
- Record({"id": "may_record_0", "updated_at": "2024-05-15"}, mock_slices[0]), - Record({"id": "may_record_1", "updated_at": "2024-05-16"}, mock_slices[0]), - Record({"id": "jun_record_0", "updated_at": "2024-06-15"}, mock_slices[1]), - Record({"id": "jun_record_1", "updated_at": "2024-06-16"}, mock_slices[1]), + Record( + data={"id": "may_record_0", "updated_at": "2024-05-15"}, + stream_name="first_stream", + associated_slice=mock_slices[0], + ), + Record( + data={"id": "may_record_1", "updated_at": "2024-05-16"}, + stream_name="first_stream", + associated_slice=mock_slices[0], + ), + Record( + data={"id": "jun_record_0", "updated_at": "2024-06-15"}, + stream_name="first_stream", + associated_slice=mock_slices[1], + ), + Record( + data={"id": "jun_record_1", "updated_at": "2024-06-16"}, + stream_name="first_stream", + associated_slice=mock_slices[1], + ),
838-842
: Same keyword-arg cleanup for Record usages hereMirroring the change above will keep Record construction consistent and correct. Ok to adjust, wdyt?
- Record({"id": "may_record_0", "updated_at": "2024-05-15"}, mock_slices[0]), - Record({"id": "may_record_1", "updated_at": "2024-05-16"}, mock_slices[0]), - Record({"id": "jun_record_0", "updated_at": "2024-06-15"}, mock_slices[1]), - Record({"id": "jun_record_1", "updated_at": "2024-06-16"}, mock_slices[1]), + Record( + data={"id": "may_record_0", "updated_at": "2024-05-15"}, + stream_name="first_stream", + associated_slice=mock_slices[0], + ), + Record( + data={"id": "may_record_1", "updated_at": "2024-05-16"}, + stream_name="first_stream", + associated_slice=mock_slices[0], + ), + Record( + data={"id": "jun_record_0", "updated_at": "2024-06-15"}, + stream_name="first_stream", + associated_slice=mock_slices[1], + ), + Record( + data={"id": "jun_record_1", "updated_at": "2024-06-16"}, + stream_name="first_stream", + associated_slice=mock_slices[1], + ),
129-137
: State property behavior differs between getter and setter in MockResumableFullRefreshStreamGetter proxies to the cursor while setter writes to a private _state that the getter never returns. Would you like to add a brief docstring or align the semantics (e.g., always proxy or always use _state if present) to avoid confusion for future test authors, wdyt?
1187-1193
: Strengthen the extra_fields assertions to verify separation from partitionNice coverage validating extra_fields. To guard against regressions where extra_fields leak into the slice’s mapping, shall we assert they are not present in the StreamSlice keys as well, wdyt?
- slices = [s.extra_fields for s in partition_router.stream_slices()] - assert slices == expected_slices + slices = list(partition_router.stream_slices()) + assert [s.extra_fields for s in slices] == expected_slices + # Ensure extra_fields are not merged into the StreamSlice mapping itself + for s in slices: + assert set(s.extra_fields.keys()).isdisjoint(s.keys())
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/partition_routers/cartesian_product_stream_slicer.py
(2 hunks)unit_tests/sources/declarative/partition_routers/helpers.py
(1 hunks)unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- airbyte_cdk/sources/declarative/partition_routers/cartesian_product_stream_slicer.py
- unit_tests/sources/declarative/partition_routers/helpers.py
🧰 Additional context used
🧬 Code graph analysis (1)
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (2)
airbyte_cdk/sources/types.py (2)
Record
(21-72)StreamSlice
(75-169)unit_tests/sources/declarative/partition_routers/helpers.py (1)
MockStream
(14-93)
🔇 Additional comments (1)
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (1)
35-35
: Importing StreamSlice from the types module looks greatThis aligns the tests with the new home of StreamSlice and resolves the prior feedback. Nice catch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (1)
75-91
: Fix: treat records asRecord
objects and supportStreamSlice
input.Currently indexes
record["updated_at"]
andstream_slice["start_time"]
, which breaks when records areRecord
instances and whenstream_slice
is aStreamSlice
. Suggest normalizing both. Also remove noisy prints. Wdyt?def read_records( self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, ) -> Iterable[Mapping[str, Any]]: - results = [ - record - for record in self._records - if stream_slice["start_time"] <= record["updated_at"] <= stream_slice["end_time"] - ] - print(f"about to emit {results}") - yield from results - print(f"setting state to {stream_slice}") - self._state = stream_slice + # Normalize slice (accept either Mapping or StreamSlice) + cursor = ( + stream_slice.cursor_slice + if isinstance(stream_slice, StreamSlice) + else (stream_slice or {}) + ) + start, end = cursor.get("start_time"), cursor.get("end_time") + results: List[Any] = [] + for r in self._records: + data = r.data if isinstance(r, Record) else r + if start is not None and end is not None and start <= data["updated_at"] <= end: + results.append(r) + yield from results + # Expose parent cursor state for assertions + self._state = cursor
🧹 Nitpick comments (5)
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (5)
9-9
: Tidy import: drop aliasing.Can we switch to a plain import to avoid the redundant alias, wdyt?
-import pytest as pytest +import pytest
67-74
: Remove unused date_ranges plumbing.
date_ranges
and_date_ranges
are unused; trimming them will reduce noise in this helper. Would you like to drop these lines, wdyt?class MockIncrementalStream(MockStream): def __init__(self, slices, records, name, cursor_field="", cursor=None, date_ranges=None): super().__init__(slices, records, name, cursor_field, cursor) - if date_ranges is None: - date_ranges = [] - self._date_ranges = date_ranges self._state = {}
129-137
: Getter/setter inconsistency forstate
.Getter proxies to the cursor while setter only updates
_state
. Should we either: (a) forward the setter to the cursor (if API supports it) or (b) drop the setter to avoid confusion, wdyt?
1285-1299
: Scope log capture to the specific SubstreamPartitionRouter loggerI verified that:
CartesianProductStreamSlicer
useslogging.getLogger("airbyte.CartesianProductStreamSlicer")
,SubstreamPartitionRouter
emits its warning vialogging.getLogger("airbyte.SubstreamPartitionRouter")
.Since the warning
"Parent state handling is not supported for CartesianProductStreamSlicer."
is logged by the SubstreamPartitionRouter, would you consider tightening thecaplog
context to that logger? For example:with caplog.at_level(logging.WARNING, logger="airbyte.SubstreamPartitionRouter"): CartesianProductStreamSlicer(stream_slicers=stream_slicers, parameters={})This makes the test explicitly target the logger that emits the warning and guards against missing it if parent‐logger levels change. wdyt?
288-302
: Ensurestream_slices
always yieldsStreamSlice
– The implementation in
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py
unconditionally doesyield StreamSlice( partition={…}, cursor_slice={}, extra_fields=… )at line 224, so callers always get a
StreamSlice
instance.
– TheStreamSlice
class inairbyte_cdk/sources/types.py
defines an__eq__
that treats dicts as equal to its internal mapping:def __eq__(self, other: Any) -> bool: if isinstance(other, dict): return self._stream_slice == other …(lines 149–157), which is why tests comparing slices directly to
[{…}]
pass.
– Other tests (e.g. line 1191 intest_substream_partition_router.py
) do accesss.extra_fields
, confirming they’re working with theStreamSlice
API.To keep things consistent and more self-documenting, would you be up for updating the assertions so that they always treat slices as
StreamSlice
objects? For example:assert [s.partition for s in slices] == expected_partitions # or, if you want to include extras assert [{"partition": s.partition, **s.extra_fields} for s in slices] == expected_slicesWdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (1)
unit_tests/sources/declarative/partition_routers/helpers.py (1)
MockStream
(14-93)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: destination-motherduck
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (Fast)
- GitHub Check: Ruff Lint Check
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Ruff Format Check
🔇 Additional comments (2)
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (2)
1134-1163
: Nice coverage for nestedextra_fields
.This cleanly validates path extraction and dot-join semantics. LGTM!
874-965
: Good end-to-end coverage for resumable full refresh.Page-wise state advancement and optional incremental dependency verification look solid. 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change makes sense to me. Just one comments on the tests
Fix a problem with extra_fields missing when requested from the parent.
Like in this connector builder project
Summary by CodeRabbit
New Features
Tests