Skip to content

Commit

Permalink
assets_from_module methods [1/2] (#8225)
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Jun 7, 2022
1 parent cc31c77 commit be6d055
Show file tree
Hide file tree
Showing 10 changed files with 329 additions and 92 deletions.
6 changes: 1 addition & 5 deletions docs/content/api/modules.json

Large diffs are not rendered by default.

6 changes: 1 addition & 5 deletions docs/content/api/searchindex.json

Large diffs are not rendered by default.

6 changes: 1 addition & 5 deletions docs/content/api/sections.json

Large diffs are not rendered by default.

Binary file modified docs/next/public/objects.inv
Binary file not shown.
8 changes: 8 additions & 0 deletions docs/sphinx/sections/api/apidocs/assets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,11 @@ A software-defined asset combines:
.. autoclass:: AssetIn

.. autoclass:: SourceAsset

.. autofunction:: assets_from_modules

.. autofunction:: assets_from_current_module

.. autofunction:: assets_from_package_module

.. autofunction:: assets_from_package_name
8 changes: 8 additions & 0 deletions python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
AssetsDefinition,
SourceAsset,
asset,
assets_from_current_module,
assets_from_modules,
assets_from_package_module,
assets_from_package_name,
build_assets_job,
multi_asset,
)
Expand Down Expand Up @@ -545,6 +549,10 @@ def __dir__() -> typing.List[str]:
"config_from_yaml_strings",
"configured",
"build_assets_job",
"assets_from_modules",
"assets_from_current_module",
"assets_from_package_module",
"assets_from_package_name",
# types
"Any",
"Bool",
Expand Down
6 changes: 6 additions & 0 deletions python_modules/dagster/dagster/core/asset_defs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
from .asset_group import AssetGroup
from .asset_in import AssetIn
from .assets import AssetsDefinition
from .assets_from_modules import (
assets_from_current_module,
assets_from_modules,
assets_from_package_module,
assets_from_package_name,
)
from .assets_job import build_assets_job
from .decorators import asset, multi_asset
from .source_asset import SourceAsset
89 changes: 12 additions & 77 deletions python_modules/dagster/dagster/core/asset_defs/asset_group.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,9 @@
import inspect
import os
import pkgutil
import warnings
from collections import defaultdict
from importlib import import_module
from types import ModuleType
from typing import (
Any,
Dict,
FrozenSet,
Generator,
Iterable,
List,
Mapping,
Optional,
Sequence,
Set,
Union,
)
from typing import Any, Dict, FrozenSet, Iterable, List, Mapping, Optional, Sequence, Set, Union

import dagster._check as check
from dagster.core.definitions.dependency import NodeHandle
Expand All @@ -37,6 +23,10 @@
from ..definitions.resource_definition import ResourceDefinition
from ..errors import DagsterInvalidDefinitionError
from .assets import AssetsDefinition
from .assets_from_modules import (
assets_and_source_assets_from_modules,
assets_and_source_assets_from_package_module,
)
from .assets_job import build_assets_job
from .source_asset import SourceAsset

Expand Down Expand Up @@ -262,11 +252,14 @@ def from_package_module(
Returns:
AssetGroup: An asset group with all the assets in the package.
"""
return AssetGroup.from_modules(
_find_modules_in_package(package_module),
assets, source_assets = assets_and_source_assets_from_package_module(
package_module, extra_source_assets
)
return AssetGroup(
assets=assets,
source_assets=source_assets,
resource_defs=resource_defs,
executor_def=executor_def,
extra_source_assets=extra_source_assets,
)

@staticmethod
Expand Down Expand Up @@ -323,33 +316,7 @@ def from_modules(
Returns:
AssetGroup: An asset group with all the assets defined in the given modules.
"""
asset_ids: Set[int] = set()
asset_keys: Dict[AssetKey, ModuleType] = dict()
source_assets: List[SourceAsset] = list(
check.opt_sequence_param(
extra_source_assets, "extra_source_assets", of_type=SourceAsset
)
)
assets: List[AssetsDefinition] = []
for module in modules:
for asset in _find_assets_in_module(module):
if id(asset) not in asset_ids:
asset_ids.add(id(asset))
keys = asset.asset_keys if isinstance(asset, AssetsDefinition) else [asset.key]
for key in keys:
if key in asset_keys:
modules_str = ", ".join(
set([asset_keys[key].__name__, module.__name__])
)
raise DagsterInvalidDefinitionError(
f"Asset key {key} is defined multiple times. Definitions found in modules: {modules_str}."
)
else:
asset_keys[key] = module
if isinstance(asset, SourceAsset):
source_assets.append(asset)
else:
assets.append(asset)
assets, source_assets = assets_and_source_assets_from_modules(modules, extra_source_assets)

return AssetGroup(
assets=assets,
Expand Down Expand Up @@ -570,38 +537,6 @@ def __eq__(self, other: object) -> bool:
)


def _find_assets_in_module(
module: ModuleType,
) -> Generator[Union[AssetsDefinition, SourceAsset], None, None]:
"""
Finds assets in the given module and adds them to the given sets of assets and source assets.
"""
for attr in dir(module):
value = getattr(module, attr)
if isinstance(value, (AssetsDefinition, SourceAsset)):
yield value
elif isinstance(value, list) and all(
isinstance(el, (AssetsDefinition, SourceAsset)) for el in value
):
yield from value


def _find_modules_in_package(package_module: ModuleType) -> Iterable[ModuleType]:
yield package_module
package_path = package_module.__file__
if package_path:
for _, modname, is_pkg in pkgutil.walk_packages([os.path.dirname(package_path)]):
submodule = import_module(f"{package_module.__name__}.{modname}")
if is_pkg:
yield from _find_modules_in_package(submodule)
else:
yield submodule
else:
raise ValueError(
f"Tried to find modules in package {package_module}, but its __file__ is None"
)


def _validate_resource_reqs_for_asset_group(
asset_list: Sequence[AssetsDefinition],
source_assets: Sequence[SourceAsset],
Expand Down
195 changes: 195 additions & 0 deletions python_modules/dagster/dagster/core/asset_defs/assets_from_modules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
import inspect
import os
import pkgutil
from importlib import import_module
from types import ModuleType
from typing import Dict, Generator, Iterable, List, Optional, Sequence, Set, Tuple, Union

import dagster._check as check
from dagster.core.definitions.events import AssetKey

from ..errors import DagsterInvalidDefinitionError
from .assets import AssetsDefinition
from .source_asset import SourceAsset


def _find_assets_in_module(
module: ModuleType,
) -> Generator[Union[AssetsDefinition, SourceAsset], None, None]:
"""
Finds assets in the given module and adds them to the given sets of assets and source assets.
"""
for attr in dir(module):
value = getattr(module, attr)
if isinstance(value, (AssetsDefinition, SourceAsset)):
yield value
elif isinstance(value, list) and all(
isinstance(el, (AssetsDefinition, SourceAsset)) for el in value
):
yield from value


def assets_and_source_assets_from_modules(
modules: Iterable[ModuleType], extra_source_assets: Optional[Sequence[SourceAsset]] = None
) -> Tuple[List[AssetsDefinition], List[SourceAsset]]:
"""
Constructs two lists, a list of assets and a list of source assets, from the given modules.
Args:
modules (Iterable[ModuleType]): The Python modules to look for assets inside.
extra_source_assets (Optional[Sequence[SourceAsset]]): Source assets to include in the
group in addition to the source assets found in the modules.
Returns:
Tuple[List[AssetsDefinition], List[SourceAsset]]:
A tuple containing a list of assets and a list of source assets defined in the given modules.
"""
asset_ids: Set[int] = set()
asset_keys: Dict[AssetKey, ModuleType] = dict()
source_assets: List[SourceAsset] = list(
check.opt_sequence_param(extra_source_assets, "extra_source_assets", of_type=SourceAsset)
)
assets: List[AssetsDefinition] = []
for module in modules:
for asset in _find_assets_in_module(module):
if id(asset) not in asset_ids:
asset_ids.add(id(asset))
keys = asset.asset_keys if isinstance(asset, AssetsDefinition) else [asset.key]
for key in keys:
if key in asset_keys:
modules_str = ", ".join(set([asset_keys[key].__name__, module.__name__]))
raise DagsterInvalidDefinitionError(
f"Asset key {key} is defined multiple times. Definitions found in modules: {modules_str}."
)
else:
asset_keys[key] = module
if isinstance(asset, SourceAsset):
source_assets.append(asset)
else:
assets.append(asset)
return assets, source_assets


def assets_from_modules(
modules: Iterable[ModuleType], extra_source_assets: Optional[Sequence[SourceAsset]] = None
) -> List[Union[AssetsDefinition, SourceAsset]]:
"""
Constructs a list of assets and source assets from the given modules.
Args:
modules (Iterable[ModuleType]): The Python modules to look for assets inside.
extra_source_assets (Optional[Sequence[SourceAsset]]): Source assets to include in the
group in addition to the source assets found in the modules.
Returns:
List[Union[AssetsDefinition, SourceAsset]]:
A list containing assets and source assets defined in the given modules.
"""
assets, source_assets = assets_and_source_assets_from_modules(
modules, extra_source_assets=extra_source_assets
)
return [*assets, *source_assets]


def assets_from_current_module(
extra_source_assets: Optional[Sequence[SourceAsset]] = None,
) -> List[Union[AssetsDefinition, SourceAsset]]:
"""
Constructs a list of assets and source assets from the module where this function is called.
Args:
extra_source_assets (Optional[Sequence[SourceAsset]]): Source assets to include in the
group in addition to the source assets found in the modules.
Returns:
List[Union[AssetsDefinition, SourceAsset]]:
A list containing assets and source assets defined in the module.
"""
caller = inspect.stack()[1]
module = inspect.getmodule(caller[0])
if module is None:
check.failed("Could not find a module for the caller")

return assets_from_modules([module], extra_source_assets=extra_source_assets)


def assets_and_source_assets_from_package_module(
package_module: ModuleType,
extra_source_assets: Optional[Sequence[SourceAsset]] = None,
) -> Tuple[List[AssetsDefinition], List[SourceAsset]]:
"""
Constructs two lists, a list of assets and a list of source assets, from the given package module.
Args:
package_module (ModuleType): The package module to looks for assets inside.
extra_source_assets (Optional[Sequence[SourceAsset]]): Source assets to include in the
group in addition to the source assets found in the modules.
Returns:
Tuple[List[AssetsDefinition], List[SourceAsset]]:
A tuple containing a list of assets and a list of source assets defined in the given modules.
"""
return assets_and_source_assets_from_modules(
_find_modules_in_package(package_module), extra_source_assets=extra_source_assets
)


def assets_from_package_module(
package_module: ModuleType,
extra_source_assets: Optional[Sequence[SourceAsset]] = None,
) -> List[Union[AssetsDefinition, SourceAsset]]:
"""
Constructs a list of assets and source assets that includes all asset
definitions and source assets in all sub-modules of the given package module.
A package module is the result of importing a package.
Args:
package_module (ModuleType): The package module to looks for assets inside.
extra_source_assets (Optional[Sequence[SourceAsset]]): Source assets to include in the
group in addition to the source assets found in the modules.
Returns:
List[Union[AssetsDefinition, SourceAsset]]:
A list containing assets and source assets defined in the module.
"""
assets, source_assets = assets_and_source_assets_from_package_module(
package_module, extra_source_assets
)
return [*assets, *source_assets]


def assets_from_package_name(
package_name: str, extra_source_assets: Optional[Sequence[SourceAsset]] = None
) -> List[Union[AssetsDefinition, SourceAsset]]:
"""
Constructs a list of assets and source assets that include all asset
definitions and source assets in all sub-modules of the given package.
Args:
package_name (str): The name of a Python package to look for assets inside.
extra_source_assets (Optional[Sequence[SourceAsset]]): Source assets to include in the
group in addition to the source assets found in the modules.
Returns:
List[Union[AssetsDefinition, SourceAsset]]:
A list containing assets and source assets defined in the module.
"""
package_module = import_module(package_name)
return assets_from_package_module(package_module, extra_source_assets=extra_source_assets)


def _find_modules_in_package(package_module: ModuleType) -> Iterable[ModuleType]:
yield package_module
package_path = package_module.__file__
if package_path:
for _, modname, is_pkg in pkgutil.walk_packages([os.path.dirname(package_path)]):
submodule = import_module(f"{package_module.__name__}.{modname}")
if is_pkg:
yield from _find_modules_in_package(submodule)
else:
yield submodule
else:
raise ValueError(
f"Tried to find modules in package {package_module}, but its __file__ is None"
)

1 comment on commit be6d055

@vercel
Copy link

@vercel vercel bot commented on be6d055 Jun 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.