Skip to content

Commit

Permalink
More helpful asset key mismatch errors (#12008)
Browse files Browse the repository at this point in the history
## Summary

A little QoL idea that came out of our onboarding breakout group last
week. Adds a suggestion to the error message when an upstream asset dep
is not found, listing any assets with similar keys:

```python
dagster._core.errors.DagsterInvalidDefinitionError: Input asset '["my", "asset1"]' for asset '["asset3"]' 
is not produced by any of the provided asset ops and is not one of the provided sources. 
Did you mean one of the following?
    ["my", "prefix", "asset1"]
```

Right now, uses three heuristics for finding potential candiates:
- Same asset name, similar prefix
- Similar asset name, same prefix
- Same asset name, prefix is off-by-one component (e.g. `["snowflake',
"elementl", "prod", "my_asset"] vs ["snowflake", "elementl",
"my_asset"]`

Currently the candidates are unordered and there's no limit to the
number shown because this is a quick impl/rfc. Can flesh it out if we
find it interesting.

## Test Plan

Added a few unit tests to showcase behavior.
  • Loading branch information
benpankow committed Feb 21, 2023
1 parent 4474da1 commit 61ed1c6
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from collections import defaultdict
from typing import AbstractSet, Dict, Iterable, List, Mapping, Tuple, cast
from typing import AbstractSet, Dict, Iterable, List, Mapping, Sequence, Tuple, cast

from thefuzz import fuzz

from dagster._core.definitions.events import AssetKey
from dagster._core.errors import DagsterInvalidDefinitionError
Expand Down Expand Up @@ -41,6 +43,54 @@ def get_resolved_asset_key_for_input(
)


def resolve_similar_asset_names(
target_asset_key: AssetKey,
assets_defs: Iterable[AssetsDefinition],
) -> Sequence[AssetKey]:
"""
Given a target asset key (an upstream dependency which we can't find), produces a list of
similar asset keys from the list of asset definitions. We use this list to produce a helpful
error message that can help users debug their asset dependencies.
"""
similar_names: List[AssetKey] = []

for asset_def in assets_defs:
for asset_key in asset_def.keys:
# Whether the asset key or upstream key has the same prefix and a similar
# name
# e.g. [snowflake, elementl, key] and [snowflake, elementl, ey]
is_same_prefix_similar_name = (
asset_key.path[:-1] == target_asset_key.path[:-1]
and fuzz.ratio(asset_key.path[-1], target_asset_key.path[-1]) > 80
)

# Whether the asset key or upstream key has a similar prefix and the same
# name
# e.g. [snowflake, elementl, key] and [nowflake, elementl, key]
is_similar_prefix_same_name = (
asset_key.path[-1] == target_asset_key.path[-1]
and fuzz.ratio(" ".join(asset_key.path[:-1]), " ".join(target_asset_key.path[:-1]))
> 80
)

# Whether the asset key or upstream key has one more prefix component than
# the other, and the same name
# e.g. [snowflake, elementl, key] and [snowflake, elementl, prod, key]
is_off_by_one_prefix_component_same_name = (
asset_key.path[-1] == target_asset_key.path[-1],
len(set(asset_key.path).symmetric_difference(set(target_asset_key.path))) == 1
and max(len(asset_key.path), len(target_asset_key.path)) > 1,
)

if (
is_same_prefix_similar_name
or is_similar_prefix_same_name
or is_off_by_one_prefix_component_same_name
):
similar_names.append(asset_key)
return similar_names


def resolve_assets_def_deps(
assets_defs: Iterable[AssetsDefinition], source_assets: Iterable[SourceAsset]
) -> Mapping[int, Mapping[AssetKey, AssetKey]]:
Expand Down Expand Up @@ -104,12 +154,21 @@ def resolve_assets_def_deps(

warned = True
elif not assets_def.node_def.input_def_named(input_name).dagster_type.is_nothing:
raise DagsterInvalidDefinitionError(
msg = (
f"Input asset '{upstream_key.to_string()}' for asset "
f"'{next(iter(assets_def.keys)).to_string()}' is not "
"produced by any of the provided asset ops and is not one of the provided "
"sources"
"sources."
)
similar_names = resolve_similar_asset_names(upstream_key, assets_defs)
if similar_names:
# Arbitrarily limit to 10 similar names to avoid a huge error message
subset_similar_names = similar_names[:10]
similar_to_string = ", ".join(
(similar.to_string() for similar in subset_similar_names)
)
msg += f" Did you mean one of the following?\n\t{similar_to_string}"
raise DagsterInvalidDefinitionError(msg)

if resolved_keys_by_unresolved_key:
result[id(assets_def)] = resolved_keys_by_unresolved_key
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import re
import time
from typing import List

import pytest
from dagster import AssetIn, AssetKey, AssetsDefinition, Definitions, asset
from dagster._core.definitions.resolved_asset_deps import resolve_similar_asset_names
from dagster._core.errors import DagsterInvalidDefinitionError


@pytest.mark.parametrize("group_name", [None, "my_group"])
@pytest.mark.parametrize("asset_key_prefix", [[], ["my_prefix"]])
def test_typo_upstream_asset_one_similar(group_name, asset_key_prefix):
@asset(group_name=group_name, key_prefix=asset_key_prefix)
def asset1():
...

@asset(
group_name=group_name,
key_prefix=asset_key_prefix,
ins={"asst1": AssetIn(asset_key_prefix + ["asst1"])},
)
def asset2(asst1):
...

with pytest.raises(
DagsterInvalidDefinitionError,
match=(
r"Input asset .*\"asst1\".* is not produced by any of the provided asset ops and is"
r" not one of the provided sources. Did you mean one of the following\?"
rf"\n\t{re.escape(asset1.asset_key.to_string())}"
),
):
Definitions(assets=[asset1, asset2])


def test_typo_upstream_asset_no_similar():
@asset
def asset1():
...

@asset
def asset2(not_close_to_asset1):
...

with pytest.raises(
DagsterInvalidDefinitionError,
match=(
r"Input asset .*\"not_close_to_asset1\".* is not produced by any of the provided asset"
r" ops and is not one of the provided sources."
),
):
Definitions(assets=[asset1, asset2])


def test_typo_upstream_asset_many_similar():
@asset
def asset1():
...

@asset
def assets1():
...

@asset
def asst():
...

@asset
def asset2(asst1):
...

with pytest.raises(
DagsterInvalidDefinitionError,
match=(
r"Input asset .*\"asst1\".* is not produced by any of the provided asset ops and is"
r" not one of the provided sources. Did you mean one of the following\?"
rf"\n\t{re.escape(asst.asset_key.to_string())},"
rf" {re.escape(asset1.asset_key.to_string())},"
rf" {re.escape(assets1.asset_key.to_string())}"
),
):
Definitions(assets=[asst, asset1, assets1, asset2])


def test_typo_upstream_asset_wrong_prefix():
@asset(key_prefix=["my", "prefix"])
def asset1():
...

@asset(ins={"asset1": AssetIn(key=AssetKey(["my", "prfix", "asset1"]))})
def asset2(asset1):
...

with pytest.raises(
DagsterInvalidDefinitionError,
match=(
r"Input asset .*\"asset1\".* is not produced by any of the provided asset ops and is"
r" not one of the provided sources. Did you mean one of the following\?"
rf"\n\t{re.escape(asset1.asset_key.to_string())}"
),
):
Definitions(assets=[asset1, asset2])


def test_typo_upstream_asset_wrong_prefix_and_wrong_key():
# In the case that the user has a typo in the key and the prefix, we don't suggest the asset since it's too different.

@asset(key_prefix=["my", "prefix"])
def asset1():
...

@asset(ins={"asset1": AssetIn(key=AssetKey(["my", "prfix", "asset4"]))})
def asset2(asset1):
...

with pytest.raises(
DagsterInvalidDefinitionError,
match=(
r"Input asset .*\"asset4\".* is not produced by any of the provided asset ops and is"
r" not one of the provided sources."
),
):
Definitions(assets=[asset1, asset2])


def test_one_off_component_prefix():
@asset(key_prefix=["my", "prefix"])
def asset1():
...

# One more component in the prefix
@asset(ins={"asset1": AssetIn(key=AssetKey(["my", "prefix", "nested", "asset1"]))})
def asset2(asset1):
...

with pytest.raises(
DagsterInvalidDefinitionError,
match=(
r"Input asset .*\"asset1\".* is not produced by any of the provided asset ops and is"
r" not one of the provided sources. Did you mean one of the following\?"
rf"\n\t{re.escape(asset1.asset_key.to_string())}"
),
):
Definitions(assets=[asset1, asset2])

# One fewer component in the prefix
@asset(ins={"asset1": AssetIn(key=AssetKey(["my", "asset1"]))})
def asset3(asset1):
...

with pytest.raises(
DagsterInvalidDefinitionError,
match=(
r"Input asset .*\"asset1\".* is not produced by any of the provided asset ops and is"
r" not one of the provided sources. Did you mean one of the following\?"
rf"\n\t{re.escape(asset1.asset_key.to_string())}"
),
):
Definitions(assets=[asset1, asset3])


NUM_ASSETS_TO_TEST_PERF = 5000
# As of 2/16/2023, `avg_elapsed_time_secs` is ~0.024s on a MBP, ~0.15s on BK
PERF_CUTOFF_SECS = 0.3
NUM_PERF_TRIALS = 10


def test_perf():
assets: List[AssetsDefinition] = []
for i in range(NUM_ASSETS_TO_TEST_PERF):

@asset(name="asset_" + str(i))
def my_asset():
...

assets.append(my_asset)

total_elapsed_time_secs = 0
for _ in range(NUM_PERF_TRIALS):
start_time = time.time()
resolve_similar_asset_names(AssetKey("asset_" + str(NUM_ASSETS_TO_TEST_PERF)), assets)
end_time = time.time()

elapsed_time_secs = end_time - start_time

total_elapsed_time_secs += elapsed_time_secs

avg_elapsed_time_secs = total_elapsed_time_secs / NUM_PERF_TRIALS

assert (
avg_elapsed_time_secs < PERF_CUTOFF_SECS
), "Performance of resolve_similar_asset_names has regressed"
1 change: 1 addition & 0 deletions python_modules/dagster/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def get_version() -> str:
"croniter>=0.3.34",
# grpcio>=1.48.1 has hanging/crashing issues: https://github.com/grpc/grpc/issues/30843 and https://github.com/grpc/grpc/issues/31885
# ensure version we require is >= that with which we generated the grpc code (set in dev-requirements)
"thefuzz",
"grpcio>=1.32.0,<1.48.1",
"grpcio-health-checking>=1.32.0,<1.44.0",
"packaging>=20.9",
Expand Down

0 comments on commit 61ed1c6

Please sign in to comment.