Skip to content

Commit

Permalink
Add version to asset decorator (#10167)
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Nov 2, 2022
1 parent 082b407 commit 6459b0f
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 0 deletions.
5 changes: 5 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,11 @@ def freshness_policies_by_key(self) -> Mapping[AssetKey, FreshnessPolicy]:
def partitions_def(self) -> Optional[PartitionsDefinition]:
return self._partitions_def

@public # type: ignore
@property
def is_versioned(self) -> bool:
return self.op.version is not None

@property
def metadata_by_key(self):
return self._metadata_by_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def asset(
output_required: bool = ...,
freshness_policy: Optional[FreshnessPolicy] = ...,
retry_policy: Optional[RetryPolicy] = ...,
op_version: Optional[str] = ...,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
...

Expand All @@ -96,6 +97,7 @@ def asset(
output_required: bool = True,
freshness_policy: Optional[FreshnessPolicy] = None,
retry_policy: Optional[RetryPolicy] = None,
op_version: Optional[str] = None,
) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]:
"""Create a definition for how to compute an asset.
Expand Down Expand Up @@ -153,6 +155,8 @@ def asset(
freshness_policy (FreshnessPolicy): A constraint telling Dagster how often this asset is intended to be updated
with respect to its root data.
retry_policy (Optional[RetryPolicy]): The retry policy for the op that computes the asset.
op_version (Optional[str]): (Experimental) Version string passed to the op underlying the
asset.
Examples:
Expand Down Expand Up @@ -195,6 +199,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
output_required=output_required,
freshness_policy=freshness_policy,
retry_policy=retry_policy,
op_version=op_version,
)(fn)

return inner
Expand All @@ -221,6 +226,7 @@ def __init__(
output_required: bool = True,
freshness_policy: Optional[FreshnessPolicy] = None,
retry_policy: Optional[RetryPolicy] = None,
op_version: Optional[str] = None,
):
self.name = name

Expand All @@ -245,6 +251,7 @@ def __init__(
self.output_required = output_required
self.freshness_policy = freshness_policy
self.retry_policy = retry_policy
self.op_version = op_version

def __call__(self, fn: Callable) -> AssetsDefinition:
asset_name = self.name or fn.__name__
Expand Down Expand Up @@ -290,6 +297,7 @@ def __call__(self, fn: Callable) -> AssetsDefinition:
},
config_schema=self.config_schema,
retry_policy=self.retry_policy,
version=self.op_version,
)(fn)

keys_by_input_name = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,15 @@ def my_asset(arg1):
assert my_asset.op.output_defs[0].dagster_type.display_name == "String"


def test_asset_with_op_version():
@asset(op_version="foo")
def my_asset(arg1):
return arg1

assert my_asset.is_versioned
assert my_asset.op.version == "foo"


def test_asset_with_key_prefix():
@asset(key_prefix="my_key_prefix")
def my_asset():
Expand Down

0 comments on commit 6459b0f

Please sign in to comment.