-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
a38a753
commit 0fd4a9d
Showing
7 changed files
with
239 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1 change: 1 addition & 0 deletions
1
python_modules/dagster/dagster/_core/definitions/declarative_scheduling/operands/__init__.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
142 changes: 142 additions & 0 deletions
142
...er/dagster/_core/definitions/declarative_scheduling/operands/scheduled_since_condition.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
import datetime | ||
from typing import Sequence, Tuple, cast | ||
|
||
from dagster._core.definitions.declarative_scheduling.serialized_objects import ( | ||
AssetSubsetWithMetadata, | ||
) | ||
from dagster._core.definitions.metadata.metadata_value import MetadataValue | ||
from dagster._serdes.serdes import whitelist_for_serdes | ||
|
||
from ..scheduling_condition import SchedulingCondition, SchedulingResult | ||
from ..scheduling_context import SchedulingContext | ||
|
||
_REQUEST_TIMESTAMP_METADATA_KEY = "request_timestamp" | ||
|
||
|
||
@whitelist_for_serdes | ||
class ScheduledSinceCondition(SchedulingCondition): | ||
"""SchedulingCondition which is true if the asset has been requested for materialization via | ||
the declarative scheduling system within the given time window. | ||
Will only detect requests which have been made since this condition was added to the asset. | ||
""" | ||
|
||
lookback_days_seconds_microseconds: Tuple[int, int, int] | ||
|
||
@property | ||
def description(self) -> str: | ||
return "" | ||
|
||
@property | ||
def lookback_timedelta(self) -> datetime.timedelta: | ||
return datetime.timedelta( | ||
days=self.lookback_days_seconds_microseconds[0], | ||
seconds=self.lookback_days_seconds_microseconds[1], | ||
microseconds=self.lookback_days_seconds_microseconds[2], | ||
) | ||
|
||
@staticmethod | ||
def from_lookback_delta(lookback_delta: datetime.timedelta) -> "ScheduledSinceCondition": | ||
lookback_days_seconds_microseconds = ( | ||
lookback_delta.days, | ||
lookback_delta.seconds, | ||
lookback_delta.microseconds, | ||
) | ||
return ScheduledSinceCondition( | ||
lookback_days_seconds_microseconds=lookback_days_seconds_microseconds | ||
) | ||
|
||
def _get_minimum_timestamp(self, context: SchedulingContext) -> float: | ||
"""The minimum timestamp for a request to be considered in the lookback window.""" | ||
return (context.effective_dt - self.lookback_timedelta).timestamp() | ||
|
||
def _get_new_subsets_with_metadata( | ||
self, context: SchedulingContext | ||
) -> Sequence[AssetSubsetWithMetadata]: | ||
"""Updates the stored information as to when the asset was last requested.""" | ||
# the first time this asset has been evaluated | ||
if context.previous_evaluation_info is None: | ||
return [] | ||
|
||
previous_subsets_with_metadata = ( | ||
context.previous_evaluation_node.subsets_with_metadata | ||
if context.previous_evaluation_node | ||
else [] | ||
) | ||
|
||
# filter out subsets that are no longer compatible with the current partitions_def | ||
filtered_subsets_with_metadata = [ | ||
swm | ||
for swm in previous_subsets_with_metadata | ||
if swm.subset.is_compatible_with_partitions_def(context.partitions_def) | ||
] | ||
|
||
# the slice that was requested on the previous evaluation | ||
newly_requested_slice = context.previous_requested_slice | ||
|
||
# no new updates since previous tick | ||
if newly_requested_slice is None: | ||
return filtered_subsets_with_metadata | ||
|
||
newly_requested_subset = newly_requested_slice.convert_to_valid_asset_subset() | ||
previous_temporal_context = context.previous_evaluation_info.temporal_context | ||
|
||
# for the newly-requested slice, add a new entry indicating that these partitions were | ||
# requested on the previous tick | ||
new_subsets_with_metadata = [ | ||
AssetSubsetWithMetadata( | ||
subset=newly_requested_subset, | ||
metadata={ | ||
_REQUEST_TIMESTAMP_METADATA_KEY: MetadataValue.float( | ||
previous_temporal_context.effective_dt.timestamp() | ||
) | ||
}, | ||
) | ||
] | ||
# for existing subsets, remove references to newly-requested partitions, as these subsets | ||
# are meant to represent the most recent time that the asset was requested | ||
for swm in filtered_subsets_with_metadata: | ||
new_subsets_with_metadata.append( | ||
AssetSubsetWithMetadata( | ||
subset=swm.subset.as_valid(context.partitions_def) - newly_requested_subset, | ||
metadata=swm.metadata, | ||
) | ||
) | ||
|
||
# finally, evict any empty subsets from the list, and any subsets with an older timestamp | ||
return [ | ||
swm | ||
for swm in new_subsets_with_metadata | ||
if not ( | ||
swm.subset.is_empty | ||
or cast( | ||
float, | ||
swm.metadata.get(_REQUEST_TIMESTAMP_METADATA_KEY, MetadataValue.float(0)).value, | ||
) | ||
< self._get_minimum_timestamp(context) | ||
) | ||
] | ||
|
||
def evaluate(self, context: SchedulingContext) -> SchedulingResult: | ||
new_subsets_with_metadata = self._get_new_subsets_with_metadata(context) | ||
|
||
# we keep track of all slices that have been requested within the lookback window, so we can | ||
# simply compute the union of all of these slices to determine the true slice | ||
requested_within_lookback_slice = context.asset_graph_view.create_empty_slice( | ||
context.asset_key | ||
) | ||
for swm in new_subsets_with_metadata: | ||
requested_slice = context.asset_graph_view.get_asset_slice_from_subset(swm.subset) | ||
if requested_slice is None: | ||
continue | ||
requested_within_lookback_slice = requested_within_lookback_slice.compute_union( | ||
requested_slice | ||
) | ||
|
||
return SchedulingResult.create( | ||
context=context, | ||
true_slice=context.candidate_slice.compute_intersection( | ||
requested_within_lookback_slice | ||
), | ||
subsets_with_metadata=new_subsets_with_metadata, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
68 changes: 68 additions & 0 deletions
68
...ions_tests/auto_materialize_tests/asset_condition_tests/test_scheduled_since_condition.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
import datetime | ||
|
||
from dagster import SchedulingCondition | ||
|
||
from ..scenario_specs import one_asset, two_partitions_def | ||
from .asset_condition_scenario import AssetConditionScenarioState | ||
|
||
|
||
def test_scheduled_since_unpartitioned() -> None: | ||
state = AssetConditionScenarioState( | ||
one_asset, | ||
asset_condition=~SchedulingCondition.scheduled_since( | ||
lookback_delta=datetime.timedelta(hours=1) | ||
), | ||
# this condition depends on having non-empty results | ||
ensure_empty_result=False, | ||
) | ||
|
||
state, result = state.evaluate("A") | ||
assert result.true_subset.size == 1 | ||
|
||
# the last tick would have requested the asset for materialization | ||
state, result = state.evaluate("A") | ||
assert result.true_subset.size == 0 | ||
|
||
state, result = state.evaluate("A") | ||
assert result.true_subset.size == 0 | ||
|
||
# now it's been more than an hour since the last request | ||
state = state.with_current_time_advanced(hours=1, seconds=1) | ||
state, result = state.evaluate("A") | ||
assert result.true_subset.size == 1 | ||
|
||
# edge case: one hour passes in between a request and the next evaluation | ||
state = state.with_current_time_advanced(hours=1) | ||
state, result = state.evaluate("A") | ||
assert result.true_subset.size == 0 | ||
|
||
|
||
def test_scheduled_since_partitioned() -> None: | ||
state = AssetConditionScenarioState( | ||
one_asset, | ||
asset_condition=~SchedulingCondition.scheduled_since( | ||
lookback_delta=datetime.timedelta(hours=1) | ||
), | ||
# this condition depends on having non-empty results | ||
ensure_empty_result=False, | ||
).with_asset_properties(partitions_def=two_partitions_def) | ||
|
||
state, result = state.evaluate("A") | ||
assert result.true_subset.size == 2 | ||
|
||
# the last tick would have requested both assets for materialization | ||
state, result = state.evaluate("A") | ||
assert result.true_subset.size == 0 | ||
|
||
state, result = state.evaluate("A") | ||
assert result.true_subset.size == 0 | ||
|
||
# now it's been more than an hour since the last request | ||
state = state.with_current_time_advanced(hours=1, seconds=1) | ||
state, result = state.evaluate("A") | ||
assert result.true_subset.size == 2 | ||
|
||
# edge case: one hour passes in between a request and the next evaluation | ||
state = state.with_current_time_advanced(hours=1) | ||
state, result = state.evaluate("A") | ||
assert result.true_subset.size == 0 |