Skip to content

Commit

Permalink
update AssetGraph dedup logic (#7021)
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Mar 12, 2022
1 parent daa1188 commit 336ee6b
Showing 1 changed file with 36 additions and 38 deletions.
74 changes: 36 additions & 38 deletions python_modules/dagster/dagster/core/asset_defs/asset_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@
from typing import (
Any,
Dict,
Generator,
Iterable,
List,
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
)

from dagster import check
from dagster.core.definitions.events import AssetKey
from dagster.core.storage.fs_asset_io_manager import fs_asset_io_manager
from dagster.utils import merge_dicts

Expand Down Expand Up @@ -370,16 +371,8 @@ def from_package_module(
Returns:
AssetGroup: An asset group with all the assets in the package.
"""
assets: Set[AssetsDefinition] = set()
source_assets: Set[SourceAsset] = set()
for module in _find_modules_in_package(package_module):
module_assets, module_source_assets = _find_assets_in_module(module)
assets.update(module_assets)
source_assets.update(module_source_assets)

return AssetGroup(
assets=list(assets),
source_assets=list(source_assets),
return AssetGroup.from_modules(
_find_modules_in_package(package_module),
resource_defs=resource_defs,
executor_def=executor_def,
)
Expand Down Expand Up @@ -411,16 +404,16 @@ def from_package_name(

@staticmethod
def from_modules(
modules: Sequence[ModuleType],
modules: Iterable[ModuleType],
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
executor_def: Optional[ExecutorDefinition] = None,
) -> "AssetGroup":
"""
Constructs an AssetGroup that includes all asset definitions and source assets in the given
module.
modules.
Args:
modules (Sequence[ModuleType]): The Python modules to look for assets inside.
modules (Iterable[ModuleType]): The Python modules to look for assets inside.
resource_defs (Optional[Mapping[str, ResourceDefinition]]): A dictionary of resource
definitions to include on the returned asset group.
executor_def (Optional[ExecutorDefinition]): An executor to include on the returned
Expand All @@ -429,16 +422,31 @@ def from_modules(
Returns:
AssetGroup: An asset group with all the assets defined in the given modules.
"""
assets: Set[AssetsDefinition] = set()
source_assets: Set[SourceAsset] = set()
asset_ids: Set[int] = set()
asset_keys: Dict[AssetKey, ModuleType] = dict()
source_assets: List[SourceAsset] = []
assets: List[AssetsDefinition] = []
for module in modules:
module_assets, module_source_assets = _find_assets_in_module(module)
assets.update(module_assets)
source_assets.update(module_source_assets)
for asset in _find_assets_in_module(module):
if id(asset) not in asset_ids:
asset_ids.add(id(asset))
keys = asset.asset_keys if isinstance(asset, AssetsDefinition) else [asset.key]
for key in keys:
if key in asset_keys:
raise DagsterInvalidDefinitionError(
"Asset key {key} is defined multiple times. Definitions in "
f"module {asset_keys[key].__name__} and {module.__name__}."
)
else:
asset_keys[key] = module
if isinstance(asset, SourceAsset):
source_assets.append(asset)
else:
assets.append(asset)

return AssetGroup(
assets=list(assets),
source_assets=list(source_assets),
assets=assets,
source_assets=source_assets,
resource_defs=resource_defs,
executor_def=executor_def,
)
Expand Down Expand Up @@ -470,28 +478,18 @@ def from_current_module(

def _find_assets_in_module(
module: ModuleType,
) -> Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset]]:
) -> Generator[Union[AssetsDefinition, SourceAsset], None, None]:
"""
Finds assets in the given module and adds them to the given sets of assets and source assets.
"""
assets: List[AssetsDefinition] = []
source_assets: List[SourceAsset] = []

for attr in dir(module):
value = getattr(module, attr)
if isinstance(value, AssetsDefinition):
assets.append(value)
elif isinstance(value, SourceAsset):
source_assets.append(value)
elif isinstance(value, list):
if all(isinstance(el, (AssetsDefinition, SourceAsset)) for el in value):
for el in value:
if isinstance(el, AssetsDefinition):
assets.append(el)
elif isinstance(el, SourceAsset):
source_assets.append(el)

return assets, source_assets
if isinstance(value, (AssetsDefinition, SourceAsset)):
yield value
elif isinstance(value, list) and all(
isinstance(el, (AssetsDefinition, SourceAsset)) for el in value
):
yield from value


def _find_modules_in_package(package_module: ModuleType) -> Iterable[ModuleType]:
Expand Down

0 comments on commit 336ee6b

Please sign in to comment.