Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source SalesForce: Remove pagination and query limits #25700

Merged
merged 14 commits into from May 19, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -13,5 +13,5 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=2.0.11
LABEL io.airbyte.version=2.0.12
LABEL io.airbyte.name=airbyte/source-salesforce
Expand Up @@ -579,7 +579,7 @@ def transform_empty_string_to_none(instance: Any, schema: Any):

class IncrementalRestSalesforceStream(RestSalesforceStream, ABC):
state_checkpoint_interval = 500
STREAM_SLICE_STEP = 120
STREAM_SLICE_STEP = 30
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the impact of changing this? Do other salesforce syncs now take longer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By decreasing the window we make responses smaller and thus iterate over them quicker.
According to screen it takes roughly the same time to sync (or even slightly less):
First sync was performed using 2.0.13-dev.b206a930, next 2 using 2.0.9
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

馃憤馃徏


def __init__(self, replication_key: str, start_date: Optional[str], **kwargs):
super().__init__(**kwargs)
Expand Down Expand Up @@ -663,8 +663,6 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late

class BulkIncrementalSalesforceStream(BulkSalesforceStream, IncrementalRestSalesforceStream):
def next_page_token(self, last_record: Mapping[str, Any]) -> Optional[Mapping[str, Any]]:
if self.name not in UNSUPPORTED_FILTERING_STREAMS:
return {"next_token": last_record[self.cursor_field], "primary_key": last_record.get(self.primary_key)}
return None

def request_params(
Expand All @@ -683,11 +681,8 @@ def request_params(
order_by_clause = ""

if self.name not in UNSUPPORTED_FILTERING_STREAMS:
last_primary_key = (next_page_token or {}).get("primary_key", "")
if last_primary_key:
where_conditions.append(f"{self.primary_key} > '{last_primary_key}'")
order_by_fields = ", ".join([self.cursor_field, self.primary_key] if self.primary_key else [self.cursor_field])
order_by_clause = f"ORDER BY {order_by_fields} ASC LIMIT {self.page_size}"
order_by_clause = f"ORDER BY {order_by_fields} ASC"
maxi297 marked this conversation as resolved.
Show resolved Hide resolved

where_clause = f"WHERE {' AND '.join(where_conditions)}"
query = f"SELECT {select_fields} FROM {table_name} {where_clause} {order_by_clause}"
Expand Down
Expand Up @@ -565,64 +565,6 @@ def test_convert_to_standard_instance(stream_config, stream_api):
assert isinstance(rest_stream, IncrementalRestSalesforceStream)


def test_bulk_stream_paging(stream_config, stream_api_pk):
last_modified_date1 = "2022-10-01T00:00:00.000+00:00"
last_modified_date2 = "2022-10-02T00:00:00.000+00:00"
assert last_modified_date1 < last_modified_date2

stream_config["start_date"] = last_modified_date1
stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api_pk)
stream.page_size = 2

csv_header = "Field1,LastModifiedDate,Id"
pages = [
[f"test,{last_modified_date1},1", f"test,{last_modified_date1},2"],
[f"test,{last_modified_date1},3", f"test,{last_modified_date2},4"],
[f"test,{last_modified_date2},5", f"test,{last_modified_date2},6"],
[f"test,{last_modified_date2},7"],
]

with requests_mock.Mocker() as mocked_requests:

post_responses = []
for job_id, page in enumerate(pages, 1):
post_responses.append({"json": {"id": f"{job_id}"}})
mocked_requests.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "JobComplete"})
mocked_requests.register_uri("GET", stream.path() + f"/{job_id}/results", text="\n".join([csv_header] + page))
mocked_requests.register_uri("DELETE", stream.path() + f"/{job_id}")
mocked_requests.register_uri("POST", stream.path(), post_responses)

stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental)))
records = list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices))

assert records == [
{"Field1": "test", "Id": 1, "LastModifiedDate": last_modified_date1},
{"Field1": "test", "Id": 2, "LastModifiedDate": last_modified_date1},
{"Field1": "test", "Id": 3, "LastModifiedDate": last_modified_date1},
{"Field1": "test", "Id": 4, "LastModifiedDate": last_modified_date2},
{"Field1": "test", "Id": 5, "LastModifiedDate": last_modified_date2}, # duplicate record
{"Field1": "test", "Id": 6, "LastModifiedDate": last_modified_date2},
{"Field1": "test", "Id": 7, "LastModifiedDate": last_modified_date2},
]

def get_query(request_index):
return mocked_requests.request_history[request_index].json()["query"]

SELECT = "SELECT LastModifiedDate, Id FROM Account"
ORDER_BY = "ORDER BY LastModifiedDate, Id ASC LIMIT 2"

assert get_query(0) == f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} AND LastModifiedDate < {stream_slices['end_date']} {ORDER_BY}"

q = f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} AND LastModifiedDate < {stream_slices['end_date']} AND Id > '2' {ORDER_BY}"
assert get_query(4) == q

q = f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} AND LastModifiedDate < {stream_slices['end_date']} AND Id > '4' {ORDER_BY}"
assert get_query(8) == q

q = f"{SELECT} WHERE LastModifiedDate >= {last_modified_date1} AND LastModifiedDate < {stream_slices['end_date']} AND Id > '6' {ORDER_BY}"
assert get_query(12) == q


def test_rest_stream_init_with_too_many_properties(stream_config, stream_api_v2_too_many_properties):
with pytest.raises(AssertionError):
# v2 means the stream is going to be a REST stream.
Expand Down
11 changes: 6 additions & 5 deletions docs/integrations/sources/salesforce.md
Expand Up @@ -129,12 +129,13 @@ Now that you have set up the Salesforce source connector, check out the followin

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------|
| 2.0.12 | 2023-04-30 | [25700](https://github.com/airbytehq/airbyte/pull/25700) | Remove pagination and query limits |
| 2.0.11 | 2023-04-20 | [25352](https://github.com/airbytehq/airbyte/pull/25352) | Update API version to 53 |
| 2.0.10 | 2023-04-05 | [24888](https://github.com/airbytehq/airbyte/pull/24888) | Add more frequent checkpointing |
| 2.0.9 | 2023-03-29 | [24660](https://github.com/airbytehq/airbyte/pull/24660) | Set default start_date. Sync for last two years if start date is not present in config |
| 2.0.8 | 2023-03-30 | [24690](https://github.com/airbytehq/airbyte/pull/24690) | Handle rate limit for bulk operations |
| 2.0.7 | 2023-03-14 | [24071](https://github.com/airbytehq/airbyte/pull/24071) | Remove regex pattern for start_date, use format validation instead |
| 2.0.6 | 2023-03-03 | [22891](https://github.com/airbytehq/airbyte/pull/22891) | Specified date formatting in specification |
| 2.0.10 | 2023-04-05 | [24888](https://github.com/airbytehq/airbyte/pull/24888) | Add more frequent checkpointing |
| 2.0.9 | 2023-03-29 | [24660](https://github.com/airbytehq/airbyte/pull/24660) | Set default start_date. Sync for last two years if start date is not present in config |
| 2.0.8 | 2023-03-30 | [24690](https://github.com/airbytehq/airbyte/pull/24690) | Handle rate limit for bulk operations |
| 2.0.7 | 2023-03-14 | [24071](https://github.com/airbytehq/airbyte/pull/24071) | Remove regex pattern for start_date, use format validation instead |
| 2.0.6 | 2023-03-03 | [22891](https://github.com/airbytehq/airbyte/pull/22891) | Specified date formatting in specification |
| 2.0.5 | 2023-03-01 | [23610](https://github.com/airbytehq/airbyte/pull/23610) | Handle different Salesforce page size for different queries |
| 2.0.4 | 2023-02-24 | [22636](https://github.com/airbytehq/airbyte/pull/22636) | Turn on default HttpAvailabilityStrategy for all streams that are not of class BulkSalesforceStream |
| 2.0.3 | 2023-02-17 | [23190](https://github.com/airbytehq/airbyte/pull/23190) | In case properties are chunked, fetch primary key in every chunk |
Expand Down