Skip to content

Commit

Permalink
[docs] - Document partitioned IO managers [CON-37] (#8191)
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Jun 7, 2022
1 parent 73d459b commit ef928a4
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 3 deletions.
25 changes: 24 additions & 1 deletion docs/content/concepts/io-management/io-managers.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,36 @@ The <PyObject module="dagster" object="io_manager" /> decorator behaves nearly i

The provided `context` argument for `handle_output` is an <PyObject module="dagster" object="OutputContext" />. The provided `context` argument for `load_input` is an <PyObject module="dagster" object="InputContext" />. The linked API documentation lists all the fields that are available on these objects.

### Accessing the partition key

IO managers interoperate smoothly with [partitions](/concepts/partitions-schedules-sensors/partitions). You can access the partition key for the current run using the `context` for both `load_input` and `handle_output`. If working with [assets](/concepts/assets/software-defined-assets), you can also access the asset-specific partition key or partition key range, though most of the time the run partition key will be equal to the asset partition key.

```python literalinclude file=/concepts/io_management/custom_io_manager.py startafter=start_partitioned_marker endbefore=end_partitioned_marker
from dagster import IOManager


class MyPartitionedIOManager(IOManager):
def path_for_partition(self, partition_key):
return f"some/path/{partition_key}.csv"

# `context.partition_key` is the run-scoped partition key
def handle_output(self, context, obj):
write_csv(self.path_for_partition(context.partition_key), obj)

# `context.asset_partition_key` is set to the partition key for an asset
# (if the `IOManager` is handling an asset). This is usually equal to the
# run `partition_key`.
def load_input(self, context):
return read_csv(self.path_for_partition(context.asset_partition_key))
```

## Examples

### A custom IO manager that stores Pandas DataFrames in tables

If your ops produce Pandas DataFrames that populate tables in a data warehouse, you might write something like the following. This IO manager uses the name assigned to the output as the name of the table to write the output to.

```python literalinclude file=/concepts/io_management/custom_io_manager.py startafter=start_marker endbefore=end_marker
```python literalinclude file=/concepts/io_management/custom_io_manager.py startafter=start_df_marker endbefore=end_df_marker
from dagster import IOManager, io_manager


Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# isort: skip_file
# pylint: disable=reimported
from dagster import job, op, MetadataEntry


Expand All @@ -20,7 +21,36 @@ def read_dataframe_from_table(**_kwargs):
return []


# start_marker
def read_csv(_path):
pass


def write_csv(_path, _obj):
pass


# start_partitioned_marker
from dagster import IOManager


class MyPartitionedIOManager(IOManager):
def path_for_partition(self, partition_key):
return f"some/path/{partition_key}.csv"

# `context.partition_key` is the run-scoped partition key
def handle_output(self, context, obj):
write_csv(self.path_for_partition(context.partition_key), obj)

# `context.asset_partition_key` is set to the partition key for an asset
# (if the `IOManager` is handling an asset). This is usually equal to the
# run `partition_key`.
def load_input(self, context):
return read_csv(self.path_for_partition(context.asset_partition_key))


# end_partitioned_marker

# start_df_marker
from dagster import IOManager, io_manager


Expand All @@ -46,7 +76,7 @@ def my_job():
op_2(op_1())


# end_marker
# end_df_marker

# start_metadata_marker
class DataframeTableIOManagerWithMetadata(IOManager):
Expand Down

1 comment on commit ef928a4

@vercel
Copy link

@vercel vercel bot commented on ef928a4 Jun 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.