Skip to content

Commit

Permalink
Converting PickledObjectFilesystemIOManager to use UPathIOManager (#1…
Browse files Browse the repository at this point in the history
…0273)

* converted PickledObjectFilesystemIOManager to use UPathIOManager

* remove experimental annotation

* add kwargs for UPath

* fix docstring

* refactor load_input logic

* fix typo

* add UPathIOManager docs

* UPathIOManager brought into main dagster scope

* fix wording

* allow omitting type annotations for loading multiple partitions

* moved UPathIOManager docs to Examples

* fix get_metadata call

* remove pandas from tests & fix some issues with docs

* add blank line

* make mdx-format

* fix issues

* fix import

* fix docs issues

* fix graphql test & toy script

* fix toy IO manager

* fix apidoc

* fix typo

* remove unused import
  • Loading branch information
danielgafni committed Nov 14, 2022
1 parent c5c7422 commit 40671c2
Show file tree
Hide file tree
Showing 15 changed files with 629 additions and 267 deletions.
2 changes: 1 addition & 1 deletion docs/content/api/modules.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/searchindex.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/sections.json

Large diffs are not rendered by default.

136 changes: 132 additions & 4 deletions docs/content/concepts/io-management/io-managers.mdx
Expand Up @@ -64,7 +64,7 @@ Consider the following diagram. In this example, a job has two IO managers, each

The default IO manager, <PyObject module="dagster" object="fs_io_manager" />, stores and retrieves values from pickle files in the local filesystem. If a job is invoked via <PyObject object="JobDefinition" method="execute_in_process" />, the default IO manager is switched to <PyObject module="dagster" object="mem_io_manager"/>, which stores outputs in memory.

Dagster provides out-of-the-box IO managers for popular storage systems: AWS S3 (<PyObject module="dagster_aws.s3" object="s3_pickle_io_manager" />), Azure Blob Storage (<PyObject module="dagster_azure.adls2" object="adls2_pickle_io_manager" />), GCS (<PyObject module="dagster_gcp.gcs" object="gcs_pickle_io_manager" />), and Snowflake (<PyObject module="dagster_snowflake" object="build_snowflake_io_manager" />) - or you can write your own.
Dagster provides out-of-the-box IO managers for popular storage systems: AWS S3 (<PyObject module="dagster_aws.s3" object="s3_pickle_io_manager" />), Azure Blob Storage (<PyObject module="dagster_azure.adls2" object="adls2_pickle_io_manager" />), GCS (<PyObject module="dagster_gcp.gcs" object="gcs_pickle_io_manager" />), and Snowflake (<PyObject module="dagster_snowflake" object="build_snowflake_io_manager" />) - or you can write your own: either from scratch or by extending the `UPathIOManager` if you want to store data in an `fsspec`-supported filesystem.

### IO managers are resources

Expand Down Expand Up @@ -353,9 +353,11 @@ The provided `context` argument for `handle_output` is an <PyObject module="dags

### Handling partitioned assets

IO managers can be written to handle [partitioned](/concepts/partitions-schedules-sensors/partitions) assets. For a partitioned asset, each invocation of `handle_output` will (over)write a single partition, and each invocation of `load_input` will typically load a single partition. When the IO manager is backed by a filesystem or object store, then each partition will typically correspond to a file or object. When it's backed by a database, then each partition will typically correspond to a range of values in a table that fall within a particular window.
IO managers can be written to handle [partitioned](/concepts/partitions-schedules-sensors/partitions) assets. For a partitioned asset, each invocation of `handle_output` will (over)write a single partition, and each invocation of `load_input` will load one or more partitions. When the IO manager is backed by a filesystem or object store, then each partition will typically correspond to a file or object. When it's backed by a database, then each partition will typically correspond to a range of values in a table that fall within a particular window.

To handle partitions in an IO manager, you'll need to determine which partition you're dealing with when you're storing an output or loading an input. For this, <PyObject object="OutputContext" /> and <PyObject object="InputContext" /> have a `asset_partition_key` property:
The default IO Manager has support for loading a partitioned upstream asset for a downstream asset with matching partitions out of the box (see the section below for loading multiple partitions). The <PyObject module="dagster" object="UPathIOManager" /> can be used to handle partitions in custom filesystem-based IO Managers.

To handle partitions in an custom IO manager, you'll need to determine which partition you're dealing with when you're storing an output or loading an input. For this, <PyObject object="OutputContext" /> and <PyObject object="InputContext" /> have a `asset_partition_key` property:

```python file=/concepts/io_management/custom_io_manager.py startafter=start_partitioned_marker endbefore=end_partitioned_marker
class MyPartitionedIOManager(IOManager):
Expand All @@ -376,7 +378,57 @@ If you're working with time window partitions, you can also use the `asset_parti

#### Handling partition mappings <Experimental />

If you're using a <PyObject object="PartitionMapping" />, a single partition of one asset might depend on a range of partitions of an upstream asset. In that case, you can access the entire range using <PyObject object="InputContext" method="asset_partition_key_range" />, <PyObject object="InputContext" method="asset_partition_keys" />, or <PyObject object="InputContext" method="asset_partitions_time_window" />.
A single partition of one asset might depend on a range of partitions of an upstream asset.

The default IO Manager has support for loading multiple upstream partitions. In this case, the downstream asset should use `Dict[str, ...]` (or leave it blank) type for the upstream `DagsterType`. Here is an example of loading multiple upstream partitions using the default partition mapping:

```python file=/concepts/io_management/loading_multiple_upstream_partitions.py
from datetime import datetime
from typing import Dict

import pandas as pd

from dagster import (
DailyPartitionsDefinition,
HourlyPartitionsDefinition,
OpExecutionContext,
asset,
materialize,
)

start = datetime(2022, 1, 1)

hourly_partitions = HourlyPartitionsDefinition(start_date=f"{start:%Y-%m-%d-%H:%M}")
daily_partitions = DailyPartitionsDefinition(start_date=f"{start:%Y-%m-%d}")


@asset(partitions_def=hourly_partitions)
def upstream_asset(context: OpExecutionContext) -> pd.DataFrame:
return pd.DataFrame({"date": [context.partition_key]})


@asset(
partitions_def=daily_partitions,
)
def downstream_asset(upstream_asset: Dict[str, pd.DataFrame]) -> pd.DataFrame:
return pd.concat(list(upstream_asset.values()))


result = materialize(
[*upstream_asset.to_source_assets(), downstream_asset],
partition_key=start.strftime(daily_partitions.fmt),
)
downstream_asset_data = result.output_for_node("downstream_asset", "result")
assert (
len(downstream_asset_data) == 24
), "downstream day should map to upstream 24 hours"
```

The `upstream_asset` becomes a mapping from partition keys to partition values. This is a property of the default IO manager or any IO manager inheriting from the <PyObject module="dagster" object="UPathIOManager" />.

A custom <PyObject object="PartitionMapping" /> instance can be provided to `In` to configure the mapped upstream partitions.

When writing a custom IO Manager for loading multiple upstream partitions, the mapped keys can be accessed using <PyObject object="InputContext" method="asset_partition_keys" />, <PyObject object="InputContext" method="asset_partition_key_range" />, or <PyObject object="InputContext" method="asset_partitions_time_window" />.

### Writing a per-input IO Manager

Expand Down Expand Up @@ -492,6 +544,82 @@ def my_job():
op_2(op_1())
```

### Custom filesystem-based IO Manager

Dagster provides a feature-rich base class for filesystem-based IO Managers: <PyObject module="dagster" object="UPathIOManager" />. It's compatible with both local and remote filesystems (like S3 or GCS) by using `universal-pathlib` and `fsspec`. The full list of supported filesystems can be found [here](https://github.com/fsspec/universal_pathlib#currently-supported-filesystems-and-schemes). The `UPathIOManager` also has other important features:

- handles partitioned assets
- handles loading a single upstream partition
- handles loading multiple upstream partitions (with respect to <PyObject object="PartitionMapping" />)
- the `get_metadata` method can be customized to add additional metadata to the output
- the `allow_missing_partitions` metadata value can be set to `True` to skip missing partitions (the default behavior is to raise an error)

The default IO manager inherits from the `UPathIOManager` and therefore has these features too.

The `UPathIOManager` already implements the `load_input` and `handle_output` methods. Instead, <PyObject module="dagster" object="UPathIOManager" method="dump_to_path" /> and <PyObject module="dagster" object="UPathIOManager" method="load_from_path" /> for a given `universal_pathlib.UPath` have to be implemented. Here are some examples:

```python file=/concepts/io_management/filesystem_io_manager.py startafter=start_marker endbefore=end_class_marker
import pandas as pd
from upath import UPath

from dagster import (
Field,
InitResourceContext,
InputContext,
OutputContext,
StringSource,
UPathIOManager,
io_manager,
)


class PandasParquetIOManager(UPathIOManager):
extension: str = ".parquet"

def dump_to_path(self, context: OutputContext, obj: pd.DataFrame, path: UPath):
with path.open("wb") as file:
obj.to_parquet(file)

def load_from_path(self, context: InputContext, path: UPath) -> pd.DataFrame:
with path.open("rb") as file:
return pd.read_parquet(file)
```

The extension attribute defines the suffix all the file paths generated by the IOManager will end with.

The io managers defined above will work with partitioned assets on any filesystem:

```python file=/concepts/io_management/filesystem_io_manager.py startafter=start_def_marker endbefore=end_marker
@io_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_pandas_parquet_io_manager(
init_context: InitResourceContext,
) -> PandasParquetIOManager:
assert init_context.instance is not None # to please mypy
base_path = UPath(
init_context.resource_config.get(
"base_path", init_context.instance.storage_directory()
)
)
return PandasParquetIOManager(base_path=base_path)


@io_manager(
config_schema={
"base_path": Field(str, is_required=True),
"AWS_ACCESS_KEY_ID": StringSource,
"AWS_SECRET_ACCESS_KEY": StringSource,
}
)
def s3_parquet_io_manager(init_context: InitResourceContext) -> PandasParquetIOManager:
# `UPath` will read boto env vars.
# The credentials can also be taken from the config and passed to `UPath` directly.
base_path = UPath(init_context.resource_config.get("base_path"))
assert str(base_path).startswith("s3://"), base_path
return PandasParquetIOManager(base_path=base_path)
```

Notice how the local and S3 IO managers are practically the same - the only difference is in the required resources.

### Providing per-output config to an IO manager

When launching a run, you might want to parameterize how particular outputs are stored.
Expand Down
Binary file modified docs/next/public/objects.inv
Binary file not shown.
4 changes: 4 additions & 0 deletions docs/sphinx/sections/api/apidocs/io-managers.rst
Expand Up @@ -42,6 +42,10 @@ Built-in IO Managers
.. autodata:: fs_io_manager
:annotation: IOManagerDefinition

The ``UPathIOManager`` can be used to easily define filesystem-based IO Managers.

.. autoclass:: UPathIOManager


Input Managers (Experimental)
----------------------------------
Expand Down
@@ -0,0 +1,61 @@
# start_marker
import pandas as pd
from upath import UPath

from dagster import (
Field,
InitResourceContext,
InputContext,
OutputContext,
StringSource,
UPathIOManager,
io_manager,
)


class PandasParquetIOManager(UPathIOManager):
extension: str = ".parquet"

def dump_to_path(self, context: OutputContext, obj: pd.DataFrame, path: UPath):
with path.open("wb") as file:
obj.to_parquet(file)

def load_from_path(self, context: InputContext, path: UPath) -> pd.DataFrame:
with path.open("rb") as file:
return pd.read_parquet(file)


# end_class_marker

# the IO Manager can be used with any filesystem (see https://github.com/fsspec/universal_pathlib)

# start_def_marker
@io_manager(config_schema={"base_path": Field(str, is_required=False)})
def local_pandas_parquet_io_manager(
init_context: InitResourceContext,
) -> PandasParquetIOManager:
assert init_context.instance is not None # to please mypy
base_path = UPath(
init_context.resource_config.get(
"base_path", init_context.instance.storage_directory()
)
)
return PandasParquetIOManager(base_path=base_path)


@io_manager(
config_schema={
"base_path": Field(str, is_required=True),
"AWS_ACCESS_KEY_ID": StringSource,
"AWS_SECRET_ACCESS_KEY": StringSource,
}
)
def s3_parquet_io_manager(init_context: InitResourceContext) -> PandasParquetIOManager:
# `UPath` will read boto env vars.
# The credentials can also be taken from the config and passed to `UPath` directly.
base_path = UPath(init_context.resource_config.get("base_path"))
assert str(base_path).startswith("s3://"), base_path
return PandasParquetIOManager(base_path=base_path)


# end_marker
@@ -0,0 +1,39 @@
from datetime import datetime
from typing import Dict

import pandas as pd

from dagster import (
DailyPartitionsDefinition,
HourlyPartitionsDefinition,
OpExecutionContext,
asset,
materialize,
)

start = datetime(2022, 1, 1)

hourly_partitions = HourlyPartitionsDefinition(start_date=f"{start:%Y-%m-%d-%H:%M}")
daily_partitions = DailyPartitionsDefinition(start_date=f"{start:%Y-%m-%d}")


@asset(partitions_def=hourly_partitions)
def upstream_asset(context: OpExecutionContext) -> pd.DataFrame:
return pd.DataFrame({"date": [context.partition_key]})


@asset(
partitions_def=daily_partitions,
)
def downstream_asset(upstream_asset: Dict[str, pd.DataFrame]) -> pd.DataFrame:
return pd.concat(list(upstream_asset.values()))


result = materialize(
[*upstream_asset.to_source_assets(), downstream_asset],
partition_key=start.strftime(daily_partitions.fmt),
)
downstream_asset_data = result.output_for_node("downstream_asset", "result")
assert (
len(downstream_asset_data) == 24
), "downstream day should map to upstream 24 hours"
Expand Up @@ -351,12 +351,15 @@ def _csv_hello_world_event_sequence(self):
"ExecutionStepStartEvent",
"ExecutionStepInputEvent",
"ExecutionStepOutputEvent",
"LogMessageEvent",
"HandledOutputEvent",
"ExecutionStepSuccessEvent",
"ExecutionStepStartEvent",
"LogMessageEvent",
"LoadedInputEvent",
"ExecutionStepInputEvent",
"ExecutionStepOutputEvent",
"LogMessageEvent",
"HandledOutputEvent",
"ExecutionStepSuccessEvent",
"RunSuccessEvent",
Expand Down
26 changes: 19 additions & 7 deletions python_modules/dagster-test/dagster_test/toys/asset_lineage.py
@@ -1,23 +1,25 @@
import datetime
import os
import random
import string
from typing import Any, Dict, Union

import pandas as pd

from dagster import (
Array,
AssetKey,
Field,
IOManager,
InputContext,
MetadataEntry,
MetadataValue,
Out,
Output,
OutputContext,
Partition,
graph,
op,
)
from dagster._core.storage.fs_io_manager import PickledObjectFilesystemIOManager
from dagster._core.storage.io_manager import io_manager
from dagster._legacy import PartitionSetDefinition

Expand Down Expand Up @@ -66,20 +68,30 @@ def metadata_for_actions(df):
}


class MyDatabaseIOManager(PickledObjectFilesystemIOManager):
def _get_path(self, context):
keys = context.get_identifier()
class MyDatabaseIOManager(IOManager):
"""Pretend this IO Manager uses an external database"""

return os.path.join("/tmp", *keys)
storage: Dict[str, pd.DataFrame] = {}

@staticmethod
def get_key(context: Union[InputContext, OutputContext]):
if context.has_asset_key:
return "/".join(list(context.get_asset_identifier()))
else:
return "/".join(list(context.get_identifier()))

def load_input(self, context: InputContext) -> Any:
# loading code here
return self.storage[self.get_key(context)]

def handle_output(self, context, obj):
super().handle_output(context, obj)
# can pretend this actually came from a library call
yield MetadataEntry(
label="num rows written to db",
description=None,
entry_data=MetadataValue.int(len(obj)),
)
self.storage[self.get_key(context)] = obj

def get_output_asset_key(self, context):
return AssetKey(
Expand Down
1 change: 1 addition & 0 deletions python_modules/dagster/dagster/__init__.py
Expand Up @@ -543,6 +543,7 @@
deserialize_value as deserialize_value,
serialize_value as serialize_value,
)
from dagster._core.storage.upath_io_manager import UPathIOManager as UPathIOManager
from dagster._utils import (
file_relative_path as file_relative_path,
)
Expand Down

1 comment on commit 40671c2

@vercel
Copy link

@vercel vercel bot commented on 40671c2 Nov 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.