Skip to content

Commit

Permalink
Azure asset IO Manager (#7201)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Mar 30, 2022
1 parent 9444b2f commit 1fc1bbc
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from .fake_adls2_resource import FakeADLS2Resource, FakeADLS2ServiceClient
from .file_cache import ADLS2FileCache, adls2_file_cache
from .file_manager import ADLS2FileHandle, ADLS2FileManager
from .io_manager import PickledObjectADLS2IOManager, adls2_pickle_io_manager
from .io_manager import (
PickledObjectADLS2IOManager,
adls2_pickle_asset_io_manager,
adls2_pickle_io_manager,
)
from .resources import adls2_file_manager, adls2_resource
from .utils import create_adls2_client
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,57 @@ def my_job():
init_context.resource_config.get("adls2_prefix"),
)
return pickled_io_manager


class PickledObjectADLS2AssetIOManager(PickledObjectADLS2IOManager):
def _get_path(self, context):
return "/".join([self.prefix, *context.asset_key.path])


@io_manager(
config_schema={
"adls2_file_system": Field(StringSource, description="ADLS Gen2 file system name"),
"adls2_prefix": Field(StringSource, is_required=False, default_value="dagster"),
},
required_resource_keys={"adls2"},
)
def adls2_pickle_asset_io_manager(init_context):
"""Persistent IO manager using Azure Data Lake Storage Gen2 for storage, meant for use with
software-defined assets.
Each asset is assigned to a single filesystem path, so subsequent materializations of an asset
will overwrite previous materializations of that asset.
Serializes objects via pickling. Suitable for objects storage for distributed executors, so long
as each execution node has network connectivity and credentials for ADLS and the backing
container.
Attach this resource definition to your job in order to make it available all your ops:
.. code-block:: python
asset_group = AssetGroup(
assets...,
resource_defs={'io_manager': adls2_pickle_io_manager, "adls2": adls2_resource, ...}),
)
You may configure this storage as follows:
.. code-block:: YAML
resources:
io_manager:
config:
adls2_file_system: my-cool-file-system
adls2_prefix: good/prefix-for-files
"""
adls_resource = init_context.resources.adls2
adls2_client = adls_resource.adls2_client
blob_client = adls_resource.blob_client
pickled_io_manager = PickledObjectADLS2AssetIOManager(
init_context.resource_config["adls2_file_system"],
adls2_client,
blob_client,
init_context.resource_config.get("adls2_prefix"),
)
return pickled_io_manager
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

@pytest.fixture(scope="session")
def storage_account():
yield "elementldevstorage"
yield "dagsterdev"


@pytest.fixture(scope="session")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import pytest
from dagster_azure.adls2 import create_adls2_client
from dagster_azure.adls2.io_manager import PickledObjectADLS2IOManager, adls2_pickle_io_manager
from dagster_azure.adls2.io_manager import (
PickledObjectADLS2IOManager,
adls2_pickle_asset_io_manager,
adls2_pickle_io_manager,
)
from dagster_azure.adls2.resources import adls2_resource
from dagster_azure.blob import create_blob_client

from dagster import (
AssetGroup,
DagsterInstance,
DynamicOutput,
DynamicOutputDefinition,
InputDefinition,
Int,
OutputDefinition,
PipelineRun,
asset,
build_input_context,
build_output_context,
graph,
Expand Down Expand Up @@ -68,7 +74,6 @@ def basic_external_plan_execution():


@pytest.mark.nettest
@pytest.mark.skip("https://github.com/dagster-io/dagster/issues/6607")
def test_adls2_pickle_io_manager_execution(storage_account, file_system, credential):
job = define_inty_job()

Expand Down Expand Up @@ -139,3 +144,32 @@ def test_adls2_pickle_io_manager_execution(storage_account, file_system, credent

assert get_step_output(add_one_step_events, "add_one")
assert io_manager.load_input(context) == 2


def test_asset_io_manager(storage_account, file_system, credential):
@asset
def upstream():
return 2

@asset
def downstream(upstream):
assert upstream == 2
return 1 + upstream

asset_group = AssetGroup(
[upstream, downstream],
resource_defs={"io_manager": adls2_pickle_asset_io_manager, "adls2": adls2_resource},
)
asset_job = asset_group.build_job(name="my_asset_job")

run_config = {
"resources": {
"io_manager": {"config": {"adls2_file_system": file_system}},
"adls2": {
"config": {"storage_account": storage_account, "credential": {"key": credential}}
},
}
}

result = asset_job.execute_in_process(run_config=run_config)
assert result.success

0 comments on commit 1fc1bbc

Please sign in to comment.