In [1]:
import datetime as dt
import pendulum
from pendulum import instance as pdi
try:
    from dagster._core.definitions.partitions.utils import TimeWindow
except ImportError:
    from dagster._core.definitions.time_window_partitions import TimeWindow

In [2]:
daily_partitions_consecutive = [
    TimeWindow(
        start=dt.datetime(2022, 1, 1, 0),
        end=dt.datetime(2022, 1, 2, 0),
    ),
    TimeWindow(
        start=dt.datetime(2022, 1, 2, 0),
        end=dt.datetime(2022, 1, 3, 0),
    ),
    TimeWindow(
        start=dt.datetime(2022, 1, 3, 0),
        end=dt.datetime(2022, 1, 4, 0),
    ),
]

daily_partitions_non_consecutive = [
    TimeWindow(
        start=dt.datetime(2022, 1, 1, 0),
        end=dt.datetime(2022, 1, 2, 0),
    ),
    TimeWindow(
        start=dt.datetime(2022, 1, 2, 0),
        end=dt.datetime(2022, 1, 3, 0),
    ),
    TimeWindow(
        start=dt.datetime(2022, 1, 4, 0),
        end=dt.datetime(2022, 1, 5, 0),
    ),
]

monthly_partition_consecutive = [
    TimeWindow(
        start=dt.datetime(2022,1,1,0),
        end=dt.datetime(2022,2,1,0),
    ),
]

monthly_partition_non_consecutive = [
    TimeWindow(
        start=dt.datetime(2022,1,1,0),
        end=dt.datetime(2022,2,1,0),
    ),
]

In [None]:
class MultiTimePartitionsChecker:
    def __init__(self, partitions: list[TimeWindow]):
        """Helper class that defines checks on a list of TimeWindow objects
        most importantly, partitions should be consecutive.

        Args:
            partitions (list[TimeWindow]): List of TimeWindow objects
        """
        self._partitions = partitions

        start_date = min([w.start for w in self._partitions])
        end_date = max([w.end for w in self._partitions])

        if not isinstance(start_date, dt.datetime):
            raise ValueError("Start date is not a datetime")
        if not isinstance(end_date, dt.datetime):
            raise ValueError("End date is not a datetime")

        self.start = start_date
        self.end = end_date

    @property
    def hourly_delta(self) -> int:
        deltas = [date_diff(w.start, w.end).in_hours() for w in self._partitions]
        if len(set(deltas)) != 1:
            raise ValueError(
                "TimeWindowPartitionsDefinition must have the same delta from start to end",
            )
        return int(deltas[0])

    def is_consecutive(self) -> bool:
        """Checks whether the provided start dates of each partition timewindow is consecutive"""
        return (
            len(
                {
                    pdi(self.start).add(hours=self.hourly_delta * i)
                    for i in range(date_diff(self.start, self.end).in_days() + 1)
                }
                - {pdi(d.start) for d in self._partitions},
            )
            == 1
        )


def date_diff(start: dt.datetime, end: dt.datetime) -> pendulum.Interval:
    """Compute an interval between two dates"""
    start_ = pendulum.instance(start)
    end_ = pendulum.instance(end)
    return end_ - start_


In [18]:
daily_checker = MultiTimePartitionsChecker(
    partitions=daily_partitions_non_consecutive
)

monthly_checker = MultiTimePartitionsChecker(
    partitions=monthly_partition_non_consecutive
)

In [21]:
# daily_checker.is_consecutive()
monthly_checker.is_consecutive()


False

In [22]:
monthly_checker._partitions

{pdi(monthly_checker.start).add(hours=monthly_checker.hourly_delta*i) for i in range(len(monthly_checker._partitions) +1)}

{DateTime(2022, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC')),
 DateTime(2022, 2, 1, 0, 0, 0, tzinfo=Timezone('UTC'))}

In [23]:
(dt.timedelta(days=date_diff(monthly_checker.start, monthly_checker.end).in_days()) - dt.timedelta(hours=monthly_checker.hourly_delta)).days + 1

1

In [25]:
{pdi(monthly_checker.start).add(hours=monthly_checker.hourly_delta*i) for i in range((dt.timedelta(days=date_diff(monthly_checker.start, monthly_checker.end).in_days()) - dt.timedelta(hours=monthly_checker.hourly_delta)).days + 1)}

{DateTime(2022, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))}