diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py index 8b23133cd98c..6d2c5db717bb 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py @@ -48,6 +48,7 @@ create_pendulum_time, to_timezone, ) +from dagster._utils.cronstring import get_fixed_minute_interval, is_basic_daily, is_basic_hourly from dagster._utils.partitions import DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE from dagster._utils.schedules import ( cron_string_iterator, @@ -370,6 +371,29 @@ def get_current_timestamp(self, current_time: Optional[datetime] = None) -> floa else pendulum.now(self.timezone) ).timestamp() + def _get_fast_num_partitions(self, current_time: Optional[datetime] = None) -> Optional[int]: + """Computes the total number of partitions quickly for common partition windows. Returns + None if the count cannot be computed quickly and must enumerate all partitions before + counting them. + """ + last_partition_window = self.get_last_partition_window(current_time) + first_partition_window = self.get_first_partition_window(current_time) + + if not last_partition_window or not first_partition_window: + return None + + if self.is_basic_daily: + return (last_partition_window.start - first_partition_window.start).days + 1 + + fixed_minute_interval = get_fixed_minute_interval(self.cron_schedule) + if fixed_minute_interval: + minutes_in_window = ( + last_partition_window.start.timestamp() - first_partition_window.start.timestamp() + ) / 60 + return int(minutes_in_window // fixed_minute_interval) + 1 + + return None + def get_num_partitions( self, current_time: Optional[datetime] = None, @@ -378,6 +402,11 @@ def get_num_partitions( # Method added for performance reasons. # Fetching partition keys requires significantly more compute time to # string format datetimes. + + fast_num_partitions = self._get_fast_num_partitions(current_time) + if fast_num_partitions is not None: + return fast_num_partitions + current_timestamp = self.get_current_timestamp(current_time=current_time) partitions_past_current_time = 0 @@ -1027,11 +1056,11 @@ def equal_except_for_start_or_end(self, other: "TimeWindowPartitionsDefinition") @property def is_basic_daily(self) -> bool: - return self.cron_schedule == "0 0 * * *" + return is_basic_daily(self.cron_schedule) @property def is_basic_hourly(self) -> bool: - return self.cron_schedule == "0 * * * *" + return is_basic_hourly(self.cron_schedule) class DailyPartitionsDefinition(TimeWindowPartitionsDefinition): diff --git a/python_modules/dagster/dagster/_utils/cronstring.py b/python_modules/dagster/dagster/_utils/cronstring.py new file mode 100644 index 000000000000..0388baba1878 --- /dev/null +++ b/python_modules/dagster/dagster/_utils/cronstring.py @@ -0,0 +1,42 @@ +def is_basic_daily(cron_schedule: str) -> bool: + return cron_schedule == "0 0 * * *" + + +def is_basic_hourly(cron_schedule: str) -> bool: + return cron_schedule == "0 * * * *" + + +def get_fixed_minute_interval(cron_schedule: str): + """Given a cronstring, returns whether or not it is safe to + assume there is a fixed number of minutes between every tick. For + many cronstrings this is not the case due to Daylight Savings Time, + but for basic hourly cron schedules and cron schedules like */15 it + is safe to assume that there are a fixed number of minutes between each + tick. + """ + if is_basic_hourly(cron_schedule): + return 60 + + cron_parts = cron_schedule.split() + is_wildcard = [part == "*" for part in cron_parts] + + # To match this criteria, every other field besides the first must end in * + # since it must be an every-n-minutes cronstring like */15 + if not is_wildcard[1:]: + return None + + if not cron_parts[0].startswith("*/"): + return None + + try: + # interval makes up the characters after the "*/" + interval = int(cron_parts[0][2:]) + except ValueError: + return None + + # cronstrings like */7 do not have a fixed interval because they jump + # from :54 to :07, but divisors of 60 do + if interval > 0 and interval < 60 and 60 % interval == 0: + return interval + + return None diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py b/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py index 8fd880111880..c4d60f9c0023 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py @@ -1228,6 +1228,18 @@ def my_partitioned_config_2(_start, _end): == partitions_def.get_partition_keys(current_time=current_time)[50:53] ) + partitions_def = TimeWindowPartitionsDefinition( + cron_schedule="*/15 * * * *", + start="2020-11-01-00:30", + timezone="US/Pacific", + fmt="%Y-%m-%d-%H:%M", + ) + current_time = datetime.strptime("2021-06-20", "%Y-%m-%d") + + assert partitions_def.get_num_partitions(current_time) == len( + partitions_def.get_partition_keys(current_time) + ) + def test_get_first_partition_window(): assert DailyPartitionsDefinition(