Skip to content

Commit

Permalink
Merge pull request #1609 from PrefectHQ/equality-fix
Browse files Browse the repository at this point in the history
Fix issue with merge task and comparisons
  • Loading branch information
cicdw committed Oct 9, 2019
2 parents 92f8332 + c9e4313 commit 2f5bc42
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/

- Fix issue with running local Flow without a schedule containing cached tasks - [#1599](https://github.com/PrefectHQ/prefect/pull/1599)
- Remove blank string for `task_run_id` in k8s resource manager - [#1604](https://github.com/PrefectHQ/prefect/pull/1604)
- Fix issue with merge task not working for pandas dataframes and numpy arrays - [#1609](https://github.com/PrefectHQ/prefect/pull/1609)

### Deprecations

Expand Down
6 changes: 4 additions & 2 deletions src/prefect/tasks/control_flow/conditional.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import prefect
from prefect import Task
from prefect.engine import signals
from prefect.engine.result import NoResult
from prefect.engine.result import NoResultType

__all__ = ["switch", "ifelse"]

Expand All @@ -15,7 +15,9 @@ def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)

def run(self, **task_results: Any) -> Any:
return next((v for v in task_results.values() if v != NoResult), None)
return next(
(v for v in task_results.values() if not isinstance(v, NoResultType)), None
)


class CompareValue(Task):
Expand Down
23 changes: 23 additions & 0 deletions tests/tasks/test_control_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,29 @@ def test_merging_diamond_flow():
assert isinstance(state.result[merge_task], Success)


def test_merging_with_objects_that_cant_be_equality_compared():
class SpecialObject:
def __eq__(self, other):
return self

def __bool__(self):
raise SyntaxError("You can't handle the truth!")

@task
def return_array():
return SpecialObject()

with Flow("test-merge") as flow:
success = SuccessTask()
ifelse(Condition(), success, return_array)
merge_task = merge(success, return_array)

with prefect.context(CONDITION=False):
flow_state = flow.run()
assert flow_state.is_successful()
assert isinstance(flow_state.result[merge_task].result, SpecialObject)


def test_list_of_tasks():
"""
Test that a list of tasks can be set as a switch condition
Expand Down

0 comments on commit 2f5bc42

Please sign in to comment.