Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bytes storage #961

Merged
merged 3 commits into from
Apr 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
- Flow now has optional `storage` keyword - [#936](https://github.com/PrefectHQ/prefect/pull/936)
- Flow `environment` argument now defaults to a `CloudEnvironment` - [#936](https://github.com/PrefectHQ/prefect/pull/936)
- `Queued` states accept `start_time` arguments - [#955](https://github.com/PrefectHQ/prefect/pull/955)
- Add new `Memory` storage for local testing - [#956](https://github.com/PrefectHQ/prefect/pull/956)
- Add new `Bytes` and `Memory` storage classes for local testing - [#956](https://github.com/PrefectHQ/prefect/pull/956), [#961](https://github.com/PrefectHQ/prefect/pull/961)
- Add new `LocalEnvironment` execution environment for local testing - [#957](https://github.com/PrefectHQ/prefect/pull/957)

### Task Library
Expand Down
2 changes: 1 addition & 1 deletion docs/outline.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ classes = ["CloudFlowRunner", "CloudTaskRunner", "CloudResultHandler"]
[pages.environments.storage]
title = "Storage"
module = "prefect.environments.storage"
classes = ["Storage", "Docker", "Memory"]
classes = ["Storage", "Docker", "Memory", "Bytes"]

[pages.environments.execution]
title = "Execution Environments"
Expand Down
1 change: 1 addition & 0 deletions src/prefect/environments/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@

from prefect.environments.storage.base import Storage
from prefect.environments.storage.docker import Docker
from prefect.environments.storage.bytes import Bytes
from prefect.environments.storage.memory import Memory
88 changes: 88 additions & 0 deletions src/prefect/environments/storage/bytes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import cloudpickle
from typing import Any, Dict, Iterable, List, TYPE_CHECKING, Union

import prefect
from prefect.environments.storage import Storage

if TYPE_CHECKING:
from prefect.core.flow import Flow
from prefect.engine.flow_runner import FlowRunner


class Bytes(Storage):
"""
Bytes Storage class, mainly used for testing. This class represents the Storage
interface for Flows stored directly as bytes.
"""

def __init__(self) -> None:
self.flows = dict() # type: Dict[str, bytes]
super().__init__()

def get_runner(
self, flow_location: str, return_flow: bool = True
) -> Union["Flow", "FlowRunner"]:
"""
Given a flow name, returns something capable of running the Flow.

Args:
- flow_location (str): the name of the flow
- return_flow (bool, optional): whether to return the full Flow object
or a `FlowRunner`; defaults to `True`

Returns:
- Union[Flow, FlowRunner]: the requested Flow or a FlowRunner for the requested Flow

Raises:
- ValueError: if the flow is not contained in this storage
"""
if not flow_location in self.flows:
raise ValueError("Flow is not contained in this Storage")
if return_flow:
flow_bytes = self.flows[flow_location]
return cloudpickle.loads(flow_bytes)
else:
runner_cls = prefect.engine.get_default_flow_runner_class()
flow_bytes = self.flows[flow_location]
flow = cloudpickle.loads(flow_bytes)
return runner_cls(flow=flow)

def add_flow(self, flow: "Flow") -> str:
"""
Method for adding a new flow to this Storage object.

Args:
- flow (Flow): a Prefect Flow to add

Returns:
- str: the location of the newly added flow in this Storage object

Raises:
- ValueError: if a flow with the same name is already contained in this storage
"""
if flow.name in self:
raise ValueError(
'Name conflict: Flow with the name "{}" is already present in this storage.'.format(
flow.name
)
)
self.flows[flow.name] = cloudpickle.dumps(flow)
return flow.name

def __contains__(self, obj: Any) -> bool:
"""
Method for determining whether an object is contained within this storage.
"""
if not isinstance(obj, str):
return False
return obj in self.flows

def build(self) -> "Storage":
"""
Build the Storage object.

Returns:
- Storage: a Storage object that contains information about how and where
each flow is stored
"""
return self
23 changes: 21 additions & 2 deletions src/prefect/serialization/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,33 @@
from marshmallow import fields, post_load

import prefect
from prefect.environments.storage import Memory, Docker, Storage
from prefect.utilities.serialization import ObjectSchema, OneOfSchema
from prefect.environments.storage import Bytes, Memory, Docker, Storage
from prefect.utilities.serialization import (
ObjectSchema,
OneOfSchema,
Bytes as BytesField,
)


class BaseStorageSchema(ObjectSchema):
class Meta:
object_class = Storage


class BytesSchema(ObjectSchema):
class Meta:
object_class = Bytes

flows = fields.Dict(key=fields.Str(), values=BytesField())

@post_load
def create_object(self, data: dict) -> Docker:
flows = data.pop("flows", dict())
base_obj = super().create_object(data)
base_obj.flows = flows
return base_obj


class DockerSchema(ObjectSchema):
class Meta:
object_class = Docker
Expand Down Expand Up @@ -40,6 +58,7 @@ class StorageSchema(OneOfSchema):

# map class name to schema
type_schemas = {
"Bytes": BytesSchema,
"Docker": DockerSchema,
"Memory": MemorySchema,
"Storage": BaseStorageSchema,
Expand Down
109 changes: 109 additions & 0 deletions tests/environments/storage/test_bytes_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import cloudpickle
import pytest

import prefect
from prefect import Flow
from prefect.engine.cloud import CloudFlowRunner
from prefect.engine.flow_runner import FlowRunner
from prefect.environments.storage import Bytes
from prefect.utilities.configuration import set_temporary_config


def test_create_bytes_storage():
storage = Bytes()
assert storage


def test_add_flow_to_storage():
storage = Bytes()
f = Flow("test")
assert f.name not in storage
res = storage.add_flow(f)
assert res == "test"
assert f.name in storage


def test_add_flow_raises_if_name_conflict():
storage = Bytes()
f = Flow("test")
res = storage.add_flow(f)
g = Flow("test")
with pytest.raises(ValueError) as exc:
storage.add_flow(g)
assert 'name "test"' in str(exc.value)


def test_get_env_runner_raises():
s = Bytes()
with pytest.raises(NotImplementedError):
s.get_env_runner("")


def test_get_runner_raises_if_flow_not_present():
s = Bytes()
with pytest.raises(ValueError):
s.get_runner("test")


def test_get_runner_returns_flow_or_flow_runner():
s = Bytes()
f = Flow("test")
s.add_flow(f)
runner = s.get_runner("test")
assert runner == f

with set_temporary_config({"engine.flow_runner.default_class": FlowRunner}):
runner = s.get_runner("test", return_flow=False)
assert isinstance(runner, FlowRunner)
assert runner.flow == f


def test_get_runner_returns_flow_or_flow_runner_responds_to_config():
s = Bytes()
f = Flow("test")
s.add_flow(f)

with set_temporary_config({"engine.flow_runner.default_class": CloudFlowRunner}):
runner = s.get_runner("test", return_flow=False)
assert isinstance(runner, CloudFlowRunner)
assert runner.flow == f


def test_containment():
s = Bytes()
f = Flow("test")
s.add_flow(f)

assert True not in s
assert f not in s
assert "test" in s
assert Flow("other") not in s
assert "other" not in s


def test_build_returns_self():
s = Bytes()
assert s.build() is s

f = Flow("test")
s.add_flow(f)
assert s.build() is s


def test_multiple_flows_in_storage():
s = Bytes()
f = Flow("test")
g = Flow("other")
z = Flow("not")
s.add_flow(f)
s.add_flow(g)

assert "test" in s
assert "other" in s
assert "not" not in s

assert s.get_runner("test") == f
assert s.get_runner("other") == g

assert isinstance(s.flows["test"], bytes)
assert isinstance(s.flows["other"], bytes)
27 changes: 26 additions & 1 deletion tests/serialization/test_storage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import prefect
from prefect.environments import storage
from prefect.serialization.storage import BaseStorageSchema, DockerSchema, MemorySchema
from prefect.serialization.storage import (
BaseStorageSchema,
DockerSchema,
MemorySchema,
BytesSchema,
)


def test_docker_empty_serialize():
Expand Down Expand Up @@ -58,3 +63,23 @@ def test_docker_serialize_with_flows():

deserialized = DockerSchema().load(serialized)
assert f.name in deserialized


def test_bytes_empty_serialize():
b = storage.Bytes()
serialized = BytesSchema().dump(b)

assert serialized
assert serialized["__version__"] == prefect.__version__
assert serialized["flows"] == dict()


def test_bytes_roundtrip():
s = storage.Bytes()
s.add_flow(prefect.Flow("test"))
serialized = BytesSchema().dump(s)
deserialized = BytesSchema().load(serialized)

assert "test" in deserialized
runner = deserialized.get_runner("test")
assert runner.run().is_successful()