Skip to content

Commit

Permalink
Deprecate asset namespace (#8274)
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Jun 9, 2022
1 parent f0b7346 commit 947a9f6
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 63 deletions.
6 changes: 3 additions & 3 deletions docs/content/concepts/assets/software-defined-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -585,18 +585,18 @@ def test_uses_context():

### Multi-component asset keys

Assets are often objects in systems with hierarchical namespaces, like filesystems. Because of this, it often makes sense for an asset key to be a list of strings, instead of just a single string.
Assets are often objects in systems with hierarchical namespaces, like filesystems. Because of this, it often makes sense for an asset key to be a list of strings, instead of just a single string. To define an asset with a multi-part asset key, use the `key_prefix` argument-- this can be either a list of strings or a single string with segments delimited by "/". The full asset key is formed by prepending the `key_prefix` to the asset name (which defaults to the name of the decorated function).

```python file=/concepts/assets/multi_component_asset_key.py startafter=start_marker endbefore=end_marker
from dagster import AssetIn, asset


@asset(namespace=["one", "two", "three"])
@asset(key_prefix=["one", "two", "three"])
def upstream_asset():
return [1, 2, 3]


@asset(ins={"upstream_asset": AssetIn(namespace=["one", "two", "three"])})
@asset(ins={"upstream_asset": AssetIn(key_prefix="one/two/three")})
def downstream_asset(upstream_asset):
return upstream_asset + [4]
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
from dagster import AssetIn, asset


@asset(namespace=["one", "two", "three"])
@asset(key_prefix=["one", "two", "three"])
def upstream_asset():
return [1, 2, 3]


@asset(ins={"upstream_asset": AssetIn(namespace=["one", "two", "three"])})
@asset(ins={"upstream_asset": AssetIn(key_prefix="one/two/three")})
def downstream_asset(upstream_asset):
return upstream_asset + [4]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
from dagster import AssetIn, asset


@asset(ins={"activity_daily_stats": AssetIn(namespace="activity_analytics")})
@asset(ins={"activity_daily_stats": AssetIn(key_prefix="activity_analytics")})
def activity_forecast(activity_daily_stats: DataFrame) -> DataFrame:
return activity_daily_stats.head(100)
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

@asset(
ins={
"stories": AssetIn(namespace="core", metadata={"columns": ["id"]}),
"comments": AssetIn(namespace="core", metadata={"columns": ["id", "user_id", "parent"]}),
"stories": AssetIn(key_prefix="core", metadata={"columns": ["id"]}),
"comments": AssetIn(key_prefix="core", metadata={"columns": ["id", "user_id", "parent"]}),
},
io_manager_key="warehouse_io_manager",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def recommender_model(user_story_matrix: IndexedCooMatrix) -> Output[TruncatedSV


@asset(
ins={"stories": AssetIn(namespace="core", metadata={"columns": ["id", "title"]})},
ins={"stories": AssetIn(key_prefix="core", metadata={"columns": ["id", "title"]})},
io_manager_key="warehouse_io_manager",
)
def component_top_stories(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@


def mock_output_context(asset_key):
@asset(name=asset_key.path[-1], namespace=asset_key.path[:-1])
@asset(name=asset_key.path[-1], key_prefix=asset_key.path[:-1])
def my_asset():
pass

Expand Down
24 changes: 16 additions & 8 deletions python_modules/dagster/dagster/core/asset_defs/asset_in.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import Any, Mapping, NamedTuple, Optional, Sequence
from typing import Any, Mapping, NamedTuple, Optional, Sequence, Union

import dagster._check as check
from dagster.core.definitions.events import AssetKey, CoerceableToAssetKey
from dagster.core.definitions.events import ASSET_KEY_DELIMITER, AssetKey, CoerceableToAssetKey
from dagster.utils.backcompat import canonicalize_backcompat_args


class AssetIn(
Expand All @@ -10,7 +11,7 @@ class AssetIn(
[
("asset_key", Optional[AssetKey]),
("metadata", Optional[Mapping[str, Any]]),
("namespace", Optional[Sequence[str]]),
("key_prefix", Optional[Sequence[str]]),
],
)
):
Expand All @@ -19,18 +20,25 @@ def __new__(
asset_key: Optional[CoerceableToAssetKey] = None,
metadata: Optional[Mapping[str, Any]] = None,
namespace: Optional[Sequence[str]] = None,
key_prefix: Optional[Union[str, Sequence[str]]] = None,
):
key_prefix = canonicalize_backcompat_args(
key_prefix, "key_prefix", namespace, "namespace", "0.16.0"
)

check.invariant(
not (asset_key and namespace),
("Asset key and namespace cannot both be set on AssetIn"),
not (asset_key and key_prefix),
("Asset key and key_prefix cannot both be set on AssetIn"),
)

# if user inputs a single string, coerce to list
namespace = [namespace] if isinstance(namespace, str) else namespace
# if user inputs a single string, split on delimiter
key_prefix = (
key_prefix.split(ASSET_KEY_DELIMITER) if isinstance(key_prefix, str) else key_prefix
)

return super(AssetIn, cls).__new__(
cls,
asset_key=AssetKey.from_coerceable(asset_key) if asset_key is not None else None,
metadata=check.opt_inst_param(metadata, "metadata", Mapping),
namespace=check.opt_list_param(namespace, "namespace", str),
key_prefix=check.opt_list_param(key_prefix, "key_prefix", str),
)
45 changes: 31 additions & 14 deletions python_modules/dagster/dagster/core/asset_defs/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from dagster.config.config_schema import ConfigSchemaType
from dagster.core.decorator_utils import get_function_params, get_valid_name_permutations
from dagster.core.definitions.decorators.op_decorator import _Op
from dagster.core.definitions.events import AssetKey
from dagster.core.definitions.events import ASSET_KEY_DELIMITER, AssetKey
from dagster.core.definitions.input import In
from dagster.core.definitions.output import Out
from dagster.core.definitions.partition import PartitionsDefinition
Expand All @@ -30,7 +30,11 @@
from dagster.core.storage.io_manager import IOManagerDefinition
from dagster.core.types.dagster_type import DagsterType
from dagster.seven import funcsigs
from dagster.utils.backcompat import ExperimentalWarning, experimental_decorator
from dagster.utils.backcompat import (
ExperimentalWarning,
canonicalize_backcompat_args,
experimental_decorator,
)

from .asset_in import AssetIn
from .assets import AssetsDefinition
Expand All @@ -48,6 +52,7 @@ def asset(
def asset(
name: Optional[str] = ...,
namespace: Optional[Sequence[str]] = ...,
key_prefix: Optional[Union[str, Sequence[str]]] = None,
ins: Optional[Mapping[str, AssetIn]] = ...,
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = ...,
metadata: Optional[Mapping[str, Any]] = ...,
Expand All @@ -71,6 +76,7 @@ def asset(
def asset(
name: Optional[Union[Callable[..., Any], Optional[str]]] = None,
namespace: Optional[Sequence[str]] = None,
key_prefix: Optional[Union[str, Sequence[str]]] = None,
ins: Optional[Mapping[str, AssetIn]] = None,
non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None,
metadata: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -101,8 +107,11 @@ def asset(
Args:
name (Optional[str]): The name of the asset. If not provided, defaults to the name of the
decorated function.
namespace (Optional[Sequence[str]]): The namespace that the asset resides in. The namespace + the
name forms the asset key.
namespace (Optional[Sequence[str]]): **Deprecated (use `key_prefix`)**. The namespace that
the asset resides in. The namespace + the name forms the asset key.
key_prefix (Optional[Union[str, Sequence[str]]]): Optional prefix to apply to the asset key. If `Sequence[str]`,
elements are prepended to function name to form the asset key. If `str`, will be split on "{asset_key_delimiter}"
and then prepended. If `None` asset key is simply the name of the function. name forms the asset key.
ins (Optional[Mapping[str, AssetIn]]): A dictionary that maps input names to their metadata
and namespaces.
non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Set of asset keys that are
Expand Down Expand Up @@ -143,18 +152,24 @@ def asset(
@asset
def my_asset(my_upstream_asset: int) -> int:
return my_upstream_asset + 1
"""
""".format(
asset_key_delimiter=ASSET_KEY_DELIMITER
)
if callable(name):
return _Asset()(name)

key_prefix = canonicalize_backcompat_args(
key_prefix, "key_prefix", namespace, "namespace", "0.16.0"
)

def inner(fn: Callable[..., Any]) -> AssetsDefinition:
check.invariant(
not (io_manager_key and io_manager_def),
"Both io_manager_key and io_manager_def were provided to `@asset` decorator. Please provide one or the other. ",
)
return _Asset(
name=cast(Optional[str], name), # (mypy bug that it can't infer name is Optional[str])
namespace=namespace,
key_prefix=key_prefix,
ins=ins,
non_argument_deps=_make_asset_keys(non_argument_deps),
metadata=metadata,
Expand All @@ -178,7 +193,7 @@ class _Asset:
def __init__(
self,
name: Optional[str] = None,
namespace: Optional[Sequence[str]] = None,
key_prefix: Optional[Union[str, Sequence[str]]] = None,
ins: Optional[Mapping[str, AssetIn]] = None,
non_argument_deps: Optional[Set[AssetKey]] = None,
metadata: Optional[Mapping[str, Any]] = None,
Expand All @@ -196,7 +211,9 @@ def __init__(
):
self.name = name
# if user inputs a single string, coerce to list
self.namespace = [namespace] if isinstance(namespace, str) else namespace
self.key_prefix = (
key_prefix.split(ASSET_KEY_DELIMITER) if isinstance(key_prefix, str) else key_prefix
)
self.ins = ins or {}
self.non_argument_deps = non_argument_deps
self.metadata = metadata
Expand All @@ -221,9 +238,9 @@ def __init__(
def __call__(self, fn: Callable) -> AssetsDefinition:
asset_name = self.name or fn.__name__

asset_ins = build_asset_ins(fn, self.namespace, self.ins or {}, self.non_argument_deps)
asset_ins = build_asset_ins(fn, self.key_prefix, self.ins or {}, self.non_argument_deps)

out_asset_key = AssetKey(list(filter(None, [*(self.namespace or []), asset_name])))
out_asset_key = AssetKey(list(filter(None, [*(self.key_prefix or []), asset_name])))
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ExperimentalWarning)

Expand Down Expand Up @@ -436,7 +453,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:

def build_asset_ins(
fn: Callable,
asset_namespace: Optional[Sequence[str]],
asset_key_prefix: Optional[Sequence[str]],
asset_ins: Mapping[str, AssetIn],
non_argument_deps: Optional[AbstractSet[AssetKey]],
) -> Mapping[AssetKey, Tuple[str, In]]:
Expand Down Expand Up @@ -475,13 +492,13 @@ def build_asset_ins(
if input_name in asset_ins:
asset_key = asset_ins[input_name].asset_key
metadata = asset_ins[input_name].metadata or {}
namespace = asset_ins[input_name].namespace
key_prefix = asset_ins[input_name].key_prefix
else:
metadata = {}
namespace = None
key_prefix = None

asset_key = asset_key or AssetKey(
list(filter(None, [*(namespace or asset_namespace or []), input_name]))
list(filter(None, [*(key_prefix or asset_key_prefix or []), input_name]))
)

ins_by_asset_key[asset_key] = (input_name.replace("-", "_"), In(metadata=metadata))
Expand Down
11 changes: 6 additions & 5 deletions python_modules/dagster/dagster/core/definitions/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
from dagster.core.execution.context.output import OutputContext

ASSET_KEY_SPLIT_REGEX = re.compile("[^a-zA-Z0-9_]")
ASSET_KEY_STRUCTURED_DELIMITER = "."
ASSET_KEY_DELIMITER = "/"
ASSET_KEY_LEGACY_DELIMITER = "."


def parse_asset_key_string(s: str) -> List[str]:
Expand Down Expand Up @@ -110,18 +111,18 @@ def to_string(self, legacy: Optional[bool] = False) -> Optional[str]:
if not self.path:
return None
if legacy:
return ASSET_KEY_STRUCTURED_DELIMITER.join(self.path)
return ASSET_KEY_LEGACY_DELIMITER.join(self.path)
return seven.json.dumps(self.path)

def to_user_string(self) -> str:
"""
E.g. "first_component/second_component"
"""
return "/".join(self.path)
return ASSET_KEY_DELIMITER.join(self.path)

@staticmethod
def from_user_string(asset_key_string: str) -> "AssetKey":
return AssetKey(asset_key_string.split("/"))
return AssetKey(asset_key_string.split(ASSET_KEY_DELIMITER))

@staticmethod
def from_db_string(asset_key_string: Optional[str]) -> Optional["AssetKey"]:
Expand All @@ -141,7 +142,7 @@ def from_db_string(asset_key_string: Optional[str]) -> Optional["AssetKey"]:
def get_db_prefix(path: List[str], legacy: Optional[bool] = False):
check.list_param(path, "path", of_type=str)
if legacy:
return ASSET_KEY_STRUCTURED_DELIMITER.join(path)
return ASSET_KEY_LEGACY_DELIMITER.join(path)
return seven.json.dumps(path)[:-2] # strip trailing '"]' from json string

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ def c(b):
def test_asset_group_build_job_selection_multi_component():
source_asset = SourceAsset(["apple", "banana"])

@asset(namespace="abc")
@asset(key_prefix="abc")
def asset1():
...

Expand Down Expand Up @@ -1143,7 +1143,7 @@ def test_assets_prefixed_disambiguate():
def asset2():
...

@asset(ins={"apple": AssetIn(namespace="core")})
@asset(ins={"apple": AssetIn(key_prefix="core")})
def orange(apple):
del apple

Expand All @@ -1165,7 +1165,7 @@ def banana(apple):
def test_assets_prefixed_source_asset():
asset1 = SourceAsset(key=AssetKey(["upstream_prefix", "asset1"]))

@asset(ins={"asset1": AssetIn(namespace="upstream_prefix")})
@asset(ins={"asset1": AssetIn(key_prefix="upstream_prefix")})
def asset2(asset1):
del asset1

Expand Down

1 comment on commit 947a9f6

@vercel
Copy link

@vercel vercel bot commented on 947a9f6 Jun 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.