Skip to content

Commit

Permalink
Remove some AssetCondition references
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed May 14, 2024
1 parent 6b6127f commit 516f396
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
from dagster._core.asset_graph_view.asset_graph_view import AssetSlice
from dagster._serdes.serdes import whitelist_for_serdes

from ..legacy.asset_condition import AssetCondition
from ..scheduling_condition import SchedulingResult
from ..scheduling_condition import SchedulingCondition, SchedulingResult
from ..scheduling_context import SchedulingContext


class SliceSchedulingCondition(AssetCondition):
class SliceSchedulingCondition(SchedulingCondition):
"""Base class for simple conditions which compute a simple slice of the asset graph."""

@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import datetime

from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.schedules import reverse_cron_string_iterator

from ..scheduling_condition import SchedulingCondition, SchedulingResult
from ..scheduling_context import SchedulingContext


@whitelist_for_serdes
class UpdatedSinceCronCondition(SchedulingCondition):
cron_schedule: str
cron_timezone: str
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dagster import AssetCondition, AutoMaterializePolicy, Definitions, asset
from dagster import AutoMaterializePolicy, Definitions, SchedulingCondition, asset
from dagster._core.definitions.declarative_scheduling.serialized_objects import (
AssetConditionEvaluation,
)
Expand All @@ -17,7 +17,7 @@


def test_missing_unpartitioned() -> None:
state = AssetConditionScenarioState(one_asset, asset_condition=AssetCondition.missing())
state = AssetConditionScenarioState(one_asset, asset_condition=SchedulingCondition.missing())

state, result = state.evaluate("A")
assert result.true_subset.size == 1
Expand Down Expand Up @@ -45,7 +45,7 @@ def test_missing_unpartitioned() -> None:

def test_missing_time_partitioned() -> None:
state = (
AssetConditionScenarioState(one_asset, asset_condition=AssetCondition.missing())
AssetConditionScenarioState(one_asset, asset_condition=SchedulingCondition.missing())
.with_asset_properties(partitions_def=daily_partitions_def)
.with_current_time(time_partitions_start_datetime)
.with_current_time_advanced(days=6, minutes=1)
Expand All @@ -72,7 +72,7 @@ def test_missing_time_partitioned() -> None:

def test_serialize_definitions_with_asset_condition() -> None:
amp = AutoMaterializePolicy.from_asset_condition(
AssetCondition.parent_newer() & ~AssetCondition.updated_since_cron("0 * * * *")
SchedulingCondition.parent_newer() & ~SchedulingCondition.updated_since_cron("0 * * * *")
)

@asset(auto_materialize_policy=amp)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import dagster._check as check
import pytest
from dagster import SchedulingCondition
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_subset import AssetSubset
from dagster._core.definitions.declarative_scheduling.legacy.asset_condition import (
AssetCondition,
)
from dagster._core.definitions.declarative_scheduling.scheduling_condition import (
SchedulingResult,
)
from dagster._core.definitions.declarative_scheduling.scheduling_condition import SchedulingResult
from dagster._core.definitions.declarative_scheduling.scheduling_context import (
SchedulingContext,
)
Expand All @@ -20,7 +16,7 @@
def get_hardcoded_condition():
true_set = set()

class HardcodedCondition(AssetCondition):
class HardcodedCondition(SchedulingCondition):
@property
def description(self) -> str:
return "..."
Expand Down Expand Up @@ -52,9 +48,9 @@ def evaluate(self, context: SchedulingContext) -> SchedulingResult:
def test_dep_missing_unpartitioned(is_any: bool) -> None:
inner_condition, true_set = get_hardcoded_condition()
condition = (
AssetCondition.any_deps_match(inner_condition)
SchedulingCondition.any_deps_match(inner_condition)
if is_any
else AssetCondition.all_deps_match(inner_condition)
else SchedulingCondition.all_deps_match(inner_condition)
)
state = AssetConditionScenarioState(one_asset_depends_on_two, asset_condition=condition)

Expand All @@ -80,9 +76,9 @@ def test_dep_missing_unpartitioned(is_any: bool) -> None:
def test_dep_missing_partitioned(is_any: bool) -> None:
inner_condition, true_set = get_hardcoded_condition()
condition = (
AssetCondition.any_deps_match(inner_condition)
SchedulingCondition.any_deps_match(inner_condition)
if is_any
else AssetCondition.all_deps_match(inner_condition)
else SchedulingCondition.all_deps_match(inner_condition)
)
state = AssetConditionScenarioState(
one_asset_depends_on_two, asset_condition=condition
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from dagster import SchedulingCondition
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.declarative_scheduling.legacy.asset_condition import AssetCondition
from dagster._core.definitions.events import AssetKeyPartitionKey

from ..scenario_specs import one_asset, two_partitions_def
from .asset_condition_scenario import AssetConditionScenarioState


def test_in_progress_unpartitioned() -> None:
state = AssetConditionScenarioState(one_asset, asset_condition=AssetCondition.in_progress())
state = AssetConditionScenarioState(
one_asset, asset_condition=SchedulingCondition.in_progress()
)

# no run in progress
state, result = state.evaluate("A")
Expand All @@ -26,7 +28,7 @@ def test_in_progress_unpartitioned() -> None:

def test_in_progress_static_partitioned() -> None:
state = AssetConditionScenarioState(
one_asset, asset_condition=AssetCondition.in_progress()
one_asset, asset_condition=SchedulingCondition.in_progress()
).with_asset_properties(partitions_def=two_partitions_def)

# no run in progress
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import datetime

from dagster import SchedulingCondition
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.declarative_scheduling.legacy.asset_condition import (
AssetCondition,
)
from dagster._core.definitions.events import AssetKeyPartitionKey

from ..scenario_specs import (
Expand All @@ -17,7 +15,7 @@

def test_in_latest_time_window_unpartitioned() -> None:
state = AssetConditionScenarioState(
one_asset, asset_condition=AssetCondition.in_latest_time_window()
one_asset, asset_condition=SchedulingCondition.in_latest_time_window()
)

state, result = state.evaluate("A")
Expand All @@ -27,7 +25,7 @@ def test_in_latest_time_window_unpartitioned() -> None:
def test_in_latest_time_window_unpartitioned_lookback() -> None:
state = AssetConditionScenarioState(
one_asset,
asset_condition=AssetCondition.in_latest_time_window(
asset_condition=SchedulingCondition.in_latest_time_window(
lookback_delta=datetime.timedelta(days=3)
),
)
Expand All @@ -38,7 +36,7 @@ def test_in_latest_time_window_unpartitioned_lookback() -> None:

def test_in_latest_time_window_static_partitioned() -> None:
state = AssetConditionScenarioState(
one_asset, asset_condition=AssetCondition.in_latest_time_window()
one_asset, asset_condition=SchedulingCondition.in_latest_time_window()
).with_asset_properties(partitions_def=two_partitions_def)

state, result = state.evaluate("A")
Expand All @@ -48,7 +46,7 @@ def test_in_latest_time_window_static_partitioned() -> None:
def test_in_latest_time_window_static_partitioned_lookback() -> None:
state = AssetConditionScenarioState(
one_asset,
asset_condition=AssetCondition.in_latest_time_window(
asset_condition=SchedulingCondition.in_latest_time_window(
lookback_delta=datetime.timedelta(days=3)
),
).with_asset_properties(partitions_def=two_partitions_def)
Expand All @@ -59,7 +57,7 @@ def test_in_latest_time_window_static_partitioned_lookback() -> None:

def test_in_latest_time_window_time_partitioned() -> None:
state = AssetConditionScenarioState(
one_asset, asset_condition=AssetCondition.in_latest_time_window()
one_asset, asset_condition=SchedulingCondition.in_latest_time_window()
).with_asset_properties(partitions_def=daily_partitions_def)

# no partitions exist yet
Expand All @@ -85,7 +83,7 @@ def test_in_latest_time_window_time_partitioned() -> None:
def test_in_latest_time_window_time_partitioned_lookback() -> None:
state = AssetConditionScenarioState(
one_asset,
asset_condition=AssetCondition.in_latest_time_window(
asset_condition=SchedulingCondition.in_latest_time_window(
lookback_delta=datetime.timedelta(days=3)
),
).with_asset_properties(partitions_def=daily_partitions_def)
Expand Down

0 comments on commit 516f396

Please sign in to comment.