Skip to content

Commit

Permalink
Allow default io_manager load_input to support partitions of differin…
Browse files Browse the repository at this point in the history
…g frequencies (#10172)

* a working implementation

* support backwards compatibility, add key value collection and type annotations

* fix white space

* tidy up partition filepath method to match existing methods

* use make isort

* rename method to make it clear and easy to find

* fix test suite

* remove unused imports

* ran isort again after removing unused imports
  • Loading branch information
andrewgryan committed Oct 27, 2022
1 parent cf8ad4a commit 58ee17c
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 2 deletions.
29 changes: 27 additions & 2 deletions python_modules/dagster/dagster/_core/storage/fs_io_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ def _get_path(self, context: Union[InputContext, OutputContext]) -> str:

return os.path.join(self.base_dir, *path)

def _get_path_for_partition(self, asset_key: AssetKey, partition_key: str) -> str:
"""Construct filepath for a particular partition_key"""
return os.path.join(self.base_dir, *asset_key.path, partition_key)

def has_output(self, context):
filepath = self._get_path(context)

Expand Down Expand Up @@ -204,9 +208,30 @@ def load_input(self, context):
if context.dagster_type.typing_type == type(None):
return None

filepath = self._get_path(context)
context.add_input_metadata({"path": MetadataValue.path(os.path.abspath(filepath))})
def has_multiple_partitions(context):
key_range = context.asset_partition_key_range
return key_range.start != key_range.end

if (
context.has_input_name
and context.has_asset_partitions
and has_multiple_partitions(context)
):
# Multiple partition load
partition_keys = context.asset_partition_keys
paths = [
self._get_path_for_partition(context.asset_key, partition_key)
for partition_key in partition_keys
]
return {key: self._load_pickle(path) for (key, path) in zip(partition_keys, paths)}
else:
# Non-partitioned or single partition load
filepath = self._get_path(context)
context.add_input_metadata({"path": MetadataValue.path(os.path.abspath(filepath))})
return self._load_pickle(filepath)

def _load_pickle(self, filepath: str):
"""Unpickle the file and Load it to a data object."""
with open(filepath, self.read_mode) as read_obj:
return pickle.load(read_obj)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import datetime

from pytest import fixture

from dagster import DailyPartitionsDefinition, HourlyPartitionsDefinition, asset, materialize


@fixture
def start():
return datetime.datetime(2022, 1, 1)


@fixture
def hourly(start):
return HourlyPartitionsDefinition(start_date=f"{start:%Y-%m-%d-%H:%M}")


@fixture
def daily(start):
return DailyPartitionsDefinition(start_date=f"{start:%Y-%m-%d}")


def test_partitioned_io_manager(hourly, daily):
@asset(partitions_def=hourly)
def hourly_asset():
return 42

@asset(partitions_def=daily)
def daily_asset(hourly_asset):
return hourly_asset

# Build hourly materializations
hourly_keys = [f"2022-01-01-{hour:02d}:00" for hour in range(0, 24)]
for key in hourly_keys:
materialize(
[hourly_asset],
partition_key=key,
)

# Materialize daily asset that depends on hourlies
result = materialize(
[*hourly_asset.to_source_assets(), daily_asset],
partition_key="2022-01-01",
)
expected = {k: 42 for k in hourly_keys}
assert result.output_for_node("daily_asset") == expected


def test_partitioned_io_manager_preserves_single_partition_dependency(daily):
@asset(partitions_def=daily)
def upstream_asset():
return 42

@asset(partitions_def=daily)
def daily_asset(upstream_asset):
return upstream_asset

result = materialize(
[upstream_asset, daily_asset],
partition_key="2022-01-01",
)
assert result.output_for_node("daily_asset") == 42

0 comments on commit 58ee17c

Please sign in to comment.