New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉Source Salesforce: add checkpointing #24888
🎉Source Salesforce: add checkpointing #24888
Conversation
db2e23c
to
0f113cf
Compare
airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py
Outdated
Show resolved
Hide resolved
f7858a6
to
8ceea7c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a few high level questions about our existing approach and how it related to the new slice behavior. The new sliced queries make sense, but I have some general concerns about how we still swallow the rate limit errors and return a successful sync which will be compounded now that we do checkpointing
@@ -633,36 +647,62 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late | |||
|
|||
|
|||
class BulkIncrementalSalesforceStream(BulkSalesforceStream, IncrementalRestSalesforceStream): | |||
STREAM_SLICE_STEP = 120 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How did we arrive at the decision to slice over 120 day windows?
There is still a risk that we will drop records even w/ the slicing. Alex details this pretty well here: https://airbytehq-team.slack.com/archives/C04UY2A9Z53/p1680717935424479?thread_ts=1680710973.672509&cid=C04UY2A9Z53 . The larger the window, the higher chances we hit a rate limit mid way through a slice and we checkpoint with records missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If to make it smaller then integration tests will last for about 4 hours until a process get killed. While testing we can immediately see that small step leads to big performance issue. One more disadvantage is that amount of queries also increased. So after some testing I decided to make ~3 checkpoints per year because it is better then 1 checkpoint in the end of synchronization and performance is not so bad
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see fair point and I trust your analysis of the impact on performance. Can you please add a comment in the code with what you had just mentioned so there is future context on how this number was determined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we could make this configurable with an optional parameter in the spec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we include this changes to feather refactoring with changes requested in this #24888 (comment)? I will do it in scope of common improvements of this connector
if error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED": | ||
logger.warn(f"API Call limit is exceeded. Error message: '{error_data.get('message')}'") | ||
logger.warning(f"API Call {url} limit is exceeded. Error message: '{error_data.get('message')}'") | ||
raise AirbyteStopSync() # if got 403 rate limit response, finish the sync with success. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking whether this is the right behavior we want to continue. Due to the 24 hour rate limit, we didn't want to block future syncs, so we just marked it successful. This has its drawbacks and we could lose records. However, now that we have checkpointing at date slice windows, maybe we should more concretely throw back an error and not swallow them. And on the next sync we pick up where the previous bookmark left off.
And now with slices, we can still make incremental progress even if we hit the rate limit issue again instead of retrying the whole sync again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have checkpointing now but not for full refresh sync. I also wanted to remove it but decided to leave as is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Failing due to daily rate limits will trigger alerts if 3 workspaces start moving more data than they can. I'm not sure what the best way to expose this kind of limitations this without introducing a new status type
airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py
Outdated
Show resolved
Hide resolved
3936010
to
9683446
Compare
cb25cea
to
da4e771
Compare
/test connector=connectors/source-salesforce
Build PassedTest summary info:
|
return None | ||
|
||
def stream_slices( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@roman-yermilov-gl we want this logic to apply to both IncrementalRestSalesforceStream and BulkIncrementalSalesforceStream, so it should be moved up. That should also allow for cleanup of slice-related logic that was added to request_params
, since much of it is duplicated between IncrementalRestSalesforceStream and BulkIncrementalSalesforceStream.
Can you also add unit tests for the IncrementalRestSalesforceStream case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@clnoll
This PR is fixing the following P1 issues: #20471, #19947, #19014, https://github.com/airbytehq/oncall/issues/1735
If those code changes are ok for you then can we approve/merge/close this PR in order to not to hold those P1 open and after that I will start working on refactoring and testing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@roman-yermilov-gl the request isn't just for refactoring - with the current implementation, only bulk streams support checkpointing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @roman-yermilov-gl Just wanted to circle back here. If we address these changes we should be able to merge this in :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@girarda
Moved slicer to IncrementalRestSalesforceStream
so all the incremental streams use slicing now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bnchrch
What should I do?)
/test connector=connectors/source-salesforce
Build PassedTest summary info:
|
if self.name not in UNSUPPORTED_FILTERING_STREAMS: | ||
query += f"ORDER BY {self.cursor_field} ASC" | ||
order_by_clause = f"ORDER BY {self.cursor_field} ASC" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it that we're only ordering by self.cursor_field
in the incremental sync case, but ordering by cursor & primary key in the bulk case? Should they be consistent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are using primary key in bulk operations query in WHERE
clause and in ORDER BY
clause when primary key exists. For this type of queries we need to handle pagination by ourselves so we are slicing by primary key inside base stream slicer. But rest streams pagination works in another way and we don't need it here.
Why we need primary key for Bulk Streams:
Given a table
id | date |
---|---|
1 | 01.01.2023 |
2 | 01.01.2023 |
3 | 01.01.2023 |
4 | 01.01.2023 |
5 | 01.01.2023 |
6 | 01.03.2023 |
Page size = 2
cursor field = date
primary key = id
Query for first slice would be:
SELECT fields FROM table WHERE date >= 01.01.2023 AND date < 01.02.2023 ORDER BY date LIMIT 15000;
Salesforce prepares data (max 15000 records but imagine it handles only 2 for example purpose):
id | date |
---|---|
1 | 01.01.2023 |
2 | 01.01.2023 |
So for now we have only 2 of 5 records satisfied first query and it means we are not ready to move to second slice. And we also see that all the 5 records have the same date 01.01.2023
. This is where primary key
comes in handy. In order to get next two records we are making second query like that:
SELECT fields FROM table WHERE date >= 01.01.2023 AND date < 01.02.2023 AND id > 2 ORDER BY date, id LIMIT 15000;
This will return:
id | date |
---|---|
3 | 01.01.2023 |
4 | 01.01.2023 |
Why we don't need primary key in REST Stream:
Here (first rows of current method)
if next_page_token:
"""
If `next_page_token` is set, subsequent requests use `nextRecordsUrl`, and do not include any parameters.
"""
return {}
we can see that there is a link made by Salesforce and we just use it as is for getting next page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, thank you for clarifying!
/test connector=connectors/source-salesforce
Build PassedTest summary info:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left a few small comments, but the flow looks good to me. Thank you for the hard work!
|
||
now = pendulum.now(tz="UTC") | ||
initial_date = pendulum.parse((stream_state or {}).get(self.cursor_field, self.start_date), tz="UTC") | ||
period_end = initial_date.add(days=now.diff(initial_date).in_days()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear to me why we need to now
with initial_date
. Is this equivalent to period_end = pendulum.today(tz="UTC")
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, we don't need those calculations. I am going to get rid of period_end
and base
(going to replace base
with initial_date
)
|
||
slice_number = 1 | ||
while not end == now: | ||
base = period_end.subtract(days=period_end.diff(initial_date).in_days()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
base can be computed outside of the while loop
period_end = initial_date.add(days=now.diff(initial_date).in_days()) | ||
|
||
slice_number = 1 | ||
while not end == now: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: while end <= now
makes the intent clearer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not possible with current logic because end
is equal to None
when now
is a datetime. So only ==
operation can be applied in this comparison
if self.name not in UNSUPPORTED_FILTERING_STREAMS: | ||
order_by_fields = [self.cursor_field, self.primary_key] if self.primary_key else [self.cursor_field] | ||
query += f"ORDER BY {','.join(order_by_fields)} ASC LIMIT {self.page_size}" | ||
primary_key = (next_page_token or {}).get("primary_key", "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can you rename this variable to last_key
for clarity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/test connector=connectors/source-salesforce
Build PassedTest summary info:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM @roman-yermilov-gl!
@clnoll Thanks |
Awesome, thanks @roman-yermilov-gl! |
/publish connector=connectors/source-salesforce
if you have connectors that successfully published but failed definition generation, follow step 4 here |
* Source Salesforce: add checkpointing * Source-Iterable: fix integration tests * Source Salesforce: fix integration test s;ices * Source Salesforce: wait for latest record to be accessible * Source Salesforce: retry for 10 times for everything * Source Salesforce: refactoring. Add checkpointing for all incremental * Source Salesforce: small fixes * auto-bump connector version --------- Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
What