Skip to content

Commit

Permalink
[1/3 partition status cache] Time window subset perf improvements (#1…
Browse files Browse the repository at this point in the history
…1640)

In order to avoid calling `croniter` on each partition key, this PR
makes two changes:
- Fetches time windows for partition keys by using a shared time window
iterator across continuous ranges
- Adds a helper method that finds time windows not part of the subset by
iterating across contained time windows
  • Loading branch information
clairelin135 committed Jan 17, 2023
1 parent 49e258c commit c96fef1
Showing 1 changed file with 75 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,29 @@ def time_window_for_partition_key(self, partition_key: str) -> TimeWindow:
)
return next(iter(self._iterate_time_windows(partition_key_dt)))

def time_windows_for_partition_keys(
self,
partition_keys: Sequence[str],
) -> Sequence[TimeWindow]:
if len(partition_keys) == 0:
return []

sorted_pks = sorted(partition_keys, key=lambda pk: datetime.strptime(pk, self.fmt))
cur_windows_iterator = iter(
self._iterate_time_windows(datetime.strptime(sorted_pks[0], self.fmt))
)
partition_key_time_windows: List[TimeWindow] = []
for partition_key in sorted_pks:
next_window = next(cur_windows_iterator)
if next_window.start.strftime(self.fmt) == partition_key:
partition_key_time_windows.append(next_window)
else:
cur_windows_iterator = iter(
self._iterate_time_windows(datetime.strptime(partition_key, self.fmt))
)
partition_key_time_windows.append(next(cur_windows_iterator))
return partition_key_time_windows

def start_time_for_partition_key(self, partition_key: str) -> datetime:
partition_key_dt = pendulum.instance(
datetime.strptime(partition_key, self.fmt), tz=self.timezone
Expand Down Expand Up @@ -1034,29 +1057,58 @@ def __init__(
self._included_time_windows = included_time_windows
self._num_partitions = num_partitions

def _get_partition_time_windows_not_in_subset(
self,
current_time: Optional[datetime] = None,
) -> Sequence[TimeWindow]:
"""
Returns a list of partition time windows that are not in the subset.
Each time window is a single partition.
"""
first_tw = self._partitions_def.get_first_partition_window()
last_tw = self._partitions_def.get_last_partition_window(current_time=current_time)

if not first_tw or not last_tw:
check.failed("No partitions found")

if len(self._included_time_windows) == 0:
return [TimeWindow(first_tw.start, last_tw.end)]

time_windows = []
if first_tw.start < self._included_time_windows[0].start:
time_windows.append(TimeWindow(first_tw.start, self._included_time_windows[0].start))

for i in range(len(self._included_time_windows) - 1):
if self._included_time_windows[i].start >= last_tw.end:
break
if self._included_time_windows[i].end < last_tw.end:
if self._included_time_windows[i + 1].start <= last_tw.end:
time_windows.append(
TimeWindow(
self._included_time_windows[i].end,
self._included_time_windows[i + 1].start,
)
)
else:
time_windows.append(
TimeWindow(
self._included_time_windows[i].end,
last_tw.end,
)
)

if last_tw.end > self._included_time_windows[-1].end:
time_windows.append(TimeWindow(self._included_time_windows[-1].end, last_tw.end))

return time_windows

def get_partition_keys_not_in_subset(
self, current_time: Optional[datetime] = None
) -> Iterable[str]:
cur_window: Optional[TimeWindow] = self._partitions_def.get_first_partition_window(
current_time
)
included_ranges_iter = iter(self._included_time_windows)
cur_included_window = next(included_ranges_iter, None)

result = []
while cur_window:
if cur_included_window and cur_window.start == cur_included_window.start:
cur_window = self._partitions_def.get_next_partition_window(
cur_included_window.end, current_time
)
cur_included_window = next(included_ranges_iter, None)
else:
result.append(cur_window.start.strftime(self._partitions_def.fmt))
cur_window = self._partitions_def.get_next_partition_window(
cur_window.end, current_time
)

return result
partition_keys: List[str] = []
for tw in self._get_partition_time_windows_not_in_subset(current_time):
partition_keys.extend(self._partitions_def.get_partition_keys_in_time_window(tw))
return partition_keys

def get_partition_keys(self, current_time: Optional[datetime] = None) -> Iterable[str]:
return [
Expand All @@ -1079,10 +1131,9 @@ def included_time_windows(self) -> Sequence[TimeWindow]:

def with_partition_keys(self, partition_keys: Iterable[str]) -> "TimeWindowPartitionsSubset":
result_windows = [*self._included_time_windows]
time_windows = [
self._partitions_def.time_window_for_partition_key(partition_key)
for partition_key in partition_keys
]
time_windows = self._partitions_def.time_windows_for_partition_keys(
list(partition_keys),
)
num_added_partitions = 0
for window in sorted(time_windows):
# go in reverse order because it's more common to add partitions at the end than the
Expand Down

0 comments on commit c96fef1

Please sign in to comment.