Skip to content

Commit

Permalink
[freshness-policy] [1/n] FreshnessPolicy object (#10024)
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Oct 27, 2022
1 parent 174dd99 commit 3c89b70
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 0 deletions.
20 changes: 20 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from dagster._core.decorator_utils import get_function_params
from dagster._core.definitions.asset_layer import get_dep_node_handles_of_graph_backed_asset
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.metadata import MetadataUserInput
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.definitions.utils import DEFAULT_GROUP_NAME, validate_group_name
Expand Down Expand Up @@ -91,6 +92,7 @@ def __init__(
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
group_names_by_key: Optional[Mapping[AssetKey, str]] = None,
metadata_by_key: Optional[Mapping[AssetKey, MetadataUserInput]] = None,
freshness_policies_by_key: Optional[Mapping[AssetKey, FreshnessPolicy]] = None,
# if adding new fields, make sure to handle them in the with_prefix_or_group
# and from_graph methods
):
Expand Down Expand Up @@ -155,6 +157,12 @@ def __init__(
node_def.resolve_output_to_origin(output_name, None)[0].metadata,
self._metadata_by_key.get(asset_key, {}),
)
self._freshness_policies_by_key = check.opt_dict_param(
freshness_policies_by_key,
"freshness_policies_by_key",
key_type=AssetKey,
value_type=FreshnessPolicy,
)

def __call__(self, *args, **kwargs):
from dagster._core.definitions.decorators.solid_decorator import DecoratedSolidFunction
Expand Down Expand Up @@ -506,6 +514,10 @@ def keys_by_input_name(self) -> Mapping[str, AssetKey]:
name: key for name, key in self.node_keys_by_input_name.items() if key in upstream_keys
}

@property
def freshness_policies_by_key(self) -> Mapping[AssetKey, FreshnessPolicy]:
return self._freshness_policies_by_key

@public # type: ignore
@property
def partitions_def(self) -> Optional[PartitionsDefinition]:
Expand Down Expand Up @@ -582,6 +594,11 @@ def with_prefix_or_group(
for key, group_name in self.group_names_by_key.items()
}

replaced_freshness_policies_by_key = {
output_asset_key_replacements.get(key, key): policy
for key, policy in self._freshness_policies_by_key.items()
}

return self.__class__(
keys_by_input_name={
input_name: input_asset_key_replacements.get(key, key)
Expand Down Expand Up @@ -617,6 +634,7 @@ def with_prefix_or_group(
**replaced_group_names_by_key,
**group_names_by_key,
},
freshness_policies_by_key=replaced_freshness_policies_by_key,
)

def _subset_graph_backed_asset(
Expand Down Expand Up @@ -723,6 +741,7 @@ def subset_for(
selected_asset_keys=selected_asset_keys & self.keys,
resource_defs=self.resource_defs,
group_names_by_key=self.group_names_by_key,
freshness_policies_by_key=self.freshness_policies_by_key,
)
else:
# multi_asset subsetting
Expand Down Expand Up @@ -830,6 +849,7 @@ def with_resources(self, resource_defs: Mapping[str, ResourceDefinition]) -> "As
can_subset=self._can_subset,
resource_defs=relevant_resource_defs,
group_names_by_key=self.group_names_by_key,
freshness_policies_by_key=self.freshness_policies_by_key,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from dagster._builtins import Nothing
from dagster._config import UserConfigSchema
from dagster._core.decorator_utils import get_function_params, get_valid_name_permutations
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._core.storage.io_manager import IOManagerDefinition
from dagster._core.types.dagster_type import DagsterType
Expand Down Expand Up @@ -66,6 +67,7 @@ def asset(
op_tags: Optional[Dict[str, Any]] = ...,
group_name: Optional[str] = ...,
output_required: bool = ...,
freshness_policy: Optional[FreshnessPolicy] = ...,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
...

Expand All @@ -90,6 +92,7 @@ def asset(
op_tags: Optional[Dict[str, Any]] = None,
group_name: Optional[str] = None,
output_required: bool = True,
freshness_policy: Optional[FreshnessPolicy] = None,
) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]:
"""Create a definition for how to compute an asset.
Expand Down Expand Up @@ -144,6 +147,8 @@ def asset(
output_required (bool): Whether the decorated function will always materialize an asset.
Defaults to True. If False, the function can return None, which will not be materialized to
storage and will halt execution of downstream assets.
freshness_policy (FreshnessPolicy): A constraint telling Dagster how often this asset is intended to be updated
with respect to its root data.
Examples:
Expand Down Expand Up @@ -184,6 +189,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
op_tags=op_tags,
group_name=group_name,
output_required=output_required,
freshness_policy=freshness_policy,
)(fn)

return inner
Expand All @@ -208,6 +214,7 @@ def __init__(
op_tags: Optional[Dict[str, Any]] = None,
group_name: Optional[str] = None,
output_required: bool = True,
freshness_policy: Optional[FreshnessPolicy] = None,
):
self.name = name

Expand All @@ -230,6 +237,7 @@ def __init__(
self.resource_defs = dict(check.opt_mapping_param(resource_defs, "resource_defs"))
self.group_name = group_name
self.output_required = output_required
self.freshness_policy = freshness_policy

def __call__(self, fn: Callable) -> AssetsDefinition:
asset_name = self.name or fn.__name__
Expand Down Expand Up @@ -293,6 +301,9 @@ def __call__(self, fn: Callable) -> AssetsDefinition:
partition_mappings=partition_mappings if partition_mappings else None,
resource_defs=self.resource_defs,
group_names_by_key={out_asset_key: self.group_name} if self.group_name else None,
freshness_policies_by_key={out_asset_key: self.freshness_policy}
if self.freshness_policy
else None,
)


Expand Down
137 changes: 137 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/freshness_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Mapping, Optional

import pendulum
from croniter import croniter

from dagster._annotations import experimental

from .events import AssetKey


@experimental
class FreshnessPolicy(ABC):
"""A FreshnessPolicy is a policy that defines how up-to-date a given asset is expected to be.
We calculate the current time of the data within an asset by traversing the history of asset
materializations of upstream assets which occured before the most recent materialization.
This gives a lower bound on the most recent records that could possibly be incorporated into the
current state of the asset to which this policy is attached.
"""

@abstractmethod
def minutes_late(
self,
evaluation_time: datetime,
upstream_materialization_times: Mapping[AssetKey, Optional[datetime]],
) -> Optional[float]:
raise NotImplementedError()

@staticmethod
def minimum_freshness(minimum_freshness_minutes: float) -> "MinimumFreshnessPolicy":
"""Static constructor for a freshness policy which specifies that the upstream data that
was used for the most recent asset materialization must have been materialized no more than
`minimum_freshness_minutes` ago, relative to the current time.
"""
return MinimumFreshnessPolicy(minimum_freshness_minutes=minimum_freshness_minutes)

@staticmethod
def cron_minimum_freshness(
minimum_freshness_minutes: float, cron_schedule: str
) -> "CronMinimumFreshnessPolicy":
"""Static constructor for a freshness policy which specifies that the upstream data that
was used for the most recent asset materialization must have been materialized no more than
`minimum_freshness_minutes` ago, relative to the most recent cron schedule tick.
"""
return CronMinimumFreshnessPolicy(
minimum_freshness_minutes=minimum_freshness_minutes,
cron_schedule=cron_schedule,
)


@experimental
class MinimumFreshnessPolicy(FreshnessPolicy):
"""A freshness policy which specifies that the upstream data that was used for the most recent
asset materialization must have been materialized no more than `minimum_freshness_minutes` ago,
relative to the current time.
"""

def __init__(self, minimum_freshness_minutes: float):
self._minimum_freshness_minutes = minimum_freshness_minutes

@property
def minimum_freshness_minutes(self) -> float:
return self._minimum_freshness_minutes

def minutes_late(
self,
evaluation_time: datetime,
upstream_materialization_times: Mapping[AssetKey, Optional[datetime]],
) -> Optional[float]:
minimum_time = evaluation_time - pendulum.duration(minutes=self.minimum_freshness_minutes)

minutes_late = 0.0
for upstream_time in upstream_materialization_times.values():
# if any upstream materialization data is missing, then exit early
if upstream_time is None:
return None

if upstream_time < minimum_time:
minutes_late = max(
minutes_late, (minimum_time - upstream_time).total_seconds() / 60
)
return minutes_late


@experimental
class CronMinimumFreshnessPolicy(FreshnessPolicy):
"""A freshness policy which specifies that the upstream data that was used for the most recent
asset materialization must have been materialized no more than `minimum_freshness_minutes` ago,
relative to the most recent cron schedule tick.
"""

def __init__(self, minimum_freshness_minutes: float, cron_schedule: str):
self._minimum_freshness_minutes = minimum_freshness_minutes
self._cron_schedule = cron_schedule

@property
def minimum_freshness_minutes(self) -> float:
return self._minimum_freshness_minutes

@property
def cron_schedule(self) -> str:
return self._cron_schedule

def minutes_late(
self,
evaluation_time: datetime,
upstream_materialization_times: Mapping[AssetKey, Optional[datetime]],
) -> Optional[float]:
minimum_freshness_duration = pendulum.duration(minutes=self.minimum_freshness_minutes)

# find the most recent schedule tick which is more than minimum_freshness_duration old,
# i.e. the most recent schedule tick which could be failing this constraint
schedule_ticks = croniter(
self.cron_schedule, evaluation_time, ret_type=datetime, is_prev=True
)
latest_required_tick = next(schedule_ticks)
while latest_required_tick + minimum_freshness_duration > evaluation_time:
latest_required_tick = next(schedule_ticks)

minutes_late = 0.0
for upstream_materialization_time in upstream_materialization_times.values():

# if any upstream materialization data is missing, then exit early
if upstream_materialization_time is None:
return None

if upstream_materialization_time < latest_required_tick:
# find the difference between the actual data time and the latest time that you would
# have expected to get this data by
expected_by_time = latest_required_tick + minimum_freshness_duration
minutes_late = max(
minutes_late, (evaluation_time - expected_by_time).total_seconds() / 60
)

return minutes_late
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import pytest

from dagster import AssetKey
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._seven.compat.pendulum import create_pendulum_time


@pytest.mark.parametrize(
["policy", "materialization_time", "evaluation_time", "expected_minutes_late"],
[
(
FreshnessPolicy.minimum_freshness(30),
create_pendulum_time(2022, 1, 1, 0),
create_pendulum_time(2022, 1, 1, 0, 25),
0,
),
(
FreshnessPolicy.minimum_freshness(120),
create_pendulum_time(2022, 1, 1, 0),
create_pendulum_time(2022, 1, 1, 1),
0,
),
(
FreshnessPolicy.minimum_freshness(30),
create_pendulum_time(2022, 1, 1, 0),
create_pendulum_time(2022, 1, 1, 1),
30,
),
(
FreshnessPolicy.minimum_freshness(500),
None,
create_pendulum_time(2022, 1, 1, 0, 25),
None,
),
# materialization happened before SLA
(
FreshnessPolicy.cron_minimum_freshness(
cron_schedule="@daily", minimum_freshness_minutes=15
),
create_pendulum_time(2022, 1, 1, 0, 5),
create_pendulum_time(2022, 1, 1, 0, 10),
0,
),
# materialization happened after SLA, but is fine now
(
FreshnessPolicy.cron_minimum_freshness(
cron_schedule="@daily", minimum_freshness_minutes=15
),
create_pendulum_time(2022, 1, 1, 0, 30),
create_pendulum_time(2022, 1, 1, 1, 0),
0,
),
# materialization for this data has not happened yet (day before)
(
FreshnessPolicy.cron_minimum_freshness(
cron_schedule="@daily", minimum_freshness_minutes=15
),
create_pendulum_time(2022, 1, 1, 23, 0),
create_pendulum_time(2022, 1, 2, 2, 0),
# expected data by is 2022-01-02T00:15, so you are 1 hour, 45 minutes late
60 + 45,
),
# weird one, basically want to have a materialization every hour no more than 5 hours after
# that data arrives -- edge case probably not useful in practice?
(
FreshnessPolicy.cron_minimum_freshness(
cron_schedule="@hourly", minimum_freshness_minutes=60 * 5
),
create_pendulum_time(2022, 1, 1, 1, 0),
create_pendulum_time(2022, 1, 1, 4, 0),
0,
),
(
FreshnessPolicy.cron_minimum_freshness(
cron_schedule="@hourly", minimum_freshness_minutes=60 * 5
),
create_pendulum_time(2022, 1, 1, 1, 15),
create_pendulum_time(2022, 1, 1, 7, 45),
# the data for 2AM is considered missing if it is not there by 7AM (5 hours later).
# we evaluate at 7:45, so at this point it is 45 minutes late
45,
),
],
)
def test_policies(policy, materialization_time, evaluation_time, expected_minutes_late):
if materialization_time:
upstream_materialization_times = {AssetKey("root"): materialization_time}
else:
upstream_materialization_times = {AssetKey("root"): None}
minutes_late = policy.minutes_late(
evaluation_time=evaluation_time,
upstream_materialization_times=upstream_materialization_times,
)

assert minutes_late == expected_minutes_late

0 comments on commit 3c89b70

Please sign in to comment.