From dfdd2c5fb5c69d37089afff54906fb7f2a119b14 Mon Sep 17 00:00:00 2001 From: Serhii Chvaliuk Date: Tue, 7 Dec 2021 19:20:28 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Source=20Salesforce:=20filter=20?= =?UTF-8?q?'null'=20byte(s)=20in=20HTTP=20responses=20(#8405)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * filter_null_byte added Signed-off-by: Sergey Chvalyuk --- .../main/resources/seed/source_definitions.yaml | 2 +- .../init/src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-salesforce/Dockerfile | 2 +- .../source-salesforce/source_salesforce/streams.py | 11 ++++++++++- .../source-salesforce/unit_tests/unit_test.py | 14 ++++++++++++++ docs/integrations/sources/salesforce.md | 1 + 6 files changed, 28 insertions(+), 4 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index b09b550a34b380..934e40841a774a 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -575,7 +575,7 @@ - name: Salesforce sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962 dockerRepository: airbyte/source-salesforce - dockerImageTag: 0.1.8 + dockerImageTag: 0.1.9 documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce icon: salesforce.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index dbc486ff978785..070e55fb7bcdbc 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -5547,7 +5547,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-salesforce:0.1.8" +- dockerImage: "airbyte/source-salesforce:0.1.9" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/salesforce" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-salesforce/Dockerfile b/airbyte-integrations/connectors/source-salesforce/Dockerfile index 40e1a6fa3a839c..f45a9d5ea5f2bb 100644 --- a/airbyte-integrations/connectors/source-salesforce/Dockerfile +++ b/airbyte-integrations/connectors/source-salesforce/Dockerfile @@ -25,5 +25,5 @@ COPY source_salesforce ./source_salesforce ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.8 +LABEL io.airbyte.version=0.1.9 LABEL io.airbyte.name=airbyte/source-salesforce diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index eb9aa595951907..b2f3371c3a22c2 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -206,9 +206,18 @@ def execute_job(self, query: str, url: str) -> str: raise Exception(f"Job for {self.name} stream using BULK API was failed.") return job_full_url + def filter_null_bytes(self, s: str): + """ + https://github.com/airbytehq/airbyte/issues/8300 + """ + res = s.replace("\x00", "") + if len(res) < len(s): + self.logger.warning("Filter 'null' bytes from string, size reduced %d -> %d chars", len(s), len(res)) + return res + def download_data(self, url: str) -> Tuple[int, dict]: job_data = self._send_http_request("GET", f"{url}/results") - decoded_content = job_data.content.decode("utf-8") + decoded_content = self.filter_null_bytes(job_data.content.decode("utf-8")) csv_data = csv.reader(decoded_content.splitlines(), delimiter=",") for i, row in enumerate(csv_data): if i == 0: diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py index 4c70f42d2e13eb..5ac5aeb6e24d08 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py @@ -284,3 +284,17 @@ def test_stream_start_date_should_be_converted_to_datetime_format(stream_rest_co def test_stream_start_datetime_format_should_not_changed(stream_rest_config, stream_rest_api): stream: IncrementalSalesforceStream = _generate_stream("ActiveFeatureLicenseMetric", stream_rest_config, stream_rest_api) assert stream.start_date == "2010-01-18T21:18:20Z" + + +def test_download_data_filter_null_bytes(stream_bulk_config, stream_bulk_api): + job_full_url: str = "https://fase-account.salesforce.com/services/data/v52.0/jobs/query/7504W00000bkgnpQAA" + stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_bulk_config, stream_bulk_api) + + with requests_mock.Mocker() as m: + m.register_uri("GET", f"{job_full_url}/results", content=b"\x00") + res = list(stream.download_data(url=job_full_url)) + assert res == [] + + m.register_uri("GET", f"{job_full_url}/results", content=b'"Id","IsDeleted"\n\x00"0014W000027f6UwQAI","false"\n\x00\x00') + res = list(stream.download_data(url=job_full_url)) + assert res == [(1, {"Id": "0014W000027f6UwQAI", "IsDeleted": "false"})] diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index de8dbea409c6bd..26d0ef442ba419 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -735,6 +735,7 @@ List of available streams: | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.9 | 2021-12-07 | [8405](https://github.com/airbytehq/airbyte/pull/8405) | Filter 'null' byte(s) in HTTP responses | | 0.1.8 | 2021-11-30 | [8191](https://github.com/airbytehq/airbyte/pull/8191) | Make `start_date` optional and change its format to `YYYY-MM-DD` | | 0.1.7 | 2021-11-24 | [8206](https://github.com/airbytehq/airbyte/pull/8206) | Handling 400 error when trying to create a job for sync using Bulk API. | | 0.1.6 | 2021-11-16 | [8009](https://github.com/airbytehq/airbyte/pull/8009) | Fix retring of BULK jobs |