-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Concurrent CDK: fix state message ordering #34131
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
f64538b
to
d40a7ba
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you confirm than what I understand is fair? If so, I think we need to change the logic a bit
return first_interval | ||
merged_intervals = self.merge_intervals(slices) | ||
first_interval = merged_intervals[0] | ||
if start < first_interval[self.START_KEY]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that start
is defined as self.parse_timestamp(state["start"]) if "start" in state else self.zero_value
here, I think will never update the state.
Assuming it is the first incremental sync, start = self.zero_value
which is 0001-01-01T00:00:00.000Z
so we would have the following scenario:
- Start incremental concurrent sync with
config["start"] = 2023-01-01
- Async generation of partitions
{"start": "2023-01-01", "end": "2023-12-31"}
- Async processing of
{"start": "2023-01-01", "end": "2023-12-31"}
. Closing this partition. At that point, state is
{
"state_type": ConcurrencyCompatibleStateType.date_range.value,
"metadata": { … },
"start": "0001-01-01T00:00:00.000Z"
"slices": [
{"start": "2023-01-01", "end": "2023-12-31"}
]
}
- Merging intervals (not relevant here)
start < first_interval[self.START_KEY] == True
so we return0001-01-01T00:00:00.000Z
as latest completed time
Outcome
- Next sync starts from "0001-01-01T00:00:00.000Z"
Expected result
- Next sync starts from "2023-12-31"
It feels like the value of start
should be initialized as the lower boundaries of all the slices. Based on @girarda's comment yesterday, the easiest wait to have that might be to ask the developer to provide it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, this should be taking the start from the config into account too. Will make that update!
@@ -81,26 +82,34 @@ def convert_from_sequential_state(self, cursor_field: CursorField, stream_state: | |||
{ | |||
"state_type": ConcurrencyCompatibleStateType.date_range.value, | |||
"metadata": { … }, | |||
"start": <timestamp representing the latest date before which all records were synced> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to set this as part of the state? Could it simply be a field in DateTimeStreamStateConverter
? I'm wary of adding stuff in the state as it means possible breaking changes when we want to change that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing to note is that this is just in the internal state; it isn't part of the state that we're emitting.
It isn't essential that it's part of the internal state, but would have to be passed into a lot of functions instead. I don't have a strong feeling either way. Given all that do you still prefer that it's not part of the state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok! I think I have more context given your comment.
In my mind, the _state
was something we could emit when we will move from the sequential state to the new fancy concurrent state hence why I was surprised. Will having start
as part of the response from convert_from_sequential_state
create more work than having an internal field when we will do this switch?
self.parse_timestamp(stream_state[cursor_field.cursor_field_key]) | ||
if cursor_field.cursor_field_key in stream_state | ||
else self.zero_value | ||
) | ||
if cursor_field.cursor_field_key in stream_state: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have the condition if cursor_field.cursor_field_key in stream_state
both in the definition of low_water_mark
and this line. Can it be grouped up?
f28b7a1
to
d310df8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm missing context on the edge cases that were identified. Can you add more information? We can also sync if you prefer
return first_interval | ||
merged_intervals = self.merge_intervals(slices) | ||
first_interval = merged_intervals[0] | ||
if previous_sync_end < first_interval[self.START_KEY]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have assume that nothing would have changed except from https://github.com/airbytehq/airbyte/pull/34131/files#diff-44999ccc78e8a64a5a79f8352f8aa2a45d8dde6ecaa92d3dff0cc773de104197R118. How can this case be possible? It feels like since we add an interval with low_water_mark
as an upper boundary, we should never have this case. Do we fear that connectors might start syncing earlier slices?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is handling what I assume would be an unusual case, which is that the start date was changed to something more recent.
|
||
# `previous_sync_end` falls outside of the first interval; this is unexpected because we shouldn't have tried | ||
# to sync anything before `previous_sync_end`, but we can handle it anyway. | ||
return self._get_latest_complete_time(previous_sync_end, merged_intervals[1:]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would mean that the first interval is completely before the first interval. In which case can this occur?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like the comment mentions, we don't really expect this to happen. Since it's unexpected I can modify this to raise an exception instead.
else: | ||
return None | ||
# Nothing has been synced so we don't advance |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we at least have one slice here as when we convert_from_sequential_state
, we always create one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, true, but I didn't want to have a condition that wasn't handled in this method. Would it make more sense to you to have this throw an exception instead?
...n/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py
Outdated
Show resolved
Hide resolved
f20dc58
to
99cd60f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused as there seems to be some code still fetching for stream_state["low_water_mark"]
. Is that expected? The last commit is very clean though and I really like it
airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py
Outdated
Show resolved
Hide resolved
else: | ||
self._actual_sync_start = start |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we log if prev_sync_low_water_mark and prev_sync_low_water_mark < sync_start
?
cdbaf23
to
77bebfe
Compare
c0cf6b7
to
8ebca1b
Compare
acd9c73
to
1d196ab
Compare
7dd181d
to
291c851
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two very small comments which I'll let you decide if it's worth changing. Good job on this very annoying time-based brain melting problem. Thanks Catherine!
self.start, self._concurrent_state = self._get_concurrent_state(stream_state) | ||
|
||
@property | ||
def state(self) -> MutableMapping[str, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be private? From a very quick look, I don't see this used outside of the class (which I think is very nice as it means we don't expose non-domain object like MutableMapping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will actually be used externally, as you'll see in the follow up Salesforce PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you show me where? I can't see to identify this change on https://github.com/airbytehq/airbyte/pull/33522/files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry that was out of date. It's there now.
@abstractmethod | ||
def deserialize(self, state: MutableMapping[str, Any]) -> MutableMapping[str, Any]: | ||
""" | ||
Perform any transformations needed for compatibility with the converter. | ||
""" | ||
... | ||
|
||
@abstractmethod | ||
def get_sync_start(self, cursor_field: "CursorField", stream_state: MutableMapping[str, Any], start: Optional[Any]) -> Any: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is there a case where we would call get_sync_start
without calling convert_from_sequential_state
? Else, I think I would merge the two to expose as few things as possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this idea! Updated to make this change.
merged_end_time = max(last_end_time, interval[self.END_KEY]) | ||
merged_intervals[-1][self.END_KEY] = merged_end_time | ||
else: | ||
merged_intervals.append(interval) | ||
|
||
return merged_intervals | ||
|
||
def compare_intervals(self, end_time: Any, start_time: Any) -> bool: | ||
def _compare_intervals(self, end_time: Any, start_time: Any) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
slices = [] | ||
|
||
# Create a slice to represent the records synced during prior syncs. | ||
# The start and end are the same to avoid confusion as to whether the records for this slice |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
Don't clear out `slices` when converting to sequential state
This reverts commit 04d6089.
152d464
to
1578550
Compare
Don't clear out
slices
when converting to sequential state