Skip to content

Commit

Permalink
Speed up get_num_partitions for common cases (#21791)
Browse files Browse the repository at this point in the history
Summary:
For daily, hourly, and minute-ly partition sets, do basic math rather
than enumerating every single partition key and counting them.

Test Plan: A representative large asset graph with many large partition
sets goes from taking 65 seconds to computing counts on all assets to
taking 0.4 seconds to compute counts on all assets

## Summary & Motivation

## How I Tested These Changes
  • Loading branch information
gibsondan committed May 17, 2024
1 parent 0d005f5 commit 0980dd8
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
create_pendulum_time,
to_timezone,
)
from dagster._utils.cronstring import get_fixed_minute_interval, is_basic_daily, is_basic_hourly
from dagster._utils.partitions import DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE
from dagster._utils.schedules import (
cron_string_iterator,
Expand Down Expand Up @@ -370,6 +371,29 @@ def get_current_timestamp(self, current_time: Optional[datetime] = None) -> floa
else pendulum.now(self.timezone)
).timestamp()

def _get_fast_num_partitions(self, current_time: Optional[datetime] = None) -> Optional[int]:
"""Computes the total number of partitions quickly for common partition windows. Returns
None if the count cannot be computed quickly and must enumerate all partitions before
counting them.
"""
last_partition_window = self.get_last_partition_window(current_time)
first_partition_window = self.get_first_partition_window(current_time)

if not last_partition_window or not first_partition_window:
return None

if self.is_basic_daily:
return (last_partition_window.start - first_partition_window.start).days + 1

fixed_minute_interval = get_fixed_minute_interval(self.cron_schedule)
if fixed_minute_interval:
minutes_in_window = (
last_partition_window.start.timestamp() - first_partition_window.start.timestamp()
) / 60
return int(minutes_in_window // fixed_minute_interval) + 1

return None

def get_num_partitions(
self,
current_time: Optional[datetime] = None,
Expand All @@ -378,6 +402,11 @@ def get_num_partitions(
# Method added for performance reasons.
# Fetching partition keys requires significantly more compute time to
# string format datetimes.

fast_num_partitions = self._get_fast_num_partitions(current_time)
if fast_num_partitions is not None:
return fast_num_partitions

current_timestamp = self.get_current_timestamp(current_time=current_time)

partitions_past_current_time = 0
Expand Down Expand Up @@ -1027,11 +1056,11 @@ def equal_except_for_start_or_end(self, other: "TimeWindowPartitionsDefinition")

@property
def is_basic_daily(self) -> bool:
return self.cron_schedule == "0 0 * * *"
return is_basic_daily(self.cron_schedule)

@property
def is_basic_hourly(self) -> bool:
return self.cron_schedule == "0 * * * *"
return is_basic_hourly(self.cron_schedule)


class DailyPartitionsDefinition(TimeWindowPartitionsDefinition):
Expand Down
42 changes: 42 additions & 0 deletions python_modules/dagster/dagster/_utils/cronstring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
def is_basic_daily(cron_schedule: str) -> bool:
return cron_schedule == "0 0 * * *"


def is_basic_hourly(cron_schedule: str) -> bool:
return cron_schedule == "0 * * * *"


def get_fixed_minute_interval(cron_schedule: str):
"""Given a cronstring, returns whether or not it is safe to
assume there is a fixed number of minutes between every tick. For
many cronstrings this is not the case due to Daylight Savings Time,
but for basic hourly cron schedules and cron schedules like */15 it
is safe to assume that there are a fixed number of minutes between each
tick.
"""
if is_basic_hourly(cron_schedule):
return 60

cron_parts = cron_schedule.split()
is_wildcard = [part == "*" for part in cron_parts]

# To match this criteria, every other field besides the first must end in *
# since it must be an every-n-minutes cronstring like */15
if not is_wildcard[1:]:
return None

if not cron_parts[0].startswith("*/"):
return None

try:
# interval makes up the characters after the "*/"
interval = int(cron_parts[0][2:])
except ValueError:
return None

# cronstrings like */7 do not have a fixed interval because they jump
# from :54 to :07, but divisors of 60 do
if interval > 0 and interval < 60 and 60 % interval == 0:
return interval

return None
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,18 @@ def my_partitioned_config_2(_start, _end):
== partitions_def.get_partition_keys(current_time=current_time)[50:53]
)

partitions_def = TimeWindowPartitionsDefinition(
cron_schedule="*/15 * * * *",
start="2020-11-01-00:30",
timezone="US/Pacific",
fmt="%Y-%m-%d-%H:%M",
)
current_time = datetime.strptime("2021-06-20", "%Y-%m-%d")

assert partitions_def.get_num_partitions(current_time) == len(
partitions_def.get_partition_keys(current_time)
)


def test_get_first_partition_window():
assert DailyPartitionsDefinition(
Expand Down

0 comments on commit 0980dd8

Please sign in to comment.