Skip to content

Commit

Permalink
Create MaterializedSchedulingCondition
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Apr 29, 2024
1 parent 7ba9e41 commit ea93cfa
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,11 @@ def compute_child_asset_slice(
),
)

def compute_materialized_asset_slice(self, asset_key: "AssetKey") -> "AssetSlice":
return _slice_from_subset(
self, self._queryer.get_materialized_asset_subset(asset_key=asset_key)
)

def compute_intersection_with_partition_keys(
self, partition_keys: AbstractSet[str], asset_slice: AssetSlice
) -> "AssetSlice":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,13 @@ def all_deps_match(condition: "AssetCondition") -> "AssetCondition":

return AllDepsCondition(operand=condition)

@staticmethod
def materialized() -> "AssetCondition":
"""Returns an AssetCondition that is true for an asset partition when it has been materialized."""
from .status_condition import MaterializedSchedulingCondition

return MaterializedSchedulingCondition()


@experimental
@whitelist_for_serdes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from abc import abstractmethod

from dagster._core.asset_graph_view.asset_graph_view import AssetSlice

from .asset_condition import AssetCondition, AssetConditionResult
from .scheduling_condition_evaluation_context import SchedulingConditionEvaluationContext


class StatusSchedulingCondition(AssetCondition):
@abstractmethod
def compute_slice_with_status(
self, context: SchedulingConditionEvaluationContext
) -> AssetSlice: ...

def evaluate(self, context: SchedulingConditionEvaluationContext) -> AssetConditionResult:
# don't compute anything if there are no candidates
if context.candidate_slice.is_empty:
true_slice = context.asset_graph_view.create_empty_slice(context.asset_key)
else:
true_slice = self.compute_slice_with_status(context)

return AssetConditionResult.create(context, true_slice.convert_to_valid_asset_subset())


class MaterializedSchedulingCondition(StatusSchedulingCondition):
@property
def description(self) -> str:
return "Materialized"

def compute_slice_with_status(
self, context: SchedulingConditionEvaluationContext
) -> AssetSlice:
return context.asset_graph_view.compute_materialized_asset_slice(context.asset_key)
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from dagster._core.definitions.declarative_scheduling.asset_condition import (
AssetCondition,
)

from ..base_scenario import run_request
from ..scenario_specs import one_asset, two_partitions_def
from .asset_condition_scenario import AssetConditionScenarioState


def test_materialized_unpartitioned() -> None:
state = AssetConditionScenarioState(one_asset, asset_condition=AssetCondition.materialized())

state, result = state.evaluate("A")
assert result.true_subset.size == 0

state = state.with_runs(run_request("A"))
_, result = state.evaluate("A")
assert result.true_subset.size == 1


def test_materialized_partitioned() -> None:
state = AssetConditionScenarioState(
one_asset, asset_condition=AssetCondition.materialized()
).with_asset_properties(partitions_def=two_partitions_def)

state, result = state.evaluate("A")
assert result.true_subset.size == 0

state = state.with_runs(run_request("A", "1"))
state, result = state.evaluate("A")
assert result.true_subset.size == 1

# same partition materialized again
state = state.with_runs(run_request("A", "1"))
state, result = state.evaluate("A")
assert result.true_subset.size == 1

state = state.with_runs(run_request("A", "2"))
_, result = state.evaluate("A")
assert result.true_subset.size == 2

0 comments on commit ea93cfa

Please sign in to comment.