Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Salesforce: add more unsupported bulk entities, fix fallback to rest #19286

Merged
merged 8 commits into from
Nov 15, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.0.24
LABEL io.airbyte.version=1.0.25
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,30 @@
"TaskStatus",
"TaskWhoRelation",
"UndecidedEventRelation",
"WorkOrderLineItemStatus",
davydov-d marked this conversation as resolved.
Show resolved Hide resolved
"WorkOrderStatus",
"UserRecordAccess",
"OwnedContentDocument",
"OpenActivity",
"NoteAndAttachment",
"Name",
"LookedUpFromActivity",
"FolderedContentDocument",
"ContractStatus",
"ContentFolderItem",
"CombinedAttachment",
"CaseTeamTemplateRecord",
"CaseTeamTemplateMember",
"CaseTeamTemplate",
"CaseTeamRole",
"CaseTeamMember",
"AttachedContentDocument",
"AggregateResult",
"AccountHistory",
Copy link
Contributor

@PierreKerschgens PierreKerschgens Nov 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi!
Since this connector version I'm having trouble getting data from AccountHistory table. The table has been added to 'UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS' list.

Airbyte doesn't fail the sync job even though I'm not receiving data. I only found this in all of my logs:

2022-11-15 17:45:06 �[44msource�[0m > Syncing stream: AccountHistory 
2022-11-15 17:47:07 �[44msource�[0m > [{"message":"Your query request was running for too long.","errorCode":"QUERY_TIMEOUT"}]
2022-11-15 17:47:07 �[44msource�[0m > Cannot receive data for stream 'AccountHistory', error message: 'Your query request was running for too long.'
2022-11-15 17:47:07 �[44msource�[0m > Read 0 records from AccountHistory stream
2022-11-15 17:47:07 �[44msource�[0m > Finished syncing AccountHistory

After downgrading the Salesforce connector from 1.0.26 to the prior version 1.0.25 I'm able to get data from AccountHistory again.

Some context to that error: #17503

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @PierreKerschgens thanks for the feedback! I prepared a PR to fix this: #19869

"ChannelProgramLevelShare",
"AccountBrandShare",
"AccountFeed",
"AssetFeed",
]

UNSUPPORTED_FILTERING_STREAMS = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def execute_job(self, query: str, url: str) -> Tuple[Optional[str], Optional[str
for i in range(0, self.MAX_RETRY_NUMBER):
job_id = self.create_stream_job(query=query, url=url)
if not job_id:
return None, None
return None, job_status
davydov-d marked this conversation as resolved.
Show resolved Hide resolved
job_full_url = f"{url}/{job_id}"
job_status = self.wait_for_job(url=job_full_url)
if job_status not in ["UploadComplete", "InProgress"]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,33 @@ def test_bulk_sync_creation_failed(stream_config, stream_api):
assert err.value.response.json()[0]["message"] == "test_error"


def test_bulk_stream_fallback_to_rest(mocker, requests_mock, stream_config, stream_api):
davydov-d marked this conversation as resolved.
Show resolved Hide resolved
"""
Here we mock BULK API with response returning error, saying BULK is not supported for this kind of entity.
On the other hand, we mock REST API for this same entity with a successful response.
After having instantiated a BulkStream, sync should succeed in case it falls back to REST API. Otherwise it would throw an error.
"""
stream = generate_stream("CustomEntity", stream_config, stream_api)
# mock a BULK API
requests_mock.register_uri(
"POST",
"https://fase-account.salesforce.com/services/data/v52.0/jobs/query",
status_code=400,
json=[{
"errorCode": "INVALIDENTITY",
"message": "CustomEntity is not supported by the Bulk API"
}]
)
rest_stream_records = [
{"id": 1, "name": "custom entity", "created": "2010-11-11"},
{"id": 11, "name": "custom entity", "created": "2020-01-02"}
]
# mock REST API
mocker.patch("source_salesforce.source.SalesforceStream.read_records", Mock(return_value=rest_stream_records))
assert type(stream) is BulkIncrementalSalesforceStream
assert list(stream.read_records(sync_mode=SyncMode.full_refresh)) == rest_stream_records


def test_stream_unsupported_by_bulk(stream_config, stream_api, caplog):
"""
Stream `AcceptedEventRelation` is not supported by BULK API, so that REST API stream will be used for it.
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ Now that you have set up the Salesforce source connector, check out the followin
## Changelog

| Version | Date | Pull Request | Subject |
|:--------| :--------- |:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------|
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------|
| 1.0.25 | 2022-11-10 | [19286](https://github.com/airbytehq/airbyte/pull/19286) | Bugfix: fallback to REST API if entity is not supported by BULK API |
| 1.0.24 | 2022-11-01 | [18799](https://github.com/airbytehq/airbyte/pull/18799) | Update list of unsupported Bulk API objects |
| 1.0.23 | 2022-11-01 | [18753](https://github.com/airbytehq/airbyte/pull/18753) | Add error_display_message for ConnectionError |
| 1.0.22 | 2022-10-12 | [17615](https://github.com/airbytehq/airbyte/pull/17615) | Make paging work, if `cursor_field` is not changed inside one page |
Expand Down