Skip to content

Commit

Permalink
馃悰 Source Salesforce: filter 'null' byte(s) in HTTP responses (#8405)
Browse files Browse the repository at this point in the history
* filter_null_byte added

Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
grubberr committed Dec 7, 2021
1 parent ccf8eeb commit dfdd2c5
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@
- name: Salesforce
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
dockerRepository: airbyte/source-salesforce
dockerImageTag: 0.1.8
dockerImageTag: 0.1.9
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5547,7 +5547,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-salesforce:0.1.8"
- dockerImage: "airbyte/source-salesforce:0.1.9"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/salesforce"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ COPY source_salesforce ./source_salesforce
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,18 @@ def execute_job(self, query: str, url: str) -> str:
raise Exception(f"Job for {self.name} stream using BULK API was failed.")
return job_full_url

def filter_null_bytes(self, s: str):
"""
https://github.com/airbytehq/airbyte/issues/8300
"""
res = s.replace("\x00", "")
if len(res) < len(s):
self.logger.warning("Filter 'null' bytes from string, size reduced %d -> %d chars", len(s), len(res))
return res

def download_data(self, url: str) -> Tuple[int, dict]:
job_data = self._send_http_request("GET", f"{url}/results")
decoded_content = job_data.content.decode("utf-8")
decoded_content = self.filter_null_bytes(job_data.content.decode("utf-8"))
csv_data = csv.reader(decoded_content.splitlines(), delimiter=",")
for i, row in enumerate(csv_data):
if i == 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,17 @@ def test_stream_start_date_should_be_converted_to_datetime_format(stream_rest_co
def test_stream_start_datetime_format_should_not_changed(stream_rest_config, stream_rest_api):
stream: IncrementalSalesforceStream = _generate_stream("ActiveFeatureLicenseMetric", stream_rest_config, stream_rest_api)
assert stream.start_date == "2010-01-18T21:18:20Z"


def test_download_data_filter_null_bytes(stream_bulk_config, stream_bulk_api):
job_full_url: str = "https://fase-account.salesforce.com/services/data/v52.0/jobs/query/7504W00000bkgnpQAA"
stream: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_bulk_config, stream_bulk_api)

with requests_mock.Mocker() as m:
m.register_uri("GET", f"{job_full_url}/results", content=b"\x00")
res = list(stream.download_data(url=job_full_url))
assert res == []

m.register_uri("GET", f"{job_full_url}/results", content=b'"Id","IsDeleted"\n\x00"0014W000027f6UwQAI","false"\n\x00\x00')
res = list(stream.download_data(url=job_full_url))
assert res == [(1, {"Id": "0014W000027f6UwQAI", "IsDeleted": "false"})]
1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ List of available streams:
| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |

| 0.1.9 | 2021-12-07 | [8405](https://github.com/airbytehq/airbyte/pull/8405) | Filter 'null' byte(s) in HTTP responses |
| 0.1.8 | 2021-11-30 | [8191](https://github.com/airbytehq/airbyte/pull/8191) | Make `start_date` optional and change its format to `YYYY-MM-DD` |
| 0.1.7 | 2021-11-24 | [8206](https://github.com/airbytehq/airbyte/pull/8206) | Handling 400 error when trying to create a job for sync using Bulk API. |
| 0.1.6 | 2021-11-16 | [8009](https://github.com/airbytehq/airbyte/pull/8009) | Fix retring of BULK jobs |
Expand Down

0 comments on commit dfdd2c5

Please sign in to comment.