Skip to content

Commit

Permalink
Source Salesforce: fix sync capped streams with more records than pag…
Browse files Browse the repository at this point in the history
…e size (#13658)

* change logic for counting records

* update doc

* correct unit test

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
marcosmarxm and octavia-squidington-iii committed Jun 14, 2022
1 parent 8e54f4f commit 669e6ed
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@
- name: Salesforce
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
dockerRepository: airbyte/source-salesforce
dockerImageTag: 1.0.9
dockerImageTag: 1.0.10
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7650,7 +7650,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-salesforce:1.0.9"
- dockerImage: "airbyte/source-salesforce:1.0.10"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.0.9
LABEL io.airbyte.version=1.0.10
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ def read_with_chunks(self, path: str = None, chunk_size: int = 100) -> Iterable[
chunks = pd.read_csv(data, chunksize=chunk_size, iterator=True, dialect="unix")
for chunk in chunks:
chunk = chunk.replace({nan: None}).to_dict(orient="records")
for n, row in enumerate(chunk, 1):
yield n, row
for row in chunk:
yield row
except pd.errors.EmptyDataError as e:
self.logger.info(f"Empty data received. {e}")
yield from []
Expand Down Expand Up @@ -382,12 +382,15 @@ def read_records(

count = 0
record: Mapping[str, Any] = {}
for count, record in self.read_with_chunks(self.download_data(url=job_full_url)):
for record in self.read_with_chunks(self.download_data(url=job_full_url)):
count += 1
yield record
self.delete_job(url=job_full_url)

if count < self.page_size:
# this is a last page
# Salesforce doesn't give a next token or something to know the request was
# the last page. The connectors will sync batches in `page_size` and
# considers that batch is smaller than the `page_size` it must be the last page.
break

next_page_token = self.next_page_token(record)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def test_download_data_filter_null_bytes(stream_config, stream_api):

m.register_uri("GET", f"{job_full_url}/results", content=b'"Id","IsDeleted"\n\x00"0014W000027f6UwQAI","false"\n\x00\x00')
res = list(stream.read_with_chunks(stream.download_data(url=job_full_url)))
assert res == [(1, {"Id": "0014W000027f6UwQAI", "IsDeleted": False})]
assert res == [{"Id": "0014W000027f6UwQAI", "IsDeleted": False}]


def test_check_connection_rate_limit(stream_config):
Expand Down Expand Up @@ -427,7 +427,7 @@ def test_csv_reader_dialect_unix():

with requests_mock.Mocker() as m:
m.register_uri("GET", url + "/results", text=text)
result = [dict(i[1]) for i in stream.read_with_chunks(stream.download_data(url))]
result = [i for i in stream.read_with_chunks(stream.download_data(url))]
assert result == data


Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ Now that you have set up the Salesforce source connector, check out the followin

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------|
| 1.0.10 | 2022-06-09 | [13658](https://github.com/airbytehq/airbyte/pull/13658) | Correct logic to sync stream larger than page size |
| 1.0.9 | 2022-05-06 | [12685](https://github.com/airbytehq/airbyte/pull/12685) | Update CDK to v0.1.56 to emit an `AirbyeTraceMessage` on uncaught exceptions |
| 1.0.8 | 2022-05-04 | [12576](https://github.com/airbytehq/airbyte/pull/12576) | Decode responses as utf-8 and fallback to ISO-8859-1 if needed |
| 1.0.7 | 2022-05-03 | [12552](https://github.com/airbytehq/airbyte/pull/12552) | Decode responses as ISO-8859-1 instead of utf-8 |
Expand Down

0 comments on commit 669e6ed

Please sign in to comment.