Skip to content

Commit

Permalink
Merge pull request #1829 from PrefectHQ/constants
Browse files Browse the repository at this point in the history
Enhance nested / ordered constant treatment
  • Loading branch information
cicdw committed Dec 13, 2019
2 parents 20dbbd6 + 78f4110 commit c9c13ec
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 88 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Expand Up @@ -10,6 +10,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/

### Enhancements

- Enhanced treatment of nested and ordered constant values - [#1829](https://github.com/PrefectHQ/prefect/pull/1829)
- Add `on_datetime`, `on_date`, and `at_time` filters - [#1837](https://github.com/PrefectHQ/prefect/pull/1837)
- Add `--latest` flag for Kubernetes Agent install CLI command - [#1842](https://github.com/PrefectHQ/prefect/pull/1842)

Expand Down Expand Up @@ -54,7 +55,6 @@ Released on Dec 11, 2019.
- Add convenience `parents()` and `children()` classmethods to all State objects for navigating the hierarchy - [#1784](https://github.com/PrefectHQ/prefect/pull/1784)
- Add new `not_all_skipped` trigger and set it as the default for merge tasks - [#1768](https://github.com/PrefectHQ/prefect/issues/1768)


### Task Library

- Azure Blob tasks now use newer `BlockBlobService` with connection string authentication - [#1831](https://github.com/PrefectHQ/prefect/pull/1831)
Expand Down
8 changes: 5 additions & 3 deletions docs/core/concepts/tasks.md
Expand Up @@ -76,7 +76,7 @@ trigger_fn(upstream_states: Set[State]) -> bool

## Constants

If a non-`Task` input is provided to a task, it is automatically converted to a `ConstantTask`.
If a non-`Task` input is provided to a task, it is automatically converted to a `Constant`.

```python
from prefect import Flow, task
Expand All @@ -88,10 +88,12 @@ def add(x, y):
with Flow('Flow With Constant') as flow:
add(1, 2)

assert len(flow) == 3
assert len(flow.tasks) == 1
assert len(flow.constants) == 2
```

This flow has three tasks, even though the user might think they only created one: the `add` task and two `Constants` representing the inputs `1` and `2`.
Prefect will attempt to automatically turn Python objects into `Constants`, including collections (like `lists`, `tuples`, `sets`, and `dicts`). If the resulting constant is used directly as the input to a task, it is optimized out of the task graph and stored in the `flow.constants` dict. However, if the constant is mapped over, then it remains in the dependency graph.


## Operators

Expand Down
15 changes: 9 additions & 6 deletions src/prefect/core/flow.py
@@ -1,4 +1,3 @@
import cloudpickle
import collections
import copy
import functools
Expand All @@ -12,7 +11,6 @@
import warnings
from collections import Counter
from pathlib import Path
from slugify import slugify
from typing import (
Any,
Callable,
Expand All @@ -27,8 +25,10 @@
cast,
)

import cloudpickle
import pendulum
from mypy_extensions import TypedDict
from slugify import slugify

import prefect
import prefect.schedules
Expand Down Expand Up @@ -813,17 +813,20 @@ def set_dependencies(
# add data edges to upstream tasks
for key, t in (keyword_tasks or {}).items():
is_mapped = mapped & (not isinstance(t, unmapped))
t = as_task(t, flow=self, convert_constants=False)
if isinstance(t, Task):
t = as_task(t, flow=self)

# if the task can be represented as a constant and we don't need to map over it
# then we can optimize it out of the graph and into the special `constants` dict
if isinstance(t, prefect.tasks.core.constants.Constant) and not is_mapped:
self.constants[task].update({key: t.value})
else:
self.add_edge(
upstream_task=t,
downstream_task=task,
key=key,
validate=validate,
mapped=is_mapped,
)
else:
self.constants[task].update({key: t})

# Execution ---------------------------------------------------------------

Expand Down
34 changes: 3 additions & 31 deletions src/prefect/tasks/core/constants.py
@@ -1,38 +1,10 @@
"""
The tasks in this module can be used to represent constant values.
In general, users will not instantiate these tasks by hand; they will automatically be
applied when users create dependencies between a task and a constant value.
In general, users will not instantiate these tasks by hand; they will be automatically created
whenever the Prefect engine detects that a constant value is required. In many cases, Prefect
will not add `Constant` tasks to the graph; optimizing them as flow-level attributes instead.
Constant tasks are most commonly used to prevent Prefect from creating a large number
of auto-generated tasks in Python collections. For example,
```python
from prefect import task, Flow
from prefect.tasks.core.constants import Constant
@task
def do_nothing(values):
return values
with Flow("Many Small Tasks") as large_flow:
result = do_nothing({"x": 1, "z": 99})
large_flow.tasks
# {<Task: 'x'>,
# <Task: 'z'>,
# <Task: 1>,
# <Task: 99>,
# <Task: Dict>,
# <Task: List>,
# <Task: List>,
# <Task: do_nothing>}
with Flow("Two Tasks") as small_flow:
result = do_nothing(Constant({"x": 1, "z": 99}))
small_flow.tasks
# {<Task: Constant[dict]>, <Task: do_nothing>}
```
"""

Expand Down
32 changes: 22 additions & 10 deletions src/prefect/utilities/tasks.py
Expand Up @@ -45,28 +45,43 @@ def add(x, y):
yield


def as_task(
x: Any, flow: Optional["Flow"] = None, convert_constants: bool = True
) -> "prefect.Task":
def as_task(x: Any, flow: Optional["Flow"] = None) -> "prefect.Task":
"""
Wraps a function, collection, or constant with the appropriate Task type.
Wraps a function, collection, or constant with the appropriate Task type. If a constant
or collection of constants is passed, a `Constant` task is returned.
Args:
- x (object): any Python object to convert to a prefect Task
- flow (Flow, optional): Flow to which the prefect Task will be bound
- convert_constants (bool, optional): a boolean specifying whether all passed Python
objects should be converted; if `False`, only collection types will be handled.
Defaults to `True`.
Returns:
- a prefect Task representing the passed object
"""
from prefect.tasks.core.constants import Constant

def is_constant(x: Any) -> bool:
"""
Helper function for determining if nested collections are constants without calling
`bind()`, which would create new tasks on the active graph.
"""
if isinstance(x, (prefect.core.Task, unmapped)):
return False
elif isinstance(x, (list, tuple, set)):
return all(is_constant(xi) for xi in x)
elif isinstance(x, dict):
return all(is_constant(xi) for xi in x.values())
return True

# task objects
if isinstance(x, prefect.core.Task): # type: ignore
return x
elif isinstance(x, unmapped):
return x.task

# handle constants, including collections of constants
elif is_constant(x):
return_task = Constant(x) # type: prefect.core.Task

# collections
elif isinstance(x, list):
return_task = prefect.tasks.core.collections.List().bind(*x, flow=flow)
Expand All @@ -83,9 +98,6 @@ def as_task(
keys=keys, values=values, flow=flow
)

# constants
elif convert_constants:
return_task = prefect.tasks.core.constants.Constant(value=x)
else:
return x

Expand Down
45 changes: 19 additions & 26 deletions tests/core/test_flow.py
Expand Up @@ -225,6 +225,20 @@ def run(self, x):
assert f.constants[t1] == dict(x=4)


@pytest.mark.parametrize(
"val", [[[[3]]], [1, 2, (3, [4])], [([1, 2, 3],)], {"a": 1, "b": [2]}]
)
def test_set_dependencies_with_nested_ordered_constants_creates_a_single_constant(val):
class ReturnTask(Task):
def run(self, x):
return x

with Flow("test") as f:
task = ReturnTask()(x=val)
assert f.run().result[task].result == val
assert f.constants[task] == dict(x=val)


def test_set_dependencies_creates_mapped_edges():
t1 = Task()
t2 = Task()
Expand Down Expand Up @@ -1052,9 +1066,7 @@ def test_viz_reflects_mapping(self):
assert 'label="a_nice_task <map>" shape=box' in graph.source
assert "label=a_list_task shape=ellipse" in graph.source
assert "label=x style=dashed" in graph.source
assert (
"label=y style=dashed" not in graph.source
) # constants are no longer represented
assert "label=y style=dashed" in graph.source

def test_viz_can_handle_skipped_mapped_tasks(self):
ipython = MagicMock(
Expand All @@ -1071,9 +1083,7 @@ def test_viz_can_handle_skipped_mapped_tasks(self):
assert 'label="a_nice_task <map>" color="#62757f80"' in graph.source
assert 'label=a_list_task color="#28a74580"' in graph.source
assert "label=x style=dashed" in graph.source
assert (
"label=y style=dashed" not in graph.source
) # constants are no longer represented
assert "label=y style=dashed" in graph.source

@pytest.mark.parametrize("state", [Success(), Failed(), Skipped()])
def test_viz_if_flow_state_provided(self, state):
Expand Down Expand Up @@ -1101,7 +1111,7 @@ def test_viz_reflects_mapping_if_flow_state_provided(self):
map_state = Mapped(map_states=[Success(), Failed()])
with patch.dict("sys.modules", IPython=ipython):
with Flow(name="test") as f:
res = add.map(x=list_task, y=prefect.tasks.core.constants.Constant(8))
res = add.map(x=list_task, y=8)
graph = f.visualize(
flow_state=Success(result={res: map_state, list_task: Success()})
)
Expand All @@ -1121,12 +1131,9 @@ def test_viz_reflects_mapping_if_flow_state_provided(self):
'label=a_list_task color="{success}80"'.format(success=Success.color)
in graph.source
)
assert 'label=8 color="#00000080"' in graph.source

# two edges for each input to add()
for var in ["x", "y"]:
for index in [0, 1]:
assert "{0} [label={1} style=dashed]".format(index, var) in graph.source
for index in [0, 1]:
assert "{0} [label=x style=dashed]".format(index) in graph.source

def test_viz_reflects_multiple_mapping_if_flow_state_provided(self):
ipython = MagicMock(
Expand Down Expand Up @@ -2507,20 +2514,6 @@ def test_load_accepts_name_and_sluggified_name(self):
assert new_obj_from_slug.name == "I aM a-test!"


def test_auto_generation_of_collection_tasks_is_robust():
@task
def do_nothing(arg):
pass

with Flow("constants") as flow:
do_nothing({"x": 1, "y": [9, 10]})

assert len(flow.tasks) == 5

flow_state = flow.run()
assert flow_state.is_successful()


@pytest.mark.skipif(
sys.platform == "win32", reason="Windows doesn't support any timeout logic"
)
Expand Down
3 changes: 2 additions & 1 deletion tests/core/test_task_map.py
Expand Up @@ -920,6 +920,7 @@ def add_one(x):
with Flow("constants") as f:
output = add_one.map(x=[1, 2, 3, 4])

flow_state = f.run()
with raise_on_exception():
flow_state = f.run()
assert flow_state.is_successful()
assert flow_state.result[output].result == [2, 3, 4, 5]
4 changes: 2 additions & 2 deletions tests/engine/test_flow_runner.py
Expand Up @@ -1594,8 +1594,8 @@ def add(x):
assert flow_state.is_successful()
assert flow_state.result[output].result == [100] * 10

## only add and the List task were submitted
assert len(calls) == 2
## only add task was submitted; the list task is a constant
assert len(calls) == 1

## to be safe, ensure '5' isn't in the logs
assert len([log.message for log in caplog.records if "99" in log.message]) == 0
8 changes: 4 additions & 4 deletions tests/tasks/test_core.py
Expand Up @@ -202,7 +202,7 @@ def test_dict_automatically_applied_to_callargs(self):
identity.bind(x=dict(a=x, b=y))
state = f.run(parameters=dict(x=1, y=2))

assert len(f.tasks) == 6 # 2 params, identity, Dict, 2 Lists for Dict
assert len(f.tasks) == 5 # 2 params, identity, Dict, List of dict values
assert sum(isinstance(t, collections.Dict) for t in f.tasks) == 1
assert state.result[identity].result == dict(a=1, b=2)

Expand All @@ -215,7 +215,7 @@ def test_dict_automatically_applied_to_callargs_imperative(self):
identity.bind(x=dict(a=x, b=y), flow=f)
state = f.run(parameters=dict(x=1, y=2))

assert len(f.tasks) == 6 # 2 params, identity, Dict, 2 Lists for Dict
assert len(f.tasks) == 5 # 2 params, identity, Dict, List of dict values
assert sum(isinstance(t, collections.Dict) for t in f.tasks) == 1
assert state.result[identity].result == dict(a=1, b=2)

Expand All @@ -227,7 +227,7 @@ def test_nested_collection_automatically_applied_to_callargs(self):
identity.bind(x=dict(a=[x, dict(y=y)], b=(y, set([x]))))
state = f.run(parameters=dict(x=1, y=2))

assert len(f.tasks) == 12
assert len(f.tasks) == 10
assert state.result[identity].result == dict(a=[1, dict(y=2)], b=(2, set([1])))

def test_nested_collection_automatically_applied_to_callargs_imperative(self):
Expand All @@ -239,5 +239,5 @@ def test_nested_collection_automatically_applied_to_callargs_imperative(self):
identity.bind(x=dict(a=[x, dict(y=y)], b=(y, set([x]))), flow=f)
state = f.run(parameters=dict(x=1, y=2))

assert len(f.tasks) == 12
assert len(f.tasks) == 10
assert state.result[identity].result == dict(a=[1, dict(y=2)], b=(2, set([1])))

0 comments on commit c9c13ec

Please sign in to comment.