From c713395808c95ff97de7deda5341a7d3c44b5cb3 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants <36314070+artem1205@users.noreply.github.com> Date: Fri, 19 May 2023 12:13:43 +0200 Subject: [PATCH] Source SalesForce: Remove pagination and query limits (#25700) * Source SalesForce: remove pagination and query limits * Source SalesForce: remove pagination and query limits * Reduce slice interval to 30 days * Source SalesForce : remove apparent_encoding guess for response; Cause OOM for large responses * Source SalesForce : bump versions --------- Co-authored-by: artem1205 --- .../connectors/source-salesforce/Dockerfile | 2 +- .../source-salesforce/metadata.yaml | 2 +- .../source_salesforce/streams.py | 11 +--- .../source-salesforce/unit_tests/api_test.py | 58 ------------------- docs/integrations/sources/salesforce.md | 11 ++-- 5 files changed, 11 insertions(+), 73 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/Dockerfile b/airbyte-integrations/connectors/source-salesforce/Dockerfile index 6bc86baaf70c1..985c9b2545a9a 100644 --- a/airbyte-integrations/connectors/source-salesforce/Dockerfile +++ b/airbyte-integrations/connectors/source-salesforce/Dockerfile @@ -13,5 +13,5 @@ RUN pip install . ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=2.0.12 +LABEL io.airbyte.version=2.0.13 LABEL io.airbyte.name=airbyte/source-salesforce diff --git a/airbyte-integrations/connectors/source-salesforce/metadata.yaml b/airbyte-integrations/connectors/source-salesforce/metadata.yaml index efcf4f56c6ace..dd9f6a257a7e6 100644 --- a/airbyte-integrations/connectors/source-salesforce/metadata.yaml +++ b/airbyte-integrations/connectors/source-salesforce/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: api connectorType: source definitionId: b117307c-14b6-41aa-9422-947e34922962 - dockerImageTag: 2.0.12 + dockerImageTag: 2.0.13 dockerRepository: airbyte/source-salesforce githubIssueLabel: source-salesforce icon: salesforce.svg diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 0de5289bd9b27..065bac3cfcc3e 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -432,7 +432,7 @@ def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str]: # set filepath for binary data from response tmp_file = os.path.realpath(os.path.basename(url)) with closing(self._send_http_request("GET", f"{url}/results", stream=True)) as response, open(tmp_file, "wb") as data_file: - response_encoding = response.apparent_encoding or response.encoding or self.encoding + response_encoding = response.encoding or self.encoding for chunk in response.iter_content(chunk_size=chunk_size): data_file.write(self.filter_null_bytes(chunk)) # check the file exists @@ -579,7 +579,7 @@ def transform_empty_string_to_none(instance: Any, schema: Any): class IncrementalRestSalesforceStream(RestSalesforceStream, ABC): state_checkpoint_interval = 500 - STREAM_SLICE_STEP = 120 + STREAM_SLICE_STEP = 30 def __init__(self, replication_key: str, start_date: Optional[str], **kwargs): super().__init__(**kwargs) @@ -663,8 +663,6 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late class BulkIncrementalSalesforceStream(BulkSalesforceStream, IncrementalRestSalesforceStream): def next_page_token(self, last_record: Mapping[str, Any]) -> Optional[Mapping[str, Any]]: - if self.name not in UNSUPPORTED_FILTERING_STREAMS: - return {"next_token": last_record[self.cursor_field], "primary_key": last_record.get(self.primary_key)} return None def request_params( @@ -683,11 +681,8 @@ def request_params( order_by_clause = "" if self.name not in UNSUPPORTED_FILTERING_STREAMS: - last_primary_key = (next_page_token or {}).get("primary_key", "") - if last_primary_key: - where_conditions.append(f"{self.primary_key} > '{last_primary_key}'") order_by_fields = ", ".join([self.cursor_field, self.primary_key] if self.primary_key else [self.cursor_field]) - order_by_clause = f"ORDER BY {order_by_fields} ASC LIMIT {self.page_size}" + order_by_clause = f"ORDER BY {order_by_fields} ASC" where_clause = f"WHERE {' AND '.join(where_conditions)}" query = f"SELECT {select_fields} FROM {table_name} {where_clause} {order_by_clause}" diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py index a80be600d7f4e..dcb0a4505f035 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py @@ -565,64 +565,6 @@ def test_convert_to_standard_instance(stream_config, stream_api): assert isinstance(rest_stream, IncrementalRestSalesforceStream) -def test_bulk_stream_paging(stream_config, stream_api_pk): - last_modified_date1 = "2022-10-01T00:00:00.000+00:00" - last_modified_date2 = "2022-10-02T00:00:00.000+00:00" - assert last_modified_date1 < last_modified_date2 - - stream_config["start_date"] = last_modified_date1 - stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api_pk) - stream.page_size = 2 - - csv_header = "Field1,LastModifiedDate,Id" - pages = [ - [f"test,{last_modified_date1},1", f"test,{last_modified_date1},2"], - [f"test,{last_modified_date1},3", f"test,{last_modified_date2},4"], - [f"test,{last_modified_date2},5", f"test,{last_modified_date2},6"], - [f"test,{last_modified_date2},7"], - ] - - with requests_mock.Mocker() as mocked_requests: - - post_responses = [] - for job_id, page in enumerate(pages, 1): - post_responses.append({"json": {"id": f"{job_id}"}}) - mocked_requests.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "JobComplete"}) - mocked_requests.register_uri("GET", stream.path() + f"/{job_id}/results", text="\n".join([csv_header] + page)) - mocked_requests.register_uri("DELETE", stream.path() + f"/{job_id}") - mocked_requests.register_uri("POST", stream.path(), post_responses) - - stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental))) - records = list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices)) - - assert records == [ - {"Field1": "test", "Id": 1, "LastModifiedDate": last_modified_date1}, - {"Field1": "test", "Id": 2, "LastModifiedDate": last_modified_date1}, - {"Field1": "test", "Id": 3, "LastModifiedDate": last_modified_date1}, - {"Field1": "test", "Id": 4, "LastModifiedDate": last_modified_date2}, - {"Field1": "test", "Id": 5, "LastModifiedDate": last_modified_date2}, # duplicate record - {"Field1": "test", "Id": 6, "LastModifiedDate": last_modified_date2}, - {"Field1": "test", "Id": 7, "LastModifiedDate": last_modified_date2}, - ] - - def get_query(request_index): - return mocked_requests.request_history[request_index].json()["query"] - - SELECT = "SELECT LastModifiedDate, Id FROM Account" - ORDER_BY = "ORDER BY LastModifiedDate, Id ASC LIMIT 2" - - assert get_query(0) == f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} AND LastModifiedDate < {stream_slices['end_date']} {ORDER_BY}" - - q = f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} AND LastModifiedDate < {stream_slices['end_date']} AND Id > '2' {ORDER_BY}" - assert get_query(4) == q - - q = f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} AND LastModifiedDate < {stream_slices['end_date']} AND Id > '4' {ORDER_BY}" - assert get_query(8) == q - - q = f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} AND LastModifiedDate < {stream_slices['end_date']} AND Id > '6' {ORDER_BY}" - assert get_query(12) == q - - def test_rest_stream_init_with_too_many_properties(stream_config, stream_api_v2_too_many_properties): with pytest.raises(AssertionError): # v2 means the stream is going to be a REST stream. diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index d466f7cabb7e1..0643698c3af3a 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -129,13 +129,14 @@ Now that you have set up the Salesforce source connector, check out the followin | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------| +| 2.0.13 | 2023-04-30 | [25700](https://github.com/airbytehq/airbyte/pull/25700) | Remove pagination and query limits | | 2.0.12 | 2023-04-25 | [25507](https://github.com/airbytehq/airbyte/pull/25507) | Update API version to 57 | | 2.0.11 | 2023-04-20 | [25352](https://github.com/airbytehq/airbyte/pull/25352) | Update API version to 53 | -| 2.0.10 | 2023-04-05 | [24888](https://github.com/airbytehq/airbyte/pull/24888) | Add more frequent checkpointing | -| 2.0.9 | 2023-03-29 | [24660](https://github.com/airbytehq/airbyte/pull/24660) | Set default start_date. Sync for last two years if start date is not present in config | -| 2.0.8 | 2023-03-30 | [24690](https://github.com/airbytehq/airbyte/pull/24690) | Handle rate limit for bulk operations | -| 2.0.7 | 2023-03-14 | [24071](https://github.com/airbytehq/airbyte/pull/24071) | Remove regex pattern for start_date, use format validation instead | -| 2.0.6 | 2023-03-03 | [22891](https://github.com/airbytehq/airbyte/pull/22891) | Specified date formatting in specification | +| 2.0.10 | 2023-04-05 | [24888](https://github.com/airbytehq/airbyte/pull/24888) | Add more frequent checkpointing | +| 2.0.9 | 2023-03-29 | [24660](https://github.com/airbytehq/airbyte/pull/24660) | Set default start_date. Sync for last two years if start date is not present in config | +| 2.0.8 | 2023-03-30 | [24690](https://github.com/airbytehq/airbyte/pull/24690) | Handle rate limit for bulk operations | +| 2.0.7 | 2023-03-14 | [24071](https://github.com/airbytehq/airbyte/pull/24071) | Remove regex pattern for start_date, use format validation instead | +| 2.0.6 | 2023-03-03 | [22891](https://github.com/airbytehq/airbyte/pull/22891) | Specified date formatting in specification | | 2.0.5 | 2023-03-01 | [23610](https://github.com/airbytehq/airbyte/pull/23610) | Handle different Salesforce page size for different queries | | 2.0.4 | 2023-02-24 | [22636](https://github.com/airbytehq/airbyte/pull/22636) | Turn on default HttpAvailabilityStrategy for all streams that are not of class BulkSalesforceStream | | 2.0.3 | 2023-02-17 | [23190](https://github.com/airbytehq/airbyte/pull/23190) | In case properties are chunked, fetch primary key in every chunk |