Skip to content

Commit

Permalink
add explicit exception before flow run; #1821
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Goodman committed Feb 14, 2020
1 parent 2d2c9e7 commit ef7ccf4
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/prefect/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,11 @@ def map(
Returns:
- Task: a new Task instance
"""
for arg in args:
if not hasattr(arg, "__getitem__"):
raise TypeError(
"cannot map over an unsubscriptable object ({})".format(arg)
)
new = self.copy(**(task_args or {}))
return new.bind(
*args, mapped=True, upstream_tasks=upstream_tasks, flow=flow, **kwargs
Expand Down
6 changes: 6 additions & 0 deletions src/prefect/engine/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,12 @@ def run_mapped_task(
# Therefore, we only try to get a result if EITHER this task's
# state is not already mapped OR the upstream result is not None.
if not state.is_mapped() or upstream_state._result != NoResult:
if not hasattr(upstream_state.result, "__getitem__"):
raise TypeError(
"cannot map over an unsubscriptable object ({})".format(
upstream_state.result
)
)
upstream_result = Result(
upstream_state.result[i],
result_handler=upstream_state._result.result_handler, # type: ignore
Expand Down
6 changes: 6 additions & 0 deletions tests/core/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,9 @@ def test_tags_are_appended_to_when_updating_with_task_args(self):
t2 = t(1, 2, task_args={"name": "test-tags", "tags": ["new-tag"]})

assert t2.tags == {"math", "test", "new-tag"}

def test_task_check_mapped_args_are_subscriptable_in_advance(self):
t = Task()
with pytest.raises(TypeError):
with Flow(name="test") as f:
res = t.map({1, 2, 3, 4})

0 comments on commit ef7ccf4

Please sign in to comment.