-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Concurrent CDK: support partitioned states #36811
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py
Outdated
Show resolved
Hide resolved
...n/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py
Show resolved
Hide resolved
...n/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py
Show resolved
Hide resolved
...n/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py
Show resolved
Hide resolved
...n/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py
Show resolved
Hide resolved
...n/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py
Outdated
Show resolved
Hide resolved
yield self._slice | ||
|
||
slice_start += self.stream_slice_step | ||
if self._cursor: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really hate this but for full refresh, we create a FinalStateCursor
and not a ConcurrentCursor
. It probably means that the generation of slices based on a state is not valid. In other words, with or without a state, we generate slices that relies on incrementality and the fact that the stream is full_refresh is irrelevant. I think this will be more true with Resumable Full Refresh but it is a bit annoying right now
airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py
Outdated
Show resolved
Hide resolved
...n/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py
Show resolved
Hide resolved
} | ||
else: | ||
now = pendulum.now(tz="UTC") | ||
assert LOOKBACK_SECONDS is not None and LOOKBACK_SECONDS >= 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know. It indeed does not make sense as we are fully owner of LOOKBACK_SECONDS
. This is code that was there before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding to that, some tests were patching this constant with invalid values. I just removed them as it is not a real case scenario
airbyte-integrations/connectors/source-salesforce/unit_tests/conftest.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-salesforce/unit_tests/conftest.py
Outdated
Show resolved
Hide resolved
airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
Outdated
Show resolved
Hide resolved
airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/concurrent_source.py
Show resolved
Hide resolved
@@ -25,6 +25,10 @@ class Comparable(Protocol): | |||
def __lt__(self: "Comparable", other: "Comparable") -> bool: | |||
pass | |||
|
|||
@abstractmethod | |||
def __add__(self: "Comparable", other: Any) -> "Comparable": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's add a docstring
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this does not always make sense. I think the typing is kind of wrong for this class. I think we need a generic with two "variables": one that represents cursor values and the other that represents gap between those. For a cursor where values are datetimes, the first would be datetime
and the second would be timedelta
airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py
Outdated
Show resolved
Hide resolved
airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py
Outdated
Show resolved
Hide resolved
airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py
Outdated
Show resolved
Hide resolved
airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py
Show resolved
Hide resolved
8461a12
to
f187529
Compare
@@ -118,7 +122,9 @@ def __init__( | |||
connector_state_converter: AbstractStreamStateConverter, | |||
cursor_field: CursorField, | |||
slice_boundary_fields: Optional[Tuple[str, str]], | |||
start: Optional[Any], | |||
start: Optional[Comparable], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚨 🚨 🚨 Breaking change alert: The type was changed for start
as it was either a Python object when being passed a partitioned state or the serialized value when being passed a sequential state. As I'm trying to remove sequential state from this class, we needed to remove the second case
airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py
Show resolved
Hide resolved
airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py
Outdated
Show resolved
Hide resolved
airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maxi297 I'm not seeing anything that I think would pose a problem for file-based sources. I can do a follow up to update the S3 connector after this is deployed for Salesforce if that would be helpful.
What I'm not clear on is why the cursor needs to be updated in the same PR that's changing the thread shutdown logic, which I thought was the primary issue that we were trying to solve in this PR. Can you help me understand that?
airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py
Outdated
Show resolved
Hide resolved
0a673c9
to
675b544
Compare
What
Addresses https://github.com/airbytehq/airbyte-internal-issues/issues/6865
How
By having the CDK support partitioned states. This implies a couple of changes:
Once this is done, we have that errors should not break the sync
Note: this PR serves as a CDK feature implementation and the changes on source-salesforce are only to show usage. The changes to source-salesforce will be moved to another PR
Migration
Following the point above: Right now if there is a state, we can only assume that everything from
self._start
to<cursor value>
was synced. If there is a state, we therefore need to create a slice from {"start": self._start, "end": }. I think we had already thought about this logic but I didn't see it in action in my round of testing today (2024-04-05)