Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #9193: add retry policy to @asset and @multi_asset #10150

Merged
merged 2 commits into from Oct 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -36,6 +36,7 @@
from ..input import In
from ..output import Out
from ..partition import PartitionsDefinition
from ..policy import RetryPolicy
from ..resource_definition import ResourceDefinition
from ..utils import DEFAULT_IO_MANAGER_KEY, NoValueSentinel

Expand Down Expand Up @@ -68,6 +69,7 @@ def asset(
group_name: Optional[str] = ...,
output_required: bool = ...,
freshness_policy: Optional[FreshnessPolicy] = ...,
retry_policy: Optional[RetryPolicy] = ...,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
...

Expand All @@ -93,6 +95,7 @@ def asset(
group_name: Optional[str] = None,
output_required: bool = True,
freshness_policy: Optional[FreshnessPolicy] = None,
retry_policy: Optional[RetryPolicy] = None,
) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]:
"""Create a definition for how to compute an asset.

Expand Down Expand Up @@ -149,6 +152,7 @@ def asset(
storage and will halt execution of downstream assets.
freshness_policy (FreshnessPolicy): A constraint telling Dagster how often this asset is intended to be updated
with respect to its root data.
retry_policy (Optional[RetryPolicy]): The retry policy for the op that computes the asset.

Examples:

Expand Down Expand Up @@ -190,6 +194,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
group_name=group_name,
output_required=output_required,
freshness_policy=freshness_policy,
retry_policy=retry_policy,
)(fn)

return inner
Expand All @@ -215,6 +220,7 @@ def __init__(
group_name: Optional[str] = None,
output_required: bool = True,
freshness_policy: Optional[FreshnessPolicy] = None,
retry_policy: Optional[RetryPolicy] = None,
):
self.name = name

Expand All @@ -238,6 +244,7 @@ def __init__(
self.group_name = group_name
self.output_required = output_required
self.freshness_policy = freshness_policy
self.retry_policy = retry_policy

def __call__(self, fn: Callable) -> AssetsDefinition:
asset_name = self.name or fn.__name__
Expand Down Expand Up @@ -282,6 +289,7 @@ def __call__(self, fn: Callable) -> AssetsDefinition:
**(self.op_tags or {}),
},
config_schema=self.config_schema,
retry_policy=self.retry_policy,
)(fn)

keys_by_input_name = {
Expand Down Expand Up @@ -323,6 +331,7 @@ def multi_asset(
can_subset: bool = False,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
group_name: Optional[str] = None,
retry_policy: Optional[RetryPolicy] = None,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
"""Create a combined definition of multiple assets that are computed using the same op and same
upstream assets.
Expand Down Expand Up @@ -362,6 +371,7 @@ def multi_asset(
context within the body of the function.
group_name (Optional[str]): A string name used to organize multiple assets into groups. This
group name will be applied to all assets produced by this multi_asset.
retry_policy (Optional[RetryPolicy]): The retry policy for the op that computes the asset.
"""
if resource_defs is not None:
experimental_arg_warning("resource_defs", "multi_asset")
Expand Down Expand Up @@ -428,6 +438,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
**(op_tags or {}),
},
config_schema=config_schema,
retry_policy=retry_policy,
)(fn)

keys_by_input_name = {
Expand Down
Expand Up @@ -22,6 +22,7 @@
build_assets_job,
multi_asset,
)
from dagster._core.definitions.policy import RetryPolicy
from dagster._core.definitions.resource_requirement import ensure_requirements_satisfied
from dagster._core.errors import DagsterInvalidConfigError
from dagster._core.types.dagster_type import resolve_dagster_type
Expand Down Expand Up @@ -583,3 +584,29 @@ def other_asset():
# If IO manager def is provided as a resource def, it appears in required
# resource keys on the underlying op.
assert set(other_asset.node_def.required_resource_keys) == {"blah"}


def test_asset_retry_policy():
retry_policy = RetryPolicy()

@asset(retry_policy=retry_policy)
def my_asset():
...

assert my_asset.op.retry_policy == retry_policy


def test_multi_asset_retry_policy():
retry_policy = RetryPolicy()

@multi_asset(
outs={
"key1": Out(asset_key=AssetKey("key1")),
"key2": Out(asset_key=AssetKey("key2")),
},
retry_policy=retry_policy,
)
def my_asset():
...

assert my_asset.op.retry_policy == retry_policy