Skip to content

Commit

Permalink
default to fs_asset_io_manager in AssetGroup (#6859)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Mar 2, 2022
1 parent 7269df9 commit 4d4d6a7
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
5 changes: 2 additions & 3 deletions python_modules/dagster/dagster/core/asset_defs/asset_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)

from dagster import check
from dagster.core.storage.fs_asset_io_manager import fs_asset_io_manager
from dagster.utils import merge_dicts

from ..definitions.executor_definition import ExecutorDefinition
Expand Down Expand Up @@ -85,8 +86,6 @@ def __new__(
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
executor_def: Optional[ExecutorDefinition] = None,
):
from dagster.core.definitions.graph_definition import default_job_io_manager

check.list_param(assets, "assets", of_type=AssetsDefinition)
source_assets = check.opt_list_param(source_assets, "source_assets", of_type=SourceAsset)
resource_defs = check.opt_dict_param(
Expand All @@ -106,7 +105,7 @@ def __new__(
)
# In the case of collisions, merge_dicts takes values from the dictionary latest in the list, so we place the user provided resource defs after the defaults.
resource_defs = merge_dicts(
{"root_manager": root_manager, "io_manager": default_job_io_manager},
{"root_manager": root_manager, "io_manager": fs_asset_io_manager},
resource_defs,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
OpSelectionData,
parse_op_selection,
)
from dagster.core.storage.fs_asset_io_manager import fs_asset_io_manager
from dagster.core.storage.tags import PARTITION_NAME_TAG
from dagster.core.utils import str_format_set

Expand Down Expand Up @@ -302,7 +303,7 @@ def _swap_default_io_man(resources: Dict[str, ResourceDefinition], job: Pipeline

if (
# pylint: disable=comparison-with-callable
resources.get("io_manager") == default_job_io_manager
resources.get("io_manager") in [default_job_io_manager, fs_asset_io_manager]
and job.version_strategy is None
):
updated_resources = dict(resources)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
DagsterInvalidDefinitionError,
IOManager,
Out,
fs_asset_io_manager,
in_process_executor,
io_manager,
mem_io_manager,
Expand Down Expand Up @@ -346,3 +347,12 @@ def test_asset_group_from_package_module():
assert {source_asset.key for source_asset in collection.source_assets} == {
AssetKey("elvis_presley")
}


def test_default_io_manager():
@asset
def asset_foo():
return "foo"

group = AssetGroup(assets=[asset_foo])
assert group.resource_defs["io_manager"] == fs_asset_io_manager

0 comments on commit 4d4d6a7

Please sign in to comment.