Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Salesforce: change the sequence of requests #23610

Merged

Conversation

davydov-d
Copy link
Collaborator

@davydov-d davydov-d commented Mar 1, 2023

What

https://github.com/airbytehq/oncall/issues/1571

How

Version 2.0.1 introduced new functionality in the PR. It allowed supporting non-bulk streams with a huge amount of properties that couldn't fit in a single http request because of max length. The idea was to split all the properties of an entity into chunks and make multiple requests to get the data: one request per chunk per page. Requests were made in a sequence:

Page 1

  • chunk 1 (select id, a, b from table order by id)
  • chunk 2 (select id, c, d from table order by id)

Page 2

  • chunk 1 (select id, a, b from table order by id) # offset and limit are handled by the page number
  • chunk 2 (select id, c, d from table order by id) # offset and limit are handled by the page number

Then parts of the records were sticked together by the primary key value within one page. This solution did not take into account that the Salesforce API does not apply a constant page size for all the queries. So when the first query returns 2k records, second returns 200 records, we have 1.8k incomplete records.

To fix this situation we change the sequence of requests. Since now each subsequent request will be made to retrieve data from the chunk that has the fewest number of records read from independent of pagination (although we store current and next page for each chunk). After sticking the record we may yield it right after we know it contains all the properties needed. This allows us supporting streams with a huge number of properties, avoiding high memory consumption and emitting consistent data.

Let's see an example. Let's say we have 6 properties split into 3 chunks.
1 request: select id, a, b from table order by id -> returns [{id: 1, a: 1, b: 1}, {id: 2, a: 2, b: 2}, {id: 3, a: 3, b: 3}, {id: 4, a: 4, b: 4}]
2 request: select id, c, d from table order by id -> returns [{id: 1, c: 1, d: 1}, {id: 2, c: 2, d: 2}] # salesforce decided to decrease page size compared to first request in favour of keeping high performance
3 request: select id, e, f from table order by id -> returns [{id: 1, e: 1, f: 1}] # page size decreased even more

Now we have 1 (id=1) complete record that can be emitted. Next steps are: find out which chunk has the fewest records (#3) and make request to fetch next page for it. When we have the response, we'll be able to emit 1+ or more records. Repeat until we have the full set of properties for each primary key.

@octavia-squidington-iii octavia-squidington-iii added the area/documentation Improvements or additions to documentation label Mar 1, 2023
@davydov-d
Copy link
Collaborator Author

davydov-d commented Mar 1, 2023

/test connector=connectors/source-salesforce

🕑 connectors/source-salesforce https://github.com/airbytehq/airbyte/actions/runs/4304761461

@davydov-d davydov-d marked this pull request as ready for review March 1, 2023 15:23
@clnoll
Copy link
Contributor

clnoll commented Mar 1, 2023

@davydov-d nice!

To make sure I understand - when we query salesforce, we'll send a request for, e.g. properties a and b, and then another request for c and d, and only paginate once all properties have been requested?

Do we do any checks to make sure we have all properties? e.g. if the page with c and d has the fewest records and we use that for pagination, do we need to do anything special to make sure we also get a and b?

@davydov-d
Copy link
Collaborator Author

@davydov-d nice!

To make sure I understand - when we query salesforce, we'll send a request for, e.g. properties a and b, and then another request for c and d, and only paginate once all properties have been requested?

Do we do any checks to make sure we have all properties? e.g. if the page with c and d has the fewest records and we use that for pagination, do we need to do anything special to make sure we also get a and b?

@clnoll each set of properties (chunk) has its own pagination. If the c and d page has the fewest records (most probably it has the smallest page size) we will read another page of that chunk until another chunk has the fewest records or there's no next page for this one. This will be repeated until all the chunks finish their pagination. It's made to kind of balance chunks so we shouldn't have to keep all the partial records in memory and wait for the first chunk to go through all its pages, then the second and so on but could emit them earlier. This line itself is actually a check that the record has all the properties.

Regarding your first question - first we'll request first pages of all the chunks, then the sequence of requests will depend on the page size of each chunk returned.

@brianjlai
Copy link
Contributor

brianjlai commented Mar 2, 2023

I looked over the code trying to functionally understand what its doing. I think it makes sense and should perform the grouping as @davydov-d clearly explained in the PR description and comments. I also don't see how this would be affecting the abnormal sync behavior so it is a little bit puzzling why that test is starting to fail all of a sudden. I will continue reviewing the code itself tomorrow too

The Salesforce rate limits are still posing a serious problem and I'm nervous about how often we will be able to test this without exceeding them again and blocking us for another 24 hours.

@brianjlai
Copy link
Contributor

brianjlai commented Mar 2, 2023

/test connector=connectors/source-salesforce

🕑 connectors/source-salesforce https://github.com/airbytehq/airbyte/actions/runs/4316777827

Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes look good @davydov-d , nice work on what was a really challenging implementation of the grouping logic.

I pushed a fix for the infinite loop and consolidating the chunk states into a new object to clean the code up a bit (that was originally going to be my main suggestion during the review). If you want to look over those changes I made, otherwise I think this is in a good state to publish as long as we can get a passing CAT run.

@brianjlai
Copy link
Contributor

brianjlai commented Mar 3, 2023

/test connector=connectors/source-salesforce

🕑 connectors/source-salesforce https://github.com/airbytehq/airbyte/actions/runs/4319656683
✅ connectors/source-salesforce https://github.com/airbytehq/airbyte/actions/runs/4319656683
Python tests coverage:

Name                                         Stmts   Miss  Cover
----------------------------------------------------------------
source_salesforce/utils.py                       8      0   100%
source_salesforce/__init__.py                    2      0   100%
source_salesforce/source.py                     96      6    94%
source_salesforce/streams.py                   398     32    92%
source_salesforce/api.py                       155     14    91%
source_salesforce/exceptions.py                  8      1    88%
source_salesforce/rate_limiting.py              22      3    86%
source_salesforce/availability_strategy.py      17      8    53%
----------------------------------------------------------------
TOTAL                                          706     64    91%
Name                                         Stmts   Miss  Cover
----------------------------------------------------------------
source_salesforce/__init__.py                    2      0   100%
source_salesforce/exceptions.py                  8      1    88%
source_salesforce/api.py                       155     21    86%
source_salesforce/availability_strategy.py      17      3    82%
source_salesforce/streams.py                   398     86    78%
source_salesforce/rate_limiting.py              22      6    73%
source_salesforce/source.py                     96     33    66%
source_salesforce/utils.py                       8      7    12%
----------------------------------------------------------------
TOTAL                                          706    157    78%

Build Passed

Test summary info:

=========================== short test summary info ============================
SKIPPED [1] ../usr/local/lib/python3.9/site-packages/connector_acceptance_test/tests/test_core.py:100: The previous and actual specifications are identical.
SKIPPED [1] ../usr/local/lib/python3.9/site-packages/connector_acceptance_test/tests/test_core.py:509: The previous and actual discovered catalogs are identical.
================== 36 passed, 2 skipped in 498.68s (0:08:18) ===================

@davydov-d
Copy link
Collaborator Author

davydov-d commented Mar 3, 2023

/publish connector=connectors/source-salesforce

🕑 Publishing the following connectors:
connectors/source-salesforce
https://github.com/airbytehq/airbyte/actions/runs/4321299920


Connector Did it publish? Were definitions generated?
connectors/source-salesforce

if you have connectors that successfully published but failed definition generation, follow step 4 here ▶️

@davydov-d davydov-d merged commit 6e985c0 into master Mar 3, 2023
@davydov-d davydov-d deleted the ddavydov/#1571-source-salesforce-handle-different-page-size branch March 3, 2023 08:31
jbfbell pushed a commit that referenced this pull request Mar 6, 2023
* #1571 source salesforce: change the sequence of requests

* #1571 source Salesforce: format

* #1571 source salesforce: fix endless loop

* #1571 source salesforce: update unit tests

* fix infinite loop for streams with no records and refactor properties into a helper object to organize state

* auto-bump connector version

---------

Co-authored-by: brianjlai <brian.lai@airbyte.io>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
danielduckworth pushed a commit to danielduckworth/airbyte that referenced this pull request Mar 13, 2023
* airbytehq#1571 source salesforce: change the sequence of requests

* airbytehq#1571 source Salesforce: format

* airbytehq#1571 source salesforce: fix endless loop

* airbytehq#1571 source salesforce: update unit tests

* fix infinite loop for streams with no records and refactor properties into a helper object to organize state

* auto-bump connector version

---------

Co-authored-by: brianjlai <brian.lai@airbyte.io>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation connectors/source/salesforce
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants