# Airflow

## Definition
- **Airflow** is an open-source platform for developing, scheduling, and monitoring batch-oriented workflows.
- **Airflow** is a platform that lets you build and run workflows. A workflow is represented as a **DAG (a Directed Acyclic Graph)**, and contains individual pieces of work called Tasks, arranged with dependencies and data flows taken into account.
    
    ```py
    # Example of DAG
    from datetime import datetime
    from airflow import DAG
    from airflow.decorators import task
    from airflow.operators.bash import BashOperator

    # A DAG represents a workflow, a collection of tasks
    with DAG(dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *") as dag:
        # Tasks are represented as operators
    hello = BashOperator(task_id="hello", bash_command="echo hello")

    @task()
    def airflow():
        print("airflow")

    # Set dependencies between tasks
    hello >> airflow()
    ```

## DAG
- **DAG (Directed Acyclic Graph)** is the core concept of Airflow, collecting Tasks together, organized with dependencies and relationships to say how they should run. The **DAG** itself doesn’t care about what is happening inside the tasks; it is merely concerned with how to execute them - the order to run them in, how many times to retry them, if they have timeouts, and so on.
    
    ![image.png](https://airflow.apache.org/docs/apache-airflow/stable/_images/basic-dag.png)
    > Based on the example above. This DAG defines four Tasks - A, B, C, and D - and dictates the order in which they have to run, and which tasks depend on what others. It will also say how often to run the DAG - maybe “every 5 minutes starting tomorrow”, or “every day since January 1st, 2020”.

- Three ways to declare a DAG:
    - using Context Manager (`with` statement):
        ```py
        # Example
        import datetime

        from airflow import DAG
        from airflow.operators.empty import EmptyOperator

        with DAG(
            dag_id="my_dag_name",
            start_date=datetime.datetime(2021, 1, 1),
            schedule="@daily",
        ):
            EmptyOperator(task_id="task")
        ```
    - using standard constructor:
        ```py
        # Example
        import datetime

        from airflow import DAG
        from airflow.operators.empty import EmptyOperator

        my_dag = DAG(
            dag_id="my_dag_name",
            start_date=datetime.datetime(2021, 1, 1),
            schedule="@daily",
        )
        EmptyOperator(task_id="task", dag=my_dag)
        ```
    - using `@dag` decorator:
        ```py
        # Example
        import datetime

        from airflow.decorators import dag
        from airflow.operators.empty import EmptyOperator


        @dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
        def generate_dag():
            EmptyOperator(task_id="task")


        generate_dag()
        ```

## Task/Operators
- A **Task** is the basic unit of execution in Airflow. Tasks are arranged into DAGs, and then have upstream and downstream dependencies set between them into order to express the order they should run in.
    ```py
    # Set dependencies between tasks using bitshift (>>) operators
    first_task >> second_task

    # or using set_downstream()
    first_task.set_downstream(second_task)

    # or using set_upstream()
    second_task.set_upstream(first_task)
    ```
- An **Operator** is conceptually a template for a predefined Task, that you can just define declaratively inside your DAG
- Airflow has a very extensive set of operators available, with some built-in to the core or pre-installed providers. Some popular operators include:
    - BashOperator - executes a bash command
    - PythonOperator - calls an arbitrary Python function
    - EmailOperator - sends an email
    - For a list of all core operators, see: [Core Operators and Hooks Reference.](https://airflow.apache.org/docs/apache-airflow/stable/operators-and-hooks-ref.html)
    ```py
    # Example
    with DAG("my-dag") as dag:
        # create task using HttpOperator
        ping = HttpOperator(endpoint="http://example.com/update/")

        # create task using EmailOperator
        email = EmailOperator(to="admin@example.com", subject="Update complete")

        # set dependencies between tasks
        ping >> email
    ```



## Scheduling

- You may set your DAG to run on a simple **schedule** by setting its `schedule` argument to either:
    - a cron expression
        - **Cron** is a job scheduler on Unix-like operating systems.
        - The actions of cron are driven by a **crontab** (cron table)
            ```
            # ┌───────────── minute (0–59)
            # │ ┌───────────── hour (0–23)
            # │ │ ┌───────────── day of the month (1–31)
            # │ │ │ ┌───────────── month (1–12)
            # │ │ │ │ ┌───────────── day of the week (0–6) (Sunday to Saturday;
            # │ │ │ │ │                                   7 is also Sunday on some systems)
            # │ │ │ │ │
            # │ │ │ │ │
            # * * * * * <command to execute>
        - To learn more about Cron, you can check https://crontab.guru/ for more.
    - a `datetime.timedelta` object, 
    - or one of the Cron Presets.
        - Cron Presets is a predefined cron expression that included in Airflow.
        - List of presets https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/cron.html#cron-presets
- Example DAG that implements scheduling:
    ```py
    # using cron expression
    with DAG("cron_expression_dag", schedule="0 0 * * *"): # will run the DAG every 00:00
    ...

    # using cron preset
    with DAG("cron_preset_dag", schedule="@once"): # will run the DAG once
        ...

    # using datetime.timedelta object
    with DAG("timedelta_dag", schedule=datetime.timedelta(days=1)): # will run the DAG every hour
        ...
    ```

## References
- Airflow Core Concepts https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/index.html
- Scheduling https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/cron.html
- Cron https://en.wikipedia.org/wiki/Cron#CRON_expression
- Schedule DAGs in Airflow https://www.astronomer.io/docs/learn/scheduling-in-airflow