Skip to content

Commit

Permalink
feat(airbyte-cdk): add DatetimeIntervalCursor (#39603)
Browse files Browse the repository at this point in the history
  • Loading branch information
lazebnyi committed Jun 19, 2024
1 parent f39f3fc commit a284676
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 5 deletions.
10 changes: 7 additions & 3 deletions airbyte-cdk/python/airbyte_cdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@
"BearerAuthenticator",
"CartesianProductStreamSlicer",
"CursorPaginationStrategy",
"DatetimeBasedCursor"
"DatetimeBasedCursor",
"DeclarativeAuthenticator",
"DeclarativeOauth2Authenticator",
"DeclarativeSingleUseRefreshTokenOauth2Authenticator",
Expand Down Expand Up @@ -178,7 +178,7 @@
"HttpSubStream",
"LimiterSession",
"MovingWindowCallRatePolicy",
"MultipleTokenAuthenticator"
"MultipleTokenAuthenticator",
"Oauth2Authenticator",
"Rate",
"SingleUseRefreshTokenOauth2Authenticator",
Expand All @@ -191,7 +191,11 @@

# Protocol classes
"AirbyteStream",
"AirbyteConnectionStatus", "AirbyteMessage", "ConfiguredAirbyteCatalog", "Status", "Type",
"AirbyteConnectionStatus",
"AirbyteMessage",
"ConfiguredAirbyteCatalog",
"Status",
"Type",
"OrchestratorType",
"ConfiguredAirbyteStream",
"DestinationSyncMode",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,11 @@ definitions:
title: Whether the target API does not support filtering and returns all data (the cursor filters records in the client instead of the API side)
description: If the target API endpoint does not take cursor values to filter records and returns all records anyway, the connector with this cursor will filter out records locally, and only emit new records from the last sync, hence incremental. This means that all records would be read from the API, but only new records will be emitted to the destination.
type: boolean
is_compare_strictly:
title: Whether to skip requests if the start time equals the end time
description: Set to True if the target API does not accept queries where the start time equal the end time.
type: boolean
default: False
lookback_window:
title: Lookback Window
description: Time interval before the start_datetime to read data for, e.g. P1M for looking back one month.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class DatetimeBasedCursor(DeclarativeCursor):
partition_field_end: Optional[str] = None
lookback_window: Optional[Union[InterpolatedString, str]] = None
message_repository: Optional[MessageRepository] = None
is_compare_strictly: bool = False
cursor_datetime_formats: List[str] = field(default_factory=lambda: [])

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
Expand Down Expand Up @@ -207,7 +208,8 @@ def _partition_daterange(
start_field = self._partition_field_start.eval(self.config)
end_field = self._partition_field_end.eval(self.config)
dates = []
while start <= end:

while self._is_within_date_range(start, end):
next_start = self._evaluate_next_start_date_safely(start, step)
end_date = self._get_date(next_start - self._cursor_granularity, end, min)
dates.append(
Expand All @@ -218,6 +220,11 @@ def _partition_daterange(
start = next_start
return dates

def _is_within_date_range(self, start: datetime.datetime, end: datetime.datetime) -> bool:
if self.is_compare_strictly:
return start < end
return start <= end

def _evaluate_next_start_date_safely(self, start: datetime.datetime, step: datetime.timedelta) -> datetime.datetime:
"""
Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,11 @@ class DatetimeBasedCursor(BaseModel):
description='If the target API endpoint does not take cursor values to filter records and returns all records anyway, the connector with this cursor will filter out records locally, and only emit new records from the last sync, hence incremental. This means that all records would be read from the API, but only new records will be emitted to the destination.',
title='Whether the target API does not support filtering and returns all data (the cursor filters records in the client instead of the API side)',
)
is_compare_strictly: bool = Field(
False,
description='Set to True if the target API does not accept queries where the start time equal the end time.',
title='Whether to skip requests if the start time equals the end time',
)
lookback_window: Optional[str] = Field(
None,
description='Time interval before the start_datetime to read data for, e.g. P1M for looking back one month.',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ def create_datetime_based_cursor(self, model: DatetimeBasedCursorModel, config:
partition_field_end=model.partition_field_end,
partition_field_start=model.partition_field_start,
message_repository=self._message_repository,
is_compare_strictly=model.is_compare_strictly,
config=config,
parameters=model.parameters or {},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def mock_datetime_now(monkeypatch):


@pytest.mark.parametrize(
"test_name, stream_state, start, end, step, cursor_field, lookback_window, datetime_format, cursor_granularity, expected_slices",
"test_name, stream_state, start, end, step, cursor_field, lookback_window, datetime_format, cursor_granularity, is_compare_strictly, expected_slices",
[
(
"test_1_day",
Expand All @@ -48,6 +48,7 @@ def mock_datetime_now(monkeypatch):
None,
datetime_format,
cursor_granularity,
None,
[
{"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-01T23:59:59.999999+0000"},
{"start_time": "2021-01-02T00:00:00.000000+0000", "end_time": "2021-01-02T23:59:59.999999+0000"},
Expand All @@ -71,6 +72,7 @@ def mock_datetime_now(monkeypatch):
None,
datetime_format,
cursor_granularity,
None,
[
{"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-02T23:59:59.999999+0000"},
{"start_time": "2021-01-03T00:00:00.000000+0000", "end_time": "2021-01-04T23:59:59.999999+0000"},
Expand All @@ -89,6 +91,7 @@ def mock_datetime_now(monkeypatch):
None,
datetime_format,
cursor_granularity,
None,
[
{"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-07T23:59:59.999999+0000"},
{"start_time": "2021-01-08T00:00:00.000000+0000", "end_time": "2021-01-14T23:59:59.999999+0000"},
Expand All @@ -108,6 +111,7 @@ def mock_datetime_now(monkeypatch):
None,
datetime_format,
cursor_granularity,
None,
[
{"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-31T23:59:59.999999+0000"},
{"start_time": "2021-02-01T00:00:00.000000+0000", "end_time": "2021-02-28T23:59:59.999999+0000"},
Expand All @@ -127,6 +131,7 @@ def mock_datetime_now(monkeypatch):
None,
datetime_format,
cursor_granularity,
None,
[
{"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-12-31T23:59:59.999999+0000"},
{"start_time": "2022-01-01T00:00:00.000000+0000", "end_time": "2022-01-01T00:00:00.000000+0000"},
Expand All @@ -142,6 +147,7 @@ def mock_datetime_now(monkeypatch):
None,
datetime_format,
cursor_granularity,
None,
[
{"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T23:59:59.999999+0000"},
{"start_time": "2021-01-06T00:00:00.000000+0000", "end_time": "2021-01-06T23:59:59.999999+0000"},
Expand All @@ -161,6 +167,7 @@ def mock_datetime_now(monkeypatch):
None,
datetime_format,
cursor_granularity,
None,
[
{"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-10T00:00:00.000000+0000"},
],
Expand All @@ -175,6 +182,7 @@ def mock_datetime_now(monkeypatch):
None,
datetime_format,
cursor_granularity,
None,
[
{"start_time": "2021-12-28T00:00:00.000000+0000", "end_time": "2021-12-28T23:59:59.999999+0000"},
{"start_time": "2021-12-29T00:00:00.000000+0000", "end_time": "2021-12-29T23:59:59.999999+0000"},
Expand All @@ -193,6 +201,7 @@ def mock_datetime_now(monkeypatch):
None,
datetime_format,
cursor_granularity,
None,
[
{"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"},
],
Expand All @@ -207,6 +216,7 @@ def mock_datetime_now(monkeypatch):
None,
datetime_format,
cursor_granularity,
None,
[
{"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T23:59:59.999999+0000"},
{"start_time": "2021-01-06T00:00:00.000000+0000", "end_time": "2021-01-06T23:59:59.999999+0000"},
Expand All @@ -226,6 +236,7 @@ def mock_datetime_now(monkeypatch):
None,
datetime_format,
cursor_granularity,
None,
[
{"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-06T23:59:59.999999+0000"},
{"start_time": "2021-01-07T00:00:00.000000+0000", "end_time": "2021-01-08T23:59:59.999999+0000"},
Expand All @@ -242,6 +253,7 @@ def mock_datetime_now(monkeypatch):
"P3D",
datetime_format,
cursor_granularity,
None,
[
{"start_time": "2021-01-02T00:00:00.000000+0000", "end_time": "2021-01-02T23:59:59.999999+0000"},
{"start_time": "2021-01-03T00:00:00.000000+0000", "end_time": "2021-01-03T23:59:59.999999+0000"},
Expand All @@ -259,6 +271,7 @@ def mock_datetime_now(monkeypatch):
"P3D",
datetime_format,
cursor_granularity,
None,
[
{"start_time": "2021-01-02T00:00:00.000000+0000", "end_time": "2021-01-02T23:59:59.999999+0000"},
{"start_time": "2021-01-03T00:00:00.000000+0000", "end_time": "2021-01-03T23:59:59.999999+0000"},
Expand All @@ -277,6 +290,7 @@ def mock_datetime_now(monkeypatch):
"{{ config['does_not_exist'] }}",
datetime_format,
cursor_granularity,
None,
[
{"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-01T23:59:59.999999+0000"},
{"start_time": "2021-01-02T00:00:00.000000+0000", "end_time": "2021-01-02T23:59:59.999999+0000"},
Expand All @@ -295,6 +309,7 @@ def mock_datetime_now(monkeypatch):
None,
datetime_format,
cursor_granularity,
None,
[
{"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T23:59:59.999999+0000"},
{"start_time": "2021-01-06T00:00:00.000000+0000", "end_time": "2021-01-06T23:59:59.999999+0000"},
Expand All @@ -304,6 +319,21 @@ def mock_datetime_now(monkeypatch):
{"start_time": "2021-01-10T00:00:00.000000+0000", "end_time": "2021-01-10T00:00:00.000000+0000"},
],
),
(
"test_slices_without_intersections",
NO_STATE,
MinMaxDatetime(datetime="{{ config['start_date'] }}", parameters={}),
MinMaxDatetime(datetime="2021-02-01T00:00:00.000000+0000", parameters={}),
"P1M",
cursor_field,
None,
datetime_format,
cursor_granularity,
True,
[
{"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-31T23:59:59.999999+0000"},
],
)
],
)
def test_stream_slices(
Expand All @@ -317,6 +347,7 @@ def test_stream_slices(
lookback_window,
datetime_format,
cursor_granularity,
is_compare_strictly,
expected_slices,
):
lookback_window = InterpolatedString(string=lookback_window, parameters={}) if lookback_window else None
Expand All @@ -328,6 +359,7 @@ def test_stream_slices(
datetime_format=datetime_format,
cursor_granularity=cursor_granularity,
lookback_window=lookback_window,
is_compare_strictly=is_compare_strictly,
config=config,
parameters={},
)
Expand Down

0 comments on commit a284676

Please sign in to comment.