diff --git a/.circleci/config.yml b/.circleci/config.yml index baf4ec802609..587fb00313a9 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -238,37 +238,6 @@ jobs: name: Run tests command: pytest -vvrfEsx . - test_airflow: - docker: - - image: continuumio/miniconda3:4.6.14 - steps: - - checkout - - setup_remote_docker - - run: - name: Install gcc - command: apt-get update && apt-get install -y gcc - - - run: - name: Create Airflow conda environment - command: conda create -n airflow python=3.6 pip -y - - - run: - name: Install Airflow - command: source activate airflow && SLUGIFY_USES_TEXT_UNIDECODE=yes pip install apache-airflow flask==1.0.4 && source deactivate - - - run: - name: Install prefect - command: pip install ".[all_extras]" - - - run: - name: Run tests w/ airflow - command: pytest -vvrfEsx --airflow --cov=prefect --cov-report=xml:/tmp/workspace/coverage/airflow-coverage.xml . - - - persist_to_workspace: - root: *workspace_root - paths: - - coverage - upload_coverage: docker: - image: python:3.6 @@ -393,13 +362,11 @@ workflows: - test_lower_prefect - test_vanilla_prefect - test_py352_import_prefect - - test_airflow - upload_coverage: requires: - test_35 - test_36 - test_vanilla_prefect - - test_airflow "Check code style and docs": jobs: diff --git a/CHANGELOG.md b/CHANGELOG.md index ac9507d54f88..29fec3ae4069 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,7 +26,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/ ### Breaking Changes -- None +- Remove Airflow Tasks - [#1992](https://github.com/PrefectHQ/prefect/pull/1992) ### Contributors diff --git a/conftest.py b/conftest.py index d22fd15d4912..cf75cfb89426 100644 --- a/conftest.py +++ b/conftest.py @@ -4,12 +4,6 @@ def pytest_addoption(parser): - parser.addoption( - "--airflow", - action="store_true", - dest="airflow", - help="including this flag will attempt to ONLY run airflow compatibility tests", - ) parser.addoption( "--skip-formatting", action="store_true", @@ -19,9 +13,6 @@ def pytest_addoption(parser): def pytest_configure(config): - config.addinivalue_line( - "markers", "airflow: mark test to run only when --airflow flag is provided." - ) config.addinivalue_line( "markers", "formatting: mark test as formatting to skip when --skip-formatting flag is provided.", @@ -29,18 +20,6 @@ def pytest_configure(config): def pytest_runtest_setup(item): - air_mark = item.get_closest_marker("airflow") - - # if a test IS marked as "airflow" and the airflow flag IS NOT set, skip it - if air_mark is not None and item.config.getoption("--airflow") is False: - pytest.skip( - "Airflow tests skipped by default unless --airflow flag provided to pytest." - ) - - # if a test IS NOT marked as airflow and the airflow flag IS set, skip it - elif air_mark is None and item.config.getoption("--airflow") is True: - pytest.skip("Non-Airflow tests skipped because --airflow flag was provided.") - formatting_mark = item.get_closest_marker("formatting") # if a test IS marked as "formatting" and the --skip-formatting flag IS set, skip it diff --git a/docs/core/task_library/airflow.md b/docs/core/task_library/airflow.md deleted file mode 100644 index 002f87159e78..000000000000 --- a/docs/core/task_library/airflow.md +++ /dev/null @@ -1,53 +0,0 @@ -# Airflow - -This module contains tasks that help migrate Airflow workflows into Prefect. Using these requires `airflow` to be installed in a separate conda environment. - -Note that by default, external Airflow dependencies and triggers will be respected; these can be ignored by passing `-A` as a CLI flag to the `AirflowTask`. - -For example, we can recreate the [example XCom DAG](https://github.com/apache/airflow/blob/master/airflow/example_dags/example_xcom.py), using -default settings: - -```python -from prefect import Flow -from prefect.tasks.airflow import AirflowTask - -puller = AirflowTask( - task_id="puller", - dag_id="example_xcom", -) -push = AirflowTask( - task_id="push", - dag_id="example_xcom", -) -push_by_returning = AirflowTask( - task_id="push_by_returning", - dag_id="example_xcom", -) - -with Flow(name="example_xcom") as flow: - res = puller(upstream_tasks=[push, push_by_returning]) - -flow_state = flow.run() - -# XComs auto-convert to return values -assert flow_state.result[push].result == [1, 2, 3] -assert flow_state.result[push_by_returning].result == {"a": "b"} -``` - -Note the difference between how one can specify dependencies in Prefect vs. Airflow. - -## AirflowTask - -Task wrapper for executing individual Airflow tasks. - -Successful execution of this task requires a separate conda environment in which `airflow` is installed. Any XComs this task pushes will be converted to return values for this task. Unless certain CLI flags are provided (e.g., `-A`), execution of this task will respect Airflow trigger rules. - -[API Reference](/api/latest/tasks/airflow.html#prefect-tasks-airflow-airflow-airflowtask) - -## AirflowTriggerDAG - -Task wrapper for triggering an Airflow DAG run. - -Successful execution of this task requires a separate conda environment in which `airflow` is installed. - -[API Reference](/api/latest/tasks/airflow.html#prefect-tasks-airflow-airflow-airflowtriggerdag) diff --git a/docs/outline.toml b/docs/outline.toml index 23fa0ce924b9..9395dd298451 100644 --- a/docs/outline.toml +++ b/docs/outline.toml @@ -180,11 +180,6 @@ module = "prefect.tasks.control_flow" classes = ["FilterTask"] functions = ["switch", "ifelse", "merge"] -[pages.tasks.airflow] -title = "Airflow Tasks" -module = "prefect.tasks.airflow" -classes = ["AirflowTask", "AirflowTriggerDAG"] - [pages.tasks.airtable] title = "Airtable Tasks" module = "prefect.tasks.airtable" diff --git a/src/prefect/tasks/airflow/__init__.py b/src/prefect/tasks/airflow/__init__.py deleted file mode 100644 index dcc2a6933661..000000000000 --- a/src/prefect/tasks/airflow/__init__.py +++ /dev/null @@ -1,39 +0,0 @@ -""" -This module contains tasks that help migrate Airflow workflows into Prefect. Using these requires `airflow` to be installed in a separate conda environment. - -Note that by default, external Airflow dependencies and triggers will be respected; these can be ignored by passing `-A` as a CLI flag to the `AirflowTask`. - -For example, we can recreate the [example XCom DAG](https://github.com/apache/airflow/blob/master/airflow/example_dags/example_xcom.py), using -default settings: - -```python -from prefect import Flow -from prefect.tasks.airflow import AirflowTask - -puller = AirflowTask( - task_id="puller", - dag_id="example_xcom", -) -push = AirflowTask( - task_id="push", - dag_id="example_xcom", -) -push_by_returning = AirflowTask( - task_id="push_by_returning", - dag_id="example_xcom", -) - -with Flow(name="example_xcom") as flow: - res = puller(upstream_tasks=[push, push_by_returning]) - -flow_state = flow.run() - -# XComs auto-convert to return values -assert flow_state.result[push].result == [1, 2, 3] -assert flow_state.result[push_by_returning].result == {"a": "b"} -``` - -Note the difference between how one can specify dependencies in Prefect vs. Airflow. -""" - -from .airflow import AirflowTask, AirflowTriggerDAG diff --git a/src/prefect/tasks/airflow/airflow.py b/src/prefect/tasks/airflow/airflow.py deleted file mode 100644 index 0877bfce6aaa..000000000000 --- a/src/prefect/tasks/airflow/airflow.py +++ /dev/null @@ -1,247 +0,0 @@ -import os -import pickle -import sqlite3 -import subprocess -import tempfile -import warnings -from collections import defaultdict -from contextlib import closing -from typing import Any, Dict, List - -import prefect - -__all__ = ["AirflowTask", "AirflowTriggerDAG"] - - -def custom_query(db: str, query: str, *params: str) -> List: - with closing(sqlite3.connect(db)) as connection: - with closing(connection.cursor()) as cursor: - cursor.execute(query, params) - return cursor.fetchall() - - -class AirflowTask(prefect.tasks.shell.ShellTask): - """ - Task wrapper for executing individual Airflow tasks. - - Successful execution of this task requires a separate conda environment in which `airflow` is installed. - Any XComs this task pushes will be converted to return values for this task. - Unless certain CLI flags are provided (e.g., `-A`), execution of this task will respect Airflow trigger rules. - - Args: - - task_id (string): the Airflow `task_id` to execute at runtime - - dag_id (string): the Airflow `dag_id` containing the given `task_id` - - airflow_env (str, optional): the name of the conda environment in which `airflow` is installed; - defaults to `"airflow"` - - cli_flags (List[str], optional): a list of CLI flags to provide to `airflow run` at runtime; - see [the airflow docs](https://airflow.apache.org/cli.html#run) for options. This can be used to ignore Airflow trigger rules - by providing `cli_flags=['-A']` - - env (dict, optional): dictionary of environment variables to use for - the subprocess (e.g., `AIRFLOW__CORE__DAGS_FOLDER`) - - execution_date (str, optional): the execution date for this task run; can also be provided to the run method; - if not provided here or to `run()`, the value of `today` in context will be used - - db_conn (str, optional): the location of the airflow database; currently only SQLite DBs are supported; - defaults to `~/airflow/airflow.db`; used for pulling XComs and inspecting task states - - **kwargs: additional keyword arguments to pass to the Task constructor - - Example: - ```python - from prefect import Flow - from prefect.tasks.airflow import AirflowTask - - # compare with https://github.com/apache/airflow/blob/master/airflow/example_dags/example_xcom.py - puller = AirflowTask( - task_id="puller", - dag_id="example_xcom", - ) - push = AirflowTask( - task_id="push", - dag_id="example_xcom", - ) - push_by_returning = AirflowTask( - task_id="push_by_returning", - dag_id="example_xcom", - ) - - with Flow(name="example_xcom") as flow: - res = puller(upstream_tasks=[push, push_by_returning]) - - flow_state = flow.run() - - # XComs auto-convert to return values - assert flow_state.result[push].result == [1, 2, 3] - assert flow_state.result[push_by_returning].result == {"a": "b"} - ``` - """ - - def __init__( - self, - task_id: str, - dag_id: str, - cli_flags: List[str] = None, - airflow_env: str = "airflow", - env: dict = None, - execution_date: str = None, - db_conn: str = None, - **kwargs: Any - ): - if cli_flags is None: - cli_flags = [] - cmd = "airflow run " + " ".join(cli_flags) + " {0} {1} {2}" - self.db_conn = db_conn or os.path.expanduser("~/airflow/airflow.db") - self.dag_id = dag_id - self.task_id = task_id - self.execution_date = execution_date - kwargs.setdefault("name", task_id) - super().__init__( - command=cmd, - env=env, - helper_script="source deactivate && source activate {}".format(airflow_env), - **kwargs - ) - - def _state_conversion(self, query: List) -> None: - if query: - status = query[0][0] - if status == "skipped": - raise prefect.engine.signals.SKIP( - "Airflow task state marked as 'skipped' in airflow db" - ) - elif status != "success": - raise prefect.engine.signals.FAIL( - "Airflow task state marked as {} in airflow db".format( - status.rstrip() - ) - ) - - def _pre_check(self, execution_date: str) -> None: - check_query = "select state from task_instance where task_id=? and dag_id=? and execution_date like ?" - status = custom_query( - self.db_conn, - check_query, - self.task_id, - self.dag_id, - "%{}%".format(execution_date), - ) - self._state_conversion(status) - - def _post_check(self, execution_date: str) -> None: - check_query = "select state from task_instance where task_id=? and dag_id=? and execution_date like ?" - status = custom_query( - self.db_conn, - check_query, - self.task_id, - self.dag_id, - "%{}%".format(execution_date), - ) - if not status: - raise prefect.engine.signals.SKIP( - "Airflow task state not present in airflow db, was skipped." - ) - self._state_conversion(status) - - def _pull_xcom(self, execution_date: str) -> Any: - check_query = "select value from xcom where task_id=? and dag_id=? and execution_date like ?" - data = custom_query( - self.db_conn, - check_query, - self.task_id, - self.dag_id, - "%{}%".format(execution_date), - ) - if data: - return pickle.loads(data[0][0]) - - @prefect.utilities.tasks.defaults_from_attrs("execution_date") - def run(self, execution_date: str = None) -> Any: - """ - Executes `airflow run` for the provided `task_id`, `dag_id` and `execution_date`. - - Args: - - execution_date (str, optional): the execution date for this task run; - if not provided here or at initialization, the value of `today` in context will be used - - Raises: - - prefect.engine.signals.PrefectStateSignal: depending on the state of the task_instance in the Airflow DB - - Returns: - - Any: any data this task pushes as an XCom - """ - if execution_date is None: - execution_date = prefect.context.get("today") - self.logger.debug("Using {} as execution date...".format(execution_date)) - self._pre_check(execution_date) - self.command = self.command.format( # type: ignore - self.dag_id, self.task_id, execution_date - ) - res = super().run() - if "Task is not able to be run" in res: - raise prefect.engine.signals.SKIP("Airflow task was not run.") - self._post_check(execution_date) - data = self._pull_xcom(execution_date) - return data - - -class AirflowTriggerDAG(prefect.tasks.shell.ShellTask): - """ - Task wrapper for triggering an Airflow DAG run. - - Successful execution of this task requires a separate conda environment in which `airflow` is installed. - - Args: - - dag_id (string): the Airflow `dag_id` containing the given `task_id` - - airflow_env (str, optional): the name of the conda environment in which `airflow` is installed; - defaults to `"airflow"` - - execution_date (str, optional): the execution date for this task run; can also be provided to the run method; - if not provided here or to `run()`, the value of `today` in context will be used - - cli_flags (List[str], optional): a list of CLI flags to provide to `airflow trigger_dag` at runtime; - this can be used to provide `execution_date` via `["-e 1999-01-01"]`. For a complete list of available options, - see the [corresponding Airflow documentation](https://airflow.apache.org/cli.html#trigger_dag) - - env (dict, optional): dictionary of environment variables to use for - the subprocess (e.g., `AIRFLOW__CORE__DAGS_FOLDER`) - - **kwargs: additional keyword arguments to pass to the Task constructor - """ - - def __init__( - self, - dag_id: str, - airflow_env: str = "airflow", - execution_date: str = None, - cli_flags: List[str] = None, - env: dict = None, - **kwargs - ): - if cli_flags is None: - cli_flags = [] - self.cli_flags = cli_flags - self.dag_id = dag_id - self.execution_date = execution_date - kwargs.setdefault("name", dag_id) - super().__init__( - env=env, - helper_script="source deactivate && source activate {}".format(airflow_env), - **kwargs - ) - - @prefect.utilities.tasks.defaults_from_attrs("execution_date") - def run(self, execution_date: str = None) -> Any: - """ - Executes `airflow trigger_dag` for the provided `dag_id` with the provided options. - - Args: - - execution_date (str, optional): the execution date for this task run; - if not provided here or at initialization, the value of `today` in context will be used - - Raises: - - prefect.engine.signals.PrefectStateSignal: depending on the state of the task_instance in the Airflow DB - - Returns: - - Any: any data this task pushes as an XCom - """ - if execution_date is None: - execution_date = prefect.context.get("today") - self.logger.debug("Using {} as execution date...".format(execution_date)) - cli_flags = self.cli_flags + ["-e {}".format(execution_date)] - cmd = "airflow trigger_dag " + " ".join(cli_flags) + " {0}".format(self.dag_id) - res = super().run(command=cmd) - return res diff --git a/tests/tasks/airflow/__init__.py b/tests/tasks/airflow/__init__.py deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/tests/tasks/airflow/test_airflow.py b/tests/tasks/airflow/test_airflow.py deleted file mode 100644 index 8cb1da12c9ab..000000000000 --- a/tests/tasks/airflow/test_airflow.py +++ /dev/null @@ -1,188 +0,0 @@ -import os -import subprocess -import tempfile - -import pytest - -from prefect import Flow, Task, task, triggers -from prefect.tasks.airflow import AirflowTask, AirflowTriggerDAG -from prefect.tasks.shell import ShellTask - -pytestmark = pytest.mark.airflow - - -@pytest.fixture(scope="module") -def airflow_settings(): - with tempfile.NamedTemporaryFile( - prefix="prefect-airflow", suffix="prefect-airflow-test.db" - ) as tmp: - env = os.environ.copy() - env["AIRFLOW__CORE__SQL_ALCHEMY_CONN"] = "sqlite:///" + tmp.name - env["db_conn"] = tmp.name - dag_folder = os.path.join(os.path.dirname(__file__), "dags") - env["AIRFLOW__CORE__DAGS_FOLDER"] = dag_folder - status = subprocess.check_output( - [ - "bash", - "-c", - "source deactivate && source activate airflow && airflow initdb", - ], - env=env, - ) - yield { - k: v for k, v in env.items() if k.startswith("AIRFLOW") or k == "db_conn" - } - - -class TestTaskStructure: - def test_init_requires_task_and_dag_id(self): - with pytest.raises(TypeError): - task = AirflowTask() - - with pytest.raises(TypeError): - task = AirflowTask(task_id="task name") - - with pytest.raises(TypeError): - task = AirflowTask(dag_id="dag name") - - def test_name_defaults_to_task_id_but_can_be_changed(self): - t1 = AirflowTask(task_id="test-task", dag_id="blob") - assert t1.name == "test-task" - - t2 = AirflowTask(task_id="test-task", dag_id="blob", name="unique") - assert t2.name == "unique" - - def test_command_responds_to_env_name(self): - t1 = AirflowTask(task_id="test-task", dag_id="blob") - assert t1.helper_script == "source deactivate && source activate airflow" - - t2 = AirflowTask( - task_id="test-task", dag_id="blob", airflow_env="airflow_conda_env" - ) - assert ( - t2.helper_script == "source deactivate && source activate airflow_conda_env" - ) - - def test_command_responds_to_cli_flags(self): - t1 = AirflowTask(task_id="test-task", dag_id="blob", cli_flags=["--force"]) - assert t1.command.startswith("airflow run --force") - - -class TestSingleTaskRuns: - def test_airflow_task_successfully_runs_a_task(self, airflow_settings): - task = AirflowTask( - db_conn=airflow_settings["db_conn"], - task_id="also_run_this", - dag_id="example_bash_operator", - env=airflow_settings, - ) - - flow = Flow(name="test single task", tasks=[task]) - flow_state = flow.run() - - assert flow_state.is_successful() - assert flow_state.result[task].is_successful() - assert flow_state.result[task].result is None - - def test_airflow_task_uses_its_own_trigger_rules_by_default(self, airflow_settings): - task = AirflowTask( - db_conn=airflow_settings["db_conn"], - task_id="run_this_last", - dag_id="example_bash_operator", - env=airflow_settings, - ) - - with Flow(name="test single task") as flow: - res = task(execution_date="2011-01-01") - flow_state = flow.run() - - assert flow_state.is_successful() - assert flow_state.result[res].is_skipped() - - def test_airflow_task_uses_cli_flags(self, airflow_settings): - task = AirflowTask( - db_conn=airflow_settings["db_conn"], - task_id="run_this_last", - dag_id="example_bash_operator", - cli_flags=["-A"], - env=airflow_settings, - ) - - with Flow(name="test single task") as flow: - res = task(execution_date="2011-01-02") - flow_state = flow.run() - - assert flow_state.is_successful() - assert flow_state.result[res].is_successful() - assert not flow_state.result[res].is_skipped() - assert flow_state.result[res].result is None - - def test_airflow_task_checks_db_state_prior_to_execution(self, airflow_settings): - pass - - def test_airflow_task_converts_xcoms_to_return_values(self, airflow_settings): - puller = AirflowTask( - db_conn=airflow_settings["db_conn"], - task_id="puller", - dag_id="example_xcom", - env=airflow_settings, - execution_date="1999-09-20", - ) - push = AirflowTask( - db_conn=airflow_settings["db_conn"], - task_id="push", - dag_id="example_xcom", - env=airflow_settings, - execution_date="1999-09-20", - ) - push_by_returning = AirflowTask( - db_conn=airflow_settings["db_conn"], - task_id="push_by_returning", - dag_id="example_xcom", - env=airflow_settings, - execution_date="1999-09-20", - ) - - with Flow(name="xcom") as flow: - res = puller(upstream_tasks=[push, push_by_returning]) - - flow_state = flow.run() - assert flow_state.is_successful() - - # puller - assert flow_state.result[res].is_successful() - assert flow_state.result[res].result is None - - # push - assert flow_state.result[push].is_successful() - assert flow_state.result[push].result == [1, 2, 3] - - # push_by_returning - assert flow_state.result[push_by_returning].is_successful() - assert flow_state.result[push_by_returning].result == {"a": "b"} - - -class TestTriggerDAG: - def test_basic_trigger_dag_triggers(self, airflow_settings): - task = AirflowTriggerDAG( - dag_id="tutorial", execution_date="1986-09-20", env=airflow_settings - ) - check_task = ShellTask( - command="airflow list_dag_runs tutorial", - helper_script=task.helper_script, - env=airflow_settings, - ) - - with Flow(name="tutorial") as flow: - res = check_task(upstream_tasks=[task]) - - flow_state = flow.run() - assert flow_state.is_successful() - - check_state = flow_state.result[res] - assert check_state.is_successful() - - # check CLI output - assert "manual__1986-09-20T00:00:00+00:00" in check_state.result - assert "running" in check_state.result - assert "1986-09-20T00:00:00+00:00" in check_state.result