Skip to content

Commit

Permalink
🐛 Source Salesforce: parse CSV with "unix" dialect (#9757)
Browse files Browse the repository at this point in the history
* improve csv parsing
* test_csv_reader_dialect_unix added

Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
grubberr committed Jan 25, 2022
1 parent 926383e commit 2629098
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "b117307c-14b6-41aa-9422-947e34922962",
"name": "Salesforce",
"dockerRepository": "airbyte/source-salesforce",
"dockerImageTag": "0.1.19",
"dockerImageTag": "0.1.20",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/salesforce",
"icon": "salesforce.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@
- name: Salesforce
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
dockerRepository: airbyte/source-salesforce
dockerImageTag: 0.1.19
dockerImageTag: 0.1.20
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6731,7 +6731,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-salesforce:0.1.19"
- dockerImage: "airbyte/source-salesforce:0.1.20"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ COPY source_salesforce ./source_salesforce
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.19
LABEL io.airbyte.version=0.1.20
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import csv
import io
import math
import time
from abc import ABC
Expand Down Expand Up @@ -244,12 +245,10 @@ def filter_null_bytes(self, s: str):
def download_data(self, url: str) -> Tuple[int, dict]:
job_data = self._send_http_request("GET", f"{url}/results")
decoded_content = self.filter_null_bytes(job_data.content.decode("utf-8"))
csv_data = csv.reader(decoded_content.splitlines(), delimiter=",")
for i, row in enumerate(csv_data):
if i == 0:
head = row
else:
yield i, dict(zip(head, row))
fp = io.StringIO(decoded_content, newline="")
csv_data = csv.DictReader(fp, dialect="unix")
for n, row in enumerate(csv_data, 1):
yield n, row

def abort_job(self, url: str):
data = {"state": "Aborted"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import csv
import io
from unittest.mock import Mock

import pytest
Expand Down Expand Up @@ -377,3 +379,26 @@ def test_pagination_rest(stream_config, stream_api):

records = [record for record in stream.read_records(sync_mode=SyncMode.full_refresh)]
assert len(records) == 4


def test_csv_reader_dialect_unix():
stream: BulkSalesforceStream = BulkSalesforceStream(stream_name=None, wait_timeout=None, sf_api=None, pk=None)
url = "https://fake-account.salesforce.com/services/data/v52.0/jobs/query/7504W00000bkgnpQAA"

data = [
{"Id": "1", "Name": '"first_name" "last_name"'},
{"Id": "2", "Name": "'" + 'first_name"\n' + "'" + 'last_name\n"'},
{"Id": "3", "Name": "first_name last_name"},
]

with io.StringIO("", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=["Id", "Name"], dialect="unix")
writer.writeheader()
for line in data:
writer.writerow(line)
text = csvfile.getvalue()

with requests_mock.Mocker() as m:
m.register_uri("GET", url + "/results", text=text)
result = [dict(i[1]) for i in stream.download_data(url)]
assert result == data
1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,7 @@ List of available streams:

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :--- |:--------------------------------------------------------------------------|
| 0.1.20 | 2022-01-26 | [9757](https://github.com/airbytehq/airbyte/pull/9757) | Parse CSV with "unix" dialect |
| 0.1.19 | 2022-01-25 | [8617](https://github.com/airbytehq/airbyte/pull/8617) | Update connector fields title/description |
| 0.1.18 | 2022-01-20 | [9478](https://github.com/airbytehq/airbyte/pull/9478) | Add available stream filtering by `queryable` flag |
| 0.1.17 | 2022-01-19 | [9302](https://github.com/airbytehq/airbyte/pull/9302) | Deprecate API Type parameter |
Expand Down

0 comments on commit 2629098

Please sign in to comment.