Skip to content

Commit

Permalink
gcs asset io manager (#7081)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Mar 18, 2022
1 parent b0a885f commit 8063405
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from .compute_log_manager import GCSComputeLogManager
from .file_manager import GCSFileHandle
from .io_manager import PickledObjectGCSIOManager, gcs_pickle_io_manager
from .gcs_fake_resource import FakeGCSBlob, FakeGCSBucket, FakeGCSClient
from .io_manager import (
PickledObjectGCSIOManager,
gcs_pickle_asset_io_manager,
gcs_pickle_io_manager,
)
from .resources import gcs_file_manager, gcs_resource
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from typing import Dict, Optional, Union


class FakeGCSBlob:
def __init__(self, name: str, bucket: "FakeGCSBucket"):
from unittest import mock

self.name = name
self.data = b""
self.bucket = bucket
self.mock_extras = mock.MagicMock()

def exists(self, *args, **kwargs):
self.mock_extras.exists(*args, **kwargs)
return True

def delete(self, *args, **kwargs):
self.mock_extras.delete(*args, **kwargs)
del self.bucket.blobs[self.name]

def download_as_bytes(self, *args, **kwargs):
self.mock_extras.download_as_bytes(*args, **kwargs)
return self.data

def upload_from_string(self, data: Union[bytes, str], *args, **kwargs):
self.mock_extras.upload_from_string(*args, **kwargs)
if isinstance(data, str):
self.data = data.encode()
else:
self.data = data


class FakeGCSBucket:
def __init__(self, name: str):
from unittest import mock

self.name = name
self.blobs: Dict[str, FakeGCSBlob] = {}
self.mock_extras = mock.MagicMock()

def blob(self, blob_name: str, *args, **kwargs):
self.mock_extras.blob(*args, **kwargs)

if blob_name not in self.blobs.keys():
self.blobs[blob_name] = FakeGCSBlob(name=blob_name, bucket=self)

return self.blobs[blob_name]

def exists(self, *args, **kwargs):
self.mock_extras.exists(*args, **kwargs)
return True


class FakeGCSClient:
def __init__(self):
from unittest import mock

self.buckets: Dict[str, FakeGCSBucket] = {}
self.mock_extras = mock.MagicMock()

def bucket(self, bucket_name: str, *args, **kwargs):
self.mock_extras.bucket(*args, **kwargs)

if bucket_name not in self.buckets.keys():
self.buckets[bucket_name] = FakeGCSBucket(name=bucket_name)

return self.buckets[bucket_name]

def list_buckets(self, *args, **kwargs):
self.mock_extras.list_buckets(*args, **kwargs)
for bucket in self.buckets.values():
yield bucket

def list_blobs(
self,
bucket_or_name: Union[FakeGCSBucket, str],
*args,
prefix: Optional[str] = None,
**kwargs,
):
self.mock_extras.list_blobs(*args, **kwargs)

if isinstance(bucket_or_name, str):
bucket = self.bucket(bucket_or_name)
else:
bucket = bucket_or_name

for blob in self.buckets[bucket.name].blobs.values():
if prefix is None:
yield blob
elif prefix in blob.name:
yield blob
49 changes: 49 additions & 0 deletions python_modules/libraries/dagster-gcp/dagster_gcp/gcs/io_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,52 @@ def my_job():
init_context.resource_config["gcs_prefix"],
)
return pickled_io_manager


class PickledObjectGCSAssetIOManager(PickledObjectGCSIOManager):
def _get_path(self, context):
return "/".join([self.prefix, *context.asset_key.path])


@io_manager(
config_schema={
"gcs_bucket": Field(StringSource),
"gcs_prefix": Field(StringSource, is_required=False, default_value="dagster"),
},
required_resource_keys={"gcs"},
)
def gcs_pickle_asset_io_manager(init_context):
"""Persistent IO manager using GCS for storage, meant for use with software-defined assets.
Each asset is assigned to a single filesystem path, so subsequent materializations of an asset
will overwrite previous materializations of that asset.
Serializes objects via pickling. Suitable for objects storage for distributed executors, so long
as each execution node has network connectivity and credentials for GCS and the backing bucket.
Attach this resource definition to your job to make it available to your ops.
.. code-block:: python
asset_group = AssetGroup(
assets...,
resource_defs={'io_manager': gcs_pickle_asset_io_manager, "gcs": gcs_resource, ...}),
)
You may configure this IO manager as follows:
.. code-block:: YAML
resources:
io_manager:
config:
gcs_bucket: my-cool-bucket
gcs_prefix: good/prefix-for-files-
"""
client = init_context.resources.gcs
pickled_io_manager = PickledObjectGCSAssetIOManager(
init_context.resource_config["gcs_bucket"],
client,
init_context.resource_config["gcs_prefix"],
)
return pickled_io_manager
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from dagster_gcp.gcs import FakeGCSBlob, FakeGCSBucket, FakeGCSClient


def test_fake_blob_read_write():
bucket = FakeGCSBucket("my_bucket")
blob = FakeGCSBlob("my_blob", bucket)

assert blob.exists()

my_string = "this is a unit test"
blob.upload_from_string(my_string)
assert blob.download_as_bytes() == my_string.encode()

my_bytes = b"these are some bytes"
blob.upload_from_string(my_bytes)
assert blob.download_as_bytes() == my_bytes


def test_blob_delete():
bucket = FakeGCSBucket("my_bucket")
foo = bucket.blob("foo")
bar = bucket.blob("bar")

foo.upload_from_string("foo")
bar.upload_from_string("bar")

assert "foo" in bucket.blobs.keys()
assert "bar" in bucket.blobs.keys()

foo.delete()

assert "foo" not in bucket.blobs.keys()
assert "bar" in bucket.blobs.keys()

bar.delete()

assert "bar" not in bucket.blobs.keys()


def test_bucket():
bucket = FakeGCSBucket("my_bucket")

assert bucket.exists()

foo = bucket.blob("foo")
bar = bucket.blob("bar")

assert bucket.blob("foo") == foo
assert bucket.blob("bar") == bar


def test_client_blobs():
client = FakeGCSClient()

foo = client.bucket("foo")
assert client.bucket("foo") == foo

bar = foo.blob("bar")
assert [bar] == list(client.list_blobs("foo"))

baz = foo.blob("baz/aaa")
assert [bar, baz] == list(client.list_blobs("foo"))

assert [baz] == list(client.list_blobs("foo", prefix="baz"))
assert [] == list(client.list_blobs("foo", prefix="xyz"))


def test_client_bucekts():
client = FakeGCSClient()

foo = client.bucket("foo")
bar = client.bucket("bar")

assert [foo, bar] == list(client.list_buckets())
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
from dagster_gcp.gcs.io_manager import PickledObjectGCSIOManager, gcs_pickle_io_manager
from dagster_gcp.gcs import FakeGCSClient
from dagster_gcp.gcs.io_manager import (
PickledObjectGCSIOManager,
gcs_pickle_asset_io_manager,
gcs_pickle_io_manager,
)
from dagster_gcp.gcs.resources import gcs_resource
from google.cloud import storage # type: ignore

from dagster import (
AssetGroup,
DagsterInstance,
DynamicOut,
DynamicOutput,
In,
Int,
Out,
PipelineRun,
asset,
build_input_context,
build_output_context,
job,
op,
resource,
)
from dagster.core.definitions.pipeline_base import InMemoryPipeline
from dagster.core.events import DagsterEventType
Expand All @@ -24,6 +32,11 @@
from dagster.core.utils import make_new_run_id


@resource
def mock_gcs_resource(_):
return FakeGCSClient()


def get_step_output(step_events, step_key, output_name="result"):
for step_event in step_events:
if (
Expand Down Expand Up @@ -134,11 +147,34 @@ def numbers():
def echo(_, x):
return x

@job(resource_defs={"io_manager": gcs_pickle_io_manager, "gcs": gcs_resource})
@job(resource_defs={"io_manager": gcs_pickle_io_manager, "gcs": mock_gcs_resource})
def dynamic():
numbers().map(echo) # pylint: disable=no-member

result = dynamic.execute_in_process(
run_config={"resources": {"io_manager": {"config": {"gcs_bucket": gcs_bucket}}}}
)
assert result.success


def test_asset_io_manager(gcs_bucket):
@asset
def upstream():
return 2

@asset
def downstream(upstream):
return 1 + upstream

asset_group = AssetGroup(
[upstream, downstream],
resource_defs={"io_manager": gcs_pickle_asset_io_manager, "gcs": mock_gcs_resource},
)
asset_job = asset_group.build_job(name="my_asset_job")

run_config = {
"resources": {"io_manager": {"config": {"gcs_bucket": gcs_bucket, "gcs_prefix": "assets"}}}
}

result = asset_job.execute_in_process(run_config=run_config)
assert result.success

0 comments on commit 8063405

Please sign in to comment.