Skip to content

Commit

Permalink
Create RecentlyRequested condition
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed May 8, 2024
1 parent bde7a62 commit 0d35118
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
InProgressSchedulingCondition as InProgressSchedulingCondition,
MissingSchedulingCondition as MissingSchedulingCondition,
ParentNewerCondition as ParentNewerCondition,
ScheduledSinceCondition as ScheduledSinceCondition,
UpdatedSinceCronCondition as UpdatedSinceCronCondition,
WillBeRequestedCondition as WillBeRequestedCondition,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .parent_newer_condition import ParentNewerCondition as ParentNewerCondition
from .scheduled_since_condition import ScheduledSinceCondition as ScheduledSinceCondition
from .slice_conditions import (
InLatestTimeWindowCondition as InLatestTimeWindowCondition,
InProgressSchedulingCondition as InProgressSchedulingCondition,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import datetime
from typing import Sequence, Tuple, cast

from dagster._core.definitions.declarative_scheduling.serialized_objects import (
AssetSubsetWithMetadata,
)
from dagster._core.definitions.metadata.metadata_value import MetadataValue
from dagster._serdes.serdes import whitelist_for_serdes

from ..scheduling_condition import SchedulingCondition, SchedulingResult
from ..scheduling_context import SchedulingContext

_REQUEST_TIMESTAMP_METADATA_KEY = "request_timestamp"


@whitelist_for_serdes
class ScheduledSinceCondition(SchedulingCondition):
"""SchedulingCondition which is true if the asset has been requested for materialization via
the declarative scheduling system within the given time window.
Will only detect requests which have been made since this condition was added to the asset.
"""

lookback_days_seconds_microseconds: Tuple[int, int, int]

@property
def description(self) -> str:
return ""

@property
def lookback_timedelta(self) -> datetime.timedelta:
return datetime.timedelta(
days=self.lookback_days_seconds_microseconds[0],
seconds=self.lookback_days_seconds_microseconds[1],
microseconds=self.lookback_days_seconds_microseconds[2],
)

@staticmethod
def from_lookback_delta(lookback_delta: datetime.timedelta) -> "ScheduledSinceCondition":
lookback_days_seconds_microseconds = (
lookback_delta.days,
lookback_delta.seconds,
lookback_delta.microseconds,
)
return ScheduledSinceCondition(
lookback_days_seconds_microseconds=lookback_days_seconds_microseconds
)

def _get_minimum_timestamp(self, context: SchedulingContext) -> float:
"""The minimum timestamp for a request to be considered in the lookback window."""
return (context.effective_dt - self.lookback_timedelta).timestamp()

def _get_new_subsets_with_metadata(
self, context: SchedulingContext
) -> Sequence[AssetSubsetWithMetadata]:
"""Updates the stored information as to when the asset was last requested."""
# the first time this asset has been evaluated
if context.previous_evaluation_info is None:
return []

previous_subsets_with_metadata = (
context.previous_evaluation_node.subsets_with_metadata
if context.previous_evaluation_node
else []
)

# filter out subsets that are no longer compatible with the current partitions_def
filtered_subsets_with_metadata = [
swm
for swm in previous_subsets_with_metadata
if swm.subset.is_compatible_with_partitions_def(context.partitions_def)
]

# the slice that was requested on the previous evaluation
newly_requested_slice = context.previous_requested_slice

# no new updates since previous tick
if newly_requested_slice is None:
return filtered_subsets_with_metadata

newly_requested_subset = newly_requested_slice.convert_to_valid_asset_subset()
previous_temporal_context = context.previous_evaluation_info.temporal_context

# for the newly-requested slice, add a new entry indicating that these partitions were
# requested on the previous tick
new_subsets_with_metadata = [
AssetSubsetWithMetadata(
subset=newly_requested_subset,
metadata={
_REQUEST_TIMESTAMP_METADATA_KEY: MetadataValue.float(
previous_temporal_context.effective_dt.timestamp()
)
},
)
]
# for existing subsets, remove references to newly-requested partitions, as these subsets
# are meant to represent the most recent time that the asset was requested
for swm in filtered_subsets_with_metadata:
new_subsets_with_metadata.append(
AssetSubsetWithMetadata(
subset=swm.subset.as_valid(context.partitions_def) - newly_requested_subset,
metadata=swm.metadata,
)
)

# finally, evict any empty subsets from the list, and any subsets with an older timestamp
return [
swm
for swm in new_subsets_with_metadata
if not (
swm.subset.is_empty
or cast(
float,
swm.metadata.get(_REQUEST_TIMESTAMP_METADATA_KEY, MetadataValue.float(0)).value,
)
< self._get_minimum_timestamp(context)
)
]

def evaluate(self, context: SchedulingContext) -> SchedulingResult:
new_subsets_with_metadata = self._get_new_subsets_with_metadata(context)

# we keep track of all slices that have been requested within the lookback window, so we can
# simply compute the union of all of these slices to determine the true slice
requested_within_lookback_slice = context.asset_graph_view.create_empty_slice(
context.asset_key
)
for swm in new_subsets_with_metadata:
requested_slice = context.asset_graph_view.get_asset_slice_from_subset(swm.subset)
if requested_slice is None:
continue
requested_within_lookback_slice = requested_within_lookback_slice.compute_union(
requested_slice
)

return SchedulingResult.create(
context=context,
true_slice=context.candidate_slice.compute_intersection(
requested_within_lookback_slice
),
subsets_with_metadata=new_subsets_with_metadata,
)
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def compute_slice(self, context: SchedulingContext) -> AssetSlice:
@whitelist_for_serdes
class InLatestTimeWindowCondition(SliceSchedulingCondition):
# This is a serializable representation of the lookback timedelta object
lookback_days_second_microseconds: Optional[Tuple[int, int, int]] = None
lookback_days_seconds_microseconds: Optional[Tuple[int, int, int]] = None

@staticmethod
def from_lookback_delta(
Expand All @@ -91,16 +91,16 @@ def from_lookback_delta(
else None
)
return InLatestTimeWindowCondition(
lookback_days_second_microseconds=lookback_days_second_microseconds
lookback_days_seconds_microseconds=lookback_days_second_microseconds
)

@property
def timedelta(self) -> Optional[datetime.timedelta]:
if self.lookback_days_second_microseconds:
if self.lookback_days_seconds_microseconds:
return datetime.timedelta(
days=self.lookback_days_second_microseconds[0],
seconds=self.lookback_days_second_microseconds[1],
microseconds=self.lookback_days_second_microseconds[2],
days=self.lookback_days_seconds_microseconds[0],
seconds=self.lookback_days_seconds_microseconds[1],
microseconds=self.lookback_days_seconds_microseconds[2],
)
else:
return None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
InProgressSchedulingCondition,
MissingSchedulingCondition,
ParentNewerCondition,
ScheduledSinceCondition,
UpdatedSinceCronCondition,
WillBeRequestedCondition,
)
Expand Down Expand Up @@ -163,6 +164,17 @@ def in_latest_time_window(

return InLatestTimeWindowCondition.from_lookback_delta(lookback_delta)

@staticmethod
def scheduled_since(lookback_delta: datetime.timedelta) -> "ScheduledSinceCondition":
"""Returns a SchedulingCondition that is true for an asset partition if it has been requested
for materialization via the declarative scheduling system within the given time window.
Will only detect requests which have been made since this condition was added to the asset.
"""
from .operands import ScheduledSinceCondition

return ScheduledSinceCondition.from_lookback_delta(lookback_delta)

@staticmethod
def requested_this_tick() -> "WillBeRequestedCondition":
"""Returns a SchedulingCondition that is true for an asset partition if it will be requested this tick."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ def legacy_context(self) -> LegacyRuleEvaluationContext:
def _queryer(self) -> "CachingInstanceQueryer":
return self.asset_graph_view._queryer # noqa

@property
def previous_requested_slice(self) -> Optional[AssetSlice]:
"""Returns the requested slice for the previous evaluation. If this asset has never been
evaluated, returns None.
"""
return (
self.previous_evaluation_info.requested_slice if self.previous_evaluation_info else None
)

@property
def previous_candidate_slice(self) -> Optional[AssetSlice]:
"""Returns the candidate slice for the previous evaluation. If this node has never been
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import datetime

from dagster import SchedulingCondition

from ..scenario_specs import one_asset, two_partitions_def
from .asset_condition_scenario import AssetConditionScenarioState


def test_scheduled_since_unpartitioned() -> None:
state = AssetConditionScenarioState(
one_asset,
asset_condition=~SchedulingCondition.scheduled_since(
lookback_delta=datetime.timedelta(hours=1)
),
# this condition depends on having non-empty results
ensure_empty_result=False,
)

state, result = state.evaluate("A")
assert result.true_subset.size == 1

# the last tick would have requested the asset for materialization
state, result = state.evaluate("A")
assert result.true_subset.size == 0

state, result = state.evaluate("A")
assert result.true_subset.size == 0

# now it's been more than an hour since the last request
state = state.with_current_time_advanced(hours=1, seconds=1)
state, result = state.evaluate("A")
assert result.true_subset.size == 1

# edge case: one hour passes in between a request and the next evaluation
state = state.with_current_time_advanced(hours=1)
state, result = state.evaluate("A")
assert result.true_subset.size == 0


def test_scheduled_since_partitioned() -> None:
state = AssetConditionScenarioState(
one_asset,
asset_condition=~SchedulingCondition.scheduled_since(
lookback_delta=datetime.timedelta(hours=1)
),
# this condition depends on having non-empty results
ensure_empty_result=False,
).with_asset_properties(partitions_def=two_partitions_def)

state, result = state.evaluate("A")
assert result.true_subset.size == 2

# the last tick would have requested both assets for materialization
state, result = state.evaluate("A")
assert result.true_subset.size == 0

state, result = state.evaluate("A")
assert result.true_subset.size == 0

# now it's been more than an hour since the last request
state = state.with_current_time_advanced(hours=1, seconds=1)
state, result = state.evaluate("A")
assert result.true_subset.size == 2

# edge case: one hour passes in between a request and the next evaluation
state = state.with_current_time_advanced(hours=1)
state, result = state.evaluate("A")
assert result.true_subset.size == 0

0 comments on commit 0d35118

Please sign in to comment.