-
Notifications
You must be signed in to change notification settings - Fork 30
fix: avoid using globals for record counting during concurrent declarative stream reads #732
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@pedro/hack-global-record-count#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch pedro/hack-global-record-count Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
PyTest Results (Fast)3 764 tests +2 3 752 ✅ +1 6m 26s ⏱️ -43s Results for commit 4e3c512. ± Comparison against base commit eca065f. This pull request skips 1 test.
♻️ This comment has been updated with latest results. |
PyTest Results (Full)3 767 tests 3 755 ✅ 11m 16s ⏱️ Results for commit 4e3c512. ♻️ This comment has been updated with latest results. |
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. 📝 WalkthroughWalkthroughReplaces a module-level global record counter with a per-instance RecordCounter shared among partitions created by the same DeclarativePartitionFactory; updates constructors and read logic to use the instance counter and adjusts tests. Also a single whitespace-only change in a manifest processor file. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Caller as Caller
participant Factory as DeclarativePartitionFactory
participant Partition as DeclarativePartition
participant Counter as RecordCounter
participant Retriever as Retriever
Note over Factory,Counter: Factory created with RecordCounter
Caller->>Factory: __init__(..., max_records_limit, record_counter)
Factory->>Counter: store reference
Note over Factory,Partition: Partition creation uses same counter
Caller->>Factory: create(stream_slice)
Factory->>Partition: __init__(..., stream_slice, record_counter)
Partition->>Counter: store reference
Note over Partition,Retriever: Read flow enforces max_records_limit
Caller->>Partition: read()
Partition->>Counter: get_total_records()
alt limit reached
Partition-->>Caller: return (no reads)
else
loop per-record
Partition->>Retriever: read_records(slice)
Retriever-->>Partition: record
Partition->>Counter: get_total_records()
alt total >= max_records_limit
Partition-->>Caller: stop reading
else
Partition->>Counter: increment()
end
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Would you like me to add a brief docstring to Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we have a test that does two READ operation with test limits and both returns records?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (1)
140-147
: Strengthen the “previous partition” scenario to actually use a second partitionCurrently the test calls read() twice on the same partition. To validate cross-partition behavior, can we create a second partition (different slice) and assert no records are produced, wdyt?
- second_partition_records = list(partition.read()) + second_partition = partition_factory.create(_ANOTHER_STREAM_SLICE) + second_partition_records = list(second_partition.read()) assert len(second_partition_records) == 0 # The DeclarativePartition exits out of the read before attempting to read_records() if # the max_records_limit has already been reached. So we only expect to see read_records() # called for the first partition read and not the second retriever.read_records.assert_called_once()airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
137-146
: Remove unusedmax_records_limit
from StreamSlicerPartitionGenerator
Themax_records_limit
argument inairbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
(lines 142–144) is never assigned or used—record limiting is handled by the providedpartition_factory
. Please remove this parameter from the constructor signature to avoid confusion. wdyt?
🧹 Nitpick comments (2)
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (2)
96-102
: Assert retriever usage to prevent accidental over-readsWould you assert retriever.read_records was called exactly once here to ensure we don’t accidentally re-enter the retriever when the cap is reached, wdyt?
actual_records = list(partition.read()) assert len(actual_records) == 5 assert actual_records == expected_records +retriever.read_records.assert_called_once()
66-79
: Add unit test to verify LOG/TRACE messages don’t count toward max_records_limit
Can we extend StreamSlicerPartitionGeneratorTest with a case that emits two LOG messages before any records (using max_records_limit=2) and assert we still receive exactly two records while the messages are forwarded? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
airbyte_cdk/manifest_server/command_processor/processor.py
(0 hunks)airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
(6 hunks)unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py
(5 hunks)
💤 Files with no reviewable changes (1)
- airbyte_cdk/manifest_server/command_processor/processor.py
🧰 Additional context used
🧬 Code graph analysis (1)
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (1)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
DeclarativePartitionFactory
(46-76)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Check: source-shopify
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: Pytest (Fast)
🔇 Additional comments (6)
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (4)
36-41
: LGTM: per-instance factory is the right place to scope the counterUsing DeclarativePartitionFactory instances to scope counting removes cross-request bleed. Nice. wdyt?
51-56
: LGTM: constructor usage consistentConstructing the factory without max_records_limit in the basic happy-path tests keeps them focused. wdyt?
69-74
: LGTM: message path remains validKeeping the message_repository path unchanged while shifting counting to the factory is appropriate. wdyt?
125-131
: LGTM: per-factory max_records_limit for cross-partition cappingThis correctly sets the cap for all partitions created by the same factory. wdyt?
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (2)
65-76
: LGTM: single shared counter per factorySharing one counter across partitions created by the same factory matches the intended scoping. wdyt?
88-98
: LGTM: pass counter through to partitionsPlumbing looks correct and keeps all partitions in the same accounting domain. wdyt?
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes a concurrency bug in the declarative stream reading functionality by replacing a global record counter with a per-instance counter. The global counter caused issues in long-running server environments where subsequent requests would inherit the record count from previous requests, potentially returning zero records if the limit was already reached.
- Replaced global
total_record_counter
variable with aRecordCounter
class that's instantiated perDeclarativePartitionFactory
- Updated
DeclarativePartition
to use the instance-specific record counter instead of the global one - Added comprehensive test coverage to verify record counter isolation between different factory instances
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
File | Description |
---|---|
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py |
Introduces RecordCounter class and updates partition factory/partition classes to use instance-specific counters |
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py |
Removes global counter reset logic and adds test for counter isolation between factory instances |
airbyte_cdk/manifest_server/command_processor/processor.py |
Minor formatting cleanup removing extra blank line |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (2)
140-146
: Exercise the “previous partition” path by actually using a second partitionThe test name and comment talk about a previous partition, but the second read reuses the same
partition
. Creating a second partition makes the test align with the intent and guards the early-exit path across partitions, wdyt?- second_partition_records = list(partition.read()) + second_partition = partition_factory.create(_ANOTHER_STREAM_SLICE) + second_partition_records = list(second_partition.read()) assert len(second_partition_records) == 0 # The DeclarativePartition exits out of the read before attempting to read_records() if # the max_records_limit has already been reached. So we only expect to see read_records() # called for the first partition read and not the second retriever.read_records.assert_called_once()
96-102
: Increment record counter only for actual recordsIt looks like your
read()
always calls_record_counter.increment()
—even for non‐record messages—so a log or error will eat into the record limit. If we intendmax_records_limit
to cap only yielded records, should we:
Add a test in
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py
+ def test_non_record_messages_do_not_count_towards_limit(self) -> None: + retriever = self._mock_retriever( + [ + _AIRBYTE_LOG_MESSAGE, + Record(data={"id": 1}, stream_name="s"), + Record(data={"id": 2}, stream_name="s"), + ] + ) + partition_factory = DeclarativePartitionFactory("s", loader, retriever, repo, max_records_limit=2) + records = list(partition_factory.create(slice).read()) + assert len(records) == 2Update the loop to only increment when emitting a record, e.g.:
for stream_data in ...: if isinstance(stream_data, Mapping): yield Record(...)
if self._max_records_limit is not None:
self._record_counter.increment() else: self._message_repository.emit_message(stream_data)
Wdyt?
🧹 Nitpick comments (3)
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (3)
51-56
: Add an assertion on stream_name for the wrapped mapping?To tighten the contract that mappings are wrapped into
Record
with the factory’s stream name, could we assert thestream_name
too, e.g., right after Line 65, wdyt?assert len(records) == 1 assert records[0].associated_slice == _A_STREAM_SLICE assert records[0].data == _A_RECORD + assert records[0].stream_name == _STREAM_NAME
69-79
: Also assert that no records are yielded when the item is a non-record message?Explicitly asserting that the read returns zero records would make the intent unambiguous, wdyt?
- list(partition_factory.create(_A_STREAM_SLICE).read()) + records = list(partition_factory.create(_A_STREAM_SLICE).read()) + assert records == [] message_repository.emit_message.assert_called_once_with(_AIRBYTE_LOG_MESSAGE)
174-191
: Optional: assert returned contents match the first two recordsSince the iterator order is deterministic here, adding content assertions would make the test a tad stronger, wdyt?
first_factory_records = list(partition1.read()) assert len(first_factory_records) == 2 + assert first_factory_records == records[:2] @@ second_factory_records = list(partition2.read()) assert len(second_factory_records) == 2 + assert second_factory_records == records[:2]
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py
(6 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (1)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (4)
DeclarativePartitionFactory
(46-76)stream_name
(130-131)create
(67-76)read
(99-125)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: destination-motherduck
- GitHub Check: SDM Docker Image Build
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: Dependency Analysis with Deptry
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (2)
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (2)
36-41
: LGTM: verifies retriever reuse across slicesCreating a single factory and reading two different slices to assert two calls to
read_records
is spot on. ✅
148-196
: LGTM: good isolation test for per-factory countersNice addition—this would have caught the global counter issue. Assertions on
read_records
per factory seal the isolation guarantee. ✅
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (2)
24-27
: Importing RecordCounter is correct; can we add a regression test that proves per-instance isolation?Since the PR’s goal is to avoid global bleed, could we add a test that performs two consecutive read() calls in the same process with a small max_records_limit and asserts both calls return records (i.e., counters don’t leak across runs)? I can draft a minimal test harness that spins two reads against the same manifest with fresh DeclarativePartitionFactory instances, wdyt?
3628-3628
: Use a shared RecordCounter per test to mirror production semantics (one counter per factory), not one per partition call.Right now each close_partition() passes a fresh RecordCounter(), which diverges from the “shared per-factory” behavior and may hide future regressions where counting affects behavior. Could we instantiate a single counter once per test (or via a function-scoped fixture) and pass it to all DeclarativePartition instances created within that test, wdyt?
Example change (apply the same pattern to the listed lines):
@@ in test_given_all_partitions_finished_when_close_partition_then_final_state_emitted - for slice in slices: + rc = RecordCounter() + for slice in slices: cursor.close_partition( DeclarativePartition( stream_name="test_stream", schema_loader=_EMPTY_SCHEMA_LOADER, retriever=MagicMock(), message_repository=MagicMock(), max_records_limit=None, stream_slice=slice, - record_counter=RecordCounter(), + record_counter=rc, ) )Alternatively, would you prefer a tiny helper to cut repetition?
def _mk_partition(slice_, rc): return DeclarativePartition( stream_name="test_stream", schema_loader=_EMPTY_SCHEMA_LOADER, retriever=MagicMock(), message_repository=MagicMock(), max_records_limit=None, stream_slice=slice_, record_counter=rc, )Also applies to: 3714-3714, 3810-3810, 3901-3901, 3976-3976, 4062-4062, 4183-4183, 4196-4196, 4209-4209, 4271-4271, 4284-4284, 4298-4298, 4357-4357, 4369-4369
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(15 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
RecordCounter
(20-31)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-shopify
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: SDM Docker Image Build
- GitHub Check: Analyze (python)
Using the concurrent CDK introduced a bug to manifest-server, specifically around how the record limits are handled.
The DeclarativePartitionGenerator defines a global to track the record counts:
https://github.com/airbytehq/airbyte-python-cdk/blame/ea76ce24349c77a42b8a1f56f685ced62477bb2e/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py#L17-L20
This is fine when you have a clean process for each command execution, but not quite when in the context for a long-running server, like the manifest server. The global var means that on subsequent requests, the same record count is continued, so if the previous request hit the limit, subsequent requests will return 0 records. Because this is a global, it doesn't even matter if you're working with a completely different source or manifest - the same limit will be applied as long as the server is alive.
This PR reimplements the counter as a property of the DeclarativeParitionFactory that's passed down to the partitions.
Summary by CodeRabbit
New Features
Bug Fixes
Tests
Style