diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index 7695e502af008..d1670f9e3e7c7 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -36,6 +36,7 @@ from ..input import In from ..output import Out from ..partition import PartitionsDefinition +from ..policy import RetryPolicy from ..resource_definition import ResourceDefinition from ..utils import DEFAULT_IO_MANAGER_KEY, NoValueSentinel @@ -68,6 +69,7 @@ def asset( group_name: Optional[str] = ..., output_required: bool = ..., freshness_policy: Optional[FreshnessPolicy] = ..., + retry_policy: Optional[RetryPolicy] = ..., ) -> Callable[[Callable[..., Any]], AssetsDefinition]: ... @@ -93,6 +95,7 @@ def asset( group_name: Optional[str] = None, output_required: bool = True, freshness_policy: Optional[FreshnessPolicy] = None, + retry_policy: Optional[RetryPolicy] = None, ) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]: """Create a definition for how to compute an asset. @@ -149,6 +152,7 @@ def asset( storage and will halt execution of downstream assets. freshness_policy (FreshnessPolicy): A constraint telling Dagster how often this asset is intended to be updated with respect to its root data. + retry_policy (Optional[RetryPolicy]): The retry policy for the op that computes the asset. Examples: @@ -190,6 +194,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: group_name=group_name, output_required=output_required, freshness_policy=freshness_policy, + retry_policy=retry_policy, )(fn) return inner @@ -215,6 +220,7 @@ def __init__( group_name: Optional[str] = None, output_required: bool = True, freshness_policy: Optional[FreshnessPolicy] = None, + retry_policy: Optional[RetryPolicy] = None, ): self.name = name @@ -238,6 +244,7 @@ def __init__( self.group_name = group_name self.output_required = output_required self.freshness_policy = freshness_policy + self.retry_policy = retry_policy def __call__(self, fn: Callable) -> AssetsDefinition: asset_name = self.name or fn.__name__ @@ -282,6 +289,7 @@ def __call__(self, fn: Callable) -> AssetsDefinition: **(self.op_tags or {}), }, config_schema=self.config_schema, + retry_policy=self.retry_policy, )(fn) keys_by_input_name = { @@ -323,6 +331,7 @@ def multi_asset( can_subset: bool = False, resource_defs: Optional[Mapping[str, ResourceDefinition]] = None, group_name: Optional[str] = None, + retry_policy: Optional[RetryPolicy] = None, ) -> Callable[[Callable[..., Any]], AssetsDefinition]: """Create a combined definition of multiple assets that are computed using the same op and same upstream assets. @@ -362,6 +371,7 @@ def multi_asset( context within the body of the function. group_name (Optional[str]): A string name used to organize multiple assets into groups. This group name will be applied to all assets produced by this multi_asset. + retry_policy (Optional[RetryPolicy]): The retry policy for the op that computes the asset. """ if resource_defs is not None: experimental_arg_warning("resource_defs", "multi_asset") @@ -428,6 +438,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: **(op_tags or {}), }, config_schema=config_schema, + retry_policy=retry_policy, )(fn) keys_by_input_name = { diff --git a/python_modules/dagster/dagster_tests/core_tests/asset_defs_tests/test_decorators.py b/python_modules/dagster/dagster_tests/core_tests/asset_defs_tests/test_decorators.py index dff3c90189b19..f7d69c73675e9 100644 --- a/python_modules/dagster/dagster_tests/core_tests/asset_defs_tests/test_decorators.py +++ b/python_modules/dagster/dagster_tests/core_tests/asset_defs_tests/test_decorators.py @@ -22,6 +22,7 @@ build_assets_job, multi_asset, ) +from dagster._core.definitions.policy import RetryPolicy from dagster._core.definitions.resource_requirement import ensure_requirements_satisfied from dagster._core.errors import DagsterInvalidConfigError from dagster._core.types.dagster_type import resolve_dagster_type @@ -583,3 +584,29 @@ def other_asset(): # If IO manager def is provided as a resource def, it appears in required # resource keys on the underlying op. assert set(other_asset.node_def.required_resource_keys) == {"blah"} + + +def test_asset_retry_policy(): + retry_policy = RetryPolicy() + + @asset(retry_policy=retry_policy) + def my_asset(): + ... + + assert my_asset.op.retry_policy == retry_policy + + +def test_multi_asset_retry_policy(): + retry_policy = RetryPolicy() + + @multi_asset( + outs={ + "key1": Out(asset_key=AssetKey("key1")), + "key2": Out(asset_key=AssetKey("key2")), + }, + retry_policy=retry_policy, + ) + def my_asset(): + ... + + assert my_asset.op.retry_policy == retry_policy