Skip to content

Commit

Permalink
[assets] assorted bugfixes (#8372)
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Jun 14, 2022
1 parent 3c618ed commit a661b5f
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
from .pandas_io_manager import pandas_io_manager

airbyte_assets = build_airbyte_assets(
connection_id=AIRBYTE_CONNECTION_ID, destination_tables=["orders", "users"]
connection_id=AIRBYTE_CONNECTION_ID,
destination_tables=["orders", "users"],
asset_key_prefix=["postgres_replica"],
)

dbt_assets = load_assets_from_dbt_project(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import dagster._check as check
from dagster.core.asset_defs.assets import AssetsDefinition
from dagster.core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster.core.errors import DagsterInvalidSubsetError
from dagster.core.selector.subset_selector import (
fetch_connected,
generate_asset_dep_graph,
Expand Down Expand Up @@ -132,7 +133,15 @@ def _resolve(self, node: AssetSelection) -> AbstractSet[str]:
[_match_groups(assets_def, set(node.children)) for assets_def in self.all_assets],
)
elif isinstance(node, KeysAssetSelection):
return set([child.to_user_string() for child in node.children])
specified_keys = set([child.to_user_string() for child in node.children])
invalid_keys = specified_keys - set(self.all_assets_by_name.keys())
if invalid_keys:
raise DagsterInvalidSubsetError(
f"AssetKey(s) {invalid_keys} were selected, but no AssetDefinition objects supply "
"these keys. Make sure all keys are spelled correctly, and all AssetsDefinitions "
"are correctly added to the repository."
)
return specified_keys
elif isinstance(node, OrAssetSelection):
child_1, child_2 = [self._resolve(child) for child in node.children]
return child_1 | child_2
Expand Down
3 changes: 2 additions & 1 deletion python_modules/dagster/dagster/core/asset_defs/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ def subset_for(self, selected_asset_keys: AbstractSet[AssetKey]) -> "AssetsDefin
can_subset=self.can_subset,
selected_asset_keys=selected_asset_keys & self.keys,
resource_defs=self.resource_defs,
group_names_by_key=self.group_names_by_key,
)

def to_source_assets(self) -> Sequence[SourceAsset]:
Expand Down Expand Up @@ -468,7 +469,7 @@ def with_resources(self, resource_defs: Mapping[str, ResourceDefinition]) -> "As
selected_asset_keys=self._selected_asset_keys,
can_subset=self._can_subset,
resource_defs=relevant_resource_defs,
group_names_by_key=self._group_names_by_key,
group_names_by_key=self.group_names_by_key,
)


Expand Down
19 changes: 16 additions & 3 deletions python_modules/dagster/dagster/core/asset_defs/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ def asset(
metadata (Optional[Dict[str, Any]]): A dict of metadata entries for the asset.
required_resource_keys (Optional[Set[str]]): Set of resource handles required by the op.
io_manager_key (Optional[str]): The resource key of the IOManager used
for storing the output of the op as an asset, and for loading it in downstream ops (default: "io_manager"). Only one of io_manager_key and io_manager_def can be provided.
for storing the output of the op as an asset, and for loading it in downstream ops
(default: "io_manager"). Only one of io_manager_key and io_manager_def can be provided.
io_manager_def (Optional[IOManagerDefinition]): The definition of the IOManager used for
storing the output of the op as an asset, and for loading it in
downstream ops. Only one of io_manager_def and io_manager_key can be provided.
Expand Down Expand Up @@ -157,7 +158,15 @@ def my_asset(my_upstream_asset: int) -> int:
return _Asset()(name)

key_prefix = canonicalize_backcompat_args(
key_prefix, "key_prefix", namespace, "namespace", "0.16.0"
key_prefix,
"key_prefix",
namespace,
"namespace",
"0.16.0",
additional_warn_txt="key_prefix applies only to the output AssetKey. If you want to modify "
"the prefix of the input AssetKeys as well, you can do this by explicitly setting the ins "
"parameter of this asset to a dictionary of the form "
"'input_name': AssetIn(key_prefix=...).",
)

def inner(fn: Callable[..., Any]) -> AssetsDefinition:
Expand All @@ -168,6 +177,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
return _Asset(
name=cast(Optional[str], name), # (mypy bug that it can't infer name is Optional[str])
key_prefix=key_prefix,
namespace=namespace,
ins=ins,
non_argument_deps=_make_asset_keys(non_argument_deps),
metadata=metadata,
Expand All @@ -191,6 +201,7 @@ class _Asset:
def __init__(
self,
name: Optional[str] = None,
namespace: Optional[Sequence[str]] = None,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
ins: Optional[Mapping[str, AssetIn]] = None,
non_argument_deps: Optional[Set[AssetKey]] = None,
Expand All @@ -209,6 +220,7 @@ def __init__(
):
self.name = name

self.namespace = namespace
if isinstance(key_prefix, str):
key_prefix = [key_prefix]
self.key_prefix = key_prefix
Expand Down Expand Up @@ -236,7 +248,8 @@ def __init__(
def __call__(self, fn: Callable) -> AssetsDefinition:
asset_name = self.name or fn.__name__

asset_ins = build_asset_ins(fn, self.key_prefix, self.ins or {}, self.non_argument_deps)
# for backcompat, we prefix input asset keys with the namespace
asset_ins = build_asset_ins(fn, self.namespace, self.ins or {}, self.non_argument_deps)

out_asset_key = AssetKey(list(filter(None, [*(self.key_prefix or []), asset_name])))
with warnings.catch_warnings():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,14 +723,6 @@ def _subset_assets_defs(
"asset keys produced by this asset."
)

missed_keys = selected_asset_keys - included_keys - {sa.key for sa in source_assets}
if missed_keys:
raise DagsterInvalidSubsetError(
f"When building job, the AssetKey(s) {[key.to_user_string() for key in missed_keys]} "
"were selected, but are not produced by any of the provided AssetsDefinitions or "
"SourceAssets. Make sure that keys are spelled correctly and that all of the expected "
"definitions are provided."
)
all_excluded_assets: Sequence[Union["AssetsDefinition", "SourceAsset"]] = [
*excluded_assets,
*source_assets,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
from dagster import (
AssetKey,
AssetOut,
AssetsDefinition,
IOManager,
Out,
Output,
ResourceDefinition,
build_op_context,
io_manager,
op,
)
from dagster._check import CheckError
from dagster.core.asset_defs import AssetGroup, AssetIn, SourceAsset, asset, multi_asset
Expand Down Expand Up @@ -92,6 +95,23 @@ def bar():
assert replaced.group_names_by_key[AssetKey("baz")] == "foo"


def test_retain_group_subset():
@op(out={"a": Out(), "b": Out()})
def ma_op():
return 1

ma = AssetsDefinition(
node_def=ma_op,
keys_by_input_name={},
keys_by_output_name={"a": AssetKey("a"), "b": AssetKey("b")},
group_names_by_key={AssetKey("a"): "foo", AssetKey("b"): "bar"},
can_subset=True,
)

subset = ma.subset_for({AssetKey("b")})
assert subset.group_names_by_key[AssetKey("b")] == "bar"


def test_chain_replace_and_subset_for():
@multi_asset(
outs={"a": AssetOut(), "b": AssetOut(), "c": AssetOut()},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,9 @@ def my_asset(arg1):
assert isinstance(my_asset, AssetsDefinition)
assert len(my_asset.op.output_defs) == 1
assert len(my_asset.op.input_defs) == 1
assert AssetKey(["my_prefix", "arg1"]) in my_asset.keys_by_input_name.values()
# this functions differently than the namespace arg in this scenario
assert AssetKey(["my_prefix", "arg1"]) not in my_asset.keys_by_input_name.values()
assert AssetKey(["arg1"]) in my_asset.keys_by_input_name.values()


def test_asset_with_context_arg():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from dagster._check import CheckError
from dagster.core.asset_defs import asset, multi_asset
from dagster.core.asset_defs.load_assets_from_modules import prefix_assets
from dagster.core.errors import DagsterInvalidSubsetError
from dagster.core.errors import DagsterInvalidDefinitionError, DagsterInvalidSubsetError
from dagster.core.execution.with_resources import with_resources
from dagster.core.test_utils import instance_for_test

Expand Down Expand Up @@ -173,22 +173,22 @@ def final(a, d):
(
"x",
False,
(DagsterInvalidSubsetError, r"When building job, the AssetKey\(s\) \['x'\]"),
(DagsterInvalidSubsetError, r"AssetKey\(s\) {'x'} were selected"),
),
(
"x",
True,
(DagsterInvalidSubsetError, r"When building job, the AssetKey\(s\) \['x'\]"),
(DagsterInvalidSubsetError, r"AssetKey\(s\) {'x'} were selected"),
),
(
["start", "x"],
False,
(DagsterInvalidSubsetError, r"When building job, the AssetKey\(s\) \['x'\]"),
(DagsterInvalidSubsetError, r"AssetKey\(s\) {'x'} were selected"),
),
(
["start", "x"],
True,
(DagsterInvalidSubsetError, r"When building job, the AssetKey\(s\) \['x'\]"),
(DagsterInvalidSubsetError, r"AssetKey\(s\) {'x'} were selected"),
),
(["d", "e", "f"], False, None),
(["d", "e", "f"], True, None),
Expand Down Expand Up @@ -413,7 +413,7 @@ def a(source):
def b(a):
return a + 1

with pytest.raises(DagsterInvalidSubsetError, match="SourceAsset"):
with pytest.raises(DagsterInvalidDefinitionError, match="sources"):
define_asset_job("job", selection="*b").resolve(assets=[a, b], source_assets=[])


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ def _fails():

def test_bad_resolve():

with pytest.raises(DagsterInvalidSubsetError, match="When building job"):
with pytest.raises(DagsterInvalidSubsetError, match=r"AssetKey\(s\) {'foo'} were selected"):

@repository
def _fails():
Expand Down
94 changes: 59 additions & 35 deletions python_modules/libraries/dagster-dbt/dagster_dbt/asset_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,20 @@ def _get_node_name(node_info: Mapping[str, Any]):


def _get_node_asset_key(node_info: Mapping[str, Any]) -> AssetKey:
"""By default, a dbt node's key is the union of its model name and any schema configured on
"""By default:
dbt sources: a dbt source's key is the union of its source name and its table name
dbt models: a dbt model's key is the union of its model name and any schema configured on
the model itself.
"""
configured_schema = node_info["config"].get("schema")
if configured_schema is not None:
components = [configured_schema, node_info["name"]]
if node_info["resource_type"] == "source":
components = [node_info["source_name"], node_info["name"]]
else:
components = [node_info["name"]]
configured_schema = node_info["config"].get("schema")
if configured_schema is not None:
components = [configured_schema, node_info["name"]]
else:
components = [node_info["name"]]

return AssetKey(components)

Expand Down Expand Up @@ -181,7 +187,7 @@ def _dbt_nodes_to_assets(
outs[node_name] = Out(
description=_get_node_description(node_info),
io_manager_key=io_manager_key,
metadata=_columns_to_metadata(node_info["columns"]),
metadata=_get_node_metadata(node_info),
is_required=False,
)
out_name_to_node_info[node_name] = node_info
Expand Down Expand Up @@ -278,25 +284,23 @@ def dbt_op(context):
)


def _columns_to_metadata(columns: Mapping[str, Any]) -> Optional[Mapping[str, Any]]:
return (
{
"schema": MetadataValue.table_schema(
TableSchema(
columns=[
TableColumn(
name=name,
type=metadata.get("data_type") or "?",
description=metadata.get("description"),
)
for name, metadata in columns.items()
]
)
def _get_node_metadata(node_info: Mapping[str, Any]) -> Mapping[str, Any]:
metadata: Dict[str, Any] = {}
columns = node_info.get("columns", [])
if len(columns) > 0:
metadata["table_schema"] = MetadataValue.table_schema(
TableSchema(
columns=[
TableColumn(
name=column_name,
type=column_info.get("data_type") or "?",
description=column_info.get("description"),
)
for column_name, column_info in columns.items()
]
)
}
if len(columns) > 0
else None
)
)
return metadata


def load_assets_from_dbt_project(
Expand All @@ -305,6 +309,7 @@ def load_assets_from_dbt_project(
target_dir: Optional[str] = None,
select: Optional[str] = None,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
runtime_metadata_fn: Optional[
Callable[[SolidExecutionContext, Mapping[str, Any]], Mapping[str, Any]]
] = None,
Expand All @@ -328,6 +333,8 @@ def load_assets_from_dbt_project(
to include. Defaults to "*".
key_prefix (Optional[Union[str, List[str]]]): A prefix to apply to all models in the dbt
project. Does not apply to sources.
source_key_prefix (Optional[Union[str, List[str]]]): A prefix to apply to all sources in the
dbt project. Does not apply to models.
runtime_metadata_fn: (Optional[Callable[[SolidExecutionContext, Mapping[str, Any]], Mapping[str, Any]]]):
A function that will be run after any of the assets are materialized and returns
metadata entries for the asset, to be displayed in the asset catalog for that run.
Expand Down Expand Up @@ -357,6 +364,7 @@ def load_assets_from_dbt_project(
return load_assets_from_dbt_manifest(
manifest_json=manifest_json,
key_prefix=key_prefix,
source_key_prefix=source_key_prefix,
runtime_metadata_fn=runtime_metadata_fn,
io_manager_key=io_manager_key,
selected_unique_ids=selected_unique_ids,
Expand All @@ -369,6 +377,7 @@ def load_assets_from_dbt_project(
def load_assets_from_dbt_manifest(
manifest_json: Mapping[str, Any],
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
runtime_metadata_fn: Optional[
Callable[[SolidExecutionContext, Mapping[str, Any]], Mapping[str, Any]]
] = None,
Expand All @@ -387,6 +396,10 @@ def load_assets_from_dbt_manifest(
Args:
manifest_json (Optional[Mapping[str, Any]]): The contents of a DBT manifest.json, which contains
a set of models to load into assets.
key_prefix (Optional[Union[str, List[str]]]): A prefix to apply to all models in the dbt
project. Does not apply to sources.
source_key_prefix (Optional[Union[str, List[str]]]): A prefix to apply to all sources in the
dbt project. Does not apply to models.
runtime_metadata_fn: (Optional[Callable[[SolidExecutionContext, Mapping[str, Any]], Mapping[str, Any]]]):
A function that will be run after any of the assets are materialized and returns
metadata entries for the asset, to be displayed in the asset catalog for that run.
Expand Down Expand Up @@ -417,17 +430,28 @@ def load_assets_from_dbt_manifest(
# must resolve the selection string using the existing manifest.json data (hacky)
selected_unique_ids = _select_unique_ids_from_manifest_json(manifest_json, select)

dbt_assets = [
_dbt_nodes_to_assets(
dbt_nodes,
runtime_metadata_fn=runtime_metadata_fn,
io_manager_key=io_manager_key,
select=select,
selected_unique_ids=selected_unique_ids,
node_info_to_asset_key=node_info_to_asset_key,
use_build_command=use_build_command,
)
]
dbt_assets_def = _dbt_nodes_to_assets(
dbt_nodes,
runtime_metadata_fn=runtime_metadata_fn,
io_manager_key=io_manager_key,
select=select,
selected_unique_ids=selected_unique_ids,
node_info_to_asset_key=node_info_to_asset_key,
use_build_command=use_build_command,
)
if source_key_prefix:
if isinstance(source_key_prefix, str):
source_key_prefix = [source_key_prefix]
source_key_prefix = check.list_param(source_key_prefix, "source_key_prefix", of_type=str)
input_key_replacements = {
input_key: AssetKey(source_key_prefix + input_key.path)
for input_key in dbt_assets_def.keys_by_input_name.values()
}
dbt_assets = [
dbt_assets_def.with_prefix_or_group(input_asset_key_replacements=input_key_replacements)
]
else:
dbt_assets = [dbt_assets_def]

if key_prefix:
dbt_assets = prefix_assets(dbt_assets, key_prefix)
Expand Down

0 comments on commit a661b5f

Please sign in to comment.