Skip to content

Commit

Permalink
Merge pull request #246 from PrefectHQ/manual-only
Browse files Browse the repository at this point in the history
Manual only trigger refactor
  • Loading branch information
cicdw committed Oct 4, 2018
2 parents bb98c9d + 03030fc commit ca89724
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 9 deletions.
2 changes: 1 addition & 1 deletion docs/generate_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
prefect.triggers.all_finished,
prefect.triggers.manual_only,
prefect.triggers.always_run,
prefect.triggers.never_run,
prefect.triggers.all_successful,
prefect.triggers.all_failed,
prefect.triggers.any_successful,
Expand Down Expand Up @@ -140,6 +139,7 @@
prefect.engine.signals.RETRY,
prefect.engine.signals.SKIP,
prefect.engine.signals.DONTRUN,
prefect.engine.signals.PAUSE,
],
"title": "Signals",
"top-level-doc": prefect.engine.signals,
Expand Down
9 changes: 9 additions & 0 deletions src/prefect/engine/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,12 @@ class DONTRUN(PrefectStateSignal):
Args:
- message (Any, optional): Defaults to `None`. A message about the signal.
"""


class PAUSE(PrefectStateSignal):
"""
Indicates that a task should not run and wait for manual execution.
Args:
- message (Any, optional): Defaults to `None`. A message about the signal.
"""
11 changes: 8 additions & 3 deletions src/prefect/engine/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ def inner(self: "TaskRunner", *args: Any, **kwargs: Any) -> State:
logging.debug("DONTRUN signal raised: {}".format(exc))
raise

# PAUSE signals get raised for handling
except signals.PAUSE as exc:
logging.debug("PAUSE signal raised: {}".format(exc))
raise

# RETRY signals are trapped and turned into Retry states
except signals.RETRY as exc:
logging.debug("RETRY signal raised")
Expand Down Expand Up @@ -166,10 +171,10 @@ def run(
# a DONTRUN signal at any point breaks the chain and we return
# the most recently computed state
except signals.DONTRUN as exc:
if "manual_only" in str(exc):
state.cached_inputs = task_inputs or {}
state.message = exc
pass
except signals.PAUSE as exc:
state.cached_inputs = task_inputs or {}
state.message = exc
finally: # resource is now available
for ticket, q in zip(tickets, queues):
q.put(ticket)
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ def manual_only(upstream_states: Set["state.State"]) -> bool:
Note this doesn't raise a failure, it simply doesn't run the task.
"""
raise signals.DONTRUN('Trigger function is "manual_only"')
raise signals.PAUSE('Trigger function is "manual_only"')


# aliases
always_run = all_finished
never_run = manual_only


def all_successful(upstream_states: Set["state.State"]) -> bool:
Expand Down
24 changes: 24 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,27 @@ def clear_registry():
"""
yield
prefect.core.registry.REGISTRY.clear()


def pytest_addoption(parser):
parser.addoption(
"--airflow",
action="store_true",
dest="airflow",
help="including this flag will attempt to run airflow compatibility tests",
)


def pytest_configure(config):
config.addinivalue_line(
"markers", "airflow: mark test to run only when --airflow flag is provided."
)


def pytest_runtest_setup(item):
mark = item.get_marker("airflow")
if mark is not None:
if item.config.getoption("--airflow") is False:
pytest.mark.skip(
"Airflow tests skipped by default unless --airflow flag provided to pytest."
)
6 changes: 3 additions & 3 deletions tests/test_triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,17 @@ def test_always_run_with_mixed_states():


def test_manual_only_with_all_success():
with pytest.raises(signals.DONTRUN):
with pytest.raises(signals.PAUSE):
triggers.manual_only(generate_states(success=3))


def test_manual_only_with_all_failed():
with pytest.raises(signals.DONTRUN):
with pytest.raises(signals.PAUSE):
triggers.manual_only(generate_states(failed=3))


def test_manual_only_with_mixed_states():
with pytest.raises(signals.DONTRUN):
with pytest.raises(signals.PAUSE):
triggers.manual_only(generate_states(success=1, failed=1, skipped=1))


Expand Down
4 changes: 4 additions & 0 deletions tests/utilities/test_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from prefect.utilities.airflow_utils import AirFlow


@pytest.mark.airflow()
def test_example_branch_operator():
flow = AirFlow(dag_id="example_branch_operator")
res = flow.run(execution_date="2018-09-20", return_tasks=flow.tasks)
Expand Down Expand Up @@ -34,6 +35,7 @@ def test_example_branch_operator():
)


@pytest.mark.airflow()
def test_example_xcom():
flow = AirFlow(dag_id="example_xcom")
res = flow.run(execution_date="2018-09-20", return_tasks=flow.tasks)
Expand All @@ -51,6 +53,7 @@ def test_example_xcom():
assert state.result == {"a": "b"}


@pytest.mark.airflow()
def test_example_short_circuit_operator():
flow = AirFlow(dag_id="example_short_circuit_operator")
res = flow.run(execution_date="2018-09-20", return_tasks=flow.tasks)
Expand All @@ -65,6 +68,7 @@ def test_example_short_circuit_operator():
assert isinstance(state, Success)


@pytest.mark.airflow()
def test_example_bash_operator():
flow = AirFlow(dag_id="example_bash_operator")
res = flow.run(execution_date="2018-09-20", return_tasks=flow.tasks)
Expand Down

0 comments on commit ca89724

Please sign in to comment.