Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Salesforce: change the sequence of requests #23610

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=2.0.4
LABEL io.airbyte.version=2.0.5
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,23 @@ def get_error_display_message(self, exception: BaseException) -> Optional[str]:
return super().get_error_display_message(exception)


class PropertyChunk:
"""
Object that is used to keep track of the current state of a chunk of properties for the stream of records being synced.
"""

properties: Mapping[str, Any]
first_time: bool
record_counter: int
next_page: Optional[Mapping[str, Any]]

def __init__(self, properties: Mapping[str, Any]):
self.properties = properties
self.first_time = True
self.record_counter = 0
self.next_page = None


class RestSalesforceStream(SalesforceStream):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -155,6 +172,22 @@ def empty_props_with_pk_if_present():
if local_properties:
yield local_properties

@staticmethod
def _next_chunk_id(property_chunks: Mapping[int, PropertyChunk]) -> Optional[int]:
"""
Figure out which chunk is going to be read next.
It should be the one with the least number of records read by the moment.
"""
non_exhausted_chunks = {
# We skip chunks that have already attempted a sync before and do not have a next page
chunk_id: property_chunk.record_counter
for chunk_id, property_chunk in property_chunks.items()
if property_chunk.first_time or property_chunk.next_page
davydov-d marked this conversation as resolved.
Show resolved Hide resolved
}
if not non_exhausted_chunks:
return None
return min(non_exhausted_chunks, key=non_exhausted_chunks.get)

def _read_pages(
self,
records_generator_fn: Callable[
Expand All @@ -164,54 +197,67 @@ def _read_pages(
stream_state: Mapping[str, Any] = None,
) -> Iterable[StreamData]:
stream_state = stream_state or {}
pagination_complete = False
records = {}
next_pages = {}

while not pagination_complete:
index = 0
for index, property_chunk in enumerate(self.chunk_properties()):
request, response = self._fetch_next_page(stream_slice, stream_state, next_pages.get(index), property_chunk)
next_pages[index] = self.next_page_token(response)
chunk_page_records = records_generator_fn(request, response, stream_state, stream_slice)
if not self.too_many_properties:
# this is the case when a stream has no primary key
# (is allowed when properties length does not exceed the maximum value)
# so there would be a single iteration, therefore we may and should yield records immediately
yield from chunk_page_records
break
chunk_page_records = {record[self.primary_key]: record for record in chunk_page_records}

for record_id, record in chunk_page_records.items():
if record_id not in records:
records[record_id] = (record, 1)
continue
incomplete_record, counter = records[record_id]
incomplete_record.update(record)
counter += 1
records[record_id] = (incomplete_record, counter)

for record_id, (record, counter) in records.items():
if counter != index + 1:
# Because we make multiple calls to query N records (each call to fetch X properties of all the N records),
# there's a chance that the number of records corresponding to the query may change between the calls. This
# may result in data inconsistency. We skip such records for now and log a warning message.
self.logger.warning(
f"Inconsistent record with primary key {record_id} found. It consists of {counter} chunks instead of {index + 1}. "
f"Skipping it."
)
continue
yield record
records_by_primary_key = {}
property_chunks: Mapping[int, PropertyChunk] = {
index: PropertyChunk(properties=properties) for index, properties in enumerate(self.chunk_properties())
}
while True:
chunk_id = self._next_chunk_id(property_chunks)
if chunk_id is None:
# pagination complete
break

records = {}
property_chunk = property_chunks[chunk_id]
request, response = self._fetch_next_page_for_chunk(
stream_slice, stream_state, property_chunk.next_page, property_chunk.properties
)

if not any(next_pages.values()):
pagination_complete = True
# When this is the first time we're getting a chunk's records, we set this to False to be used when deciding the next chunk
if property_chunk.first_time:
property_chunk.first_time = False
property_chunk.next_page = self.next_page_token(response)
chunk_page_records = records_generator_fn(request, response, stream_state, stream_slice)
if not self.too_many_properties:
# this is the case when a stream has no primary key
# (it is allowed when properties length does not exceed the maximum value)
# so there would be a single chunk, therefore we may and should yield records immediately
for record in chunk_page_records:
property_chunk.record_counter += 1
yield record
continue

# stick together different parts of records by their primary key and emit if a record is complete
for record in chunk_page_records:
property_chunk.record_counter += 1
record_id = record[self.primary_key]
if record_id not in records_by_primary_key:
records_by_primary_key[record_id] = (record, 1)
continue
partial_record, counter = records_by_primary_key[record_id]
partial_record.update(record)
counter += 1
if counter == len(property_chunks):
yield partial_record # now it's complete
records_by_primary_key.pop(record_id)
else:
records_by_primary_key[record_id] = (partial_record, counter)

# Process what's left.
# Because we make multiple calls to query N records (each call to fetch X properties of all the N records),
# there's a chance that the number of records corresponding to the query may change between the calls.
# Select 'a', 'b' from table order by pk -> returns records with ids `1`, `2`
# <insert smth.>
# Select 'c', 'd' from table order by pk -> returns records with ids `1`, `3`
# Then records `2` and `3` would be incomplete.
# This may result in data inconsistency. We skip such records for now and log a warning message.
davydov-d marked this conversation as resolved.
Show resolved Hide resolved
incomplete_record_ids = ",".join([str(key) for key in records_by_primary_key])
if incomplete_record_ids:
self.logger.warning(f"Inconsistent record(s) with primary keys {incomplete_record_ids} found. Skipping them.")

# Always return an empty generator just in case no records were ever yielded
yield from []

def _fetch_next_page(
def _fetch_next_page_for_chunk(
self,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
Expand All @@ -229,7 +275,6 @@ def _fetch_next_page(
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

response = self._send_request(request, request_kwargs)
return request, response

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,11 @@ def test_bulk_stream_fallback_to_rest(mocker, requests_mock, stream_config, stre
"POST",
"https://fase-account.salesforce.com/services/data/v52.0/jobs/query",
status_code=400,
json=[{
"errorCode": "INVALIDENTITY",
"message": "CustomEntity is not supported by the Bulk API"
}]
json=[{"errorCode": "INVALIDENTITY", "message": "CustomEntity is not supported by the Bulk API"}],
)
rest_stream_records = [
{"id": 1, "name": "custom entity", "created": "2010-11-11"},
{"id": 11, "name": "custom entity", "created": "2020-01-02"}
{"id": 11, "name": "custom entity", "created": "2020-01-02"},
]
# mock REST API
mocker.patch("source_salesforce.source.RestSalesforceStream.read_records", lambda *args, **kwargs: iter(rest_stream_records))
Expand Down Expand Up @@ -640,48 +637,47 @@ def test_too_many_properties(stream_config, stream_api_v2_pk_too_many_properties
[
{
"json": {
"nextRecordsUrl": next_page_url,
"records": [{"Id": 1, "propertyA": "A"}, {"Id": 2, "propertyA": "A"}]
"records": [
{"Id": 1, "propertyA": "A"},
{"Id": 2, "propertyA": "A"},
{"Id": 3, "propertyA": "A"},
{"Id": 4, "propertyA": "A"},
]
}
},
{
"json": {
"nextRecordsUrl": next_page_url,
"records": [{"Id": 1, "propertyB": "B"}, {"Id": 2, "propertyB": "B"}]
}
},
# 2 for 2 chunks above and 1 for a chunk below
*[{"json": {"records": [{"Id": 1}, {"Id": 2}], "nextRecordsUrl": next_page_url}} for _ in range(chunks_len - 3)],
{
"json": {
"records": [{"Id": 1}, {"Id": 2}]
}
},
{
"json": {
"records": [{"Id": 3, "propertyA": "A"}, {"Id": 4, "propertyA": "A"}]
}
},
{
"json": {
"records": [{"Id": 3, "propertyB": "B"}, {"Id": 4, "propertyB": "B"}]
}
},
# 2 for 2 chunks above and 1 for a chunk below
*[{"json": {"records": [{"Id": 3}, {"Id": 4}]}} for _ in range(chunks_len - 3)],
{
"json": {
"records": [{"Id": 3}, {"Id": 4}]
}
}
]
{"json": {"nextRecordsUrl": next_page_url, "records": [{"Id": 1, "propertyB": "B"}, {"Id": 2, "propertyB": "B"}]}},
# 2 for 2 chunks above
*[{"json": {"records": [{"Id": 1}, {"Id": 2}], "nextRecordsUrl": next_page_url}} for _ in range(chunks_len - 2)],
{"json": {"records": [{"Id": 3, "propertyB": "B"}, {"Id": 4, "propertyB": "B"}]}},
# 2 for 1 chunk above and 1 chunk had no next page
*[{"json": {"records": [{"Id": 3}, {"Id": 4}]}} for _ in range(chunks_len - 2)],
],
)
records = list(stream.read_records(sync_mode=SyncMode.full_refresh))
assert records == [
{"Id": 1, "propertyA": "A", "propertyB": "B"},
{"Id": 2, "propertyA": "A", "propertyB": "B"},
{"Id": 3, "propertyA": "A", "propertyB": "B"},
{"Id": 4, "propertyA": "A", "propertyB": "B"}
{"Id": 4, "propertyA": "A", "propertyB": "B"},
]
for call in requests_mock.request_history:
assert len(call.url) < Salesforce.REQUEST_SIZE_LIMITS


def test_stream_with_no_records_in_response(stream_config, stream_api_v2_pk_too_many_properties, requests_mock):
stream = generate_stream("Account", stream_config, stream_api_v2_pk_too_many_properties)
chunks = list(stream.chunk_properties())
for chunk in chunks:
assert stream.primary_key in chunk
assert stream.too_many_properties
assert stream.primary_key
assert type(stream) == RestSalesforceStream
url = "https://fase-account.salesforce.com/services/data/v52.0/queryAll"
requests_mock.get(
url,
[
{"json": {"records": []}},
],
)
records = list(stream.read_records(sync_mode=SyncMode.full_refresh))
assert records == []
1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ Now that you have set up the Salesforce source connector, check out the followin

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------|
| 2.0.5 | 2023-03-01 | [23610](https://github.com/airbytehq/airbyte/pull/23610) | Handle different Salesforce page size for different queries |
| 2.0.4 | 2023-02-24 | [22636](https://github.com/airbytehq/airbyte/pull/22636) | Turn on default HttpAvailabilityStrategy for all streams that are not of class BulkSalesforceStream |
| 2.0.3 | 2023-02-17 | [23190](https://github.com/airbytehq/airbyte/pull/23190) | In case properties are chunked, fetch primary key in every chunk |
| 2.0.2 | 2023-02-13 | [22896](https://github.com/airbytehq/airbyte/pull/22896) | Count the URL length based on encoded params |
Expand Down