Skip to content

Commit

Permalink
Create RecentlyRequested condition
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed May 14, 2024
1 parent 0474105 commit 55b9187
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
MissingSchedulingCondition as MissingSchedulingCondition,
ParentNewerCondition as ParentNewerCondition,
RequestedThisTickCondition as RequestedThisTickCondition,
ScheduledSinceCondition as ScheduledSinceCondition,
UpdatedSinceCronCondition as UpdatedSinceCronCondition,
)
from .operators import (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .parent_newer_condition import ParentNewerCondition as ParentNewerCondition
from .scheduled_since_condition import ScheduledSinceCondition as ScheduledSinceCondition
from .slice_conditions import (
InLatestTimeWindowCondition as InLatestTimeWindowCondition,
InProgressSchedulingCondition as InProgressSchedulingCondition,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import datetime
from typing import Sequence, cast

from dagster._core.definitions.declarative_scheduling.scheduling_evaluation_info import (
AssetSliceWithMetadata,
)
from dagster._core.definitions.declarative_scheduling.utils import SerializableTimeDelta
from dagster._core.definitions.metadata.metadata_value import MetadataValue
from dagster._serdes.serdes import whitelist_for_serdes

from ..scheduling_condition import SchedulingCondition, SchedulingResult
from ..scheduling_context import SchedulingContext

_REQUEST_TIMESTAMP_METADATA_KEY = "request_timestamp"


@whitelist_for_serdes
class ScheduledSinceCondition(SchedulingCondition):
"""SchedulingCondition which is true if the asset has been requested for materialization via
the declarative scheduling system within the given time window.
Will only detect requests which have been made since this condition was added to the asset.
"""

serializable_lookback_timedelta: SerializableTimeDelta

@property
def description(self) -> str:
return f"Has been requested within the last {self.lookback_timedelta}"

@property
def lookback_timedelta(self) -> datetime.timedelta:
return self.serializable_lookback_timedelta.to_timedelta()

@staticmethod
def from_lookback_delta(lookback_delta: datetime.timedelta) -> "ScheduledSinceCondition":
return ScheduledSinceCondition(
serializable_lookback_timedelta=SerializableTimeDelta.from_timedelta(lookback_delta)
)

def _get_minimum_timestamp(self, context: SchedulingContext) -> float:
"""The minimum timestamp for a request to be considered in the lookback window."""
return (context.effective_dt - self.lookback_timedelta).timestamp()

def _get_new_slices_with_metadata(
self, context: SchedulingContext
) -> Sequence[AssetSliceWithMetadata]:
"""Updates the stored information as to when the asset was last requested."""
# the first time this asset has been evaluated
if context.previous_evaluation_info is None:
return []

previous_slices_with_metadata = (
context.previous_evaluation_node.slices_with_metadata
if context.previous_evaluation_node
else []
)

# no new updates since previous tick
if context.previous_requested_slice is None:
return previous_slices_with_metadata

# for existing subsets, remove references to newly-requested partitions, as these subsets
# are meant to represent the most recent time that the asset was requested
slices_with_metadata = [
AssetSliceWithMetadata(
asset_slice.compute_difference(context.previous_requested_slice), metadata
)
for asset_slice, metadata in previous_slices_with_metadata
]

# for the newly-requested slice, add a new entry indicating that these partitions were
# requested on the previous tick
previous_request_timestamp = (
context.previous_evaluation_info.temporal_context.effective_dt.timestamp()
)
slices_with_metadata.append(
AssetSliceWithMetadata(
context.previous_requested_slice,
{_REQUEST_TIMESTAMP_METADATA_KEY: MetadataValue.float(previous_request_timestamp)},
)
)

# finally, evict any empty subsets from the list, and any subsets with an older timestamp
return [
asset_slice_with_metadata
for asset_slice_with_metadata in slices_with_metadata
if not (
asset_slice_with_metadata.asset_slice.is_empty
or cast(
float,
asset_slice_with_metadata.metadata.get(
_REQUEST_TIMESTAMP_METADATA_KEY, MetadataValue.float(0)
).value,
)
< self._get_minimum_timestamp(context)
)
]

def evaluate(self, context: SchedulingContext) -> SchedulingResult:
slices_with_metadata = self._get_new_slices_with_metadata(context)

# we keep track of all slices that have been requested within the lookback window, so we can
# simply compute the union of all of these slices to determine the true slice
requested_within_lookback_slice = context.asset_graph_view.create_empty_slice(
context.asset_key
)
for asset_slice, _ in slices_with_metadata:
requested_within_lookback_slice = requested_within_lookback_slice.compute_union(
asset_slice
)

return SchedulingResult.create(
context=context,
true_slice=context.candidate_slice.compute_intersection(
requested_within_lookback_slice
),
slices_with_metadata=slices_with_metadata,
)
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import datetime
from abc import abstractmethod
from typing import Optional, Tuple
from typing import Optional

from dagster._core.asset_graph_view.asset_graph_view import AssetSlice
from dagster._core.definitions.declarative_scheduling.utils import SerializableTimeDelta
from dagster._serdes.serdes import whitelist_for_serdes

from ..scheduling_condition import SchedulingCondition, SchedulingResult
Expand Down Expand Up @@ -74,46 +75,35 @@ def compute_slice(self, context: SchedulingContext) -> AssetSlice:

@whitelist_for_serdes
class InLatestTimeWindowCondition(SliceSchedulingCondition):
# This is a serializable representation of the lookback timedelta object
lookback_days_second_microseconds: Optional[Tuple[int, int, int]] = None
serializable_lookback_timedelta: Optional[SerializableTimeDelta] = None

@staticmethod
def from_lookback_delta(
lookback_delta: Optional[datetime.timedelta],
) -> "InLatestTimeWindowCondition":
lookback_days_second_microseconds = (
(
lookback_delta.days,
lookback_delta.seconds,
lookback_delta.microseconds,
)
return InLatestTimeWindowCondition(
serializable_lookback_timedelta=SerializableTimeDelta.from_timedelta(lookback_delta)
if lookback_delta
else None
)
return InLatestTimeWindowCondition(
lookback_days_second_microseconds=lookback_days_second_microseconds
)

@property
def timedelta(self) -> Optional[datetime.timedelta]:
if self.lookback_days_second_microseconds:
return datetime.timedelta(
days=self.lookback_days_second_microseconds[0],
seconds=self.lookback_days_second_microseconds[1],
microseconds=self.lookback_days_second_microseconds[2],
)
else:
return None
def lookback_timedelta(self) -> Optional[datetime.timedelta]:
return (
self.serializable_lookback_timedelta.to_timedelta()
if self.serializable_lookback_timedelta
else None
)

@property
def description(self) -> str:
return (
f"Within {self.timedelta} of the end of the latest time window"
if self.timedelta
f"Within {self.lookback_timedelta} of the end of the latest time window"
if self.lookback_timedelta
else "Within latest time window"
)

def compute_slice(self, context: SchedulingContext) -> AssetSlice:
return context.asset_graph_view.compute_latest_time_window_slice(
context.asset_key, lookback_delta=self.timedelta
context.asset_key, lookback_delta=self.lookback_timedelta
)
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
MissingSchedulingCondition,
ParentNewerCondition,
RequestedThisTickCondition,
ScheduledSinceCondition,
UpdatedSinceCronCondition,
)
from .operators import (
Expand Down Expand Up @@ -147,6 +148,22 @@ def in_latest_time_window(

return InLatestTimeWindowCondition.from_lookback_delta(lookback_delta)

@staticmethod
def scheduled_since(lookback_delta: datetime.timedelta) -> "ScheduledSinceCondition":
"""Returns a SchedulingCondition that is true for an asset partition if it has been requested
for materialization via the declarative scheduling system within the given time window.
Will only detect requests which have been made since this condition was added to the asset.
Args:
lookback_delta (datetime.timedelta): How far back in time to look for requests, e.g.
if this is set to 1 hour, this rule will be true for a given asset partition for
1 hour starting from the latest tick on which it gets requested.
"""
from .operands import ScheduledSinceCondition

return ScheduledSinceCondition.from_lookback_delta(lookback_delta)

@staticmethod
def requested_this_tick() -> "RequestedThisTickCondition":
"""Returns a SchedulingCondition that is true for an asset partition if it will be requested this tick."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,15 @@ def legacy_context(self) -> LegacyRuleEvaluationContext:
def _queryer(self) -> "CachingInstanceQueryer":
return self.asset_graph_view._queryer # noqa

@property
def previous_requested_slice(self) -> Optional[AssetSlice]:
"""Returns the requested slice for the previous evaluation. If this asset has never been
evaluated, returns None.
"""
return (
self.previous_evaluation_info.requested_slice if self.previous_evaluation_info else None
)

@property
def previous_candidate_slice(self) -> Optional[AssetSlice]:
"""Returns the candidate slice for the previous evaluation. If this node has never been
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import datetime
from typing import NamedTuple

from dagster._serdes.serdes import whitelist_for_serdes


@whitelist_for_serdes
class SerializableTimeDelta(NamedTuple):
"""A Dagster-serializable version of a datetime.timedelta. The datetime.timedelta class
internally stores values as an integer number of days, seconds, and microseconds. This class
handles converting between the in-memory and serializable formats.
"""

days: int
seconds: int
microseconds: int

@staticmethod
def from_timedelta(timedelta: datetime.timedelta) -> "SerializableTimeDelta":
return SerializableTimeDelta(
days=timedelta.days, seconds=timedelta.seconds, microseconds=timedelta.microseconds
)

def to_timedelta(self) -> datetime.timedelta:
return datetime.timedelta(
days=self.days, seconds=self.seconds, microseconds=self.microseconds
)
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class AssetConditionScenarioState(ScenarioState):
asset_condition: Optional[SchedulingCondition] = None
previous_evaluation_state: Optional[AssetConditionEvaluationState] = None
requested_asset_partitions: Optional[Sequence[AssetKeyPartitionKey]] = None
ensure_empty_result: bool = True

def _get_current_evaluation_state_by_key(
self, asset_graph_view: AssetGraphView
Expand Down Expand Up @@ -84,8 +85,12 @@ def evaluate(
asset_key = AssetKey.from_coercible(asset)
# ensure that the top level condition never returns any asset partitions, as otherwise the
# next evaluation will assume that those asset partitions were requested by the machinery
asset_condition = AndAssetCondition(
operands=[check.not_none(self.asset_condition), FalseAssetCondition()]
asset_condition = (
AndAssetCondition(
operands=[check.not_none(self.asset_condition), FalseAssetCondition()]
)
if self.ensure_empty_result
else check.not_none(self.asset_condition)
)
asset_graph = self.scenario_spec.with_asset_properties(
asset,
Expand Down Expand Up @@ -141,8 +146,9 @@ def evaluate(
context, full_result
),
)
result = full_result.child_results[0] if self.ensure_empty_result else full_result

return new_state, full_result.child_results[0]
return new_state, result

def without_previous_evaluation_state(self) -> "AssetConditionScenarioState":
"""Removes the previous evaluation state from the state. This is useful for testing
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import datetime

from dagster import SchedulingCondition

from ..scenario_specs import one_asset, two_partitions_def
from .asset_condition_scenario import AssetConditionScenarioState


def test_scheduled_since_unpartitioned() -> None:
state = AssetConditionScenarioState(
one_asset,
asset_condition=~SchedulingCondition.scheduled_since(
lookback_delta=datetime.timedelta(hours=1)
),
# this condition depends on having non-empty results
ensure_empty_result=False,
)

state, result = state.evaluate("A")
assert result.true_subset.size == 1

# the last tick would have requested the asset for materialization
state, result = state.evaluate("A")
assert result.true_subset.size == 0

state, result = state.evaluate("A")
assert result.true_subset.size == 0

# now it's been more than an hour since the last request
state = state.with_current_time_advanced(hours=1, seconds=1)
state, result = state.evaluate("A")
assert result.true_subset.size == 1

# edge case: one hour passes in between a request and the next evaluation
state = state.with_current_time_advanced(hours=1)
state, result = state.evaluate("A")
assert result.true_subset.size == 0


def test_scheduled_since_partitioned() -> None:
state = AssetConditionScenarioState(
one_asset,
asset_condition=~SchedulingCondition.scheduled_since(
lookback_delta=datetime.timedelta(hours=1)
),
# this condition depends on having non-empty results
ensure_empty_result=False,
).with_asset_properties(partitions_def=two_partitions_def)

state, result = state.evaluate("A")
assert result.true_subset.size == 2

# the last tick would have requested both assets for materialization
state, result = state.evaluate("A")
assert result.true_subset.size == 0

state, result = state.evaluate("A")
assert result.true_subset.size == 0

# now it's been more than an hour since the last request
state = state.with_current_time_advanced(hours=1, seconds=1)
state, result = state.evaluate("A")
assert result.true_subset.size == 2

# edge case: one hour passes in between a request and the next evaluation
state = state.with_current_time_advanced(hours=1)
state, result = state.evaluate("A")
assert result.true_subset.size == 0

0 comments on commit 55b9187

Please sign in to comment.