-
Notifications
You must be signed in to change notification settings - Fork 31
feat(low-code cursors): Support for low-code cursors to define a field to allow for user defined cursor fields in the catalog #851
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ned cursor fields in the catalog
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@brian/allow_for_user_defined_cursor_fields#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch brian/allow_for_user_defined_cursor_fieldsHelpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
📝 WalkthroughWalkthroughThis PR introduces support for catalog-defined cursor fields in declarative sources. It adds an Changes
Sequence DiagramsequenceDiagram
autonumber
participant ModelFactory as ModelToComponentFactory
participant Catalog as Configured Catalog
participant CursorField
participant DefaultStream
ModelFactory->>ModelFactory: Parse cursor model<br/>(allow_catalog_defined_cursor_field flag)
alt allow_catalog_defined_cursor_field = true
ModelFactory->>Catalog: Query configured stream
Catalog-->>ModelFactory: Return cursor_field (if present)
ModelFactory->>CursorField: Create with cursor_field_key<br/>supports_catalog_defined_cursor_field=true
else allow_catalog_defined_cursor_field = false
ModelFactory->>CursorField: Create from InterpolatedString<br/>supports_catalog_defined_cursor_field=false
end
ModelFactory->>DefaultStream: Pass CursorField object
DefaultStream->>DefaultStream: Store CursorField instance
note over DefaultStream: as_airbyte_stream()
DefaultStream->>DefaultStream: Set source_defined_cursor<br/>based on supports_catalog_defined_cursor_field
DefaultStream-->>Catalog: Return AirbyteStream config
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes
Possibly related PRs
Suggested reviewers
Quick question: Have you considered adding a deprecation notice or migration guide for the Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Tip 📝 Customizable high-level summaries are now available in beta!You can now customize how CodeRabbit generates the high-level summary in your pull requests — including its content, structure, tone, and formatting.
Example instruction:
Note: This feature is currently in beta for Pro-tier users, and pricing will be announced later. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml(2 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py(3 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py(5 hunks)airbyte_cdk/sources/streams/concurrent/cursor.py(1 hunks)airbyte_cdk/sources/streams/concurrent/default_stream.py(4 hunks)
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2024-12-11T16:34:46.319Z
Learnt from: pnilan
Repo: airbytehq/airbyte-python-cdk PR: 0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.
Applied to files:
airbyte_cdk/sources/declarative/models/declarative_component_schema.pyairbyte_cdk/sources/declarative/declarative_component_schema.yaml
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Applied to files:
airbyte_cdk/sources/streams/concurrent/default_stream.py
📚 Learning: 2024-11-18T23:40:06.391Z
Learnt from: ChristoGrab
Repo: airbytehq/airbyte-python-cdk PR: 58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Applied to files:
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
📚 Learning: 2025-01-14T00:20:32.310Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.
Applied to files:
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
🧬 Code graph analysis (2)
airbyte_cdk/sources/streams/concurrent/default_stream.py (3)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (2)
cursor(83-86)cursor_field(57-61)airbyte_cdk/sources/streams/concurrent/adapters.py (2)
cursor(196-197)cursor_field(189-193)airbyte_cdk/sources/streams/concurrent/cursor.py (3)
Cursor(54-90)CursorField(40-51)cursor_field(213-214)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
airbyte_cdk/sources/streams/concurrent/cursor.py (2)
cursor_field(213-214)CursorField(40-51)airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
cursor_field(52-53)airbyte_cdk/sources/streams/concurrent/adapters.py (1)
cursor_field(189-193)airbyte_cdk/sources/file_based/stream/concurrent/adapters.py (1)
cursor_field(132-136)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Analyze (python)
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (Fast)
🔇 Additional comments (6)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)
853-856: Good addition — consistent schema for both cursor types.The two new
allow_catalog_defined_cursor_fieldboolean fields are well-placed and clearly documented. The fields follow the same structure in bothIncrementingCountCursorandDatetimeBasedCursor, which is good for consistency.One small question: should these fields have an explicit
default: falseto make the intended default behavior explicit in the schema? Or is leaving it undefined (and relying on the code to handle None/undefined) intentional? Just want to make sure the schema communicates intent clearly to consumers.Also applies to: 920-923
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
1589-1593: Auto-generated schema looks good.Based on learnings, this file is auto-generated from
declarative_component_schema.yaml. The newallow_catalog_defined_cursor_fieldfield has been correctly added to both cursor models with appropriate metadata. The schema structure is consistent with other optional boolean fields in the file.Also applies to: 1621-1625
airbyte_cdk/sources/streams/concurrent/cursor.py (1)
41-45: Implementation looks solid.The extension to
CursorFieldis clean and backward compatible with the default value ofFalse. The attribute naming clearly conveys the intent.Per the PR objectives, tests are marked as TODO. Would you like me to help generate test cases for this new functionality, or do you have them planned already? Specifically, we'd want to verify:
- Default behavior when the parameter is omitted
- Behavior when explicitly set to
True/False- Integration with the catalog-defined cursor field resolution logic
airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
70-76: Verify the source_defined_cursor logic, wdyt?The logic sets
source_defined_cursor = not self._cursor_field.supports_catalog_defined_cursor_field. This means:
- If the cursor supports catalog-defined fields → it's NOT source-defined (allows user override)
- If the cursor does not support catalog-defined fields → it IS source-defined (connector controls it)
This seems semantically correct, but could you confirm this is the intended behavior? The inversion might be confusing at first glance, so a brief inline comment explaining the relationship might help future maintainers.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
2113-2117: Verify CursorField with empty key doesn't cause downstream issuesThe change is confirmed to pass
CursorFieldobjects where the fallback creates an emptyCursorField(cursor_field_key="")for non-cursor streams. SinceDefaultStreamexpectsOptional[CursorField](not strings), the interface enforcement is intentional.The key risk remains: Does the rest of the pipeline handle empty
cursor_field_keygracefully without throwing errors when attempting to extract or validate cursor values? The FIXME comment suggests this is a known workaround, so we should verify:
Empty key handling: Check if any downstream cursor logic (
ConcurrentCursor, partition generators, etc.) will break whencursor_field_key=""is passed to non-cursor streams.All callers compatibility: Since
DefaultStreamnow strictly requiresCursorFieldobjects (not strings), confirm all instantiation sites are compatible—particularly any external or plugin code.
4333-4353: Review comment verification complete — recommendations are sound.The concerns you raised are substantiated:
Empty string edge case: Confirmed that line 4351 accesses
cursor_field[0]without empty string validation. Similar pattern exists inhelpers.py:40. While unlikely (no evidence of empty strings in practice), this could create a CursorField with an empty key, causingextract_value()to fail downstream.Type assumption: Verified —
cursor_fieldis indeedList[str](fromOptional[List[str]]in the Airbyte protocol).Test coverage gap: Confirmed — no dedicated tests found for
_get_catalog_defined_cursor_field, and existing test cases only use non-empty strings like "created", "updated_at", "a_key".Your recommendations are well-calibrated: validating non-empty strings and adding test coverage for edge cases would harden this code. The design decision on whether empty strings should be caught early (vs. relying on downstream validation) is worth the developer's deliberation.
maxi297
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Best PR I've seen in a while
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
unit_tests/sources/streams/concurrent/test_adapters.py (1)
319-332: Consider adding test coverage for CursorField construction.The commented assertion at line 331 suggests you're planning to verify the CursorField behavior. Since
facade.cursor_fieldreturns a string (not the CursorField object itself), you could verify the underlying CursorField construction by checking the internal state, wdyt?For example:
assert facade._abstract_stream._cursor_field.cursor_field_key == "cursor" assert facade._abstract_stream._cursor_field.supports_catalog_defined_cursor_field == FalseWould you like me to generate expanded test coverage for the CursorField wrapping behavior?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte_cdk/sources/file_based/stream/concurrent/adapters.py(2 hunks)airbyte_cdk/sources/streams/concurrent/adapters.py(2 hunks)unit_tests/sources/streams/concurrent/test_adapters.py(2 hunks)unit_tests/sources/streams/concurrent/test_default_stream.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
airbyte_cdk/sources/streams/concurrent/adapters.py (2)
airbyte_cdk/sources/streams/concurrent/default_stream.py (2)
cursor(94-95)cursor_field(52-53)airbyte_cdk/sources/streams/concurrent/cursor.py (3)
Cursor(54-90)CursorField(40-51)cursor_field(213-214)
unit_tests/sources/streams/concurrent/test_default_stream.py (2)
airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
cursor(94-95)airbyte_cdk/sources/streams/concurrent/cursor.py (3)
Cursor(54-90)CursorField(40-51)FinalStateCursor(93-135)
unit_tests/sources/streams/concurrent/test_adapters.py (1)
airbyte_cdk/sources/streams/concurrent/cursor.py (2)
Cursor(54-90)CursorField(40-51)
airbyte_cdk/sources/file_based/stream/concurrent/adapters.py (2)
airbyte_cdk/sources/streams/concurrent/default_stream.py (2)
cursor(94-95)cursor_field(52-53)airbyte_cdk/sources/streams/concurrent/cursor.py (2)
CursorField(40-51)cursor_field(213-214)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-google-drive
- GitHub Check: Check: source-intercom
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: SDM Docker Image Build
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Analyze (python)
- GitHub Check: Analyze (python)
🔇 Additional comments (7)
airbyte_cdk/sources/streams/concurrent/adapters.py (2)
28-28: LGTM!The CursorField import is correctly added and grouped with related cursor imports.
100-100: LGTM!The cursor_field is correctly wrapped in a CursorField object when present, and None is passed otherwise. The default
supports_catalog_defined_cursor_field=Falseis appropriate for these general adapters (catalog-defined cursor support is for low-code connectors via the declarative factory).unit_tests/sources/streams/concurrent/test_adapters.py (1)
25-25: LGTM!The CursorField import is correctly added for test usage.
airbyte_cdk/sources/file_based/stream/concurrent/adapters.py (2)
35-35: LGTM!The CursorField import is correctly added and properly organized with related concurrent stream imports.
101-101: LGTM!The cursor_field wrapping follows the same pattern as the StreamFacade adapter (line 100 in adapters.py), which is great for consistency. The logic correctly handles the None case and appropriately defaults
supports_catalog_defined_cursor_field=Falsefor file-based adapters.unit_tests/sources/streams/concurrent/test_default_stream.py (2)
11-11: LGTM!The CursorField import is correctly added to the existing cursor imports.
177-212: LGTM!The test correctly updates to use
CursorField(cursor_field_key="date")instead of the raw string, matching the new DefaultStream API. The test properly validates that the CursorField is correctly translated to the expected AirbyteStream output (withdefault_cursor_field=["date"]andsource_defined_cursor=True).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (2)
5125-5127: Consider simplifying the test setup, wdyt?The YAML content at line 5081 already sets
allow_catalog_defined_cursor_field: true, but then these lines override it again based on the test parameter. This feels a bit redundant and could be confusing to future maintainers.Would it be clearer to either:
- Remove the hardcoded
allow_catalog_defined_cursor_field: truefrom the YAML and only set it programmatically here, or- Use YAML template substitution to parametrize it directly in the content string?
This would make the test setup more straightforward.
</comment_end>
5141-5215: Nice test coverage for the edge case!This test correctly validates that when a stream is missing from the configured catalog, it falls back to the stream-defined cursor field. The logic is sound and the assertions are appropriate.
Minor suggestion: At line 5214, the hardcoded
Truevalue could potentially be extracted to a variable likeexpected_supports_catalog_defined = Truewith a comment explaining that it comes from the manifest's setting. This would make the test intention slightly clearer, but it's totally optional.</comment_end>
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py(1 hunks)unit_tests/sources/streams/concurrent/test_default_stream.py(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
unit_tests/sources/streams/concurrent/test_default_stream.py (2)
airbyte_cdk/sources/streams/concurrent/default_stream.py (5)
cursor(94-95)DefaultStream(17-125)name(44-45)namespace(48-49)as_airbyte_stream(58-82)airbyte_cdk/sources/streams/concurrent/cursor.py (3)
Cursor(54-90)CursorField(40-51)FinalStateCursor(93-135)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-google-drive
- GitHub Check: Pytest (Fast)
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (2)
unit_tests/sources/streams/concurrent/test_default_stream.py (2)
11-11: LGTM - Clean API migration!The import addition and the update to use
CursorField(cursor_field_key="date")instead of a raw string properly align the test with the new CursorField-based API. This maintains backward compatibility while enabling the new catalog-defined cursor functionality.</comment_end>
Also applies to: 190-190
275-310: Excellent test coverage for catalog-defined cursors!This test effectively validates the behavior when
supports_catalog_defined_cursor_field=True, including:
source_defined_cursor=False(indicating catalog-defined)- Incremental sync mode support
- Proper cursor field propagation
The test structure mirrors the existing
test_as_airbyte_stream_with_a_cursortest nicely, making it easy to understand the difference between source-defined and catalog-defined cursors. Great addition!</comment_end>
allows for defining incremental streams that support a user-defined cursor field. By using the
allow_catalog_defined_cursor_fieldboolean field, we're able to do two things:cursor_fieldif it exists, or defaults to the source defined one in the manifestSummary by CodeRabbit
New Features
Tests
✏️ Tip: You can customize this high-level summary in your review settings.