Skip to content

Commit

Permalink
Source Salesforce: change the sequence of requests (#23610)
Browse files Browse the repository at this point in the history
* #1571 source salesforce: change the sequence of requests

* #1571 source Salesforce: format

* #1571 source salesforce: fix endless loop

* #1571 source salesforce: update unit tests

* fix infinite loop for streams with no records and refactor properties into a helper object to organize state

* auto-bump connector version

---------

Co-authored-by: brianjlai <brian.lai@airbyte.io>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
3 people authored and jbfbell committed Mar 6, 2023
1 parent 007af0b commit 7d1bb66
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1668,7 +1668,7 @@
- name: Salesforce
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
dockerRepository: airbyte/source-salesforce
dockerImageTag: 2.0.4
dockerImageTag: 2.0.5
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13169,7 +13169,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-salesforce:2.0.4"
- dockerImage: "airbyte/source-salesforce:2.0.5"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=2.0.4
LABEL io.airbyte.version=2.0.5
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,23 @@ def get_error_display_message(self, exception: BaseException) -> Optional[str]:
return super().get_error_display_message(exception)


class PropertyChunk:
"""
Object that is used to keep track of the current state of a chunk of properties for the stream of records being synced.
"""

properties: Mapping[str, Any]
first_time: bool
record_counter: int
next_page: Optional[Mapping[str, Any]]

def __init__(self, properties: Mapping[str, Any]):
self.properties = properties
self.first_time = True
self.record_counter = 0
self.next_page = None


class RestSalesforceStream(SalesforceStream):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -155,6 +172,22 @@ def empty_props_with_pk_if_present():
if local_properties:
yield local_properties

@staticmethod
def _next_chunk_id(property_chunks: Mapping[int, PropertyChunk]) -> Optional[int]:
"""
Figure out which chunk is going to be read next.
It should be the one with the least number of records read by the moment.
"""
non_exhausted_chunks = {
# We skip chunks that have already attempted a sync before and do not have a next page
chunk_id: property_chunk.record_counter
for chunk_id, property_chunk in property_chunks.items()
if property_chunk.first_time or property_chunk.next_page
}
if not non_exhausted_chunks:
return None
return min(non_exhausted_chunks, key=non_exhausted_chunks.get)

def _read_pages(
self,
records_generator_fn: Callable[
Expand All @@ -164,54 +197,67 @@ def _read_pages(
stream_state: Mapping[str, Any] = None,
) -> Iterable[StreamData]:
stream_state = stream_state or {}
pagination_complete = False
records = {}
next_pages = {}

while not pagination_complete:
index = 0
for index, property_chunk in enumerate(self.chunk_properties()):
request, response = self._fetch_next_page(stream_slice, stream_state, next_pages.get(index), property_chunk)
next_pages[index] = self.next_page_token(response)
chunk_page_records = records_generator_fn(request, response, stream_state, stream_slice)
if not self.too_many_properties:
# this is the case when a stream has no primary key
# (is allowed when properties length does not exceed the maximum value)
# so there would be a single iteration, therefore we may and should yield records immediately
yield from chunk_page_records
break
chunk_page_records = {record[self.primary_key]: record for record in chunk_page_records}

for record_id, record in chunk_page_records.items():
if record_id not in records:
records[record_id] = (record, 1)
continue
incomplete_record, counter = records[record_id]
incomplete_record.update(record)
counter += 1
records[record_id] = (incomplete_record, counter)

for record_id, (record, counter) in records.items():
if counter != index + 1:
# Because we make multiple calls to query N records (each call to fetch X properties of all the N records),
# there's a chance that the number of records corresponding to the query may change between the calls. This
# may result in data inconsistency. We skip such records for now and log a warning message.
self.logger.warning(
f"Inconsistent record with primary key {record_id} found. It consists of {counter} chunks instead of {index + 1}. "
f"Skipping it."
)
continue
yield record
records_by_primary_key = {}
property_chunks: Mapping[int, PropertyChunk] = {
index: PropertyChunk(properties=properties) for index, properties in enumerate(self.chunk_properties())
}
while True:
chunk_id = self._next_chunk_id(property_chunks)
if chunk_id is None:
# pagination complete
break

records = {}
property_chunk = property_chunks[chunk_id]
request, response = self._fetch_next_page_for_chunk(
stream_slice, stream_state, property_chunk.next_page, property_chunk.properties
)

if not any(next_pages.values()):
pagination_complete = True
# When this is the first time we're getting a chunk's records, we set this to False to be used when deciding the next chunk
if property_chunk.first_time:
property_chunk.first_time = False
property_chunk.next_page = self.next_page_token(response)
chunk_page_records = records_generator_fn(request, response, stream_state, stream_slice)
if not self.too_many_properties:
# this is the case when a stream has no primary key
# (it is allowed when properties length does not exceed the maximum value)
# so there would be a single chunk, therefore we may and should yield records immediately
for record in chunk_page_records:
property_chunk.record_counter += 1
yield record
continue

# stick together different parts of records by their primary key and emit if a record is complete
for record in chunk_page_records:
property_chunk.record_counter += 1
record_id = record[self.primary_key]
if record_id not in records_by_primary_key:
records_by_primary_key[record_id] = (record, 1)
continue
partial_record, counter = records_by_primary_key[record_id]
partial_record.update(record)
counter += 1
if counter == len(property_chunks):
yield partial_record # now it's complete
records_by_primary_key.pop(record_id)
else:
records_by_primary_key[record_id] = (partial_record, counter)

# Process what's left.
# Because we make multiple calls to query N records (each call to fetch X properties of all the N records),
# there's a chance that the number of records corresponding to the query may change between the calls.
# Select 'a', 'b' from table order by pk -> returns records with ids `1`, `2`
# <insert smth.>
# Select 'c', 'd' from table order by pk -> returns records with ids `1`, `3`
# Then records `2` and `3` would be incomplete.
# This may result in data inconsistency. We skip such records for now and log a warning message.
incomplete_record_ids = ",".join([str(key) for key in records_by_primary_key])
if incomplete_record_ids:
self.logger.warning(f"Inconsistent record(s) with primary keys {incomplete_record_ids} found. Skipping them.")

# Always return an empty generator just in case no records were ever yielded
yield from []

def _fetch_next_page(
def _fetch_next_page_for_chunk(
self,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
Expand All @@ -229,7 +275,6 @@ def _fetch_next_page(
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

response = self._send_request(request, request_kwargs)
return request, response

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,11 @@ def test_bulk_stream_fallback_to_rest(mocker, requests_mock, stream_config, stre
"POST",
"https://fase-account.salesforce.com/services/data/v52.0/jobs/query",
status_code=400,
json=[{
"errorCode": "INVALIDENTITY",
"message": "CustomEntity is not supported by the Bulk API"
}]
json=[{"errorCode": "INVALIDENTITY", "message": "CustomEntity is not supported by the Bulk API"}],
)
rest_stream_records = [
{"id": 1, "name": "custom entity", "created": "2010-11-11"},
{"id": 11, "name": "custom entity", "created": "2020-01-02"}
{"id": 11, "name": "custom entity", "created": "2020-01-02"},
]
# mock REST API
mocker.patch("source_salesforce.source.RestSalesforceStream.read_records", lambda *args, **kwargs: iter(rest_stream_records))
Expand Down Expand Up @@ -640,48 +637,47 @@ def test_too_many_properties(stream_config, stream_api_v2_pk_too_many_properties
[
{
"json": {
"nextRecordsUrl": next_page_url,
"records": [{"Id": 1, "propertyA": "A"}, {"Id": 2, "propertyA": "A"}]
"records": [
{"Id": 1, "propertyA": "A"},
{"Id": 2, "propertyA": "A"},
{"Id": 3, "propertyA": "A"},
{"Id": 4, "propertyA": "A"},
]
}
},
{
"json": {
"nextRecordsUrl": next_page_url,
"records": [{"Id": 1, "propertyB": "B"}, {"Id": 2, "propertyB": "B"}]
}
},
# 2 for 2 chunks above and 1 for a chunk below
*[{"json": {"records": [{"Id": 1}, {"Id": 2}], "nextRecordsUrl": next_page_url}} for _ in range(chunks_len - 3)],
{
"json": {
"records": [{"Id": 1}, {"Id": 2}]
}
},
{
"json": {
"records": [{"Id": 3, "propertyA": "A"}, {"Id": 4, "propertyA": "A"}]
}
},
{
"json": {
"records": [{"Id": 3, "propertyB": "B"}, {"Id": 4, "propertyB": "B"}]
}
},
# 2 for 2 chunks above and 1 for a chunk below
*[{"json": {"records": [{"Id": 3}, {"Id": 4}]}} for _ in range(chunks_len - 3)],
{
"json": {
"records": [{"Id": 3}, {"Id": 4}]
}
}
]
{"json": {"nextRecordsUrl": next_page_url, "records": [{"Id": 1, "propertyB": "B"}, {"Id": 2, "propertyB": "B"}]}},
# 2 for 2 chunks above
*[{"json": {"records": [{"Id": 1}, {"Id": 2}], "nextRecordsUrl": next_page_url}} for _ in range(chunks_len - 2)],
{"json": {"records": [{"Id": 3, "propertyB": "B"}, {"Id": 4, "propertyB": "B"}]}},
# 2 for 1 chunk above and 1 chunk had no next page
*[{"json": {"records": [{"Id": 3}, {"Id": 4}]}} for _ in range(chunks_len - 2)],
],
)
records = list(stream.read_records(sync_mode=SyncMode.full_refresh))
assert records == [
{"Id": 1, "propertyA": "A", "propertyB": "B"},
{"Id": 2, "propertyA": "A", "propertyB": "B"},
{"Id": 3, "propertyA": "A", "propertyB": "B"},
{"Id": 4, "propertyA": "A", "propertyB": "B"}
{"Id": 4, "propertyA": "A", "propertyB": "B"},
]
for call in requests_mock.request_history:
assert len(call.url) < Salesforce.REQUEST_SIZE_LIMITS


def test_stream_with_no_records_in_response(stream_config, stream_api_v2_pk_too_many_properties, requests_mock):
stream = generate_stream("Account", stream_config, stream_api_v2_pk_too_many_properties)
chunks = list(stream.chunk_properties())
for chunk in chunks:
assert stream.primary_key in chunk
assert stream.too_many_properties
assert stream.primary_key
assert type(stream) == RestSalesforceStream
url = "https://fase-account.salesforce.com/services/data/v52.0/queryAll"
requests_mock.get(
url,
[
{"json": {"records": []}},
],
)
records = list(stream.read_records(sync_mode=SyncMode.full_refresh))
assert records == []
2 changes: 1 addition & 1 deletion connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@
| **SFTP** | <img alt="SFTP icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/sftp.svg" height="30" height="30"/> | Source | airbyte/source-sftp:0.1.2 | alpha | [link](https://docs.airbyte.com/integrations/sources/sftp) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sftp) | <small>`a827c52e-791c-4135-a245-e233c5255199`</small> |
| **SFTP Bulk** | <img alt="SFTP Bulk icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/sftp.svg" height="30" height="30"/> | Source | airbyte/source-sftp-bulk:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/sftp-bulk) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sftp-bulk) | <small>`31e3242f-dee7-4cdc-a4b8-8e06c5458517`</small> |
| **SalesLoft** | <img alt="SalesLoft icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/salesloft.svg" height="30" height="30"/> | Source | airbyte/source-salesloft:0.1.4 | alpha | [link](https://docs.airbyte.com/integrations/sources/salesloft) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-salesloft) | <small>`41991d12-d4b5-439e-afd0-260a31d4c53f`</small> |
| **Salesforce** | <img alt="Salesforce icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/salesforce.svg" height="30" height="30"/> | Source | airbyte/source-salesforce:2.0.4 | generally_available | [link](https://docs.airbyte.com/integrations/sources/salesforce) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-salesforce) | <small>`b117307c-14b6-41aa-9422-947e34922962`</small> |
| **Salesforce** | <img alt="Salesforce icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/salesforce.svg" height="30" height="30"/> | Source | airbyte/source-salesforce:2.0.5 | generally_available | [link](https://docs.airbyte.com/integrations/sources/salesforce) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-salesforce) | <small>`b117307c-14b6-41aa-9422-947e34922962`</small> |
| **Sample Data (Faker)** | <img alt="Sample Data (Faker) icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/faker.svg" height="30" height="30"/> | Source | airbyte/source-faker:2.0.3 | beta | [link](https://docs.airbyte.com/integrations/sources/faker) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-faker) | <small>`dfd88b22-b603-4c3d-aad7-3701784586b1`</small> |
| **SearchMetrics** | <img alt="SearchMetrics icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/searchmetrics.svg" height="30" height="30"/> | Source | airbyte/source-search-metrics:0.1.1 | alpha | [link](https://docs.airbyte.com/integrations/sources/search-metrics) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-search-metrics) | <small>`8d7ef552-2c0f-11ec-8d3d-0242ac130003`</small> |
| **Secoda** | <img alt="Secoda icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/secoda.svg" height="30" height="30"/> | Source | airbyte/source-secoda:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/secoda) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-secoda) | <small>`da9fc6b9-8059-4be0-b204-f56e22e4d52d`</small> |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ Now that you have set up the Salesforce source connector, check out the followin

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------|
| 2.0.5 | 2023-03-01 | [23610](https://github.com/airbytehq/airbyte/pull/23610) | Handle different Salesforce page size for different queries |
| 2.0.4 | 2023-02-24 | [22636](https://github.com/airbytehq/airbyte/pull/22636) | Turn on default HttpAvailabilityStrategy for all streams that are not of class BulkSalesforceStream |
| 2.0.3 | 2023-02-17 | [23190](https://github.com/airbytehq/airbyte/pull/23190) | In case properties are chunked, fetch primary key in every chunk |
| 2.0.2 | 2023-02-13 | [22896](https://github.com/airbytehq/airbyte/pull/22896) | Count the URL length based on encoded params |
Expand Down

0 comments on commit 7d1bb66

Please sign in to comment.