### Basics

#### Simple DAG structure

In [None]:

import datetime
from airflow import DAG

from airflow.decorators import task
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

default_dag_args = {
    "start_date": datetime.datetime(2018, 1, 1)
}

# Begin of workflow
with DAG(
    "simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args
) as dag:
    
    @task()
    def greeting():
        import logging
        
        logging.info("Hi")
        
    
    # Task calls function greeting()
    hello_python = PythonOperator(task_id="hi", python_callable=greeting)
    
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")
    
    # Define order of tasks with << and >>
    hello_python >> goodbye_bash

#### Bash Operator

Der BashOperator entählt folgende Pakete:  
    - gcloud command, including the gcloud storage sub-command for working with Cloud Storage buckets.  
    - bq command   
    - kubectl command


In [None]:

from airflow.operators import bash

# Create BigQuery output dataset.
make_bq_dataset = bash.BashOperator(
    taskt_id="make_bq_dataset"
    
    bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
)

#### PythonOperator

Der Python Code wird bei Cloud Composer in einem container mit dem [Cloud Composer Image](https://cloud.google.com/composer/docs/concepts/versioning/composer-versions).

In [None]:
from airflow.operators.python import PythonOperator
    
@task()
def greeting():
    import logging
    
    logging.info("Hi")
    
# Task calls greeting() function
hello_python = PythonOperator(task_id="hi", python_callable=greeting)

#### Google Cloud Operators

Es gibt sehr viele [Google Cloud Operatoren](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/index.html).  

Der Link führt zur Doku.


#### EmailOperator

Was macht der wohl?  
Benötigt weiter Konfiguration im Composer environment.  

[12.000 Kostenlose Emails im Monat](https://cloud.google.com/composer/docs/configure-email)   (Anleitung zu Konfiguration)

In [None]:
from airflow.operators import email

min_query_date = 0
max_query_date = 0 
output_file="path/to/output/file"

# Send email confirmation 
email_summary = email.EmailOperator(
    task_id="email_summary",
    to="{{var.value.email}}",
    subject="Sample BigQuery notify data ready",
    html_content="""
    Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
    12AM. The most popular question was '{question_title}' with
    {view_count} views. Top 100 questions asked are now available at:
    {export_location}.
    """.format(
        min_date=min_query_date,
        max_date=max_query_date,
        question_title=(
            "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
            "key='return_value')[0][0] }}"
        ),
        view_count=(
            "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
            "key='return_value')[0][1] }}"
        ),
        export_location=output_file,
    ),
)


#### Notification on operator failure

Man kann `email_on_faile` auf `True` setzen und erhält dann eine Benachrichtigung, wenn der DAG fehlschlägt.  
Muss in einem Composer environment ebenfalls konfiuriert werden.

In [None]:
from airflow import models

from airflow.operators.python import PythonOperator

yesterday = "Gestern"
project_id = "gcloud project id"

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email":"{{var.value.email}}",
    "email_on_failure":True,
    "email_on_retry":False,
    "retries":1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id":project_id
}

with DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args
) as dag:
    
    @task()
    def HelloWorld():
        import logging
        
        logging.info(Hello World) # Wird schiefgehen!
    
    # Tasks
    hello_world = PythonOperator(task_id="HelloWorld", python_callable="HelloWorld")

#### Best practices

+ Custom libraries  
    Platziere keine custom libraries in das top-level des `/dags` Ordners.

+ Pro Python module nur ein DAG (Auch keine SubDAGs). Davor solllen die Tasks in einem DAG gruppiert werden.

+ [Teste deine DAGs](https://cloud.google.com/composer/docs/how-to/using/testing-dags)


#### Info / Resources

+ [How-to Guides](https://airflow.apache.org/docs/apache-airflow/2.9.3/howto/index.html)

### Advanced mechanisms

#### Group tasks inside DAGs

Um tasks in einem DAG zu gruppieren, werden sogenannte relationships verwendet. 


![image.png](./DokuData/images/group_tasks_dag.png)


Hier laufen op-1 und op-2 gleichzeitig, so wie op-3 und op-4. Das kann erreicht werden, indem man eine relationship zwischen des tasks herstellt. Abgeleitet aus der vorher beschriebenen Syntax für die Reihenfolge ergibt sich: 

`start >> [task_1, task_2]`  

Mit den `[]` Klammern wird eine relationship / group ausgedrückt. 


In [None]:
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago

DAG_NAME= "all_tasks_in_on_dag"

args = {"owner":"airflow","start_date":days_ago(1),"schedule_interval":"@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    start = EmptyOperator(task_id="start")
    
    task_1 = BashOperator(task_id="op-1", bash_command=":", dag=dag)
    task_2 = BashOperator(task_id="op-2", bash_command=":", dag=dag)
    
    some_other_task = BashOperator(task_id="some-other-task", bash_command=":", dag=dag)
    
    task_3 = BashOperator(task_id="op-3", bash_command=":", dag=dag)
    task_4 = BashOperator(task_id="op-4", bash_command=":", dag=dag)
    
    end = EmptyOperator(task_id="end")
    
    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

#### Triggering children DAGs from a parent DAG

+ `TriggerDagRunOperator`  

![](./DokuData/images/trigger_child_dag.png)

In [None]:
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago

DAG_NAME = "trigger_child_dag"

with DAG(dag_id=DAG_NAME,
         default_args={"owner":"airflow"},
         start_date=days_ago(1),
         schedule_interval="@once",
) as dag:
    start = EmptyOperator(task_id="start")
    
    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",
        conf={"message":"Hello World"}
    )
    
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",
        conf={"message":"Hello World"}
    )
    
    some_other_task = EmptyOperator(task_id="some-other-task")
    
    end = EmptyOperator(task_id="end")
    
    start >> trigger_1 >> some_other_task >> trigger_2 >> end

In [None]:
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "dag-to-start"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    dag_task = EmptyOperator(task_id="dag-task")

Creating Taskgroups

+ Nutzt den `TaskgroupOperator``

![taskgroup example](./DokuData/images/taskgroup.png)

In [None]:
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

with DAG(dag_id="taskgroup example", start_date=days_ago(1)) as dag:
    start = EmptyOperator(task_id="start")
    
    with TaskGroup("taskgroup_1", tooltip="task group #1") as section_1:
        task_1 = BashOperator(task_id="op-1", bash_command=":")
        task_2 = BashOperator(task_id="op-2", bash_command=":")
        
    with TaskGroup("taskgroup_2", tooltip="task group #2") as section_2:
        task_1 = BashOperator(task_id="op-3", bash_command=":")
        task_2 = BashOperator(task_id="op-4", bash_command=":")
        
    some_other_task = EmptyOperator(task_id="some-other-task")
    
    end = EmptyOperator(task_id="end")
    
    start >> section_1 >> some_other_task >> section_2 >> end

### Tipps & Tricks

#### Adding tags to DAGs for filtering in the UI

In [None]:
DAG(dag_id="example_dag_tag", schedule="0 0 * * *", tags=["example"])

#### Add owner links to a DAG

In [None]:
DAG(dag_id="example_dag_tag", schedule="0 0 * * *", tags=["example"], owner_links="sdd":"https://www.schwaebisch-media.de/")

#### Notifiers

In [None]:
from airflow.notifications.basenotifier import BaseNotifier
