Skip to content

Commit

Permalink
Source SalesForce: Remove pagination and query limits (#25700)
Browse files Browse the repository at this point in the history
* Source SalesForce: remove pagination and query limits

* Source SalesForce: remove pagination and query limits

* Reduce slice interval to 30 days

* Source SalesForce : remove apparent_encoding guess for response;
Cause OOM for large responses

* Source SalesForce : bump versions


---------

Co-authored-by: artem1205 <artem1205@users.noreply.github.com>
  • Loading branch information
2 people authored and nguyenaiden committed May 25, 2023
1 parent 0b3d9ac commit c713395
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 73 deletions.
Expand Up @@ -13,5 +13,5 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=2.0.12
LABEL io.airbyte.version=2.0.13
LABEL io.airbyte.name=airbyte/source-salesforce
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: b117307c-14b6-41aa-9422-947e34922962
dockerImageTag: 2.0.12
dockerImageTag: 2.0.13
dockerRepository: airbyte/source-salesforce
githubIssueLabel: source-salesforce
icon: salesforce.svg
Expand Down
Expand Up @@ -432,7 +432,7 @@ def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str]:
# set filepath for binary data from response
tmp_file = os.path.realpath(os.path.basename(url))
with closing(self._send_http_request("GET", f"{url}/results", stream=True)) as response, open(tmp_file, "wb") as data_file:
response_encoding = response.apparent_encoding or response.encoding or self.encoding
response_encoding = response.encoding or self.encoding
for chunk in response.iter_content(chunk_size=chunk_size):
data_file.write(self.filter_null_bytes(chunk))
# check the file exists
Expand Down Expand Up @@ -579,7 +579,7 @@ def transform_empty_string_to_none(instance: Any, schema: Any):

class IncrementalRestSalesforceStream(RestSalesforceStream, ABC):
state_checkpoint_interval = 500
STREAM_SLICE_STEP = 120
STREAM_SLICE_STEP = 30

def __init__(self, replication_key: str, start_date: Optional[str], **kwargs):
super().__init__(**kwargs)
Expand Down Expand Up @@ -663,8 +663,6 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late

class BulkIncrementalSalesforceStream(BulkSalesforceStream, IncrementalRestSalesforceStream):
def next_page_token(self, last_record: Mapping[str, Any]) -> Optional[Mapping[str, Any]]:
if self.name not in UNSUPPORTED_FILTERING_STREAMS:
return {"next_token": last_record[self.cursor_field], "primary_key": last_record.get(self.primary_key)}
return None

def request_params(
Expand All @@ -683,11 +681,8 @@ def request_params(
order_by_clause = ""

if self.name not in UNSUPPORTED_FILTERING_STREAMS:
last_primary_key = (next_page_token or {}).get("primary_key", "")
if last_primary_key:
where_conditions.append(f"{self.primary_key} > '{last_primary_key}'")
order_by_fields = ", ".join([self.cursor_field, self.primary_key] if self.primary_key else [self.cursor_field])
order_by_clause = f"ORDER BY {order_by_fields} ASC LIMIT {self.page_size}"
order_by_clause = f"ORDER BY {order_by_fields} ASC"

where_clause = f"WHERE {' AND '.join(where_conditions)}"
query = f"SELECT {select_fields} FROM {table_name} {where_clause} {order_by_clause}"
Expand Down
Expand Up @@ -565,64 +565,6 @@ def test_convert_to_standard_instance(stream_config, stream_api):
assert isinstance(rest_stream, IncrementalRestSalesforceStream)


def test_bulk_stream_paging(stream_config, stream_api_pk):
last_modified_date1 = "2022-10-01T00:00:00.000+00:00"
last_modified_date2 = "2022-10-02T00:00:00.000+00:00"
assert last_modified_date1 < last_modified_date2

stream_config["start_date"] = last_modified_date1
stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api_pk)
stream.page_size = 2

csv_header = "Field1,LastModifiedDate,Id"
pages = [
[f"test,{last_modified_date1},1", f"test,{last_modified_date1},2"],
[f"test,{last_modified_date1},3", f"test,{last_modified_date2},4"],
[f"test,{last_modified_date2},5", f"test,{last_modified_date2},6"],
[f"test,{last_modified_date2},7"],
]

with requests_mock.Mocker() as mocked_requests:

post_responses = []
for job_id, page in enumerate(pages, 1):
post_responses.append({"json": {"id": f"{job_id}"}})
mocked_requests.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "JobComplete"})
mocked_requests.register_uri("GET", stream.path() + f"/{job_id}/results", text="\n".join([csv_header] + page))
mocked_requests.register_uri("DELETE", stream.path() + f"/{job_id}")
mocked_requests.register_uri("POST", stream.path(), post_responses)

stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental)))
records = list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices))

assert records == [
{"Field1": "test", "Id": 1, "LastModifiedDate": last_modified_date1},
{"Field1": "test", "Id": 2, "LastModifiedDate": last_modified_date1},
{"Field1": "test", "Id": 3, "LastModifiedDate": last_modified_date1},
{"Field1": "test", "Id": 4, "LastModifiedDate": last_modified_date2},
{"Field1": "test", "Id": 5, "LastModifiedDate": last_modified_date2}, # duplicate record
{"Field1": "test", "Id": 6, "LastModifiedDate": last_modified_date2},
{"Field1": "test", "Id": 7, "LastModifiedDate": last_modified_date2},
]

def get_query(request_index):
return mocked_requests.request_history[request_index].json()["query"]

SELECT = "SELECT LastModifiedDate, Id FROM Account"
ORDER_BY = "ORDER BY LastModifiedDate, Id ASC LIMIT 2"

assert get_query(0) == f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} AND LastModifiedDate < {stream_slices['end_date']} {ORDER_BY}"

q = f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} AND LastModifiedDate < {stream_slices['end_date']} AND Id > '2' {ORDER_BY}"
assert get_query(4) == q

q = f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} AND LastModifiedDate < {stream_slices['end_date']} AND Id > '4' {ORDER_BY}"
assert get_query(8) == q

q = f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} AND LastModifiedDate < {stream_slices['end_date']} AND Id > '6' {ORDER_BY}"
assert get_query(12) == q


def test_rest_stream_init_with_too_many_properties(stream_config, stream_api_v2_too_many_properties):
with pytest.raises(AssertionError):
# v2 means the stream is going to be a REST stream.
Expand Down
11 changes: 6 additions & 5 deletions docs/integrations/sources/salesforce.md
Expand Up @@ -129,13 +129,14 @@ Now that you have set up the Salesforce source connector, check out the followin

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------|
| 2.0.13 | 2023-04-30 | [25700](https://github.com/airbytehq/airbyte/pull/25700) | Remove pagination and query limits |
| 2.0.12 | 2023-04-25 | [25507](https://github.com/airbytehq/airbyte/pull/25507) | Update API version to 57 |
| 2.0.11 | 2023-04-20 | [25352](https://github.com/airbytehq/airbyte/pull/25352) | Update API version to 53 |
| 2.0.10 | 2023-04-05 | [24888](https://github.com/airbytehq/airbyte/pull/24888) | Add more frequent checkpointing |
| 2.0.9 | 2023-03-29 | [24660](https://github.com/airbytehq/airbyte/pull/24660) | Set default start_date. Sync for last two years if start date is not present in config |
| 2.0.8 | 2023-03-30 | [24690](https://github.com/airbytehq/airbyte/pull/24690) | Handle rate limit for bulk operations |
| 2.0.7 | 2023-03-14 | [24071](https://github.com/airbytehq/airbyte/pull/24071) | Remove regex pattern for start_date, use format validation instead |
| 2.0.6 | 2023-03-03 | [22891](https://github.com/airbytehq/airbyte/pull/22891) | Specified date formatting in specification |
| 2.0.10 | 2023-04-05 | [24888](https://github.com/airbytehq/airbyte/pull/24888) | Add more frequent checkpointing |
| 2.0.9 | 2023-03-29 | [24660](https://github.com/airbytehq/airbyte/pull/24660) | Set default start_date. Sync for last two years if start date is not present in config |
| 2.0.8 | 2023-03-30 | [24690](https://github.com/airbytehq/airbyte/pull/24690) | Handle rate limit for bulk operations |
| 2.0.7 | 2023-03-14 | [24071](https://github.com/airbytehq/airbyte/pull/24071) | Remove regex pattern for start_date, use format validation instead |
| 2.0.6 | 2023-03-03 | [22891](https://github.com/airbytehq/airbyte/pull/22891) | Specified date formatting in specification |
| 2.0.5 | 2023-03-01 | [23610](https://github.com/airbytehq/airbyte/pull/23610) | Handle different Salesforce page size for different queries |
| 2.0.4 | 2023-02-24 | [22636](https://github.com/airbytehq/airbyte/pull/22636) | Turn on default HttpAvailabilityStrategy for all streams that are not of class BulkSalesforceStream |
| 2.0.3 | 2023-02-17 | [23190](https://github.com/airbytehq/airbyte/pull/23190) | In case properties are chunked, fetch primary key in every chunk |
Expand Down

0 comments on commit c713395

Please sign in to comment.