---
title: "Airflow Part 2 - DAGs"
date: "2022-01-17"
image: feature.png
categories: ["Data Engineering", "MLOps", "Airflow"]
---

![](feature.png){fig-align="center"}

:::{.callout-note}
Other than my experience and the documentation, the main resource behind this post and figures is the fantastic book :[_Data Pipelines with Apache. Airflow](https://www.manning.com/books/data-pipelines-with-apache-airflow).
:::

- `DAG()` class is needed to instantiate a **DAG** which will be the starting point of any workflow.
    - The required arguments are: `dag_id` which is the name Airflow web UI uses to display workflow. `start_date` which is when to start running the workflow, it can be in the past
    - There are other arguments such as `schedule_interval` which determines the schedule to rerun the DAG
- `Operator` is responsible for a piece of work and almost represents a task.
    - It has `task_id` which is the name web UI uses to display the task
    - There are many operators such as `BashOperator`, `PythonOperator` ... All of them inherits from `BaseOperator`
    - Some operators are generic such as `BashOperator` and some are specific such as `EmailOperator`
- `Task` is a wrapper/manager over operator that makes sure the operator gets executed
- `>>` represents the dependencies between tasks
    - `a` >> `b` means `a` should run before `b`
- Airflow UI offers two views:
    - **tree view** that shows the DAG runs over time. Each column is one run. Each row is a task. So we can inspect status of tasks over time
    - **graph view** that shows the DAG as a graph which helps showing the dependencies of tasks in the workflow
- If any task failed, all successive tasks that depend on it don't run
    - We can rerun the failed tasks (which also would cause successive tasks to rerun) w/o having to rerun the workflow from scratch
    - We can inspect the logs to see what was the reason for the errors
- Tasks can run in parallel depending on their dependencies

- To setup Airflow locally inside Python virtual env:
    - pip install apache-airflow
    - airflow init db # Initialize metastore locally using SQLite; not recommended for production
    - airflow users create --username admin --password admin --firstname Anonymous --lastname Admin --role Admin --email admin@example.org # Create user
    - airflow webserver # Start web server to use web UI
    - airflow scheduler # Start scheduler, don't use sequential in production

In [None]:
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

f = lambda: print(1)
dag = DAG(dag_id="simple-workflow", start_date=airflow.utils.dates.days_ago(10))
a = BashOperator(task_id="bash", bash_command="echo 'a'", dag=dag)
b = PythonOperator(task_id="python", python_callable=f, dag=dag)
a >> b


<Task(PythonOperator): python>