Skip to content

Commit

Permalink
asset op tags (#7472)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Apr 25, 2022
1 parent 1756b97 commit e9f0ebc
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
14 changes: 13 additions & 1 deletion python_modules/dagster/dagster/core/asset_defs/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def asset(
dagster_type: Optional[DagsterType] = ...,
partitions_def: Optional[PartitionsDefinition] = ...,
partition_mappings: Optional[Mapping[str, PartitionMapping]] = ...,
op_tags: Optional[Dict[str, Any]] = ...,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
...

Expand All @@ -73,6 +74,7 @@ def asset(
dagster_type: Optional[DagsterType] = None,
partitions_def: Optional[PartitionsDefinition] = None,
partition_mappings: Optional[Mapping[str, PartitionMapping]] = None,
op_tags: Optional[Dict[str, Any]] = None,
) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]:
"""Create a definition for how to compute an asset.
Expand Down Expand Up @@ -111,6 +113,10 @@ def asset(
If no entry is provided for a particular asset dependency, the partition mapping defaults
to the default partition mapping for the partitions definition, which is typically maps
partition keys to the same partition keys in upstream assets.
op_tags (Optional[Dict[str, Any]]): A dictionary of tags for the op that computes the asset.
Frameworks may expect and require certain metadata to be attached to a op. Values that
are not strings will be json encoded and must meet the criteria that
`json.loads(json.dumps(value)) == value`.
Examples:
Expand All @@ -137,6 +143,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
dagster_type=dagster_type,
partitions_def=partitions_def,
partition_mappings=partition_mappings,
op_tags=op_tags,
)(fn)

return inner
Expand All @@ -157,6 +164,7 @@ def __init__(
dagster_type: Optional[DagsterType] = None,
partitions_def: Optional[PartitionsDefinition] = None,
partition_mappings: Optional[Mapping[str, PartitionMapping]] = None,
op_tags: Optional[Dict[str, Any]] = None,
):
self.name = name
# if user inputs a single string, coerce to list
Expand All @@ -171,6 +179,7 @@ def __init__(
self.dagster_type = dagster_type
self.partitions_def = partitions_def
self.partition_mappings = partition_mappings
self.op_tags = op_tags

def __call__(self, fn: Callable) -> AssetsDefinition:
asset_name = self.name or fn.__name__
Expand Down Expand Up @@ -202,7 +211,10 @@ def partition_fn(context): # pylint: disable=function-redefined
ins=asset_ins,
out=out,
required_resource_keys=self.required_resource_keys,
tags={"kind": self.compute_kind} if self.compute_kind else None,
tags={
**({"kind": self.compute_kind} if self.compute_kind else {}),
**(self.op_tags or {}),
},
config_schema={
"assets": {
"input_partitions": Field(dict, is_required=False),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,3 +337,14 @@ def my_asset():
pass

assert my_asset.partitions_def == partitions_def


def test_op_tags():
tags = {"apple": "banana", "orange": {"rind": "fsd", "segment": "fjdskl"}}
tags_stringified = {"apple": "banana", "orange": '{"rind": "fsd", "segment": "fjdskl"}'}

@asset(op_tags=tags)
def my_asset():
...

assert my_asset.op.tags == tags_stringified

0 comments on commit e9f0ebc

Please sign in to comment.