Skip to content

Commit

Permalink
Merge Asset IO functionality with Library IO managers (#8189)
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Jun 6, 2022
1 parent efae88a commit 585eeaf
Show file tree
Hide file tree
Showing 19 changed files with 125 additions and 235 deletions.
2 changes: 1 addition & 1 deletion docs/content/api/modules.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/searchindex.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/sections.json

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions docs/content/concepts/assets/software-defined-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,12 @@ In any situation where you might use a source asset, you could also have the cod

By default, materializing an asset will pickle it to a local file named "my_asset", in a temporary directory. You can specify this directory by providing a value for the `local_artifact_storage` property in your dagster.yaml file.

[IO managers](/concepts/io-management/io-managers) enable fully overriding this behavior and storing asset contents in any way you wish - e.g. writing them as tables in a database or as objects in a cloud object store. Dagster also provides built-in IO managers that pickle assets to AWS S3 (<PyObject module="dagster_aws.s3" object="s3_pickle_asset_io_manager" />), Azure Blob Storage (<PyObject module="dagster_azure.adls2" object="adls2_pickle_asset_io_manager" />), and GCS (<PyObject module="dagster_gcp.gcs" object="gcs_pickle_asset_io_manager" />), or you can write your own.
[IO managers](/concepts/io-management/io-managers) enable fully overriding this behavior and storing asset contents in any way you wish - e.g. writing them as tables in a database or as objects in a cloud object store. Dagster also provides built-in IO managers that pickle assets to AWS S3 (<PyObject module="dagster_aws.s3" object="s3_pickle_io_manager" />), Azure Blob Storage (<PyObject module="dagster_azure.adls2" object="adls2_pickle_io_manager" />), and GCS (<PyObject module="dagster_gcp.gcs" object="gcs_pickle_io_manager" />), or you can write your own.

To apply an IO manager to a set of assets, you can include it with them in an <PyObject object="AssetGroup" />.

```python file=/concepts/assets/asset_io_manager.py startafter=start_marker endbefore=end_marker
from dagster_aws.s3 import s3_pickle_asset_io_manager, s3_resource
from dagster_aws.s3 import s3_pickle_io_manager, s3_resource

from dagster import AssetGroup, asset

Expand All @@ -194,18 +194,18 @@ def downstream_asset(upstream_asset):

asset_group = AssetGroup(
[upstream_asset, downstream_asset],
resource_defs={"io_manager": s3_pickle_asset_io_manager, "s3": s3_resource},
resource_defs={"io_manager": s3_pickle_io_manager, "s3": s3_resource},
)
```

This example also includes `"s3": s3_resource`, because the `s3_pickle_asset_io_manager` depends on an s3 resource.
This example also includes `"s3": s3_resource`, because the `s3_pickle_io_manager` depends on an s3 resource.

When `upstream_asset` is materialized, the value `[1, 2, 3]` will be will be pickled and stored in an object on S3. When `downstream_asset` is materialized, the value of `upstream_asset` will be read from S3 and depickled, and `[1, 2, 3, 4]` will be pickled and stored in a different object on S3.

Different assets can have different IO managers:

```python file=/concepts/assets/asset_different_io_managers.py startafter=start_marker endbefore=end_marker
from dagster_aws.s3 import s3_pickle_asset_io_manager, s3_resource
from dagster_aws.s3 import s3_pickle_io_manager, s3_resource

from dagster import AssetGroup, asset, fs_io_manager

Expand All @@ -223,7 +223,7 @@ def downstream_asset(upstream_asset):
asset_group = AssetGroup(
[upstream_asset, downstream_asset],
resource_defs={
"s3_io_manager": s3_pickle_asset_io_manager,
"s3_io_manager": s3_pickle_io_manager,
"s3": s3_resource,
"fs_io_manager": fs_io_manager,
},
Expand All @@ -235,7 +235,7 @@ When `upstream_asset` is materialized, the value `[1, 2, 3]` will be will be pic
The same assets can be bound to different resources and IO managers in different environments. For example, for local development, you might want to store assets on your local filesystem while, in production, you might want to store the assets in S3.

```python file=/concepts/assets/asset_io_manager_prod_local.py startafter=start_marker endbefore=end_marker
from dagster_aws.s3 import s3_pickle_asset_io_manager, s3_resource
from dagster_aws.s3 import s3_pickle_io_manager, s3_resource

from dagster import AssetGroup, asset, fs_io_manager

Expand All @@ -252,7 +252,7 @@ def downstream_asset(upstream_asset):

prod_asset_group = AssetGroup(
[upstream_asset, downstream_asset],
resource_defs={"io_manager": s3_pickle_asset_io_manager, "s3": s3_resource},
resource_defs={"io_manager": s3_pickle_io_manager, "s3": s3_resource},
)

local_asset_group = AssetGroup(
Expand Down
5 changes: 1 addition & 4 deletions docs/sphinx/sections/api/apidocs/libraries/dagster-aws.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ S3
.. autoconfigurable:: dagster_aws.s3.s3_pickle_io_manager
:annotation: IOManagerDefinition

.. autoconfigurable:: dagster_aws.s3.s3_pickle_asset_io_manager
:annotation: IOManagerDefinition


ECS
---
Expand Down Expand Up @@ -88,7 +85,7 @@ CloudWatch
SecretsManager
--------------

Resources which surface SecretsManager secrets for use in Dagster resources and jobs.
Resources which surface SecretsManager secrets for use in Dagster resources and jobs.

.. autoconfigurable:: dagster_aws.secretsmanager.secretsmanager_resource
:annotation: ResourceDefinition
Expand Down
3 changes: 0 additions & 3 deletions docs/sphinx/sections/api/apidocs/libraries/dagster-azure.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,3 @@ dependency on an old version, via ``snowflake-connector-python``.

.. autoconfigurable:: dagster_azure.adls2.adls2_pickle_io_manager
:annotation: IOManagerDefinition

.. autoconfigurable:: dagster_azure.adls2.adls2_pickle_asset_io_manager
:annotation: IOManagerDefinition
3 changes: 0 additions & 3 deletions docs/sphinx/sections/api/apidocs/libraries/dagster-gcp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ GCS
.. autoconfigurable:: dagster_gcp.gcs.gcs_pickle_io_manager
:annotation: IOManagerDefinition

.. autoconfigurable:: dagster_gcp.gcs.gcs_pickle_asset_io_manager
:annotation: IOManagerDefinition

Legacy APIs
-----------

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# pylint: disable=redefined-outer-name
# start_marker
from dagster_aws.s3 import s3_pickle_asset_io_manager, s3_resource
from dagster_aws.s3 import s3_pickle_io_manager, s3_resource

from dagster import AssetGroup, asset, fs_io_manager

Expand All @@ -18,7 +18,7 @@ def downstream_asset(upstream_asset):
asset_group = AssetGroup(
[upstream_asset, downstream_asset],
resource_defs={
"s3_io_manager": s3_pickle_asset_io_manager,
"s3_io_manager": s3_pickle_io_manager,
"s3": s3_resource,
"fs_io_manager": fs_io_manager,
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# pylint: disable=redefined-outer-name
# start_marker
from dagster_aws.s3 import s3_pickle_asset_io_manager, s3_resource
from dagster_aws.s3 import s3_pickle_io_manager, s3_resource

from dagster import AssetGroup, asset

Expand All @@ -17,7 +17,7 @@ def downstream_asset(upstream_asset):

asset_group = AssetGroup(
[upstream_asset, downstream_asset],
resource_defs={"io_manager": s3_pickle_asset_io_manager, "s3": s3_resource},
resource_defs={"io_manager": s3_pickle_io_manager, "s3": s3_resource},
)

# end_marker
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# pylint: disable=redefined-outer-name
# start_marker
from dagster_aws.s3 import s3_pickle_asset_io_manager, s3_resource
from dagster_aws.s3 import s3_pickle_io_manager, s3_resource

from dagster import AssetGroup, asset, fs_io_manager

Expand All @@ -17,7 +17,7 @@ def downstream_asset(upstream_asset):

prod_asset_group = AssetGroup(
[upstream_asset, downstream_asset],
resource_defs={"io_manager": s3_pickle_asset_io_manager, "s3": s3_resource},
resource_defs={"io_manager": s3_pickle_io_manager, "s3": s3_resource},
)

local_asset_group = AssetGroup(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
from .compute_log_manager import S3ComputeLogManager
from .file_cache import S3FileCache, s3_file_cache
from .file_manager import S3FileHandle, S3FileManager
from .io_manager import (
PickledObjectS3AssetIOManager,
PickledObjectS3IOManager,
s3_pickle_asset_io_manager,
s3_pickle_io_manager,
)
from .io_manager import PickledObjectS3IOManager, s3_pickle_io_manager
from .ops import S3Coordinate, file_handle_to_s3
from .resources import s3_file_manager, s3_resource
from .s3_fake_resource import S3FakeSession, create_s3_fake_resource
Expand Down
54 changes: 6 additions & 48 deletions python_modules/libraries/dagster-aws/dagster_aws/s3/io_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ def __init__(
self.s3.list_objects(Bucket=self.bucket, Prefix=self.s3_prefix, MaxKeys=1)

def _get_path(self, context: Union[InputContext, OutputContext]) -> str:
return "/".join([self.s3_prefix, "storage", *context.get_identifier()])
if context.has_asset_key:
path = context.get_asset_identifier()
else:
path = ["storage"] + context.get_identifier()

return "/".join([self.s3_prefix, *path])

def has_output(self, context):
key = self._get_path(context)
Expand Down Expand Up @@ -108,50 +113,3 @@ def my_job():
s3_prefix = init_context.resource_config.get("s3_prefix") # s3_prefix is optional
pickled_io_manager = PickledObjectS3IOManager(s3_bucket, s3_session, s3_prefix=s3_prefix)
return pickled_io_manager


class PickledObjectS3AssetIOManager(PickledObjectS3IOManager):
def _get_path(self, context: Union[InputContext, OutputContext]) -> str:
return "/".join([self.s3_prefix, *context.get_asset_identifier()])


@io_manager(
config_schema={
"s3_bucket": Field(StringSource),
"s3_prefix": Field(StringSource, is_required=False, default_value="dagster"),
},
required_resource_keys={"s3"},
)
def s3_pickle_asset_io_manager(init_context):
"""Persistent IO manager using S3 for storage, meant for use with software-defined assets.
Each asset is assigned to a single filesystem path, so subsequent materializations of an asset
will overwrite previous materializations of that asset.
Serializes objects via pickling. Suitable for objects storage for distributed executors, so long
as each execution node has network connectivity and credentials for S3 and the backing bucket.
Attach this resource definition to your job to make it available to your ops.
.. code-block:: python
asset_group = AssetGroup(
assets...,
resource_defs={'io_manager': s3_pickle_asset_io_manager, "s3": s3_resource, ...}),
)
You may configure this IO manager as follows:
.. code-block:: YAML
resources:
io_manager:
config:
s3_bucket: my-cool-bucket
s3_prefix: good/prefix-for-files-
"""
s3_session = init_context.resources.s3
s3_bucket = init_context.resource_config["s3_bucket"]
s3_prefix = init_context.resource_config.get("s3_prefix") # s3_prefix is optional
pickled_io_manager = PickledObjectS3AssetIOManager(s3_bucket, s3_session, s3_prefix=s3_prefix)
return pickled_io_manager
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from dagster_aws.s3.io_manager import s3_pickle_asset_io_manager, s3_pickle_io_manager
from dagster_aws.s3.io_manager import s3_pickle_io_manager
from dagster_aws.s3.utils import construct_s3_client

from dagster import (
GraphIn,
GraphOut,
In,
Int,
Out,
Expand All @@ -10,10 +12,12 @@
VersionStrategy,
asset,
build_assets_job,
graph,
job,
op,
resource,
)
from dagster.core.asset_defs.assets import AssetsDefinition
from dagster.core.test_utils import instance_for_test


Expand Down Expand Up @@ -121,41 +125,63 @@ def memoized():


def define_assets_job(bucket):
@op
def first_op(first_input):
assert first_input == 2
return first_input * 2

@op
def second_op(second_input):
assert second_input == 4
return second_input + 3

@asset
def asset1():
return 1

@asset
def asset2(asset1):
assert asset1 == 1
return asset1 + 1

@graph(ins={"asset2": GraphIn()}, out={"asset3": GraphOut()})
def graph_asset(asset2):
return second_op(first_op(asset2))

@asset(partitions_def=StaticPartitionsDefinition(["apple", "orange"]))
def partitioned():
return 8

return build_assets_job(
name="assets",
assets=[asset1, asset2, partitioned],
assets=[asset1, asset2, AssetsDefinition.from_graph(graph_asset), partitioned],
resource_defs={
"io_manager": s3_pickle_asset_io_manager.configured({"s3_bucket": bucket}),
"io_manager": s3_pickle_io_manager.configured({"s3_bucket": bucket}),
"s3": s3_test_resource,
},
)


def test_s3_pickle_asset_io_manager_execution(mock_s3_bucket):
def test_s3_pickle_io_manager_asset_execution(mock_s3_bucket):
assert not len(list(mock_s3_bucket.objects.all()))
inty_job = define_assets_job(mock_s3_bucket.name)

result = inty_job.execute_in_process(partition_key="apple")

assert result.output_for_node("asset1") == 1
assert result.output_for_node("asset2") == 2
assert result.output_for_node("graph_asset.first_op") == 4
assert result.output_for_node("graph_asset.second_op") == 7

objects = list(mock_s3_bucket.objects.all())
assert len(objects) == 3
assert len(objects) == 5
assert {(o.bucket_name, o.key) for o in objects} == {
("test-bucket", "dagster/asset1"),
("test-bucket", "dagster/asset2"),
("test-bucket", "dagster/asset3"),
("test-bucket", "dagster/partitioned/apple"),
(
"test-bucket",
"/".join(["dagster", "storage", result.run_id, "graph_asset.first_op", "result"]),
),
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
from .fake_adls2_resource import FakeADLS2Resource, FakeADLS2ServiceClient, fake_adls2_resource
from .file_cache import ADLS2FileCache, adls2_file_cache
from .file_manager import ADLS2FileHandle, ADLS2FileManager
from .io_manager import (
PickledObjectADLS2IOManager,
adls2_pickle_asset_io_manager,
adls2_pickle_io_manager,
)
from .io_manager import PickledObjectADLS2IOManager, adls2_pickle_io_manager
from .resources import adls2_file_manager, adls2_resource
from .utils import create_adls2_client

1 comment on commit 585eeaf

@vercel
Copy link

@vercel vercel bot commented on 585eeaf Jun 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.