Skip to content

Commit

Permalink
Modify get_first_partition_window to account for offset (#12504)
Browse files Browse the repository at this point in the history
Causes a graphQL error reported by a user here:
https://dagster.slack.com/archives/C01U954MEER/p1677064207587029

Previously, `get_first_partition_window` did not account for the end
offset.

This led to errors such as `get_first_partition_window` returning `None`
when the start time window is the current time window and offset > 0.
  • Loading branch information
clairelin135 authored and sryza committed Mar 1, 2023
1 parent 7f4676b commit 445a134
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 7 deletions.
Expand Up @@ -370,17 +370,42 @@ def get_prev_partition_window(self, start_dt: datetime) -> Optional[TimeWindow]:
def get_first_partition_window(
self, current_time: Optional[datetime] = None
) -> Optional[TimeWindow]:
current_timestamp = (
current_time = cast(
datetime,
pendulum.instance(current_time, tz=self.timezone)
if current_time
else pendulum.now(self.timezone)
).timestamp()
else pendulum.now(self.timezone),
)
current_timestamp = current_time.timestamp()

time_window = next(iter(self._iterate_time_windows(self.start)))
if time_window.end.timestamp() <= current_timestamp:
return time_window

if self.end_offset == 0:
return time_window if time_window.end.timestamp() <= current_timestamp else None
elif self.end_offset > 0:
iterator = iter(self._iterate_time_windows(current_time))
# first returned time window is time window of current time
curr_window_plus_offset = next(iterator)
for _ in range(self.end_offset):
curr_window_plus_offset = next(iterator)
return (
time_window
if time_window.end.timestamp() <= curr_window_plus_offset.start.timestamp()
else None
)
else:
return None
# end offset < 0
end_window = None
iterator = iter(self._reverse_iterate_time_windows(current_time))
for _ in range(abs(self.end_offset)):
end_window = next(iterator)

if end_window is None:
check.failed("end_window should not be None")

return (
time_window if time_window.end.timestamp() <= end_window.start.timestamp() else None
)

def get_last_partition_window(
self, current_time: Optional[datetime] = None
Expand Down Expand Up @@ -1234,7 +1259,7 @@ def _get_partition_time_windows_not_in_subset(
Returns a list of partition time windows that are not in the subset.
Each time window is a single partition.
"""
first_tw = self._partitions_def.get_first_partition_window()
first_tw = self._partitions_def.get_first_partition_window(current_time=current_time)
last_tw = self._partitions_def.get_last_partition_window(current_time=current_time)

if not first_tw or not last_tw:
Expand Down
Expand Up @@ -652,3 +652,129 @@ def my_partitioned_config_2(_start, _end):
partitions_def.get_partition_keys_between_indexes(50, 53, current_time=current_time)
== partitions_def.get_partition_keys(current_time=current_time)[50:53]
)


def test_get_first_partition_window():
assert DailyPartitionsDefinition(
start_date="2023-01-01"
).get_first_partition_window() == time_window("2023-01-01", "2023-01-02")

assert DailyPartitionsDefinition(
start_date="2023-01-01", end_offset=1
).get_first_partition_window(
current_time=datetime.strptime("2023-01-01", "%Y-%m-%d")
) == time_window(
"2023-01-01", "2023-01-02"
)

assert (
DailyPartitionsDefinition(start_date="2023-02-15", end_offset=1).get_first_partition_window(
current_time=datetime.strptime("2023-02-14", "%Y-%m-%d")
)
is None
)

assert DailyPartitionsDefinition(
start_date="2023-01-01", end_offset=2
).get_first_partition_window(
current_time=datetime.strptime("2023-01-02", "%Y-%m-%d")
) == time_window(
"2023-01-01", "2023-01-02"
)

assert MonthlyPartitionsDefinition(
start_date="2023-01-01", end_offset=1
).get_first_partition_window(
current_time=datetime.strptime("2023-01-15", "%Y-%m-%d")
) == time_window(
"2023-01-01", "2023-02-01"
)

assert (
DailyPartitionsDefinition(
start_date="2023-01-15", end_offset=-1
).get_first_partition_window(current_time=datetime.strptime("2023-01-16", "%Y-%m-%d"))
is None
)

assert DailyPartitionsDefinition(
start_date="2023-01-15", end_offset=-1
).get_first_partition_window(
current_time=datetime.strptime("2023-01-17", "%Y-%m-%d")
) == time_window(
"2023-01-15", "2023-01-16"
)

assert (
DailyPartitionsDefinition(
start_date="2023-01-15", end_offset=-2
).get_first_partition_window(current_time=datetime.strptime("2023-01-17", "%Y-%m-%d"))
is None
)

assert DailyPartitionsDefinition(
start_date="2023-01-15", end_offset=-2
).get_first_partition_window(
current_time=datetime.strptime("2023-01-18", "%Y-%m-%d")
) == time_window(
"2023-01-15", "2023-01-16"
)

assert (
MonthlyPartitionsDefinition(
start_date="2023-01-01", end_offset=-1
).get_first_partition_window(current_time=datetime.strptime("2023-01-15", "%Y-%m-%d"))
is None
)

assert (
DailyPartitionsDefinition(start_date="2023-01-15", end_offset=1).get_first_partition_window(
current_time=datetime.strptime("2023-01-14", "%Y-%m-%d")
)
is None
)

assert DailyPartitionsDefinition(
start_date="2023-01-15", end_offset=1
).get_first_partition_window(
current_time=datetime(year=2023, month=1, day=15, hour=12, minute=0, second=0)
) == time_window(
"2023-01-15", "2023-01-16"
)

assert DailyPartitionsDefinition(
start_date="2023-01-15", end_offset=1
).get_first_partition_window(
current_time=datetime(year=2023, month=1, day=14, hour=12, minute=0, second=0)
) == time_window(
"2023-01-15", "2023-01-16"
)

assert (
DailyPartitionsDefinition(start_date="2023-01-15", end_offset=1).get_first_partition_window(
current_time=datetime(year=2023, month=1, day=13, hour=12, minute=0, second=0)
)
is None
)

assert (
MonthlyPartitionsDefinition(
start_date="2023-01-01", end_offset=-1
).get_first_partition_window(current_time=datetime.strptime("2023-01-15", "%Y-%m-%d"))
is None
)

assert (
MonthlyPartitionsDefinition(
start_date="2023-01-01", end_offset=-1
).get_first_partition_window(current_time=datetime.strptime("2023-02-01", "%Y-%m-%d"))
is None
)

assert MonthlyPartitionsDefinition(
start_date="2023-01-01", end_offset=-1
).get_first_partition_window(
current_time=datetime.strptime("2023-03-01", "%Y-%m-%d")
) == time_window(
"2023-01-01", "2023-02-01"
)

0 comments on commit 445a134

Please sign in to comment.