Skip to content

Commit

Permalink
Merge pull request #1730 from PrefectHQ/no-constants
Browse files Browse the repository at this point in the history
No constants
  • Loading branch information
cicdw authored Nov 15, 2019
2 parents 3ad8ced + a220116 commit 42636b9
Show file tree
Hide file tree
Showing 14 changed files with 165 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
### Features

- Allow users to provide a custom version group ID for controlling Cloud versioning - [#1665](https://github.com/PrefectHQ/prefect/issues/1665)
- Stop autogenerating constant tasks - [#1730](https://github.com/PrefectHQ/prefect/pull/1730)

### Enhancements

Expand Down
19 changes: 6 additions & 13 deletions docs/core/tutorials/task-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,26 +257,24 @@ To see some of these subtleties in action, let's work out a more complicated exa
f = Flow("add-example")

f.set_dependencies(add_task, keyword_tasks={"x": 1, "y": 2})
print(f.tasks) # {<Task: add_task>, <Task: 2>, <Task: 1>}
print(f.tasks) # {<Task: add_task>}
```

The first thing to observe here is that _three_ tasks were added to our flow! This is because, in Prefect, _every single piece of data that is passed around_ must be represented as a Prefect Task. In this case, the constants 1 and 2 were auto-converted to Tasks which simply return these values when run.

Now, let's switch our attention to the functional API and reproduce the above example exactly:
```python
with Flow("add-example-v2") as f:
result = add_task(x=1, y=2)

print(f.tasks) # {<Task: add_task>, <Task: 2>, <Task: 1>}
print(f.tasks) # {<Task: add_task>}

add_task in f.tasks # False
result in f.tasks # True
```

As before, the constants were auto-converted to Prefect Tasks, and we see that a _copy_ of the `add_task` was created and added to the Flow.
We see here that a _copy_ of the `add_task` was created and added to the Flow.

::: warning Auto-generation is granular
Note that Prefect unpacks Python collections at a very granular level; so, for example, adding a dictionary to a Flow will actually create Tasks for all of the dictionary's keys and its values.
::: warning Auto-generation of Tasks
Note that Prefect will autogenerate Tasks to represent Python collections; so, for example, adding a dictionary to a Flow will actually create Tasks for the dictionary's keys and its values.

```python
from prefect import task, Flow
Expand All @@ -290,19 +288,14 @@ with Flow("constants") as flow:

flow.tasks

# {<Task: 'x'>,
# <Task: 'y'>,
# <Task: 10>,
# <Task: 1>,
# <Task: 9>,
# <Task: Dict>,
# <Task: List>, # corresponding to [9, 10]
# <Task: List>, # corresponding to the dictionary keys
# <Task: List>, # corresponding to the dictionary values
# <Task: do_nothing>}
```

This can be burdensome for very large Python collections. To prevent this granular auto-generation from occuring, you can always wrap Python objects in a `Constant` Task:
This can be burdensome for deeply nested Python collections. To prevent this granular auto-generation from occuring, you can always wrap Python objects in a `Constant` Task:

```python
from prefect.tasks.core.constants import Constant
Expand Down
23 changes: 14 additions & 9 deletions src/prefect/core/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ def __init__(

self.tasks = set() # type: Set[Task]
self.edges = set() # type: Set[Edge]
self.constants = collections.defaultdict(
dict
) # type: Dict[Task, Dict[str, Any]]

for t in tasks or []:
self.add_task(t)
Expand Down Expand Up @@ -806,15 +809,17 @@ def set_dependencies(
# add data edges to upstream tasks
for key, t in (keyword_tasks or {}).items():
is_mapped = mapped & (not isinstance(t, unmapped))
t = as_task(t, flow=self)
assert isinstance(t, Task) # mypy assert
self.add_edge(
upstream_task=t,
downstream_task=task,
key=key,
validate=validate,
mapped=is_mapped,
)
t = as_task(t, flow=self, convert_constants=False)
if isinstance(t, Task):
self.add_edge(
upstream_task=t,
downstream_task=task,
key=key,
validate=validate,
mapped=is_mapped,
)
else:
self.constants[task].update({key: t})

# Execution ---------------------------------------------------------------

Expand Down
14 changes: 14 additions & 0 deletions src/prefect/engine/flow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import prefect
from prefect.core import Edge, Flow, Task
from prefect.engine import signals
from prefect.engine.result import Result
from prefect.engine.result_handlers import ConstantResultHandler
from prefect.engine.runner import ENDRUN, Runner, call_state_handlers
from prefect.engine.state import (
Failed,
Expand Down Expand Up @@ -417,6 +419,18 @@ def get_flow_run_state(
edge.upstream_task, Pending(message="Task state not available.")
)

# augment edges with upstream constants
for key, val in self.flow.constants[task].items():
edge = Edge(
upstream_task=prefect.tasks.core.constants.Constant(val),
downstream_task=task,
key=key,
)
upstream_states[edge] = Success(
"Auto-generated constant value",
result=Result(val, result_handler=ConstantResultHandler(val)),
)

# -- run the task

with prefect.context(task_full_name=task.name, task_tags=task.tags):
Expand Down
1 change: 1 addition & 0 deletions src/prefect/engine/result_handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def read(self, url):
"""

from prefect.engine.result_handlers.result_handler import ResultHandler
from prefect.engine.result_handlers.constant_result_handler import ConstantResultHandler
from prefect.engine.result_handlers.json_result_handler import JSONResultHandler
from prefect.engine.result_handlers.local_result_handler import LocalResultHandler
from prefect.engine.result_handlers.secret_result_handler import SecretResultHandler
Expand Down
38 changes: 38 additions & 0 deletions src/prefect/engine/result_handlers/constant_result_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import Any

from prefect.engine.result_handlers import ResultHandler


class ConstantResultHandler(ResultHandler):
"""
Hook for storing and retrieving constant Python objects. Only intended to be used
internally.
Args:
- value (Any): the underlying value that we wish to "handle"
"""

def __init__(self, value: Any) -> None:
self.value = value
super().__init__()

def read(self, arg: str) -> Any:
"""
Returns the underlying value regardless of the argument passed.
Args:
- arg (str): an unused argument
"""
return self.value

def write(self, result: Any) -> str:
"""
Returns the repr of the underlying value, purely for convenience.
Args:
- result (Any): the result to represent
Returns:
- str: the repr of the result
"""
return repr(self.value)
2 changes: 1 addition & 1 deletion src/prefect/engine/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from prefect.core import Edge, Task
from prefect.engine import signals
from prefect.engine.result import NoResult, Result
from prefect.engine.result_handlers import JSONResultHandler
from prefect.engine.result_handlers import JSONResultHandler, ResultHandler
from prefect.engine.runner import ENDRUN, Runner, call_state_handlers
from prefect.engine.state import (
Cached,
Expand Down
11 changes: 9 additions & 2 deletions src/prefect/utilities/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,18 @@ def add(x, y):
yield


def as_task(x: Any, flow: Optional["Flow"] = None) -> "prefect.Task":
def as_task(
x: Any, flow: Optional["Flow"] = None, convert_constants: bool = True
) -> "prefect.Task":
"""
Wraps a function, collection, or constant with the appropriate Task type.
Args:
- x (object): any Python object to convert to a prefect Task
- flow (Flow, optional): Flow to which the prefect Task will be bound
- convert_constants (bool, optional): a boolean specifying whether all passed Python
objects should be converted; if `False`, only collection types will be handled.
Defaults to `True`.
Returns:
- a prefect Task representing the passed object
Expand Down Expand Up @@ -79,8 +84,10 @@ def as_task(x: Any, flow: Optional["Flow"] = None) -> "prefect.Task":
)

# constants
else:
elif convert_constants:
return_task = prefect.tasks.core.constants.Constant(value=x)
else:
return x

return_task.auto_generated = True
return return_task
Expand Down
33 changes: 26 additions & 7 deletions tests/core/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def run(self, x):
assert f.tasks == set([t1, t2, t3, t4])


def test_set_dependencies_converts_arguments_to_tasks():
def test_set_dependencies_converts_unkeyed_arguments_to_tasks():
class ArgTask(Task):
def run(self, x):
return x
Expand All @@ -218,7 +218,8 @@ def run(self, x):
f.set_dependencies(
task=t1, upstream_tasks=[t2], downstream_tasks=[t3], keyword_tasks={"x": t4}
)
assert len(f.tasks) == 4
assert len(f.tasks) == 3
assert f.constants[t1] == dict(x=4)


def test_set_dependencies_creates_mapped_edges():
Expand Down Expand Up @@ -329,8 +330,7 @@ def test_calling_a_task_returns_a_copy():

with Flow(name="test") as f:
t.bind(4, 2)
with pytest.warns(UserWarning):
t2 = t(9, 0)
t2 = t(9, 0)

assert isinstance(t2, AddTask)
assert t != t2
Expand Down Expand Up @@ -1035,7 +1035,9 @@ def test_viz_reflects_mapping(self):
assert 'label="a_nice_task <map>" shape=box' in graph.source
assert "label=a_list_task shape=ellipse" in graph.source
assert "label=x style=dashed" in graph.source
assert "label=y style=dashed" in graph.source
assert (
"label=y style=dashed" not in graph.source
) # constants are no longer represented

@pytest.mark.parametrize("state", [Success(), Failed(), Skipped()])
def test_viz_if_flow_state_provided(self, state):
Expand Down Expand Up @@ -1063,7 +1065,7 @@ def test_viz_reflects_mapping_if_flow_state_provided(self):
map_state = Mapped(map_states=[Success(), Failed()])
with patch.dict("sys.modules", IPython=ipython):
with Flow(name="test") as f:
res = add.map(x=list_task, y=8)
res = add.map(x=list_task, y=prefect.tasks.core.constants.Constant(8))
graph = f.visualize(
flow_state=Success(result={res: map_state, list_task: Success()})
)
Expand Down Expand Up @@ -1353,12 +1355,15 @@ def test_replace_converts_new_to_task(self):

def test_replace_converts_new_collections_to_tasks(self):
add = AddTask()

with Flow(name="test") as f:
x, y = Parameter("x"), Parameter("y")
res = add(x, y)

f.replace(x, [55, 56])
f.replace(y, [1, 2])
assert len(f.tasks) == 7

assert len(f.tasks) == 3
state = f.run()
assert state.is_successful()
assert state.result[res].result == [55, 56, 1, 2]
Expand Down Expand Up @@ -2438,3 +2443,17 @@ def test_load_accepts_name_and_sluggified_name(self):
assert list(new_obj_from_slug.tasks)[0].name == "foo"
assert list(new_obj_from_slug.tasks)[0].slug == t.slug
assert new_obj_from_slug.name == "I aM a-test!"


def test_auto_generation_of_collection_tasks_is_robust():
@task
def do_nothing(arg):
pass

with Flow("constants") as flow:
do_nothing({"x": 1, "y": [9, 10]})

assert len(flow.tasks) == 5

flow_state = flow.run()
assert flow_state.is_successful()
13 changes: 13 additions & 0 deletions tests/core/test_task_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -910,3 +910,16 @@ def add_one(x):
printed_lines = [line for line in captured.out.split("\n") if line != ""]

assert len(printed_lines) == 15


def test_mapping_over_constants():
@prefect.task
def add_one(x):
return x + 1

with Flow("constants") as f:
output = add_one.map(x=[1, 2, 3, 4])

flow_state = f.run()
assert flow_state.is_successful()
assert flow_state.result[output].result == [2, 3, 4, 5]
25 changes: 24 additions & 1 deletion tests/engine/cloud/test_cloud_flow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from prefect.engine.cloud import CloudFlowRunner, CloudTaskRunner
from prefect.engine.result import NoResult, Result, SafeResult
from prefect.engine.result_handlers import (
ConstantResultHandler,
JSONResultHandler,
ResultHandler,
SecretResultHandler,
Expand Down Expand Up @@ -324,7 +325,7 @@ def raise_me(x, y):
with prefect.Flow(name="test") as flow:
final = raise_me(4, 7)

assert len(flow.tasks) == 3
assert len(flow.tasks) == 1

res = flow.run(state=Pending())

Expand Down Expand Up @@ -385,6 +386,28 @@ def is_p_three(p):
assert last_state.cached_inputs["p"] == exp_res


def test_task_failure_caches_constant_inputs_automatically(client):
@prefect.task(max_retries=2, retry_delay=timedelta(seconds=100))
def is_p_three(p):
if p == 3:
raise ValueError("No thank you.")

with prefect.Flow("test") as f:
res = is_p_three(3)

state = CloudFlowRunner(flow=f).run(return_tasks=[res])
assert state.is_running()
assert isinstance(state.result[res], Retrying)
exp_res = Result(3, result_handler=ConstantResultHandler(3))
assert not state.result[res].cached_inputs["p"] == exp_res
exp_res.store_safe_value()
assert state.result[res].cached_inputs["p"] == exp_res

last_state = client.set_task_run_state.call_args_list[-1][-1]["state"]
assert isinstance(last_state, Retrying)
assert last_state.cached_inputs["p"] == exp_res


def test_task_failure_with_upstream_secrets_doesnt_store_secret_value_and_recompute_if_necessary(
client,
):
Expand Down
4 changes: 3 additions & 1 deletion tests/engine/test_flow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,9 @@ def test_improper_use_of_unmapped_fails_gracefully():
add = AddTask()
x = Parameter("x", default=[1, 2, 3])
with Flow(name="test") as f:
res = add.map(x, y=8) # incorrect, should use `unmapped`
res = add.map(
x, y=prefect.tasks.core.constants.Constant(8)
) # incorrect, should use `unmapped`

state = FlowRunner(flow=f).run(return_tasks=f.tasks)
assert state.is_failed()
Expand Down
Loading

0 comments on commit 42636b9

Please sign in to comment.