From b78890490becefcf88146f52062d7ead36468072 Mon Sep 17 00:00:00 2001 From: Chris White Date: Wed, 9 Oct 2019 10:27:01 -0700 Subject: [PATCH 1/2] Fix issue with merge task not working for numpy arrays and pandas dataframes --- CHANGELOG.md | 1 + src/prefect/tasks/control_flow/conditional.py | 6 ++++-- tests/tasks/test_control_flow.py | 17 +++++++++++++++++ 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 026480275349..e8e4b20a7470 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/ - Fix issue with running local Flow without a schedule containing cached tasks - [#1599](https://github.com/PrefectHQ/prefect/pull/1599) - Remove blank string for `task_run_id` in k8s resource manager - [#1604](https://github.com/PrefectHQ/prefect/pull/1604) +- Fix issue with merge task not working for pandas dataframes and numpy arrays - [#1609](https://github.com/PrefectHQ/prefect/pull/1609) ### Deprecations diff --git a/src/prefect/tasks/control_flow/conditional.py b/src/prefect/tasks/control_flow/conditional.py index 2b3d86c1686d..62e21a6fcee9 100644 --- a/src/prefect/tasks/control_flow/conditional.py +++ b/src/prefect/tasks/control_flow/conditional.py @@ -3,7 +3,7 @@ import prefect from prefect import Task from prefect.engine import signals -from prefect.engine.result import NoResult +from prefect.engine.result import NoResultType __all__ = ["switch", "ifelse"] @@ -15,7 +15,9 @@ def __init__(self, **kwargs) -> None: super().__init__(**kwargs) def run(self, **task_results: Any) -> Any: - return next((v for v in task_results.values() if v != NoResult), None) + return next( + (v for v in task_results.values() if not isinstance(v, NoResultType)), None + ) class CompareValue(Task): diff --git a/tests/tasks/test_control_flow.py b/tests/tasks/test_control_flow.py index 1d4740d3fe7d..3702714cbbfa 100644 --- a/tests/tasks/test_control_flow.py +++ b/tests/tasks/test_control_flow.py @@ -1,3 +1,4 @@ +import numpy as np import pytest import prefect @@ -95,6 +96,22 @@ def test_merging_diamond_flow(): assert isinstance(state.result[merge_task], Success) +def test_merging_with_objects_that_cant_be_equality_compared(): + @task + def return_array(): + return np.array([1, 2, 3]) + + with Flow("test-merge") as flow: + success = SuccessTask() + ifelse(Condition(), success, return_array) + merge_task = merge(success, return_array) + + with prefect.context(CONDITION=False): + flow_state = flow.run() + assert flow_state.is_successful() + assert (flow_state.result[merge_task].result == np.array([1, 2, 3])).all() + + def test_list_of_tasks(): """ Test that a list of tasks can be set as a switch condition From c9e431338313daa7099a67f6c4153a1bd6da44cc Mon Sep 17 00:00:00 2001 From: Chris White Date: Wed, 9 Oct 2019 10:31:18 -0700 Subject: [PATCH 2/2] Update test to not rely on numpy --- tests/tasks/test_control_flow.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tests/tasks/test_control_flow.py b/tests/tasks/test_control_flow.py index 3702714cbbfa..244727496906 100644 --- a/tests/tasks/test_control_flow.py +++ b/tests/tasks/test_control_flow.py @@ -1,4 +1,3 @@ -import numpy as np import pytest import prefect @@ -97,9 +96,16 @@ def test_merging_diamond_flow(): def test_merging_with_objects_that_cant_be_equality_compared(): + class SpecialObject: + def __eq__(self, other): + return self + + def __bool__(self): + raise SyntaxError("You can't handle the truth!") + @task def return_array(): - return np.array([1, 2, 3]) + return SpecialObject() with Flow("test-merge") as flow: success = SuccessTask() @@ -109,7 +115,7 @@ def return_array(): with prefect.context(CONDITION=False): flow_state = flow.run() assert flow_state.is_successful() - assert (flow_state.result[merge_task].result == np.array([1, 2, 3])).all() + assert isinstance(flow_state.result[merge_task].result, SpecialObject) def test_list_of_tasks():