-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Salesforce: retry on download_data and create_stream_job #36385
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
The new |
airbyte-integrations/connectors/source-salesforce/unit_tests/conftest.py
Show resolved
Hide resolved
@@ -193,7 +193,8 @@ Now that you have set up the Salesforce source connector, check out the followin | |||
|
|||
| Version | Date | Pull Request | Subject | | |||
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------| | |||
| 2.4.0 | 2024-03-12 | [35978](https://github.com/airbytehq/airbyte/pull/35978) | Upgrade CDK to start emitting record counts with state and full refresh state | | |||
| 2.4.1 | 2024-03-22 | [36385](https://github.com/airbytehq/airbyte/pull/36385) | Retry HTTP requests on IncompleteRead | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be updated...
The new |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤞
@patch("source_salesforce.source.BulkSalesforceStream._non_retryable_send_http_request") | ||
def test_given_fail_with_http_errors_when_create_stream_job_then_handle_error(send_http_request_patch): | ||
mocked_response = Mock() | ||
mocked_response.status_code = 666 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👿
fwiw: I also tried running the log query. it's still running, but hasn't shown any instance of "IncompleteRead" on Cloud |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
welp worth a shot!
We saw part the fix adding resiliency on the error line 4971 (given only source logs) here so baby steps I guess but the sync has still failed. Next steps:
|
airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py
Show resolved
Hide resolved
) | ||
|
||
logger = logging.getLogger("airbyte") | ||
|
||
|
||
def default_backoff_handler(max_tries: int, factor: int, **kwargs): | ||
def default_backoff_handler(max_tries: int, backoff_method={"method": backoff.expo, "params": {"factor": 15}}, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like individual parameters would be better. Let's see if we can do this even if method
can take different parameters
https://github.com/airbytehq/airbyte-internal-issues/issues/6886 and https://github.com/airbytehq/airbyte-internal-issues/issues/6855 were added to this PR. A new |
airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
* master: (1562 commits) ✨ source-google-drive: migrate to poetry (airbytehq#36581) enable spotbugs for CDK core and dependencies submodule (airbytehq#36612) ✨ Source Salesforce, Shopify: add maxSecondsBetweenMessages in metadata (airbytehq#36723) java-cdk: re-export airbyte-api dependency (airbytehq#36759) Source Google Sheets: address dependency conflict and update CDK (airbytehq#36515) Airbyte CI: rename incorrectly named pipelines (airbytehq#36722) Source Azure Blob Storage: add integration tests (airbytehq#36542) Salesforce: retry on download_data and create_stream_job (airbytehq#36385) ✨Source Monday: Bumped CDK version dependency (airbytehq#36746) airbyte-ci: burst gradle task cache on new java cdk release (airbytehq#36480) chore(connectors): remove tasks.py and top-level requirements.txt (airbytehq#36738) airbyte-ci: fix pull-request-number option for migrate_to_base_image (airbytehq#36779) 🤖 Bump patch version of Python CDK add backward compatibility for an old close slice logic (airbytehq#36774) Bump Airbyte version from 0.57.0 to 0.57.1 🤖 Bump patch version of Python CDK low-code: Fix cursor pagination instantiation if the stop_condition is a string (airbytehq#36760) fix rabbitmq icon and simplify docs registry code (airbytehq#36767) Update azure-entra-id.md (airbytehq#36758) re-enable rabbitmq on OSS (airbytehq#36749) ...
What
A self-managed customer has had issues syncing a very big Salesforce sync. We are seeing a lot of connector errors in the sync namely TimeoutError. However, the one that makes the sync crash is an
IncompleteRead
/ChunkedEncodingError
. I can't observe aIncompleteRead
in our cloud platform for the past 7 days as this query yields no result (note that I've got weird infinite loading on Google Logs Explorer this morning so I would like someone to confirm that as well).There are other
ChunkedEncodingError
as shown by https://github.com/airbytehq/airbyte-internal-issues/issues/6860 so we will try to tackle all of them at the same time. The issue is that they occur not on the actual HTTP query but when we try to read the response. As such, the retry should encapsulate.json()
and.iter_content(<...>)
Other fixes were adding as they were follow up to fully resolve the customer issues, namely:
Solution
We have no good reasons to believe it will work but for now, we will set this exception as retryable, make sure the backoff are not only on
self._session.send(<...>)
but also on.json()
and.iter_content(<...>)
and will monitor the situation.