Skip to content

Commit

Permalink
Merge pull request #318 from PrefectHQ/serialization
Browse files Browse the repository at this point in the history
Serialization Part II
  • Loading branch information
jlowin committed Nov 4, 2018
2 parents 48bddf0 + c390c22 commit 1e1f6f1
Show file tree
Hide file tree
Showing 26 changed files with 662 additions and 444 deletions.
13 changes: 4 additions & 9 deletions src/prefect/core/edge.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Set

from prefect.core.task import Task
from prefect.serialization.edge import EdgeSchema


def is_valid_identifier(string: str) -> bool:
Expand Down Expand Up @@ -116,7 +117,8 @@ def __repr__(self) -> str:

def __eq__(self, other: "Edge") -> bool: # type: ignore
if type(self) == type(other):
return self.serialize() == other.serialize()
attrs = ["upstream_task", "downstream_task", "key", "mapped"]
return all(getattr(self, a) == getattr(other, a) for a in attrs)
return False

def __hash__(self) -> int:
Expand All @@ -125,12 +127,5 @@ def __hash__(self) -> int:
def serialize(self) -> dict:
"""
Represents the Edge as a dict.
Can be reversed by calling Edge(**edge.serialize())
"""
return dict(
upstream_task=self.upstream_task,
downstream_task=self.downstream_task,
key=self.key,
mapped=self.mapped,
)
return EdgeSchema().dump(self)
40 changes: 11 additions & 29 deletions src/prefect/core/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,12 @@ def __init__(

self.set_reference_tasks(reference_tasks or [])
for e in edges or []:
self.add_edge(**e.serialize())
self.add_edge(
upstream_task=e.upstream_task,
downstream_task=e.downstream_task,
key=e.key,
mapped=e.mapped,
)

self._prefect_version = prefect.__version__

Expand Down Expand Up @@ -994,37 +999,14 @@ def serialize(self, build: bool = False) -> dict:
else:
environment_key = None

tasks = []
for t in self.tasks:
task_info = t.serialize()
task_info.update(self.task_info[t])
tasks.append(task_info)
serialized = prefect.serialization.flow.FlowSchema().dump(self)

edges = []
for e in self.edges:
edge_info = e.serialize()
upstream_task = edge_info.pop("upstream_task")
edge_info["upstream_task_id"] = self.task_info[upstream_task]["id"]
downstream_task = edge_info.pop("downstream_task")
edge_info["downstream_task_id"] = self.task_info[downstream_task]["id"]
edges.append(edge_info)

return dict(
id=self.id,
name=self.name,
version=self.version,
project=self.project,
description=self.description,
environment=self.environment,
environment_key=environment_key,
parameters=self.parameters(),
schedule=self.schedule,
tasks=tasks,
edges=edges,
reference_tasks=[self.task_info[t]["id"] for t in self.reference_tasks()],
throttle=self.throttle,
serialized.update(
environment=dumps(self.environment), environment_key=environment_key
)

return serialized

def register(self, registry: dict = None) -> None:
"""
Register the flow.
Expand Down
41 changes: 25 additions & 16 deletions src/prefect/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import prefect.triggers
from prefect.utilities.json import Serializable, to_qualified_name
from prefect.utilities import logging
from prefect.serialization.task import TaskSchema, ParameterSchema

if TYPE_CHECKING:
from prefect.core.flow import Flow # pylint: disable=W0611
Expand Down Expand Up @@ -438,20 +439,7 @@ def serialize(self) -> Dict[str, Any]:
Returns:
- dict representing this task
"""
return dict(
name=self.name,
slug=self.slug,
description=self.description,
tags=self.tags,
type=to_qualified_name(type(self)),
max_retries=self.max_retries,
retry_delay=self.retry_delay,
timeout=self.timeout,
trigger=self.trigger,
skip_on_upstream_skip=self.skip_on_upstream_skip,
cache_for=self.cache_for,
cache_validator=self.cache_validator,
)
return TaskSchema().dump(self)

# Operators ----------------------------------------------------------------

Expand Down Expand Up @@ -818,16 +806,26 @@ class Parameter(Task):
value is ignored.
- default (any, optional): A default value for the parameter. If the default
is not None, the Parameter will not be required.
- description (str, optional): Descriptive information about this parameter
- tags ([str], optional): A list of tags for this parameter
"""

def __init__(self, name: str, default: Any = None, required: bool = True) -> None:
def __init__(
self,
name: str,
default: Any = None,
required: bool = True,
description: str = None,
tags: Iterable[str] = None,
) -> None:
if default is not None:
required = False

self.required = required
self.default = default

super().__init__(name=name, slug=name)
super().__init__(name=name, slug=name, description=description, tags=tags)

def __repr__(self) -> str:
return "<Parameter: {self.name}>".format(self=self)
Expand Down Expand Up @@ -869,3 +867,14 @@ def info(self) -> Dict[str, Any]:
info = super().info() # type: ignore
info.update(required=self.required, default=self.default)
return info

# Serialization ------------------------------------------------------------

def serialize(self) -> Dict[str, Any]:
"""
Creates a serialized representation of this parameter
Returns:
- dict representing this parameter
"""
return ParameterSchema().dump(self)
2 changes: 1 addition & 1 deletion src/prefect/engine/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def serialize(self) -> dict:
"""
Serializes the state to a dict.
"""
from prefect.serialization.schemas.state import StateSchema
from prefect.serialization.state import StateSchema

return StateSchema().dump(self)

Expand Down
2 changes: 1 addition & 1 deletion src/prefect/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def next(self, n: int, on_or_after: datetime = None) -> List[datetime]:
raise NotImplementedError("Must be implemented on Schedule subclasses")

def serialize(self):
from prefect.serialization.schemas.schedule import ScheduleSchema
from prefect.serialization.schedule import ScheduleSchema

return ScheduleSchema().dump(self)

Expand Down
6 changes: 5 additions & 1 deletion src/prefect/serialization/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
import prefect.serialization.schemas
import prefect.serialization.schedule
import prefect.serialization.task
import prefect.serialization.edge
import prefect.serialization.flow
import prefect.serialization.state
15 changes: 15 additions & 0 deletions src/prefect/serialization/edge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from marshmallow import fields
import prefect
from prefect.serialization.versioned_schema import VersionedSchema, version
from prefect.serialization.task import TaskSchema


@version("0.3.3")
class EdgeSchema(VersionedSchema):
class Meta:
object_class = lambda: prefect.core.Edge

upstream_task = fields.Nested(TaskSchema, only=["id"])
downstream_task = fields.Nested(TaskSchema, only=["id"])
key = fields.String(allow_none=True)
mapped = fields.Boolean(allow_none=True)
86 changes: 86 additions & 0 deletions src/prefect/serialization/flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from marshmallow import fields, post_load, pre_dump

import prefect
from prefect.serialization.edge import EdgeSchema
from prefect.serialization.schedule import ScheduleSchema
from prefect.serialization.task import ParameterSchema, TaskSchema
from prefect.serialization.versioned_schema import (
VersionedSchema,
version,
to_qualified_name,
)
from prefect.utilities.serialization import JSONField, NestedField


@version("0.3.3")
class FlowSchema(VersionedSchema):
class Meta:
object_class = lambda: prefect.core.Flow
object_class_exclude = [
"id",
"type",
"parameters",
"environment_key",
"environment",
]
# ordered to make sure Task objects are loaded before Edge objects, due to Task caching
ordered = True

id = fields.String()
project = fields.String(allow_none=True)
name = fields.String(allow_none=True)
version = fields.String(allow_none=True)
description = fields.String(allow_none=True)
type = fields.Function(lambda flow: to_qualified_name(type(flow)), lambda x: x)
schedule = fields.Nested(ScheduleSchema)
environment = JSONField(allow_none=True)
environment_key = fields.String(allow_none=True)
parameters = NestedField(
ParameterSchema,
dump_fn=lambda obj, context: {
p
for p in getattr(obj, "tasks", [])
if isinstance(p, prefect.core.task.Parameter)
},
many=True,
)
tasks = fields.Nested(TaskSchema, many=True)
edges = fields.Nested(EdgeSchema, many=True)
reference_tasks = NestedField(
TaskSchema,
many=True,
dump_fn=lambda obj, context: getattr(obj, "_reference_tasks", []),
only=["id"],
)

@pre_dump
def put_task_ids_in_context(self, flow: "prefect.core.Flow") -> "prefect.core.Flow":
"""
Adds task ids to context so they may be used by nested TaskSchemas and EdgeSchemas.
If the serialized object is not a Flow (like a dict), this step is skipped.
"""
if isinstance(flow, prefect.core.Flow):
self.context["task_ids"] = {t: i["id"] for t, i in flow.task_info.items()}
return flow

@post_load
def create_object(self, data):
"""
Flow edges are validated, for example to make sure the keys match Task inputs,
but because we are deserializing all Tasks as base Tasks, the edge validation will
fail (base Tasks have no inputs). Therefore we hold back the edges from Flow
initialization and assign them explicitly.
Args:
- data (dict): the deserialized data
Returns:
- Flow
"""
edges = set(data.pop("edges", []))
flow = super().create_object(data)
flow.edges = edges
flow._id = data.get("id", None)
return flow
File renamed without changes.
3 changes: 0 additions & 3 deletions src/prefect/serialization/schemas/__init__.py

This file was deleted.

File renamed without changes.
Loading

0 comments on commit 1e1f6f1

Please sign in to comment.