-
Notifications
You must be signed in to change notification settings - Fork 30
fix(concurrent-cdk): StreamPartition handles AirbyteRecords #392
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
📝 WalkthroughWalkthroughThe changes enhance the Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant StreamPartition
Caller->>StreamPartition: Call read(record_data)
alt record_data is AirbyteMessage with non-null record
StreamPartition->>Caller: Yield Record(record_data.record.data or {} if data is None)
else record_data is Mapping type
StreamPartition->>Caller: Yield processed record (existing logic)
end
Suggested reviewers
Would these suggestions work for you? wdyt? 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms (9)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
unit_tests/sources/streams/concurrent/test_adapters.py (1)
142-195
: Great test implementation for the new functionality!This test properly validates the behavior of the
StreamPartition.read
method when processingAirbyteMessage
records. It ensures that the data from theAirbyteRecordMessage
is correctly extracted and yielded as aRecord
.The test setup is comprehensive with appropriate mock objects and assertions. Would you consider adding another parameter case for a different transformer configuration like you have in other tests? Just a thought since you only have the
NoTransform
case currently, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/streams/concurrent/adapters.py
(1 hunks)unit_tests/sources/streams/concurrent/test_adapters.py
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Check: 'source-shopify' (skip=false)
🔇 Additional comments (2)
airbyte_cdk/sources/streams/concurrent/adapters.py (1)
305-310
: Nice fix to handle AirbyteMessage containing AirbyteRecord!The change correctly handles the case when
record_data
is anAirbyteMessage
with a non-nullrecord
attribute. It extracts the data from the record and yields it as aRecord
object, ensuring proper handling of these messages.I like the null-safety with
record_data.record.data or {}
that provides an empty dict fallback. This aligns with the PR objective to fix howAirbyteRecord
instances are handled.unit_tests/sources/streams/concurrent/test_adapters.py (1)
10-17
: Good addition of AirbyteRecordMessage to the imports!Adding the
AirbyteRecordMessage
import supports the new test case and makes the dependencies clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: can you update the docstring of read_records too? It seems outdated.
Update the StreamPartition so it properly handles
AirbyteRecord
s.On master, we send all AirbyteMessages on the repository, which is wrong if the message is an
AirbyteRecord
. We should instead create a record from it and yield it.Summary by CodeRabbit
AirbyteMessage
objects.StreamPartition
class when readingAirbyteMessage
records.