Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance nested / ordered constant treatment #1829

Merged
merged 10 commits into from Dec 13, 2019
Merged
2 changes: 1 addition & 1 deletion CHANGELOG.md
Expand Up @@ -18,7 +18,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
- Add `prefect agent install` option to output `supervisord.conf` file for Local Agent - [#1819](https://github.com/PrefectHQ/prefect/pull/1819)
- Add convenience `parents()` and `children()` classmethods to all State objects for navigating the hierarchy - [#1784](https://github.com/PrefectHQ/prefect/pull/1784)
- Add new `not_all_skipped` trigger and set it as the default for merge tasks - [#1768](https://github.com/PrefectHQ/prefect/issues/1768)

- Enhanced treatment of nested and ordered constant values - [#1829](https://github.com/PrefectHQ/prefect/pull/1829)

### Task Library

Expand Down
8 changes: 5 additions & 3 deletions docs/core/concepts/tasks.md
Expand Up @@ -76,7 +76,7 @@ trigger_fn(upstream_states: Set[State]) -> bool

## Constants

If a non-`Task` input is provided to a task, it is automatically converted to a `ConstantTask`.
If a non-`Task` input is provided to a task, it is automatically converted to a `Constant`.

```python
from prefect import Flow, task
Expand All @@ -88,10 +88,12 @@ def add(x, y):
with Flow('Flow With Constant') as flow:
add(1, 2)

assert len(flow) == 3
assert len(flow.tasks) == 1
assert len(flow.constants) == 2
```

This flow has three tasks, even though the user might think they only created one: the `add` task and two `Constants` representing the inputs `1` and `2`.
Prefect will attempt to automatically turn Python objects into `Constants`, including collections (like `lists`, `tuples`, `sets`, and `dicts`). If the resulting constant is used directly as the input to a task, it is optimized out of the task graph and stored in the `flow.constants` dict. However, if the constant is mapped over, then it remains in the dependency graph.


## Operators

Expand Down
15 changes: 9 additions & 6 deletions src/prefect/core/flow.py
@@ -1,4 +1,3 @@
import cloudpickle
import collections
import copy
import functools
Expand All @@ -12,7 +11,6 @@
import warnings
from collections import Counter
from pathlib import Path
from slugify import slugify
from typing import (
Any,
Callable,
Expand All @@ -27,8 +25,10 @@
cast,
)

import cloudpickle
import pendulum
from mypy_extensions import TypedDict
from slugify import slugify

import prefect
import prefect.schedules
Expand Down Expand Up @@ -813,17 +813,20 @@ def set_dependencies(
# add data edges to upstream tasks
for key, t in (keyword_tasks or {}).items():
is_mapped = mapped & (not isinstance(t, unmapped))
t = as_task(t, flow=self, convert_constants=False)
if isinstance(t, Task):
t = as_task(t, flow=self)

# if the task can be represented as a constant and we don't need to map over it
# then we can optimize it out of the graph and into the special `constants` dict
if isinstance(t, prefect.tasks.core.constants.Constant) and not is_mapped:
self.constants[task].update({key: t.value})
else:
self.add_edge(
upstream_task=t,
downstream_task=task,
key=key,
validate=validate,
mapped=is_mapped,
)
else:
self.constants[task].update({key: t})

# Execution ---------------------------------------------------------------

Expand Down
34 changes: 3 additions & 31 deletions src/prefect/tasks/core/constants.py
@@ -1,38 +1,10 @@
"""
The tasks in this module can be used to represent constant values.

In general, users will not instantiate these tasks by hand; they will automatically be
applied when users create dependencies between a task and a constant value.
In general, users will not instantiate these tasks by hand; they will be automatically created
whenever the Prefect engine detects that a constant value is required. In many cases, Prefect
will not add `Constant` tasks to the graph; optimizing them as flow-level attributes instead.

Constant tasks are most commonly used to prevent Prefect from creating a large number
of auto-generated tasks in Python collections. For example,

```python
from prefect import task, Flow
from prefect.tasks.core.constants import Constant

@task
def do_nothing(values):
return values

with Flow("Many Small Tasks") as large_flow:
result = do_nothing({"x": 1, "z": 99})

large_flow.tasks
# {<Task: 'x'>,
# <Task: 'z'>,
# <Task: 1>,
# <Task: 99>,
# <Task: Dict>,
# <Task: List>,
# <Task: List>,
# <Task: do_nothing>}

with Flow("Two Tasks") as small_flow:
result = do_nothing(Constant({"x": 1, "z": 99}))

small_flow.tasks
# {<Task: Constant[dict]>, <Task: do_nothing>}
```
"""

Expand Down
32 changes: 22 additions & 10 deletions src/prefect/utilities/tasks.py
Expand Up @@ -45,28 +45,43 @@ def add(x, y):
yield


def as_task(
x: Any, flow: Optional["Flow"] = None, convert_constants: bool = True
) -> "prefect.Task":
def as_task(x: Any, flow: Optional["Flow"] = None) -> "prefect.Task":
"""
Wraps a function, collection, or constant with the appropriate Task type.
Wraps a function, collection, or constant with the appropriate Task type. If a constant
or collection of constants is passed, a `Constant` task is returned.

Args:
- x (object): any Python object to convert to a prefect Task
- flow (Flow, optional): Flow to which the prefect Task will be bound
- convert_constants (bool, optional): a boolean specifying whether all passed Python
objects should be converted; if `False`, only collection types will be handled.
Defaults to `True`.

Returns:
- a prefect Task representing the passed object
"""
from prefect.tasks.core.constants import Constant

def is_constant(x: Any) -> bool:
"""
Helper function for determining if nested collections are constants without calling
`bind()`, which would create new tasks on the active graph.
"""
if isinstance(x, (prefect.core.Task, unmapped)):
return False
elif isinstance(x, (list, tuple, set)):
return all(is_constant(xi) for xi in x)
elif isinstance(x, dict):
return all(is_constant(xi) for xi in x.values())
return True

# task objects
if isinstance(x, prefect.core.Task): # type: ignore
return x
elif isinstance(x, unmapped):
return x.task

# handle constants, including collections of constants
elif is_constant(x):
return_task = Constant(x) # type: prefect.core.Task

# collections
elif isinstance(x, list):
return_task = prefect.tasks.core.collections.List().bind(*x, flow=flow)
Expand All @@ -83,9 +98,6 @@ def as_task(
keys=keys, values=values, flow=flow
)

# constants
elif convert_constants:
return_task = prefect.tasks.core.constants.Constant(value=x)
else:
return x

Expand Down
45 changes: 19 additions & 26 deletions tests/core/test_flow.py
Expand Up @@ -225,6 +225,20 @@ def run(self, x):
assert f.constants[t1] == dict(x=4)


@pytest.mark.parametrize(
"val", [[[[3]]], [1, 2, (3, [4])], [([1, 2, 3],)], {"a": 1, "b": [2]}]
)
def test_set_dependencies_with_nested_ordered_constants_creates_a_single_constant(val):
class ReturnTask(Task):
def run(self, x):
return x

with Flow("test") as f:
task = ReturnTask()(x=val)
assert f.run().result[task].result == val
assert f.constants[task] == dict(x=val)


def test_set_dependencies_creates_mapped_edges():
t1 = Task()
t2 = Task()
Expand Down Expand Up @@ -1052,9 +1066,7 @@ def test_viz_reflects_mapping(self):
assert 'label="a_nice_task <map>" shape=box' in graph.source
assert "label=a_list_task shape=ellipse" in graph.source
assert "label=x style=dashed" in graph.source
assert (
"label=y style=dashed" not in graph.source
) # constants are no longer represented
assert "label=y style=dashed" in graph.source

def test_viz_can_handle_skipped_mapped_tasks(self):
ipython = MagicMock(
Expand All @@ -1071,9 +1083,7 @@ def test_viz_can_handle_skipped_mapped_tasks(self):
assert 'label="a_nice_task <map>" color="#62757f80"' in graph.source
assert 'label=a_list_task color="#28a74580"' in graph.source
assert "label=x style=dashed" in graph.source
assert (
"label=y style=dashed" not in graph.source
) # constants are no longer represented
assert "label=y style=dashed" in graph.source

@pytest.mark.parametrize("state", [Success(), Failed(), Skipped()])
def test_viz_if_flow_state_provided(self, state):
Expand Down Expand Up @@ -1101,7 +1111,7 @@ def test_viz_reflects_mapping_if_flow_state_provided(self):
map_state = Mapped(map_states=[Success(), Failed()])
with patch.dict("sys.modules", IPython=ipython):
with Flow(name="test") as f:
res = add.map(x=list_task, y=prefect.tasks.core.constants.Constant(8))
res = add.map(x=list_task, y=8)
graph = f.visualize(
flow_state=Success(result={res: map_state, list_task: Success()})
)
Expand All @@ -1121,12 +1131,9 @@ def test_viz_reflects_mapping_if_flow_state_provided(self):
'label=a_list_task color="{success}80"'.format(success=Success.color)
in graph.source
)
assert 'label=8 color="#00000080"' in graph.source

# two edges for each input to add()
for var in ["x", "y"]:
for index in [0, 1]:
assert "{0} [label={1} style=dashed]".format(index, var) in graph.source
for index in [0, 1]:
assert "{0} [label=x style=dashed]".format(index) in graph.source

def test_viz_reflects_multiple_mapping_if_flow_state_provided(self):
ipython = MagicMock(
Expand Down Expand Up @@ -2507,20 +2514,6 @@ def test_load_accepts_name_and_sluggified_name(self):
assert new_obj_from_slug.name == "I aM a-test!"


def test_auto_generation_of_collection_tasks_is_robust():
@task
def do_nothing(arg):
pass

with Flow("constants") as flow:
do_nothing({"x": 1, "y": [9, 10]})

assert len(flow.tasks) == 5

flow_state = flow.run()
assert flow_state.is_successful()


@pytest.mark.skipif(
sys.platform == "win32", reason="Windows doesn't support any timeout logic"
)
Expand Down
3 changes: 2 additions & 1 deletion tests/core/test_task_map.py
Expand Up @@ -920,6 +920,7 @@ def add_one(x):
with Flow("constants") as f:
output = add_one.map(x=[1, 2, 3, 4])

flow_state = f.run()
with raise_on_exception():
flow_state = f.run()
assert flow_state.is_successful()
assert flow_state.result[output].result == [2, 3, 4, 5]
4 changes: 2 additions & 2 deletions tests/engine/test_flow_runner.py
Expand Up @@ -1611,8 +1611,8 @@ def add(x):
assert flow_state.is_successful()
assert flow_state.result[output].result == [100] * 10

## only add and the List task were submitted
assert len(calls) == 2
## only add task was submitted; the list task is a constant
assert len(calls) == 1

## to be safe, ensure '5' isn't in the logs
assert len([log.message for log in caplog.records if "99" in log.message]) == 0
8 changes: 4 additions & 4 deletions tests/tasks/test_core.py
Expand Up @@ -202,7 +202,7 @@ def test_dict_automatically_applied_to_callargs(self):
identity.bind(x=dict(a=x, b=y))
state = f.run(parameters=dict(x=1, y=2))

assert len(f.tasks) == 6 # 2 params, identity, Dict, 2 Lists for Dict
assert len(f.tasks) == 5 # 2 params, identity, Dict, List of dict values
assert sum(isinstance(t, collections.Dict) for t in f.tasks) == 1
assert state.result[identity].result == dict(a=1, b=2)

Expand All @@ -215,7 +215,7 @@ def test_dict_automatically_applied_to_callargs_imperative(self):
identity.bind(x=dict(a=x, b=y), flow=f)
state = f.run(parameters=dict(x=1, y=2))

assert len(f.tasks) == 6 # 2 params, identity, Dict, 2 Lists for Dict
assert len(f.tasks) == 5 # 2 params, identity, Dict, List of dict values
assert sum(isinstance(t, collections.Dict) for t in f.tasks) == 1
assert state.result[identity].result == dict(a=1, b=2)

Expand All @@ -227,7 +227,7 @@ def test_nested_collection_automatically_applied_to_callargs(self):
identity.bind(x=dict(a=[x, dict(y=y)], b=(y, set([x]))))
state = f.run(parameters=dict(x=1, y=2))

assert len(f.tasks) == 12
assert len(f.tasks) == 10
assert state.result[identity].result == dict(a=[1, dict(y=2)], b=(2, set([1])))

def test_nested_collection_automatically_applied_to_callargs_imperative(self):
Expand All @@ -239,5 +239,5 @@ def test_nested_collection_automatically_applied_to_callargs_imperative(self):
identity.bind(x=dict(a=[x, dict(y=y)], b=(y, set([x]))), flow=f)
state = f.run(parameters=dict(x=1, y=2))

assert len(f.tasks) == 12
assert len(f.tasks) == 10
assert state.result[identity].result == dict(a=[1, dict(y=2)], b=(2, set([1])))
46 changes: 42 additions & 4 deletions tests/utilities/test_tasks.py
Expand Up @@ -7,6 +7,7 @@
from prefect.engine.signals import PAUSE
from prefect.engine.state import Paused, Resume
from prefect.utilities import tasks
from prefect.tasks.core.constants import Constant


class TestTaskDecorator:
Expand Down Expand Up @@ -122,19 +123,56 @@ def return_val(x):
def test_as_task_toggles_constants(self):
with Flow("test") as f:
t = tasks.as_task(4)
s = tasks.as_task(5, convert_constants=False)

assert isinstance(t, Task)
assert t.name == "4"

assert not isinstance(s, Task)
assert s == 5

def test_as_task_doesnt_label_tasks_as_auto_generated(self):
t = Task()
assert t.auto_generated is False
assert tasks.as_task(t).auto_generated is False

@pytest.mark.parametrize(
"val", [[[[]]], [[[3]]], [1, 2, (3, [4])], [([1, 2, 3],)], {"a": 1, "b": [2]}]
)
def test_nested_collections_of_constants_are_constants(self, val):
task = tasks.as_task(val)
assert isinstance(task, Constant)
assert task.value == val

@pytest.mark.parametrize(
"val",
[
[[[3, Task()]]],
[1, Task(), (3, [4])],
[([1, 2, Task()],)],
{"a": Task(), "b": [2]},
],
)
def test_nested_collections_of_mixed_constants_are_not_constants(self, val):
with Flow("test") as f:
task = tasks.as_task(val)
assert not isinstance(task, Constant)

@pytest.mark.parametrize(
"val", [[[[]]], [[[3]]], [1, 2, (3, [4])], [([1, 2, 3],)], {"a": 1, "b": [2]}]
)
def test_nested_collections(self, val):
with Flow("test") as f:
task = tasks.as_task(val)
f.add_task(task)
assert f.run().result[task].result == val

def test_ordered_collections(self):
"""
Tests that ordered collections maintain order
"""
val = [[list(range(100))]]
with Flow("test") as f:
task = tasks.as_task(val)
f.add_task(task)
assert f.run().result[task].result == val


def test_tag_contextmanager_works_with_task_decorator():
@tasks.task
Expand Down