Skip to content

Commit

Permalink
[ISSUE #26570] make step and cursor_granularity optional (#26952)
Browse files Browse the repository at this point in the history
* [ISSUE #26570] make step and cursor_granularity optional

* [ISSUE #26570] fix typos
  • Loading branch information
maxi297 committed Jun 5, 2023
1 parent 80584ed commit b5c0ac1
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -552,9 +552,7 @@ definitions:
- cursor_field
- end_datetime
- datetime_format
- cursor_granularity
- start_datetime
- step
properties:
type:
type: string
Expand All @@ -579,7 +577,7 @@ definitions:
title: Cursor Granularity
description:
Smallest increment the datetime_format has (ISO 8601 duration) that is used to ensure the start of a slice does not overlap with the end of the previous one, e.g. for %Y-%m-%d the granularity should
be P1D, for %Y-%m-%dT%H:%M:%SZ the granularity should be PT1S.
be P1D, for %Y-%m-%dT%H:%M:%SZ the granularity should be PT1S. Given this field is provided, `step` needs to be provided as well.
type: string
examples:
- "PT1S"
Expand All @@ -604,7 +602,7 @@ definitions:
- "{{ config['start_time'] }}"
step:
title: Step
description: The size of the time window (ISO8601 duration).
description: The size of the time window (ISO8601 duration). Given this field is provided, `cursor_granularity` needs to be provided as well.
type: string
examples:
- "P1W"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,26 @@ class DatetimeBasedCursor(StreamSlicer):

start_datetime: Union[MinMaxDatetime, str]
end_datetime: Union[MinMaxDatetime, str]
step: Union[InterpolatedString, str]
cursor_field: Union[InterpolatedString, str]
datetime_format: str
cursor_granularity: str
config: Config
parameters: InitVar[Mapping[str, Any]]
_cursor: dict = field(repr=False, default=None) # tracks current datetime
_cursor_end: dict = field(repr=False, default=None) # tracks end of current stream slice
step: Optional[Union[InterpolatedString, str]] = None
cursor_granularity: Optional[str] = None
start_time_option: Optional[RequestOption] = None
end_time_option: Optional[RequestOption] = None
partition_field_start: Optional[str] = None
partition_field_end: Optional[str] = None
lookback_window: Optional[Union[InterpolatedString, str]] = None

def __post_init__(self, parameters: Mapping[str, Any]):
if (self.step and not self.cursor_granularity) or (not self.step and self.cursor_granularity):
raise ValueError(
f"If step is defined, cursor_granularity should be as well and vice-versa. "
f"Right now, step is `{self.step}` and cursor_granularity is `{self.cursor_granularity}`"
)
if not isinstance(self.start_datetime, MinMaxDatetime):
self.start_datetime = MinMaxDatetime(self.start_datetime, parameters)
if not isinstance(self.end_datetime, MinMaxDatetime):
Expand All @@ -71,7 +76,11 @@ def __post_init__(self, parameters: Mapping[str, Any]):
self._timezone = datetime.timezone.utc
self._interpolation = JinjaInterpolation()

self._step = self._parse_timedelta(InterpolatedString.create(self.step, parameters=parameters).eval(self.config))
self._step = (
self._parse_timedelta(InterpolatedString.create(self.step, parameters=parameters).eval(self.config))
if self.step
else datetime.timedelta.max
)
self._cursor_granularity = self._parse_timedelta(self.cursor_granularity)
self.cursor_field = InterpolatedString.create(self.cursor_field, parameters=parameters)
self.lookback_window = InterpolatedString.create(self.lookback_window, parameters=parameters)
Expand Down Expand Up @@ -149,11 +158,23 @@ def _partition_daterange(self, start: datetime.datetime, end: datetime.datetime,
end_field = self.partition_field_end.eval(self.config)
dates = []
while start <= end:
end_date = self._get_date(start + step - self._cursor_granularity, end, min)
next_start = self._evaluate_next_start_date_safely(start, step)
end_date = self._get_date(next_start - self._cursor_granularity, end, min)
dates.append({start_field: self._format_datetime(start), end_field: self._format_datetime(end_date)})
start += step
start = next_start
return dates

def _evaluate_next_start_date_safely(self, start, step):
"""
Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date
This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code
would have broken anyway.
"""
try:
return start + step
except OverflowError:
return datetime.datetime.max.replace(tzinfo=datetime.timezone.utc)

def _get_date(self, cursor_value, default_date: datetime.datetime, comparator) -> datetime.datetime:
cursor_date = cursor_value or default_date
return comparator(cursor_date, default_date)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,9 +851,9 @@ class DatetimeBasedCursor(BaseModel):
examples=["%Y-%m-%dT%H:%M:%S.%f%z"],
title="Cursor Field Datetime Format",
)
cursor_granularity: str = Field(
...,
description="Smallest increment the datetime_format has (ISO 8601 duration) that is used to ensure the start of a slice does not overlap with the end of the previous one, e.g. for %Y-%m-%d the granularity should be P1D, for %Y-%m-%dT%H:%M:%SZ the granularity should be PT1S.",
cursor_granularity: Optional[str] = Field(
None,
description="Smallest increment the datetime_format has (ISO 8601 duration) that is used to ensure the start of a slice does not overlap with the end of the previous one, e.g. for %Y-%m-%d the granularity should be P1D, for %Y-%m-%dT%H:%M:%SZ the granularity should be PT1S. Given this field is provided, `step` needs to be provided as well.",
examples=["PT1S"],
title="Cursor Granularity",
)
Expand All @@ -869,9 +869,9 @@ class DatetimeBasedCursor(BaseModel):
examples=["2020-01-1T00:00:00Z", "{{ config['start_time'] }}"],
title="Start Datetime",
)
step: str = Field(
...,
description="The size of the time window (ISO8601 duration).",
step: Optional[str] = Field(
None,
description="The size of the time window (ISO8601 duration). Given this field is provided, `cursor_granularity` needs to be provided as well.",
examples=["P1W", "{{ config['step_increment'] }}"],
title="Step",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,5 +581,47 @@ def test_format_datetime(test_name, input_dt, datetimeformat, datetimeformat_gra
assert expected_output == output_date


def test_step_but_no_cursor_granularity():
with pytest.raises(ValueError):
DatetimeBasedCursor(
start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000", parameters={}),
end_datetime=MinMaxDatetime("2021-01-10T00:00:00.000000+0000", parameters={}),
step="P1D",
cursor_field=InterpolatedString(cursor_field, parameters={}),
datetime_format="%Y-%m-%d",
lookback_window=InterpolatedString("P0D", parameters={}),
config=config,
parameters={},
)


def test_cursor_granularity_but_no_step():
with pytest.raises(ValueError):
DatetimeBasedCursor(
start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000", parameters={}),
end_datetime=MinMaxDatetime("2021-01-10T00:00:00.000000+0000", parameters={}),
cursor_granularity="P1D",
cursor_field=InterpolatedString(cursor_field, parameters={}),
datetime_format="%Y-%m-%d",
lookback_window=InterpolatedString("P0D", parameters={}),
config=config,
parameters={},
)


def test_no_cursor_granularity_and_no_step_then_only_return_one_slice():
cursor = DatetimeBasedCursor(
start_datetime=MinMaxDatetime("2021-01-01", parameters={}),
end_datetime=MinMaxDatetime("2023-01-01", parameters={}),
cursor_field=InterpolatedString(cursor_field, parameters={}),
datetime_format="%Y-%m-%d",
lookback_window=InterpolatedString("P0D", parameters={}),
config=config,
parameters={},
)
stream_slices = cursor.stream_slices(SyncMode.incremental, {})
assert stream_slices == [{"start_time": "2021-01-01", "end_time": "2023-01-01"}]


if __name__ == "__main__":
unittest.main()

0 comments on commit b5c0ac1

Please sign in to comment.