Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

馃帀Source Salesforce: add checkpointing #24888

Merged
merged 9 commits into from
Apr 24, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -23087,7 +23087,7 @@
"sourceDefinitionId": "b117307c-14b6-41aa-9422-947e34922962",
"name": "Salesforce",
"dockerRepository": "airbyte/source-salesforce",
"dockerImageTag": "2.0.9",
"dockerImageTag": "2.0.10",
"documentationUrl": "https://docs.airbyte.com/integrations/sources/salesforce",
"icon": "salesforce.svg",
"sourceType": "api",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1830,7 +1830,7 @@
- name: Salesforce
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
dockerRepository: airbyte/source-salesforce
dockerImageTag: 2.0.9
dockerImageTag: 2.0.10
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13749,7 +13749,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-salesforce:2.0.9"
- dockerImage: "airbyte/source-salesforce:2.0.10"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=2.0.9
LABEL io.airbyte.version=2.0.10
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@ acceptance_tests:
bypass_reason: "impossible to fill the stream with data because it is an organic traffic"
- name: "Describe"
bypass_reason: "Data is not permanent"
timeout_seconds: 3600
fail_on_extra_columns: false
incremental:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/incremental_catalog.json"
future_state:
future_state_path: "integration_tests/future_state.json"
timeout_seconds: 7200
full_refresh:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
timeout_seconds: 3600
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ def test_not_queryable_stream(caplog, input_config):
)
def test_failed_jobs_with_successful_switching(caplog, input_sandbox_config, stream_name, log_messages):
stream = get_stream(input_sandbox_config, stream_name)
expected_record_ids = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh))
stream_slice = {
"start_date": "2023-01-01T00:00:00.000+0000",
"end_date": "2023-02-01T00:00:00.000+0000"
}
expected_record_ids = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))

create_query_matcher = re.compile(r"jobs/query$")
job_matcher = re.compile(r"jobs/query/fake_id$")
Expand All @@ -88,7 +92,7 @@ def test_failed_jobs_with_successful_switching(caplog, input_sandbox_config, str
m.register_uri("GET", job_matcher, json={"state": "Failed", "errorMessage": "unknown error"})
m.register_uri("DELETE", job_matcher, json={})
with caplog.at_level(logging.WARNING):
loaded_record_ids = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh))
loaded_record_ids = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))

caplog_rec_counter = len(caplog.records) - 1
for log_message in log_messages:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datetime import datetime
from pathlib import Path

import pendulum
import pytest
import requests
from airbyte_cdk.models import SyncMode
Expand Down Expand Up @@ -66,8 +67,7 @@ def update_note(stream, note_id, headers):


def get_stream_state():
state_date = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
return {"LastModifiedDate": state_date}
return {"LastModifiedDate": pendulum.now(tz="UTC").add(days=-1).isoformat(timespec="milliseconds")}


def test_update_for_deleted_record(stream):
Expand All @@ -79,21 +79,54 @@ def test_update_for_deleted_record(stream):

created_note_id = response.json()["id"]

notes = set(record["Id"] for record in stream.read_records(sync_mode=None))
assert created_note_id in notes, "The stream didn't return the note we created"
# A record may not be accessible right after creation. This workaround makes few attempts to receive latest record
notes = []
attempts = 10
while created_note_id not in notes:
now = pendulum.now(tz="UTC")
stream_slice = {
"start_date": now.add(days=-1).isoformat(timespec="milliseconds"),
"end_date": now.isoformat(timespec="milliseconds")
}
notes = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))
try:
assert created_note_id in notes, "The stream didn't return the note we created"
break
except Exception as e:
if attempts:
time.sleep(2)
else:
raise e
attempts = attempts - 1

response = delete_note(stream, created_note_id, headers)
assert response.status_code == 204, "Note was not deleted"

is_note_updated = False
is_deleted = False
for record in stream.read_records(sync_mode=SyncMode.incremental, stream_state=stream_state):
if created_note_id == record["Id"]:
is_note_updated = True
is_deleted = record["IsDeleted"]
# A record may still be accessible right after deletion for some time
attempts = 10
while True:
is_note_updated = False
is_deleted = False
now = pendulum.now(tz="UTC")
stream_slice = {
"start_date": now.add(days=-1).isoformat(timespec="milliseconds"),
"end_date": now.isoformat(timespec="milliseconds")
}
for record in stream.read_records(sync_mode=SyncMode.incremental, stream_state=stream_state, stream_slice=stream_slice):
if created_note_id == record["Id"]:
is_note_updated = True
is_deleted = record["IsDeleted"]
break
try:
assert is_note_updated, "No deleted note during the sync"
assert is_deleted, "Wrong field value for deleted note during the sync"
break
assert is_note_updated, "No deleted note during the sync"
assert is_deleted, "Wrong field value for deleted note during the sync"
except Exception as e:
if attempts:
time.sleep(2)
else:
raise e
attempts = attempts - 1

time.sleep(1)
response = update_note(stream, created_note_id, headers)
Expand All @@ -107,8 +140,25 @@ def test_deleted_record(stream):

created_note_id = response.json()["id"]

notes = set(record["Id"] for record in stream.read_records(sync_mode=None))
assert created_note_id in notes, "No created note during the sync"
# A record may not be accessible right after creation. This workaround makes few attempts to receive latest record
notes = []
attempts = 10
while created_note_id not in notes:
now = pendulum.now(tz="UTC")
stream_slice = {
"start_date": now.add(days=-1).isoformat(timespec="milliseconds"),
"end_date": now.isoformat(timespec="milliseconds")
}
notes = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))
try:
assert created_note_id in notes, "No created note during the sync"
break
except Exception as e:
if attempts:
time.sleep(2)
else:
raise e
attempts = attempts - 1

response = update_note(stream, created_note_id, headers)
assert response.status_code == 204, "Note was not updated"
Expand All @@ -117,14 +167,29 @@ def test_deleted_record(stream):
response = delete_note(stream, created_note_id, headers)
assert response.status_code == 204, "Note was not deleted"

record = None
for record in stream.read_records(sync_mode=SyncMode.incremental, stream_state=stream_state):
if created_note_id == record["Id"]:
# A record updates take some time to become accessible
attempts = 10
while created_note_id not in notes:
now = pendulum.now(tz="UTC")
stream_slice = {
"start_date": now.add(days=-1).isoformat(timespec="milliseconds"),
"end_date": now.isoformat(timespec="milliseconds")
}
record = None
for record in stream.read_records(sync_mode=SyncMode.incremental, stream_state=stream_state, stream_slice=stream_slice):
if created_note_id == record["Id"]:
break
try:
assert record, "No updated note during the sync"
assert record["IsDeleted"], "Wrong field value for deleted note during the sync"
assert record["TextPreview"] == UPDATED_NOTE_CONTENT and record["TextPreview"] != NOTE_CONTENT, "Note Content was not updated"
break

assert record, "No updated note during the sync"
assert record["IsDeleted"], "Wrong field value for deleted note during the sync"
assert record["TextPreview"] == UPDATED_NOTE_CONTENT and record["TextPreview"] != NOTE_CONTENT, "Note Content was not updated"
except Exception as e:
if attempts:
time.sleep(2)
else:
raise e
attempts = attempts - 1


def test_parallel_discover(input_sandbox_config):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ def _read_stream(
except exceptions.HTTPError as error:
error_data = error.response.json()[0]
error_code = error_data.get("errorCode")
url = error.response.url
if error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED":
logger.warn(f"API Call limit is exceeded. Error message: '{error_data.get('message')}'")
logger.warning(f"API Call {url} limit is exceeded. Error message: '{error_data.get('message')}'")
raise AirbyteStopSync() # if got 403 rate limit response, finish the sync with success.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking whether this is the right behavior we want to continue. Due to the 24 hour rate limit, we didn't want to block future syncs, so we just marked it successful. This has its drawbacks and we could lose records. However, now that we have checkpointing at date slice windows, maybe we should more concretely throw back an error and not swallow them. And on the next sync we pick up where the previous bookmark left off.

And now with slices, we can still make incremental progress even if we hit the rate limit issue again instead of retrying the whole sync again

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have checkpointing now but not for full refresh sync. I also wanted to remove it but decided to leave as is

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failing due to daily rate limits will trigger alerts if 3 workspaces start moving more data than they can. I'm not sure what the best way to expose this kind of limitations this without introducing a new status type

raise error
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ def transform_empty_string_to_none(instance: Any, schema: Any):

class IncrementalRestSalesforceStream(RestSalesforceStream, ABC):
state_checkpoint_interval = 500
STREAM_SLICE_STEP = 120

def __init__(self, replication_key: str, start_date: Optional[str], **kwargs):
super().__init__(**kwargs)
Expand All @@ -592,6 +593,20 @@ def format_start_date(start_date: Optional[str]) -> Optional[str]:
return pendulum.parse(start_date).strftime("%Y-%m-%dT%H:%M:%SZ") # type: ignore[attr-defined,no-any-return]
return None

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
start, end = (None, None)
now = pendulum.now(tz="UTC")
initial_date = pendulum.parse((stream_state or {}).get(self.cursor_field, self.start_date), tz="UTC")

slice_number = 1
while not end == now:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: while end <= now makes the intent clearer

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not possible with current logic because end is equal to None when now is a datetime. So only == operation can be applied in this comparison

start = initial_date.add(days=(slice_number - 1) * self.STREAM_SLICE_STEP)
end = min(now, initial_date.add(days=slice_number * self.STREAM_SLICE_STEP))
yield {"start_date": start.isoformat(timespec="milliseconds"), "end_date": end.isoformat(timespec="milliseconds")}
slice_number = slice_number + 1

def request_params(
self,
stream_state: Mapping[str, Any],
Expand All @@ -607,14 +622,28 @@ def request_params(

property_chunk = property_chunk or {}

stream_date = stream_state.get(self.cursor_field)
start_date = stream_date or self.start_date
start_date = max(
(stream_state or {}).get(self.cursor_field, self.start_date),
(stream_slice or {}).get("start_date", ""),
(next_page_token or {}).get("start_date", ""),
)
end_date = (stream_slice or {}).get("end_date", pendulum.now(tz="UTC").isoformat(timespec="milliseconds"))

select_fields = ",".join(property_chunk.keys())
table_name = self.name
where_conditions = []
order_by_clause = ""

query = f"SELECT {','.join(property_chunk.keys())} FROM {self.name} "
if start_date:
query += f"WHERE {self.cursor_field} >= {start_date} "
where_conditions.append(f"{self.cursor_field} >= {start_date}")
if end_date:
where_conditions.append(f"{self.cursor_field} < {end_date}")
if self.name not in UNSUPPORTED_FILTERING_STREAMS:
query += f"ORDER BY {self.cursor_field} ASC"
order_by_clause = f"ORDER BY {self.cursor_field} ASC"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it that we're only ordering by self.cursor_field in the incremental sync case, but ordering by cursor & primary key in the bulk case? Should they be consistent?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using primary key in bulk operations query in WHERE clause and in ORDER BY clause when primary key exists. For this type of queries we need to handle pagination by ourselves so we are slicing by primary key inside base stream slicer. But rest streams pagination works in another way and we don't need it here.

Why we need primary key for Bulk Streams:
Given a table

id date
1 01.01.2023
2 01.01.2023
3 01.01.2023
4 01.01.2023
5 01.01.2023
6 01.03.2023

Page size = 2
cursor field = date
primary key = id

Query for first slice would be:

SELECT fields FROM table WHERE date >= 01.01.2023 AND date < 01.02.2023 ORDER BY date LIMIT 15000;

Salesforce prepares data (max 15000 records but imagine it handles only 2 for example purpose):

id date
1 01.01.2023
2 01.01.2023

So for now we have only 2 of 5 records satisfied first query and it means we are not ready to move to second slice. And we also see that all the 5 records have the same date 01.01.2023. This is where primary key comes in handy. In order to get next two records we are making second query like that:

SELECT fields FROM table WHERE date >= 01.01.2023 AND date < 01.02.2023 AND id > 2 ORDER BY date, id LIMIT 15000;

This will return:

id date
3 01.01.2023
4 01.01.2023

Why we don't need primary key in REST Stream:
Here (first rows of current method)

if next_page_token:
    """
    If `next_page_token` is set, subsequent requests use `nextRecordsUrl`, and do not include any parameters.
    """
    return {}

we can see that there is a link made by Salesforce and we just use it as is for getting next page.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thank you for clarifying!


where_clause = f"WHERE {' AND '.join(where_conditions)}"
query = f"SELECT {select_fields} FROM {table_name} {where_clause} {order_by_clause}"

return {"q": query}

@property
Expand All @@ -635,34 +664,33 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
class BulkIncrementalSalesforceStream(BulkSalesforceStream, IncrementalRestSalesforceStream):
def next_page_token(self, last_record: Mapping[str, Any]) -> Optional[Mapping[str, Any]]:
if self.name not in UNSUPPORTED_FILTERING_STREAMS:
page_token: str = last_record[self.cursor_field]
res = {"next_token": page_token}
# use primary key as additional filtering param, if cursor_field is not increased from previous page
if self.primary_key and self.prev_start_date == page_token:
res["primary_key"] = last_record[self.primary_key]
return res
return {"next_token": last_record[self.cursor_field], "primary_key": last_record.get(self.primary_key)}
return None

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
selected_properties = self.get_json_schema().get("properties", {})
start_date = max(
(stream_state or {}).get(self.cursor_field, ""),
(stream_slice or {}).get("start_date", ""),
(next_page_token or {}).get("start_date", ""),
)
end_date = stream_slice["end_date"]

stream_date = stream_state.get(self.cursor_field)
next_token = (next_page_token or {}).get("next_token")
primary_key = (next_page_token or {}).get("primary_key")
start_date = next_token or stream_date or self.start_date
self.prev_start_date = start_date
select_fields = ", ".join(self.get_json_schema().get("properties", {}).keys())
table_name = self.name
where_conditions = [f"{self.cursor_field} >= {start_date}", f"{self.cursor_field} < {end_date}"]
order_by_clause = ""

query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} "
if start_date:
if primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
query += f"WHERE ({self.cursor_field} = {start_date} AND {self.primary_key} > '{primary_key}') OR ({self.cursor_field} > {start_date}) "
else:
query += f"WHERE {self.cursor_field} >= {start_date} "
if self.name not in UNSUPPORTED_FILTERING_STREAMS:
order_by_fields = [self.cursor_field, self.primary_key] if self.primary_key else [self.cursor_field]
query += f"ORDER BY {','.join(order_by_fields)} ASC LIMIT {self.page_size}"
last_primary_key = (next_page_token or {}).get("primary_key", "")
if last_primary_key:
where_conditions.append(f"{self.primary_key} > '{last_primary_key}'")
order_by_fields = ", ".join([self.cursor_field, self.primary_key] if self.primary_key else [self.cursor_field])
order_by_clause = f"ORDER BY {order_by_fields} ASC LIMIT {self.page_size}"

where_clause = f"WHERE {' AND '.join(where_conditions)}"
query = f"SELECT {select_fields} FROM {table_name} {where_clause} {order_by_clause}"
return {"q": query}


Expand Down