<a href="https://colab.research.google.com/github/buaindra/gcp_utility/blob/main/gcp/composer/GoogleCloud_Composer_Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Airflow and Composer

#### Airflow Summit 2022: https://www.crowdcast.io/e/airflowsummit2022/


### Airflow Ref
1. Airflow official doc: https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html
1. Airflow Concepts: https://airflow.apache.org/docs/apache-airflow/1.10.4/concepts.html
2. Google doc: https://cloud.google.com/composer/docs/composer-2/trigger-dags

### Try blogs:
1. Medium blog: https://github.com/mbrukman/gcp-pso/blob/master/examples/cloud-composer-examples/composer_dataflow_examples/simple_load_dag.py

## Airflow Components
1. **A scheduler,** which handles both triggering scheduled workflows, and submitting Tasks to the executor to run.

2. **An executor,** which handles running tasks. In the default Airflow installation, this runs everything inside the scheduler, but most production-suitable executors actually push task execution out to workers.

3. **A webserver,** which presents a handy user interface to inspect, trigger and debug the behaviour of DAGs and tasks.

4. **A folder of DAG files,** read by the scheduler and executor (and any workers the executor has)

5. **A metadata database,** used by the scheduler, executor and webserver to store state.



## DAG
> Directed Acyclic Graph
>
1. DAG Ref:
  1. Airflow official doc: https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html

### Dag Run
1. Dag Run Status
  1. success
  2. failed
  3. skipped

2. DAG argument:
  1. **schedule_interval**
        1. it can be a cron expression as str, cron preset, datetime.timedelta(minutes=3)

        2. **Data Interval**: 
        1. means if dag scheduled for **@daily**, then data interval start at midnight of each day and end at midnight of next day.
        2.  **execution date/logical date**:
          1. denotes the start of the data interval, not when dag is actually executed.
  2. **start_date**:
        1. it also points to same logical date.
        1. DAG run will only be scheduled one interval after start_date.  
  3. **catchup:**
        1. the scheduler by default kick of a dag run for any data interval that has not been run since the last data interval. this concept is called Catchup
        2. if catchup = False in DAG argument, then scheduler creates dag run only for latest interval.

  4. **depends_on_past:**
        1. keep it False, then if earlier task has been failed will not impact the next time task execution.

  5. **trigger_rule:**
        1. ALL_SUCCESS = 'all_success'
        2. ALL_FAILED = 'all_failed'
        3. ALL_DONE = 'all_done'
        4. ONE_SUCCESS = 'one_success'
        5. ONE_FAILED = 'one_failed'
  
  6. **default_args:** its default argument to every task. "Owner" argument must needed for every task, which can be pass through default_arg.


### Hook, Sensonsors and Operators
1. Operators: In composer or airflow dag, we are mostly using Operators, each operator creates individual task in airflow dag.
like: PythonOperator, BashOperator

2. Hook: Hooks are interfaces to services external to the airflow cluster. While operators provide a way to create tasks that may or may not communicate with some external service, hooks provide a uniform interface to access external services like S3, Postgres, MYSQL, Hive etc.

3. Sensors: Sensors are special kind of operator that are designed to wait for something to happen.

#### Ref:
https://towardsdatascience.com/apache-airflow-tips-and-best-practices-ff64ce92ef8

### Execution Time
1. The execution time is not the actual run time, but rather the start timestamp of its schedule period.
2. The execution time of manual triggered DAG, would be exactly when it was triggered.

### start_date:

### schedule_interval:

#### Sample DAG code

In [None]:
# from airflow import DAG
from airflow.models.dag import DAG  # DAG object to initiate the DAG
from airflow.operators.bash import BashOperator  # operator to operate as task

import datetime
import pendulum

dag = DAG(
    "tutorial",  # dag_id: unique identifier of your dag
    default_args={
        "owner": "Airflow",
        "depends_on_past": True,
        "retries": 1
        "retry_delay": datetime.timedelta(minutes=3)  # datetime.timedelta(days=1)
    },
    start_date=pendulum.datetime(2015, 12, 1, tz="UTC"),
    description="a simple tutorial dag",
    schedule_interval = "@daily",
    catchup=False,
)

t1 = EmptyOperator(task_id="task", dag=dag)

In the example above, if the DAG is picked up by the scheduler daemon on 2016-01-02 at 6 AM, (or from the command line), a single DAG Run will be created with a data between 2016-01-01 and 2016-01-02, and the next one will be created just after midnight on the morning of 2016-01-03 with a data interval between 2016-01-02 and 2016-01-03.

In [None]:
from airflow import DAG
from airflow.operators.bash import BashOperator
import datetime


with DAG(
    "sample_dag",
    description="dample dag for learning"
    default_args={
        "owner": "Airflow",
    },
    start_date=datetime.datetime.now(),
    schedule_interval=datetime.timedelta(days=1),  
) as dag:

  t1 = BashOperator(
      task_id="print_date",
      bash_command="date",
  )

  t2 = BashOperator(
      task_id='sleep',
      depends_on_past=False,
      bash_command='sleep 5',
      retries=3,
  )

The actual tasks defined here will run in a different context from the context of this script. Different tasks run on different workers at different points in time, which means that this script cannot be used to cross communicate between tasks. Note that for this purpose we have a more advanced feature called **XComs**.

## XComs

#### Ref: https://www.youtube.com/watch?v=8veO7-SN5ZY

1. cross communinication/share data between tasks
2. stored inside airflow metadata database
3. python finction returns data which is by default xcom with key: "return_value".
4. every operator in airflow, push xcom
4. in Python function:
  1. use "ti" as parameter to python function for task_instance object
  2. then push xcom by using ti.xcom_push(key="key", value="value")
  3. then pull xcom by using ti.xcom_pull(key="key", task_ids=\<task_id\>)
5. in bash operator, last print statement, push x_com by default. To avaoid that you can use, "do_xcom_push=False"



In [None]:
# dags/xcom_dag.py

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
#from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.models.param import Param
from airflow.models import Variable

from random import uniform
from datetime import datetime, timedelta


project_id = Variable.get("project_id")
region = eval(Variable.get("region", default_var="us-central1"))

default_args = {
    "depends_on_past": False,
    "project_id": project_id,
    "region": region,
    "start_date": datetime(2020, 1, 1),
    "retries": 1,
    "retry_delay": timedelta(minutes=3)
}

def _training_model(ti):
    accuracy = uniform(0.1, 10.0)
    print(f'model\'s accuracy: {accuracy}')
    ti.xcom_push(key='model_accuracy', value=accuracy)

def _choose_best_model(**kwargs):
    ti = kwargs['ti']  # another way to access ti
    print('choose best model')
    accuracies = ti.xcom_pull(key='model_accuracy', task_ids=['training_model_A', 'training_model_B', 'training_model_C'])
    print(accuracies)

with DAG(dag_id='xcom_dag', 
         schedule_interval='@daily', 
         default_args=default_args, 
         catchup=False,
         tags=["ingestion"]
         params = {
             'SRC_TBL_NM': Param("ALL", type=["string", "array"]),
         },
    ) as dag:

    downloading_data = BashOperator(
        task_id='downloading_data',
        bash_command='sleep 3',
        do_xcom_push=False
    )

    # check below one..
    training_model_task = [
        PythonOperator(
            task_id=f'training_model_{task}',
            python_callable=_training_model
        ) for task in ['A', 'B', 'C']]

    choose_model = PythonOperator(
        task_id='choose_model',
        python_callable=_choose_best_model,
        provide_context=True,  # all default variable will be passed as key-value argument
        #op_kwargs={"key": "{{ params.SRC_TBL_NM }}"}
    )

    downloading_data >> training_model_task >> choose_model

### BashOperator

In [None]:
from airflow.operators.bash_operator import BashOperator

load_file_to_bq = BashOperator(
    task_id="load_file_to_bq",
    bash_command = "bq load --autodetect --source_format=CSV --field_delimiter ',' project:dataset.table_name gs://bucket/prefix/blob_name",
    dag=dag
)

## Variables on Airflow
##### Ref: https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html

1. Stored on Airflow metadata database

1. Variables are Airflow’s runtime configuration concept - a general key/value store that is global and can be queried from your tasks, and easily set via Airflow’s user interface, or bulk-uploaded as a JSON file.

2. Variables are global, and should only be used for overall configuration that covers the entire installation; to pass data from one Task/Operator to another, you should use XComs instead.

In [None]:
from airflow.models import Variable

dict_val = "foo"
Variable.set("foo", dict_val)
foo = Variable.get("foo")


In [None]:
# You can also use them from templates:
# Raw value
echo {{ var.value.<variable_name> }}

# Auto-deserialize JSON value
echo {{ var.json.<variable_name> }}

## PythonOperator

#### Ref: 
1. https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html
2. https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/python/index.html
3. https://airflow.apache.org/docs/apache-airflow/1.10.4/howto/operator/python.html

## BranchPythonOperator



In [None]:
from airflow.operators.python import BranchPythonOperator

def _branching_func(**kwargs):
    ti = kwargs[ti]
    executable_tasks = ti.xcom_pull(key="final_table_name", task_ids="read_config_db")
    new_list = ["tk1_"+str(table) for table in executable_tasks]
    return new_list

read_config_db = PythonOperator(...)

branch_task = BranchPythonOperator(
    task_id="branch_task",
    provide_context=True,
    pyhton_callable=_branching_func,
    do_xcom_push=False,
)

def return_config():
    table_list = ["table_1", "table_2"]
    return table_list

read_config_db >> branch_task   \
>> [DummyOperator(task_id=f"tk1_{table}", dag=dag) for table in return_config()]  \
>> end

## SubDag

## Templating with Jinja
#### Ref:
1. Airflow template ref: https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#templates-ref
2. Jinja: https://jinja.palletsprojects.com/en/latest/api/#custom-filters 

### Uses:
1. Jinja template only can be accessible on operators' templated field.
2. python function also can accept jinja template
3. use config values inside dag:
```python
var = "{{ dag_run.conf['key'] }}"
```

In [None]:
templated_command = dedent(
    """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
)

In [None]:
def run_this_func(**context):
    """
    Print the payload "message" passed to the DagRun conf attribute.
    :param context: The execution context
    :type context: dict
    """
    print("context", context)
    print("Remotely received value of {} for key=message".format(context["dag_run"].conf["key"]))

#PythonOperator usage
run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, dag=dag, provide_context=True)


In [None]:
#BashOperator usage
bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: \'{{ dag_run.conf["key"] if dag_run else "" }}\'"',
    dag=dag
)

In [None]:
#SparkSubmitOperator usage
spark_task = SparkSubmitOperator(
        task_id="task_id",
        conn_id=spark_conn_id,
        name="task_name",
        application="example.py",
        application_args=[
            '--key', '\'{{ dag_run.conf["key"] if dag_run else "" }}\''
        ],
        num_executors=10,
        executor_cores=5,
        executor_memory='30G',
        #driver_memory='2G',
        conf={'spark.yarn.maxAppAttempts': 1},
        dag=dag)

## Sensors
##### ref: https://www.youtube.com/watch?v=fgm3BZ3Ubnw

In [None]:
from airflow.sensors.filesystem import FileSensor

waiting_for_file = FileSensor(
    task_id="waiting_for_file",
    poke_interval=30,  # in every 30 secs, sensor will check for the file
    timeout= 60 * 5,  # 5 mints timeout, best practice to use to avoid deadlock. else your task will work on worker and wait for file and will not be finished.
    mode="reschedule",  # default is "poke". reschedule helps sensor to release the worker during the interval time so other task can use that worker.
    soft_fail=True  # default is False. If its true and execution time is greater than sensor timeout, then it will skip the task.
)

In [None]:
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor

check_gcs_file_exists_or_not = GCSObjectExistenceSensor(
    task_id = "check_gcs_file_exists_or_not",
    bucket = "bucket_name",
    object = "blob_name",
    # schedule_interval=60,
    mode="reschedule",
    poke_interval = 120
)

### ExternalTaskSensor
##### Ref: https://www.youtube.com/watch?v=Oemg-3aiAiI

##### Hints:
1. Not recommend to use
2. Should have same execution time for parent dag and child dag, else need to specify delta time parameter
3. both parent and child dag should have same schedule interval.

In [None]:
from airflow.sensors.extarnal_task import ExternalTaskSensor

extarnal_sensor = ExternalTaskSensor(
    task_id="extarnal_sensor",
    external_dag_id="child_dag",
    external_task_id="child_task",
    timeout=600,
    mode="reschedule",
)

## TriggerDagRunOperator

##### Hints:
1. ** Need to check execution dag time for child dag
2. By default, Trigger dag will not wait for the completion status of target date. Default is, wait_for_completion=False


In [None]:
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger_child_dag = TriggerDagRunOperator(
    task_id="trigger_child_dag",
    trigger_dag_id="child_dag",
    execution_date="{{ ds }}",  # target dag will take current execution date of the trigger dag
    reset_dag_run=True,  # used for backfilling, 
    wait_for_completion=True,  # trigger dag will wait untill target dag has not been completed
    poke_interval=10,  # in every 10 sec it will check target dag completed or not
    dag=dag
)

## Group tasks inside DAGs

In [None]:
from  airflow.utils.task_group import TaskGroup

## Dynamic Task creation

In [None]:
if return_config():
    for table in return_config():  # return_config() will return table list
        with TaskGroup(group_id=f"group_{table}") as tg1:
            t1 = DummyOperator(task_id=f"t1_{table}", dag=dag)
            t2 = DummyOperator(task_id=f"t2_{table}", dag=dag)

            [read_config_db >> t1 >> t2 >> end]

start >> read_config_db >> tg1 >> end

## Dynamic Task Mapping in Airflow 2.3