---
title: "Airflow Part 7 - Triggering Workflows"
date: "2022-09-26"
image: feature.png
categories: ["Data Engineering", "MLOps"]
---

![](feature.png){fig-align="center"}

- It is meant to exchange messages between tasks, which is some form of shared state
- We can use dag instance to push/pull data between tasks:
    - `conext["dag_instance"].xcom_push(key="data_name", value="value")` to push data to metastore. It also store the `dag_id`, `task_id`, & `execution_date`.
    - `conext["dag_instance"].xcom_pull(key="data_name")` which pull the shared data. We can also specify `dag_id` and `execution_date`.
    - We can also access push/pull methods in templates using `task_instance.xcom_push()` or `task_instance.xcom_pull()`
    - We can view the shared data on the UI by going to Admin -> XComs

- **Limitations**:
    - XComs data will be pickled and stored in the database -> The objects have to be serializable
    - Size limitations:
        - SQLite—Stored as BLOB type, 2GB limit
        - PostgreSQL—Stored as BYTEA type, 1 GB limit 
        - MySQL—Stored as BLOB type, 64 KB limit
    - It create hidden dependency between tasks because now the task the pushes the shared state has to push the data before the task that pulls the data. Airflow won't manage/respect this dependency the developer has to document this and make sure this is not an issue based on the tasks' order
- Due to its limitations in terms of size, we can create custom backends for XComs by defining a class that inherits from `BaseXCom` and implements two static methods. Airflow will use this class. It can be added to `xcom_backend` parameter in the Airflow configWe can use cheap/large storage services on the cloud such as Amazon S3, Azure Blob Storage, or Google GCS.

```python
from typing import Any
from airflow.models.xcom import BaseXCom

class CustomXComBackend(BaseXCom):
    
    @staticmethod
    def serialize(value: Any):
        ...
    
    @staticmethod
    def deserialize(result):
        ...
```
- If most of tasks are PythonOperators, we can use `Taskflow` API that takes care of passing state between tasks and avoid the boilerplate code that we have to write with regular API. We need to just decorate the function that we use in the PythonOperator with `@task` and Airflow will take care of the rest by passed XCom data between tasks. Example:

<img src="images/taskflow-example.png">

```python
from airflow.decorators import task


with DAG(...) as dag:
    start = DummyOperator(task_id="start")
    start >> fetch_sales
    start >> fetch_weather
    fetch_sales >> clean_sales
    fetch_weather >> clean_weather
    [clean_sales, clean_weather] >> join_datasets
    
    @task
    def train_model():
        model_id = str(uuid.uuid4())
        # Airflow will figure out that the return value is XCom
        # and would take care of pushing it
        return model_id

    @task
    def deploy_model(model_id: str):
        # Airflow would realize that this task uses XCom so it passes
        # it from XCom
        print(f"Deploying model {model_id}")

model_id = train_model()
deploy_model(model_id)

# Now train_model and deploy_model will be new tasks
# with explicit dependeny. 
# The task type is PythonDecoratedOperator
join_datasets >> model_id
```
- Any data passed between Taskflow-style tasks will be stored as XComs and subject to the same limitations of XCom
- The main limitation of Taskflow API is that it is still only for PythonOperators

- Workflows are most commonly triggered based on schedule intervals provided using `start_date`, `end_date` , `schedule_interval`. Airflow would calculate when the next schedule would be and start the first task(s) to run at the next data/time.
- However, sometimes we want the workflow to run based on the occurance of external events such as a file is available in specific location OR code is changed on git repo etc.
- One way to execute workflows based on the occurance of external exents is using Airflow's **sensors**. Sensor is a subclass of operators that checks if certain condition is true. If true, execute the step (workflow). If false, wait for a given period (default 60 seconds) and tries again. It keeps doing so for *timeout* period. This is a form of **Poking**, which is checking for the existence of file in the case of FileSensor.

```Python
from airflow.sensors.filesystem import FileSensor
wait_for_file_1 = FileSensor(
    task_id="wait_for_file_1", filepath="/data/file_1.csv"
    )
```
- We can also use **globbing** with FileSensors by using wildcards to check for the existence of file(s)
- We can also use PythonSensor which checks for certain condition and must return a Boolean. It is more flexible and easier to read than using globbing within FileSensor. It is the same as PythonOperator in terms of taking a Python callable

```Python
from pathlib import Path
from airflow.sensors.python import PythonSensor

# Check whether there is any data for a given supermarker
# and there is _SUCCESS path which indicates whether the 
# data for the given supermarket is all uploaded
def _wait_for_supermarket(supermarket):
    supermarket_path = Path("/data") / supermarket
    success_path = Path("/data") / "_SUCCESS"
    data_files = supermarketpath.glob("*.csv")
    return data_files and success_path.exists()

wait_for_supermarket_1 = PythonSensor(
    task_id="wait_for_supermarket_1",
    python_callable=_wait_for_supermarket,
    op_kwargs={"supermarket": "supermarket_1"},
    dag=dag
    )
```
<img src="images/python-sensor.png">

- All sensors take a `timeout` arguments, which has default value of 7 days
- There is also a limit on the number of tasks Airflow can run concurrently per DAG (default is 16). DAG takes `concurrency` argument that can change this number. There is also a limit on the number of tasks per global Airflow and the number DAG runs per DAG
```Python
wait_for_supermarket_1 = PythonSensor(
    task_id="wait_for_supermarket_1",
    python_callable=_wait_for_supermarket,
    op_kwargs={"supermarket": "supermarket_1"},
    concurreny=20, # Default is 16
    dag=dag
    )
```
- There is snowball effect when sensors don't succeed. The occupy slots that DAG has (which is determined by the concurrency argument. From the above figure, if only task 1 succeeds and the rest keeps polling and the DAG is scheduled daily with default concurrency of 16 slots and default timeout of 7 days, this is what will happen (sensor deadlock):
    - Day 1: Supermarket 1 succeeded; supermarkets 2, 3, and 4 are polling, occupying 3 tasks.
    - Day 2: Supermarket 1 succeeded; supermarkets 2, 3, and 4 are polling, occupying 6 tasks.
    - Day 3: Supermarket 1 succeeded; supermarkets 2, 3, and 4 are polling, occupying 9 tasks.
    - Day 4: Supermarket 1 succeeded; supermarkets 2, 3, and 4 are polling, occupying 12 tasks.
    - Day 5: Supermarket 1 succeeded; supermarkets 2, 3, and 4 are polling, occupying 15 tasks.
    - Day 6: Supermarket 1 succeeded; supermarkets 2, 3, and 4 are polling, occupying 16 tasks; two new tasks cannot run, and any other task trying to run is blocked.

<img src="images/python-sensor-deadlock.png">

- This also affect the global Airflow limit of maximum number of tasks that can run concurrently, which may lead to whole system get stalled. 
- For sensor task, it pokes to check the condition and block if it is false. So it would run for a little bit and wait for the most part. It keeps poking untel the timeout period is completed, which means it keeps occupying the slot until the condition becomes true or timeout is reached
- `mode` argument which has two values: {`poking`, `reschedule`}. The default is poking. Reschedule can solve the sensor deadlock and snowball effect because it releases the slot the sensor task is occupying after the slot has finished poking. In other words, sensor task would poke, if condition if false, the system will reschedule it and take its slot and make it available to other tasks. It is the same concept as **process scheduling** that the OS does when a process does a blocking system call.

```Python
wait_for_supermarket_1 = PythonSensor(
    task_id="wait_for_supermarket_1",
    python_callable=_wait_for_supermarket,
    op_kwargs={"supermarket": "supermarket_1"},
    mode="reschedule",
    dag=dag
    )
```
- We can trigger another DAG to run from inside another DAG using `TriggerDagRunOperator`. This will cause another DAG to run once the trigger_operator runs which is useful if we want to split DAGs and make some DAGs available to other DAGs instead of repearing functionality. See below for both approaches:
<img src="images/complicated-dag-logic.png">
<img src="images/triggered-dag.png">

```Python
from pathlib import Path

import airflow.utils.dates
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.python import PythonSensor

dag1 = DAG(
    dag_id="ingest_supermarket_data",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="0 16 * * *",
)
dag2 = DAG(
    dag_id="create_metrics",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval=None, # Since it will be triggered
)


def _wait_for_supermarket(supermarket_id_):
    supermarket_path = Path("/data/" + supermarket_id_)
    data_files = supermarket_path.glob("data-*.csv")
    success_file = supermarket_path / "_SUCCESS"
    return data_files and success_file.exists()


for supermarket_id in range(1, 5):
    wait = PythonSensor(
        task_id=f"wait_for_supermarket_{supermarket_id}",
        python_callable=_wait_for_supermarket,
        op_kwargs={"supermarket_id_": f"supermarket{supermarket_id}"},
        dag=dag1,
    )
    copy = DummyOperator(task_id=f"copy_to_raw_supermarket_{supermarket_id}", dag=dag1)
    process = DummyOperator(task_id=f"process_supermarket_{supermarket_id}", dag=dag1)
    trigger_create_metrics_dag = TriggerDagRunOperator(
        task_id=f"trigger_create_metrics_dag_supermarket_{supermarket_id}",
        trigger_dag_id="create_metrics", # Has to be the same dag_id as dag2
        dag=dag1,
    )
    wait >> copy >> process >> trigger_create_metrics_dag

compute_differences = DummyOperator(task_id="compute_differences", dag=dag2)
update_dashboard = DummyOperator(task_id="update_dashboard", dag=dag2)
notify_new_data = DummyOperator(task_id="notify_new_data", dag=dag2)
compute_differences >> update_dashboard

```
- Each DAG run has a run_id that starts with one of the following:
    - `scheduled__` to indicate the DAG run started because of its schedule
    - `backfill__` to indicate the DAG run started by a backfill job
    - `manual__` to indicate the DAG run started by a manual action (e.g., pressing the Trigger Dag button, or triggered by a TriggerDagRunOperator)
- From the UI, scheduled DAGs have their task instance in black border while Triggered DAGs don't
- Clearing a task in a DAG will clear the task and all its downstream tasks and trigger a run (backfill)
    - It only clears tasks within the same DAG, NOT downstream tasks in another DAG of TriggerDagRunOperator
- If the triggered DAG has dependency on multiple triggering DAGs to be completed before it can run, then we can use `ExternalTaskSensor` that checks whether the task has been completed successfully (sensor poking the state of tasks in another DAGs). Each `ExternalTaskSensor` checks for only 1 task by querying the metastore database
    - By default, it uses the same execution_date as itself
    - If the task runs on different schedule, we then need to provide timedelta object to `execution_delta` argument to get what would be the execution_date of the task it tries to sense

```Python
import datetime

import airflow.utils.dates
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.sensors.external_task import ExternalTaskSensor

dag1 = DAG(
    dag_id="ingest_supermarket_data",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="0 16 * * *",
)
dag2 = DAG(
    dag_id="create_metrics",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="0 18 * * *",
)

DummyOperator(task_id="copy_to_raw", dag=dag1) >> DummyOperator(
    task_id="process_supermarket", dag=dag1
)

wait = ExternalTaskSensor(
    task_id="wait_for_process_supermarket",
    external_dag_id="figure_6_20_dag_1",
    external_task_id="process_supermarket",
    # positive # will be subtracted from the execution_date of task sensor
    # to get the execution_date of the task it is trying to sense
    execution_delta=datetime.timedelta(hours=6),  
    dag=dag2,
)
report = DummyOperator(task_id="report", dag=dag2)
wait >> report
```
- We can also trigger DAGs from CLI which will have execution_date of the current data and time
    - `airflow dags trigger dag1`
    - With configuration; which will be available in the context of each task using context["dag_run"].conf:
        - `airflow dags trigger -c '{"supermarket_id": 1}' dag1`
        - `airflow dags trigger --conf '{"supermarket_id": 1}' dag1`

- Stick to coding style conventions by using tools like flake8, pylint, black
- There are two ways to define DAGs. Stick to one of them:
    1. With context manager: 
```Python
   with DAG(...) as dag:
        task1 = PythonOperator(...)
        task2 = PythonOperator(...)
```
    2. Traditional:
```Python
   dag = DAG(...)
   task1 = PythonOperator(..., dag=dag)
   task2 = PythonOperator(..., dag=dag)
```
- There are also multiple ways to define dependencies. Stick to one of them:
```Python
    task1 >> task2
    task1 << task2
    [task1] >> task2
    task1.set_downstream(task2)
    task2.set_upstream(task1)
```
- When loading config files, make sure to understand where the loading happens:
    - At the top level on the scheduler
    - At the DAG level when it is parsed
    - Or when the DAG is executing -> in the worker
- Avoid doing any computation in DAG definition:
    - At the top level, it will be computed every time the DAG is loaded
    - In the DAG definition, it will be executed every time the DAG is parsed by the scheduler
    - In the task, it will be computed when the task is executed on the worker machine
- Fetch credentials within the task, so they are only fetched once the task is executed
- Use factory methods to generate DAGs or set of tasks that are almost typical with few minor changes. Example:

```Python
def generate_tasks(dataset_name, raw_dir, processed_dir, preprocess_script, output_dir, dag):
    raw_path = os.path.join(raw_dir, dataset_name, "{ds_nodash}.json") 
    processed_path = os.path.join(
    processed_dir, dataset_name, "{ds_nodash}.json" )
    output_path = os.path.join(output_dir, dataset_name, "{ds_nodash}.json")
    fetch_task = BashOperator(
        task_id=f"fetch_{dataset_name}",
        bash_command=f"echo 'curl http://example.com/{dataset_name}.json{raw_path}.json'", dag=dag,
        )
    preprocess_task = BashOperator(
        task_id=f"preprocess_{dataset_name}",
        bash_command=f"echo '{preprocess_script} {raw_path} {processed_path}'", dag=dag,
    )
    export_task = BashOperator(
        task_id=f"export_{dataset_name}",
        bash_command=f"echo 'cp {processed_path} {output_path}'", dag=dag,
       )
        fetch_task >> preprocess_task >> export_task
    return fetch_task, export_task

with DAG(
    dag_id="01_task_factory",
    start_date=airflow.utils.dates.days_ago(5),
    schedule_interval="@daily",
) as dag:
    for dataset in ["sales", "customers"]:
        generate_tasks(
            dataset_name=dataset,
            raw_dir="/data/raw", 
            processed_dir="/data/processed", 
            output_dir="/data/output",
            preprocess_script=f"preprocess_{dataset}.py", dag=dag
        )
```
- We can use `TaskGroup` to group related tasks into groups that will help us navigating the DAG in the UI. This is very helpful when DAGs become very complicated
- Create new DAGs for big changes such as renaming/removing tasks or changing the schedule_date/interval so we can keep the historical info about old DAGs and not confuse the scheduler. Scheduler database has instances of the runs of each DAG
- Make sure that tasks are idempotenet -> Regardless when they run, If given the same input the should produce the same output. Therefore, be careful when writing data. We may want to overwrite or upsert to avoid appending the same data
    - Also, tasks should not have side effects
- Avoid writing intermediate results on local filesystem because each task runs independently (and mostly on different machines) -> Use cloud shared storage such as Amazon's S3 bucket where all workers can access it
- We can use SLAs on each DAG/task where Airflow will notify if they don't finish within SLA. DAG takes `sla` argument