Skip to content

Commit

Permalink
Update values within InLatestTimeWindow
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed May 10, 2024
1 parent 04f098c commit f34a068
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import datetime
from abc import abstractmethod
from typing import Optional
from typing import Optional, Tuple

from dagster._core.asset_graph_view.asset_graph_view import AssetSlice
from dagster._serdes.serdes import whitelist_for_serdes
Expand Down Expand Up @@ -50,11 +50,36 @@ def compute_slice(self, context: SchedulingContext) -> AssetSlice:

@whitelist_for_serdes
class InLatestTimeWindowCondition(SliceSchedulingCondition):
lookback_seconds: Optional[float] = None
# This is a serializable representation of the lookback timedelta object
lookback_days_second_microseconds: Optional[Tuple[int, int, int]] = None

@staticmethod
def from_lookback_delta(
lookback_delta: Optional[datetime.timedelta],
) -> "InLatestTimeWindowCondition":
lookback_days_second_microseconds = (
(
lookback_delta.days,
lookback_delta.seconds,
lookback_delta.microseconds,
)
if lookback_delta
else None
)
return InLatestTimeWindowCondition(
lookback_days_second_microseconds=lookback_days_second_microseconds
)

@property
def timedelta(self) -> Optional[datetime.timedelta]:
return datetime.timedelta(seconds=self.lookback_seconds) if self.lookback_seconds else None
if self.lookback_days_second_microseconds:
return datetime.timedelta(
days=self.lookback_days_second_microseconds[0],
seconds=self.lookback_days_second_microseconds[1],
microseconds=self.lookback_days_second_microseconds[2],
)
else:
return None

@property
def description(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,7 @@ def in_latest_time_window(
"""
from .operands import InLatestTimeWindowCondition

return InLatestTimeWindowCondition(
lookback_seconds=lookback_delta.total_seconds() if lookback_delta else None
)
return InLatestTimeWindowCondition.from_lookback_delta(lookback_delta)


class SchedulingResult(DagsterModel):
Expand Down

0 comments on commit f34a068

Please sign in to comment.