Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No constants #1730

Merged
merged 13 commits into from
Nov 15, 2019
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
### Features

- Allow users to provide a custom version group ID for controlling Cloud versioning - [#1665](https://github.com/PrefectHQ/prefect/issues/1665)
- Stop autogenerating constant tasks - [#1730](https://github.com/PrefectHQ/prefect/pull/1730)

### Enhancements

Expand Down
19 changes: 6 additions & 13 deletions docs/core/tutorials/task-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,26 +257,24 @@ To see some of these subtleties in action, let's work out a more complicated exa
f = Flow("add-example")

f.set_dependencies(add_task, keyword_tasks={"x": 1, "y": 2})
print(f.tasks) # {<Task: add_task>, <Task: 2>, <Task: 1>}
print(f.tasks) # {<Task: add_task>}
```

The first thing to observe here is that _three_ tasks were added to our flow! This is because, in Prefect, _every single piece of data that is passed around_ must be represented as a Prefect Task. In this case, the constants 1 and 2 were auto-converted to Tasks which simply return these values when run.

Now, let's switch our attention to the functional API and reproduce the above example exactly:
```python
with Flow("add-example-v2") as f:
result = add_task(x=1, y=2)

print(f.tasks) # {<Task: add_task>, <Task: 2>, <Task: 1>}
print(f.tasks) # {<Task: add_task>}

add_task in f.tasks # False
result in f.tasks # True
```

As before, the constants were auto-converted to Prefect Tasks, and we see that a _copy_ of the `add_task` was created and added to the Flow.
We see here that a _copy_ of the `add_task` was created and added to the Flow.

::: warning Auto-generation is granular
Note that Prefect unpacks Python collections at a very granular level; so, for example, adding a dictionary to a Flow will actually create Tasks for all of the dictionary's keys and its values.
::: warning Auto-generation of Tasks
Note that Prefect will autogenerate Tasks to represent Python collections; so, for example, adding a dictionary to a Flow will actually create Tasks for the dictionary's keys and its values.

```python
from prefect import task, Flow
Expand All @@ -290,19 +288,14 @@ with Flow("constants") as flow:

flow.tasks

# {<Task: 'x'>,
# <Task: 'y'>,
# <Task: 10>,
# <Task: 1>,
# <Task: 9>,
# <Task: Dict>,
# <Task: List>, # corresponding to [9, 10]
# <Task: List>, # corresponding to the dictionary keys
# <Task: List>, # corresponding to the dictionary values
# <Task: do_nothing>}
```

This can be burdensome for very large Python collections. To prevent this granular auto-generation from occuring, you can always wrap Python objects in a `Constant` Task:
This can be burdensome for deeply nested Python collections. To prevent this granular auto-generation from occuring, you can always wrap Python objects in a `Constant` Task:

```python
from prefect.tasks.core.constants import Constant
Expand Down
23 changes: 14 additions & 9 deletions src/prefect/core/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ def __init__(

self.tasks = set() # type: Set[Task]
self.edges = set() # type: Set[Edge]
self.constants = collections.defaultdict(
dict
) # type: Dict[Task, Dict[str, Any]]

for t in tasks or []:
self.add_task(t)
Expand Down Expand Up @@ -806,15 +809,17 @@ def set_dependencies(
# add data edges to upstream tasks
for key, t in (keyword_tasks or {}).items():
is_mapped = mapped & (not isinstance(t, unmapped))
t = as_task(t, flow=self)
assert isinstance(t, Task) # mypy assert
self.add_edge(
upstream_task=t,
downstream_task=task,
key=key,
validate=validate,
mapped=is_mapped,
)
t = as_task(t, flow=self, convert_constants=False)
if isinstance(t, Task):
self.add_edge(
upstream_task=t,
downstream_task=task,
key=key,
validate=validate,
mapped=is_mapped,
)
else:
self.constants[task].update({key: t})

# Execution ---------------------------------------------------------------

Expand Down
14 changes: 14 additions & 0 deletions src/prefect/engine/flow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import prefect
from prefect.core import Edge, Flow, Task
from prefect.engine import signals
from prefect.engine.result import Result
from prefect.engine.result_handlers import ConstantResultHandler
from prefect.engine.runner import ENDRUN, Runner, call_state_handlers
from prefect.engine.state import (
Failed,
Expand Down Expand Up @@ -417,6 +419,18 @@ def get_flow_run_state(
edge.upstream_task, Pending(message="Task state not available.")
)

# augment edges with upstream constants
for key, val in self.flow.constants[task].items():
edge = Edge(
upstream_task=prefect.tasks.core.constants.Constant(val),
downstream_task=task,
key=key,
)
upstream_states[edge] = Success(
"Auto-generated succcess state",
cicdw marked this conversation as resolved.
Show resolved Hide resolved
result=Result(val, result_handler=ConstantResultHandler(val)),
)

# -- run the task

with prefect.context(task_full_name=task.name, task_tags=task.tags):
Expand Down
1 change: 1 addition & 0 deletions src/prefect/engine/result_handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def read(self, url):
"""

from prefect.engine.result_handlers.result_handler import ResultHandler
from prefect.engine.result_handlers.constant_result_handler import ConstantResultHandler
from prefect.engine.result_handlers.json_result_handler import JSONResultHandler
from prefect.engine.result_handlers.local_result_handler import LocalResultHandler
from prefect.engine.result_handlers.secret_result_handler import SecretResultHandler
Expand Down
38 changes: 38 additions & 0 deletions src/prefect/engine/result_handlers/constant_result_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import Any

from prefect.engine.result_handlers import ResultHandler


class ConstantResultHandler(ResultHandler):
"""
Hook for storing and retrieving constant Python objects. Only intended to be used
internally.
joshmeek marked this conversation as resolved.
Show resolved Hide resolved

Args:
- value (Any): the underlying value that we wish to "handle"
"""

def __init__(self, value: Any) -> None:
self.value = value
super().__init__()

def read(self, arg: str) -> Any:
"""
Returns the underlying value regardless of the argument passed.

Args:
- arg (str): an unused argument
"""
return self.value

def write(self, result: Any) -> str:
"""
Returns the repr of the underlying value, purely for convenience.

Args:
- result (Any): the result to represent

Returns:
- str: the repr of the result
"""
return repr(self.value)
2 changes: 1 addition & 1 deletion src/prefect/engine/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from prefect.core import Edge, Task
from prefect.engine import signals
from prefect.engine.result import NoResult, Result
from prefect.engine.result_handlers import JSONResultHandler
from prefect.engine.result_handlers import JSONResultHandler, ResultHandler
from prefect.engine.runner import ENDRUN, Runner, call_state_handlers
from prefect.engine.state import (
Cached,
Expand Down
11 changes: 9 additions & 2 deletions src/prefect/utilities/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,18 @@ def add(x, y):
yield


def as_task(x: Any, flow: Optional["Flow"] = None) -> "prefect.Task":
def as_task(
x: Any, flow: Optional["Flow"] = None, convert_constants: bool = True
) -> "prefect.Task":
"""
Wraps a function, collection, or constant with the appropriate Task type.

Args:
- x (object): any Python object to convert to a prefect Task
- flow (Flow, optional): Flow to which the prefect Task will be bound
- convert_constants (bool, optional): a boolean specifying whether all passed Python
cicdw marked this conversation as resolved.
Show resolved Hide resolved
objects should be converted; if `False`, only collection types will be handled.
Defaults to `True`.

Returns:
- a prefect Task representing the passed object
Expand Down Expand Up @@ -79,8 +84,10 @@ def as_task(x: Any, flow: Optional["Flow"] = None) -> "prefect.Task":
)

# constants
else:
elif convert_constants:
return_task = prefect.tasks.core.constants.Constant(value=x)
else:
return x

return_task.auto_generated = True
return return_task
Expand Down
33 changes: 26 additions & 7 deletions tests/core/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def run(self, x):
assert f.tasks == set([t1, t2, t3, t4])


def test_set_dependencies_converts_arguments_to_tasks():
def test_set_dependencies_converts_unkeyed_arguments_to_tasks():
class ArgTask(Task):
def run(self, x):
return x
Expand All @@ -218,7 +218,8 @@ def run(self, x):
f.set_dependencies(
task=t1, upstream_tasks=[t2], downstream_tasks=[t3], keyword_tasks={"x": t4}
)
assert len(f.tasks) == 4
assert len(f.tasks) == 3
assert f.constants[t1] == dict(x=4)


def test_set_dependencies_creates_mapped_edges():
Expand Down Expand Up @@ -329,8 +330,7 @@ def test_calling_a_task_returns_a_copy():

with Flow(name="test") as f:
t.bind(4, 2)
with pytest.warns(UserWarning):
t2 = t(9, 0)
t2 = t(9, 0)

assert isinstance(t2, AddTask)
assert t != t2
Expand Down Expand Up @@ -1035,7 +1035,9 @@ def test_viz_reflects_mapping(self):
assert 'label="a_nice_task <map>" shape=box' in graph.source
assert "label=a_list_task shape=ellipse" in graph.source
assert "label=x style=dashed" in graph.source
assert "label=y style=dashed" in graph.source
assert (
"label=y style=dashed" not in graph.source
) # constants are no longer represented

@pytest.mark.parametrize("state", [Success(), Failed(), Skipped()])
def test_viz_if_flow_state_provided(self, state):
Expand Down Expand Up @@ -1063,7 +1065,7 @@ def test_viz_reflects_mapping_if_flow_state_provided(self):
map_state = Mapped(map_states=[Success(), Failed()])
with patch.dict("sys.modules", IPython=ipython):
with Flow(name="test") as f:
res = add.map(x=list_task, y=8)
res = add.map(x=list_task, y=prefect.tasks.core.constants.Constant(8))
graph = f.visualize(
flow_state=Success(result={res: map_state, list_task: Success()})
)
Expand Down Expand Up @@ -1353,12 +1355,15 @@ def test_replace_converts_new_to_task(self):

def test_replace_converts_new_collections_to_tasks(self):
add = AddTask()

with Flow(name="test") as f:
x, y = Parameter("x"), Parameter("y")
res = add(x, y)

f.replace(x, [55, 56])
f.replace(y, [1, 2])
assert len(f.tasks) == 7

assert len(f.tasks) == 3
state = f.run()
assert state.is_successful()
assert state.result[res].result == [55, 56, 1, 2]
Expand Down Expand Up @@ -2438,3 +2443,17 @@ def test_load_accepts_name_and_sluggified_name(self):
assert list(new_obj_from_slug.tasks)[0].name == "foo"
assert list(new_obj_from_slug.tasks)[0].slug == t.slug
assert new_obj_from_slug.name == "I aM a-test!"


def test_auto_generation_of_collection_tasks_is_robust():
@task
def do_nothing(arg):
pass

with Flow("constants") as flow:
do_nothing({"x": 1, "y": [9, 10]})

assert len(flow.tasks) == 5

flow_state = flow.run()
assert flow_state.is_successful()
13 changes: 13 additions & 0 deletions tests/core/test_task_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -910,3 +910,16 @@ def add_one(x):
printed_lines = [line for line in captured.out.split("\n") if line != ""]

assert len(printed_lines) == 15


def test_mapping_over_constants():
@prefect.task
def add_one(x):
return x + 1

with Flow("constants") as f:
output = add_one.map(x=[1, 2, 3, 4])

flow_state = f.run()
assert flow_state.is_successful()
assert flow_state.result[output].result == [2, 3, 4, 5]
25 changes: 24 additions & 1 deletion tests/engine/cloud/test_cloud_flow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from prefect.engine.cloud import CloudFlowRunner, CloudTaskRunner
from prefect.engine.result import NoResult, Result, SafeResult
from prefect.engine.result_handlers import (
ConstantResultHandler,
JSONResultHandler,
ResultHandler,
SecretResultHandler,
Expand Down Expand Up @@ -324,7 +325,7 @@ def raise_me(x, y):
with prefect.Flow(name="test") as flow:
final = raise_me(4, 7)

assert len(flow.tasks) == 3
assert len(flow.tasks) == 1

res = flow.run(state=Pending())

Expand Down Expand Up @@ -385,6 +386,28 @@ def is_p_three(p):
assert last_state.cached_inputs["p"] == exp_res


def test_task_failure_caches_constant_inputs_automatically(client):
@prefect.task(max_retries=2, retry_delay=timedelta(seconds=100))
def is_p_three(p):
if p == 3:
raise ValueError("No thank you.")

with prefect.Flow("test") as f:
res = is_p_three(3)

state = CloudFlowRunner(flow=f).run(return_tasks=[res])
assert state.is_running()
assert isinstance(state.result[res], Retrying)
exp_res = Result(3, result_handler=ConstantResultHandler(3))
assert not state.result[res].cached_inputs["p"] == exp_res
exp_res.store_safe_value()
assert state.result[res].cached_inputs["p"] == exp_res

last_state = client.set_task_run_state.call_args_list[-1][-1]["state"]
assert isinstance(last_state, Retrying)
assert last_state.cached_inputs["p"] == exp_res


def test_task_failure_with_upstream_secrets_doesnt_store_secret_value_and_recompute_if_necessary(
client,
):
Expand Down
4 changes: 3 additions & 1 deletion tests/engine/test_flow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,9 @@ def test_improper_use_of_unmapped_fails_gracefully():
add = AddTask()
x = Parameter("x", default=[1, 2, 3])
with Flow(name="test") as f:
res = add.map(x, y=8) # incorrect, should use `unmapped`
res = add.map(
x, y=prefect.tasks.core.constants.Constant(8)
) # incorrect, should use `unmapped`

state = FlowRunner(flow=f).run(return_tasks=f.tasks)
assert state.is_failed()
Expand Down
Loading