Skip to content

Commit

Permalink
Expose metadata on AssetNode (and source assets) directly (#6900)
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Mar 12, 2022
1 parent b2df1b0 commit 809665e
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 15 deletions.
1 change: 1 addition & 0 deletions docs/content/guides/dagster/software-defined-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ from dagster import AssetKey, SourceAsset, asset
sfo_q2_weather_sample = SourceAsset(
key=AssetKey("sfo_q2_weather_sample"),
description="Weather samples, taken every five minutes at SFO",
metadata={"format": "csv"},
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
sfo_q2_weather_sample = SourceAsset(
key=AssetKey("sfo_q2_weather_sample"),
description="Weather samples, taken every five minutes at SFO",
metadata={"format": "csv"},
)


Expand Down
1 change: 1 addition & 0 deletions js_modules/dagit/packages/core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import TYPE_CHECKING, List, Optional, Union

import graphene
from dagster_graphql.implementation.events import iterate_metadata_entries
from dagster_graphql.schema.metadata import GrapheneMetadataEntry
from dagster_graphql.schema.solids import (
GrapheneCompositeSolidDefinition,
GrapheneSolidDefinition,
Expand Down Expand Up @@ -89,6 +91,7 @@ class GrapheneAssetNode(graphene.ObjectType):
partitions=graphene.List(graphene.String),
)
materializationCountByPartition = non_null_list(GrapheneMaterializationCount)
metadata_entries = non_null_list(GrapheneMetadataEntry)
op = graphene.Field(GrapheneSolidDefinition)
opName = graphene.String()
partitionKeys = non_null_list(graphene.String)
Expand Down Expand Up @@ -222,6 +225,12 @@ def resolve_dependedByKeys(self, _graphene_info) -> List[GrapheneAssetKey]:
for dep in self._external_asset_node.depended_by
]

def resolve_dependencyKeys(self, _graphene_info):
return [
GrapheneAssetKey(path=dep.upstream_asset_key.path)
for dep in self._external_asset_node.dependencies
]

def resolve_dependencies(self, graphene_info) -> List[GrapheneAssetDependency]:
if not self._external_asset_node.dependencies:
return []
Expand All @@ -241,12 +250,6 @@ def resolve_dependencies(self, graphene_info) -> List[GrapheneAssetDependency]:
for dep in self._external_asset_node.dependencies
]

def resolve_dependencyKeys(self, _graphene_info) -> List[GrapheneAssetKey]:
return [
GrapheneAssetKey(path=dep.upstream_asset_key.path)
for dep in self._external_asset_node.dependencies
]

def resolve_jobNames(self, _graphene_info) -> List[graphene.String]:
return self._external_asset_node.job_names

Expand Down Expand Up @@ -309,6 +312,9 @@ def resolve_materializationCountByPartition(
for partition_key in partition_keys
]

def resolve_metadata_entries(self, _graphene_info) -> List[GrapheneMetadataEntry]:
return list(iterate_metadata_entries(self._external_asset_node.metadata_entries))

def resolve_op(
self, _graphene_info
) -> Optional[Union[GrapheneSolidDefinition, GrapheneCompositeSolidDefinition]]:
Expand Down
57 changes: 49 additions & 8 deletions python_modules/dagster/dagster/core/asset_defs/source_asset.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,66 @@
from typing import Any, Dict, NamedTuple, Optional
from typing import NamedTuple, Optional, Sequence, Union

import dagster.check as check
from dagster.core.definitions.events import AssetKey
from dagster.core.definitions.metadata import (
MetadataEntry,
MetadataMapping,
MetadataUserInput,
PartitionMetadataEntry,
normalize_metadata,
)
from dagster.core.definitions.partition import PartitionsDefinition


class SourceAsset(NamedTuple):
class SourceAsset(
NamedTuple(
"_SourceAsset",
[
("key", AssetKey),
("metadata_entries", Sequence[Union[MetadataEntry, PartitionMetadataEntry]]),
("io_manager_key", str),
("description", Optional[str]),
("partitions_def", Optional[PartitionsDefinition]),
],
)
):
"""A SourceAsset represents an asset that is not generated by any Dagster op in the repository
that it's referenced from.
Attributes:
key (AssetKey): The key of the asset.
metadata (Optional[Dict[str, Any]]): Metadata associated with the asset.
metadata_entries (List[MetadataEntry]): Metadata associated with the asset.
io_manager_key (str): The key for the IOManager that will be used to load the contents of
the asset when it's used as an input to other assets inside a job.
description (Optional[str]): The description of the asset.
partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that
compose the asset.
"""

key: AssetKey
metadata: Optional[Dict[str, Any]] = None
io_manager_key: str = "io_manager"
description: Optional[str] = None
partitions_def: Optional[PartitionsDefinition] = None
def __new__(
cls,
key: AssetKey,
metadata: Optional[MetadataUserInput] = None,
io_manager_key: str = "io_manager",
description: Optional[str] = None,
partitions_def: Optional[PartitionsDefinition] = None,
):

metadata = check.opt_dict_param(metadata, "metadata", key_type=str)
metadata_entries = normalize_metadata(metadata, [], allow_invalid=True)

return super().__new__(
cls,
key=check.inst_param(key, "key", AssetKey),
metadata_entries=metadata_entries,
io_manager_key=check.str_param(io_manager_key, "io_manager_key"),
description=check.opt_str_param(description, "description"),
partitions_def=check.opt_inst_param(
partitions_def, "partitions_def", PartitionsDefinition
),
)

@property
def metadata(self) -> MetadataMapping:
# PartitionMetadataEntry (unstable API) case is unhandled
return {entry.label: entry.entry_data for entry in self.metadata_entries} # type: ignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
ScheduleDefinition,
)
from dagster.core.definitions.events import AssetKey
from dagster.core.definitions.metadata import MetadataEntry
from dagster.core.definitions.mode import DEFAULT_MODE_NAME
from dagster.core.definitions.node_definition import NodeDefinition
from dagster.core.definitions.partition import PartitionScheduleDefinition, ScheduleType
Expand Down Expand Up @@ -692,6 +693,7 @@ class ExternalAssetNode(
("partitions_def_data", Optional[ExternalPartitionsDefinitionData]),
("output_name", Optional[str]),
("output_description", Optional[str]),
("metadata_entries", Sequence[MetadataEntry]),
],
)
):
Expand All @@ -711,6 +713,7 @@ def __new__(
partitions_def_data: Optional[ExternalPartitionsDefinitionData] = None,
output_name: Optional[str] = None,
output_description: Optional[str] = None,
metadata_entries: Optional[Sequence[MetadataEntry]] = None,
):
return super(ExternalAssetNode, cls).__new__(
cls,
Expand All @@ -731,6 +734,9 @@ def __new__(
),
output_name=check.opt_str_param(output_name, "output_name"),
output_description=check.opt_str_param(output_description, "output_description"),
metadata_entries=check.opt_sequence_param(
metadata_entries, "metadata_entries", of_type=MetadataEntry
),
)


Expand Down Expand Up @@ -840,13 +846,18 @@ def external_asset_graph_from_defs(
" and as a non-source asset"
)

# TODO: For now we are dropping partition metadata entries
metadata_entries = [
entry for entry in source_asset.metadata_entries if isinstance(entry, MetadataEntry)
]
asset_nodes.append(
ExternalAssetNode(
asset_key=source_asset.key,
dependencies=list(deps[source_asset.key].values()),
depended_by=list(dep_by[source_asset.key].values()),
job_names=[],
op_description=source_asset.description,
metadata_entries=metadata_entries,
)
)

Expand All @@ -861,7 +872,7 @@ def external_asset_graph_from_defs(
]
] = None

if output_def and output_def.asset_partitions_def:
if output_def.asset_partitions_def:
partitions_def = output_def.asset_partitions_def
if partitions_def:
if isinstance(partitions_def, TimeWindowPartitionsDefinition):
Expand All @@ -888,6 +899,7 @@ def external_asset_graph_from_defs(
partitions_def_data=partitions_def_data,
output_name=output_def.name,
output_description=output_def.description,
metadata_entries=output_def.metadata_entries,
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from dagster.core.asset_defs.source_asset import SourceAsset
from dagster.core.definitions.events import AssetKey
from dagster.core.definitions.metadata import MetadataEntry, MetadataValue


def test_source_asset_metadata():
sa = SourceAsset(key=AssetKey("foo"), metadata={"foo": "bar", "baz": object()})
assert sa.metadata_entries == [
MetadataEntry(label="foo", description=None, entry_data=MetadataValue.text("bar")),
MetadataEntry(
label="baz",
description=None,
entry_data=MetadataValue.text("[object] (unserializable)"),
),
]
assert sa.metadata == {
"foo": MetadataValue.text("bar"),
"baz": MetadataValue.text("[object] (unserializable)"),
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dagster import AssetGroup, AssetKey, DagsterInvariantViolationError, Out
from dagster.check import CheckError
from dagster.core.asset_defs import AssetIn, SourceAsset, asset, build_assets_job, multi_asset
from dagster.core.definitions.metadata import MetadataEntry, MetadataValue
from dagster.core.host_representation.external_data import (
ExternalAssetDependedBy,
ExternalAssetDependency,
Expand Down Expand Up @@ -381,6 +382,7 @@ def assets(in1, in2): # pylint: disable=unused-argument
op_description=None,
job_names=["assets_job"],
output_name="result",
metadata_entries=[],
),
ExternalAssetNode(
asset_key=AssetKey(["in1"]),
Expand All @@ -395,6 +397,7 @@ def assets(in1, in2): # pylint: disable=unused-argument
op_description=None,
job_names=["assets_job"],
output_name="result",
metadata_entries=[],
),
ExternalAssetNode(
asset_key=AssetKey(["in2"]),
Expand All @@ -408,6 +411,7 @@ def assets(in1, in2): # pylint: disable=unused-argument
op_description=None,
job_names=["assets_job"],
output_name="result",
metadata_entries=[],
),
ExternalAssetNode(
asset_key=AssetKey(["mixed"]),
Expand All @@ -429,6 +433,13 @@ def assets(in1, in2): # pylint: disable=unused-argument
op_description=None,
job_names=["assets_job"],
output_name="mixed",
metadata_entries=[
MetadataEntry(
label=".dagster/asset_deps",
description=None,
entry_data=MetadataValue.text("[set] (unserializable)"),
)
],
),
ExternalAssetNode(
asset_key=AssetKey(["only_in"]),
Expand All @@ -451,6 +462,7 @@ def assets(in1, in2): # pylint: disable=unused-argument
op_description=None,
job_names=["assets_job"],
output_name="only_in",
metadata_entries=[],
),
ExternalAssetNode(
asset_key=AssetKey(["only_out"]),
Expand All @@ -471,6 +483,13 @@ def assets(in1, in2): # pylint: disable=unused-argument
op_description=None,
job_names=["assets_job"],
output_name="only_out",
metadata_entries=[
MetadataEntry(
label=".dagster/asset_deps",
description=None,
entry_data=MetadataValue.text("[set] (unserializable)"),
)
],
),
]

Expand Down

1 comment on commit 809665e

@vercel
Copy link

@vercel vercel bot commented on 809665e Mar 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.