Skip to content

Commit

Permalink
fix(airbyte-cdk): client_side_incremental fix end_datetime comparison (
Browse files Browse the repository at this point in the history
…#38874)

Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
  • Loading branch information
artem1205 committed Jun 18, 2024
1 parent cbb8268 commit 46f8d4e
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,7 @@ def _start_date_from_config(self) -> datetime.datetime:

@property
def _end_datetime(self) -> datetime.datetime:
return (
self._date_time_based_cursor._end_datetime.get_datetime(self._date_time_based_cursor.config)
if self._date_time_based_cursor._end_datetime
else datetime.datetime.max
)
return self._date_time_based_cursor.select_best_end_datetime()

def filter_records(
self,
Expand All @@ -82,7 +78,7 @@ def filter_records(
records = (
record
for record in records
if self._end_datetime > self._date_time_based_cursor.parse_date(record[self._cursor_field]) > filter_date
if self._end_datetime >= self._date_time_based_cursor.parse_date(record[self._cursor_field]) >= filter_date
)
if self.condition:
records = super().filter_records(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ def stream_slices(self) -> Iterable[StreamSlice]:
:return:
"""
end_datetime = self._select_best_end_datetime()
start_datetime = self._calculate_earliest_possible_value(self._select_best_end_datetime())
end_datetime = self.select_best_end_datetime()
start_datetime = self._calculate_earliest_possible_value(self.select_best_end_datetime())
return self._partition_daterange(start_datetime, end_datetime, self._step)

def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
Expand All @@ -179,7 +179,15 @@ def _calculate_earliest_possible_value(self, end_datetime: datetime.datetime) ->
cursor_datetime = self._calculate_cursor_datetime_from_state(self.get_stream_state())
return max(earliest_possible_start_datetime, cursor_datetime) - lookback_delta

def _select_best_end_datetime(self) -> datetime.datetime:
def select_best_end_datetime(self) -> datetime.datetime:
"""
Returns the optimal end datetime.
This method compares the current datetime with a pre-configured end datetime
and returns the earlier of the two. If no pre-configured end datetime is set,
the current datetime is returned.
:return datetime.datetime: The best end datetime, which is either the current datetime or the pre-configured end datetime, whichever is earlier.
"""
now = datetime.datetime.now(tz=self._timezone)
if not self._end_datetime:
return now
Expand Down Expand Up @@ -308,7 +316,7 @@ def should_be_synced(self, record: Record) -> bool:
f"Could not find cursor field `{cursor_field}` in record. The incremental sync will assume it needs to be synced",
)
return True
latest_possible_cursor_value = self._select_best_end_datetime()
latest_possible_cursor_value = self.select_best_end_datetime()
earliest_possible_cursor_value = self._calculate_earliest_possible_value(latest_possible_cursor_value)
return self._is_within_daterange_boundaries(record, earliest_possible_cursor_value, latest_possible_cursor_value)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,38 +12,67 @@
from airbyte_cdk.sources.declarative.partition_routers import SubstreamPartitionRouter
from airbyte_cdk.sources.declarative.types import StreamSlice

DATE_FORMAT = "%Y-%m-%d"
RECORDS_TO_FILTER_DATE_FORMAT = [
{"id": 1, "created_at": "2020-01-03"},
{"id": 2, "created_at": "2021-01-03"},
{"id": 3, "created_at": "2021-01-04"},
{"id": 4, "created_at": "2021-02-01"},
]

DATE_TIME_WITH_TZ_FORMAT = "%Y-%m-%dT%H:%M:%S%z"
RECORDS_TO_FILTER_DATE_TIME_WITH_TZ_FORMAT = [
{"id": 1, "created_at": "2020-01-03T00:00:00+00:00"},
{"id": 2, "created_at": "2021-01-03T00:00:00+00:00"},
{"id": 3, "created_at": "2021-01-04T00:00:00+00:00"},
{"id": 4, "created_at": "2021-02-01T00:00:00+00:00"},
]

DATE_TIME_WITHOUT_TZ_FORMAT = "%Y-%m-%dT%H:%M:%S"
RECORDS_TO_FILTER_DATE_TIME_WITHOUT_TZ_FORMAT = [
{"id": 1, "created_at": "2020-01-03T00:00:00"},
{"id": 2, "created_at": "2021-01-03T00:00:00"},
{"id": 3, "created_at": "2021-01-04T00:00:00"},
{"id": 4, "created_at": "2021-02-01T00:00:00"},
]


@pytest.mark.parametrize(
"filter_template, records, expected_records",
[
(
"{{ record['created_at'] > stream_state['created_at'] }}",
[{"id": 1, "created_at": "06-06-21"}, {"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}],
[{"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}],
"{{ record['created_at'] > stream_state['created_at'] }}",
[{"id": 1, "created_at": "06-06-21"}, {"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}],
[{"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}],
),
(
"{{ record['last_seen'] >= stream_slice['last_seen'] }}",
[{"id": 1, "last_seen": "06-06-21"}, {"id": 2, "last_seen": "06-07-21"}, {"id": 3, "last_seen": "06-10-21"}],
[{"id": 3, "last_seen": "06-10-21"}],
"{{ record['last_seen'] >= stream_slice['last_seen'] }}",
[{"id": 1, "last_seen": "06-06-21"}, {"id": 2, "last_seen": "06-07-21"}, {"id": 3, "last_seen": "06-10-21"}],
[{"id": 3, "last_seen": "06-10-21"}],
),
(
"{{ record['id'] >= next_page_token['last_seen_id'] }}",
[{"id": 11}, {"id": 12}, {"id": 13}, {"id": 14}, {"id": 15}],
[{"id": 14}, {"id": 15}],
"{{ record['id'] >= next_page_token['last_seen_id'] }}",
[{"id": 11}, {"id": 12}, {"id": 13}, {"id": 14}, {"id": 15}],
[{"id": 14}, {"id": 15}],
),
(
"{{ record['id'] >= next_page_token['path_to_nowhere'] }}",
[{"id": 11}, {"id": 12}, {"id": 13}, {"id": 14}, {"id": 15}],
[],
"{{ record['id'] >= next_page_token['path_to_nowhere'] }}",
[{"id": 11}, {"id": 12}, {"id": 13}, {"id": 14}, {"id": 15}],
[],
),
(
"{{ record['created_at'] > parameters['created_at'] }}",
[{"id": 1, "created_at": "06-06-21"}, {"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}],
[{"id": 3, "created_at": "06-08-21"}],
"{{ record['created_at'] > parameters['created_at'] }}",
[{"id": 1, "created_at": "06-06-21"}, {"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}],
[{"id": 3, "created_at": "06-08-21"}],
),
],
ids=["test_using_state_filter", "test_with_slice_filter", "test_with_next_page_token_filter",
"test_missing_filter_fields_return_no_results", "test_using_parameters_filter", ]
ids=[
"test_using_state_filter",
"test_with_slice_filter",
"test_with_next_page_token_filter",
"test_missing_filter_fields_return_no_results",
"test_using_parameters_filter",
],
)
def test_record_filter(filter_template: str, records: List[Mapping], expected_records: List[Mapping]):
config = {"response_override": "stop_if_you_see_me"}
Expand All @@ -53,46 +82,116 @@ def test_record_filter(filter_template: str, records: List[Mapping], expected_re
next_page_token = {"last_seen_id": 14}
record_filter = RecordFilter(config=config, condition=filter_template, parameters=parameters)

actual_records = list(record_filter.filter_records(
records, stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
))
actual_records = list(
record_filter.filter_records(records, stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
)
assert actual_records == expected_records


@pytest.mark.parametrize(
"stream_state, record_filter_expression, expected_record_ids",
"datetime_format, stream_state, record_filter_expression, end_datetime, records_to_filter, expected_record_ids",
[
({}, None, [2, 3]),
({"created_at": "2021-01-03"}, None, [3]),
({}, "{{ record['id'] % 2 == 1 }}", [3]),
(DATE_FORMAT, {}, None, "2021-01-05", RECORDS_TO_FILTER_DATE_FORMAT, [2, 3]),
(DATE_FORMAT, {}, None, None, RECORDS_TO_FILTER_DATE_FORMAT, [2, 3, 4]),
(DATE_FORMAT, {"created_at": "2021-01-04"}, None, "2021-01-05", RECORDS_TO_FILTER_DATE_FORMAT, [3]),
(DATE_FORMAT, {"created_at": "2021-01-04"}, None, None, RECORDS_TO_FILTER_DATE_FORMAT, [3, 4]),
(DATE_FORMAT, {}, "{{ record['id'] % 2 == 1 }}", "2021-01-05", RECORDS_TO_FILTER_DATE_FORMAT, [3]),
(DATE_TIME_WITH_TZ_FORMAT, {}, None, "2021-01-05T00:00:00+00:00", RECORDS_TO_FILTER_DATE_TIME_WITH_TZ_FORMAT, [2, 3]),
(DATE_TIME_WITH_TZ_FORMAT, {}, None, None, RECORDS_TO_FILTER_DATE_TIME_WITH_TZ_FORMAT, [2, 3, 4]),
(
DATE_TIME_WITH_TZ_FORMAT,
{"created_at": "2021-01-04T00:00:00+00:00"},
None,
"2021-01-05T00:00:00+00:00",
RECORDS_TO_FILTER_DATE_TIME_WITH_TZ_FORMAT,
[3],
),
(
DATE_TIME_WITH_TZ_FORMAT,
{"created_at": "2021-01-04T00:00:00+00:00"},
None,
None,
RECORDS_TO_FILTER_DATE_TIME_WITH_TZ_FORMAT,
[3, 4],
),
(
DATE_TIME_WITH_TZ_FORMAT,
{},
"{{ record['id'] % 2 == 1 }}",
"2021-01-05T00:00:00+00:00",
RECORDS_TO_FILTER_DATE_TIME_WITH_TZ_FORMAT,
[3],
),
(DATE_TIME_WITHOUT_TZ_FORMAT, {}, None, "2021-01-05T00:00:00", RECORDS_TO_FILTER_DATE_TIME_WITHOUT_TZ_FORMAT, [2, 3]),
(DATE_TIME_WITHOUT_TZ_FORMAT, {}, None, None, RECORDS_TO_FILTER_DATE_TIME_WITHOUT_TZ_FORMAT, [2, 3, 4]),
(
DATE_TIME_WITHOUT_TZ_FORMAT,
{"created_at": "2021-01-04T00:00:00"},
None,
"2021-01-05T00:00:00",
RECORDS_TO_FILTER_DATE_TIME_WITHOUT_TZ_FORMAT,
[3],
),
(
DATE_TIME_WITHOUT_TZ_FORMAT,
{"created_at": "2021-01-04T00:00:00"},
None,
None,
RECORDS_TO_FILTER_DATE_TIME_WITHOUT_TZ_FORMAT,
[3, 4],
),
(
DATE_TIME_WITHOUT_TZ_FORMAT,
{},
"{{ record['id'] % 2 == 1 }}",
"2021-01-05T00:00:00",
RECORDS_TO_FILTER_DATE_TIME_WITHOUT_TZ_FORMAT,
[3],
),
],
ids=[
"date_format_no_stream_state_no_record_filter",
"date_format_no_stream_state_no_end_date_no_record_filter",
"date_format_with_stream_state_no_record_filter",
"date_format_with_stream_state_no_end_date_no_record_filter",
"date_format_no_stream_state_with_record_filter",
"date_time_with_tz_format_no_stream_state_no_record_filter",
"date_time_with_tz_format_no_stream_state_no_end_date_no_record_filter",
"date_time_with_tz_format_with_stream_state_no_record_filter",
"date_time_with_tz_format_with_stream_state_no_end_date_no_record_filter",
"date_time_with_tz_format_no_stream_state_with_record_filter",
"date_time_without_tz_format_no_stream_state_no_record_filter",
"date_time_without_tz_format_no_stream_state_no_end_date_no_record_filter",
"date_time_without_tz_format_with_stream_state_no_record_filter",
"date_time_without_tz_format_with_stream_state_no_end_date_no_record_filter",
"date_time_without_tz_format_no_stream_state_with_record_filter",
],
ids=["no_stream_state_no_record_filter", "with_stream_state_no_record_filter", "no_stream_state_with_record_filter"]
)
def test_client_side_record_filter_decorator_no_parent_stream(stream_state: Optional[Mapping], record_filter_expression: str,
expected_record_ids: List[int]):
records_to_filter = [
{"id": 1, "created_at": "2020-01-03"},
{"id": 2, "created_at": "2021-01-03"},
{"id": 3, "created_at": "2021-01-04"},
{"id": 4, "created_at": "2021-02-01"},
]
def test_client_side_record_filter_decorator_no_parent_stream(
datetime_format: str,
stream_state: Optional[Mapping],
record_filter_expression: str,
end_datetime: Optional[str],
records_to_filter: List[Mapping],
expected_record_ids: List[int],
):
date_time_based_cursor = DatetimeBasedCursor(
start_datetime=MinMaxDatetime(datetime="2021-01-01", datetime_format="%Y-%m-%d", parameters={}),
end_datetime=MinMaxDatetime(datetime="2021-01-05", datetime_format="%Y-%m-%d", parameters={}),
step="P10Y",
cursor_field=InterpolatedString.create("created_at", parameters={}),
datetime_format="%Y-%m-%d",
cursor_granularity="P1D",
config={},
parameters={},
)
start_datetime=MinMaxDatetime(datetime="2021-01-01", datetime_format=DATE_FORMAT, parameters={}),
end_datetime=MinMaxDatetime(datetime=end_datetime, parameters={}) if end_datetime else None,
step="P10Y",
cursor_field=InterpolatedString.create("created_at", parameters={}),
datetime_format=datetime_format,
cursor_granularity="P1D",
config={},
parameters={},
)

record_filter_decorator = ClientSideIncrementalRecordFilterDecorator(
config={},
condition=record_filter_expression,
parameters={},
date_time_based_cursor=date_time_based_cursor,
per_partition_cursor=None
per_partition_cursor=None,
)

filtered_records = list(
Expand All @@ -105,31 +204,24 @@ def test_client_side_record_filter_decorator_no_parent_stream(stream_state: Opti
@pytest.mark.parametrize(
"stream_state, expected_record_ids",
[
({}, [2, 3]),
({"states": [{"some_parent_id": {"created_at": "2021-01-03"}}]}, [3]),
({}, [2, 3]),
({"states": [{"some_parent_id": {"created_at": "2021-01-03"}}]}, [3]),
],
ids=["no_stream_state_no_record_filter", "with_stream_state_no_record_filter"]
ids=["no_stream_state_no_record_filter", "with_stream_state_no_record_filter"],
)
def test_client_side_record_filter_decorator_with_parent_stream(stream_state: Optional[Mapping], expected_record_ids: List[int]):
records_to_filter = [
{"id": 1, "created_at": "2020-01-03"},
{"id": 2, "created_at": "2021-01-03"},
{"id": 3, "created_at": "2021-01-04"},
{"id": 4, "created_at": "2021-02-01"},
]
date_time_based_cursor = DatetimeBasedCursor(
start_datetime=MinMaxDatetime(datetime="2021-01-01", datetime_format="%Y-%m-%d", parameters={}),
end_datetime=MinMaxDatetime(datetime="2021-01-05", datetime_format="%Y-%m-%d", parameters={}),
start_datetime=MinMaxDatetime(datetime="2021-01-01", datetime_format=DATE_FORMAT, parameters={}),
end_datetime=MinMaxDatetime(datetime="2021-01-05", datetime_format=DATE_FORMAT, parameters={}),
step="P10Y",
cursor_field=InterpolatedString.create("created_at", parameters={}),
datetime_format="%Y-%m-%d",
datetime_format=DATE_FORMAT,
cursor_granularity="P1D",
config={},
parameters={},
)
per_partition_cursor = PerPartitionCursor(
cursor_factory=CursorFactory(
lambda: date_time_based_cursor),
cursor_factory=CursorFactory(lambda: date_time_based_cursor),
partition_router=SubstreamPartitionRouter(
config={},
parameters={},
Expand All @@ -139,28 +231,26 @@ def test_client_side_record_filter_decorator_with_parent_stream(stream_state: Op
parent_key="id",
partition_field="id",
stream=DeclarativeStream(
type="DeclarativeStream",
retriever=CustomRetriever(
type="CustomRetriever",
class_name="a_class_name"
)
)
type="DeclarativeStream", retriever=CustomRetriever(type="CustomRetriever", class_name="a_class_name")
),
)
]
],
),
)
if stream_state:
per_partition_cursor.set_initial_state({"states": [{"partition": {"id": "some_parent_id", "parent_slice": {}}, "cursor": {'created_at': '2021-01-03'}}]})
per_partition_cursor.set_initial_state(
{"states": [{"partition": {"id": "some_parent_id", "parent_slice": {}}, "cursor": {"created_at": "2021-01-04"}}]}
)
record_filter_decorator = ClientSideIncrementalRecordFilterDecorator(
config={},
parameters={},
date_time_based_cursor=date_time_based_cursor,
per_partition_cursor=per_partition_cursor
config={}, parameters={}, date_time_based_cursor=date_time_based_cursor, per_partition_cursor=per_partition_cursor
)
filtered_records = list(
record_filter_decorator.filter_records(records=records_to_filter, stream_state=stream_state,
stream_slice=StreamSlice(partition={"id": "some_parent_id", "parent_slice": {}}, cursor_slice={}),
next_page_token=None)
record_filter_decorator.filter_records(
records=RECORDS_TO_FILTER_DATE_FORMAT,
stream_state=stream_state,
stream_slice=StreamSlice(partition={"id": "some_parent_id", "parent_slice": {}}, cursor_slice={}),
next_page_token=None,
)
)

assert [x.get("id") for x in filtered_records] == expected_record_ids
Loading

0 comments on commit 46f8d4e

Please sign in to comment.