# DAGs development 

Before developing DAGs with Airflow and Postgres, we need to add a connection for Airflow to find the databases. 
As usual, we login to the cluster.

In [None]:
# Replace the command with your own one inside the single quotes and run the cell
# Example OC_LOGIN_COMMAND='oc login --token=sha256~3bR5KXgwiUoaQiph2_kIXCDQnVfm_HQy3YwU2m-UOrs --server=https://c109-e.us-east.containers.cloud.ibm.com:31656'
OC_LOGIN_COMMAND='_replace_this_string_by_pasting_the_clipboard_'
$OC_LOGIN_COMMAND

Then, we need to retrieve two values (the hostname and the port) that we will use immediately. Prepare for copy-and-paste them.

In [None]:
internalservice=$(oc get svc | grep ClusterIP | awk '{print $1}')
internalhostname=$(oc get svc $internalservice -o go-template --template='{{.metadata.name}}.{{.metadata.namespace}}.svc.cluster.local')
internalport=$(oc get svc | grep ClusterIP | awk '{print $5}' | cut -f1 -d'/')
echo Internal hostname of Postgres: $internalhostname
echo Internal port of Postgres: $internalport

In order to create the connection, we need to access the Airflow admin interface as we did during the **Airflow Deployment** section:

![](../pictures/airflowroute.png)



Copy-and-paste the values we obtained before in the new connection menu:

![](../pictures/airflow_postgres_conn.png)

In [None]:
# Install the python package to access Postgres
oc project airflow
oc rsh  --shell=/bin/bash airflow-worker-0 /home/airflow/.local/bin/pip install 'databand[spark,airflow,postgres]'
POD_SCHEDULER=$(oc get pods | grep airflow-scheduler | awk '{print $1}')
oc rsh  --shell=/bin/bash $POD_SCHEDULER /home/airflow/.local/bin/pip install 'databand[spark,airflow,postgres]'
echo 'databand[spark,airflow,postgres]'installed in airflow-worker-0 and $POD_SCHEDULER

In [None]:
oc rsh  --shell=/bin/bash airflow-worker-0  mkdir -p /opt/airflow/dags/sql

In [None]:
pwd

Now, we will transfer some files from our local machine to the Airflow containers. Please ensure that you are in the localdirectory of the DAGs 

In [None]:
# you may need to modify the cd command to place yourself in the DAGs directory
cd ../dags
ls -l

If you did it right you will see the `motogp_dag.py`file and the `sql`subdirectory. Something like this

Now, we will transfer some files:

In [None]:
oc cp motogp_dag.py airflow-worker-0:dags/ 
for file in sql/*
do 
  oc cp $file airflow-worker-0:dags/sql
done


Wait one minute or two for Airflow to register the DAG and you can visualize it:

![](../pictures/DAG_activate.png)
![](../pictures/DAG_graph.png)

You can review the DAG here (do not execute the cells)

`motogp_dag.py`

In [None]:
# Simple DAG for the Databand hands-on workshop

# These are mandatory imports
from __future__ import annotations
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator
from airflow.operators.postgres_operator import PostgresOperator
from datetime import datetime
from random import random


# An auxiliary function to decide either drop a table or delete its contents
def delete_or_drop():
    if random() < 0.5:
        return("motogp_delete_table")
    else:
        return("motogp_drop_table")

# The main body of the DAG with all its tasks. 
# Notice the argument postgres_conn_id: it must match a connection name in Airflow
with DAG(
    dag_id="motogp_postgres",
    start_date=datetime(2023, 1, 1),
    schedule="@once",
    catchup=False,
) as dag:
    motogp_create_table = PostgresOperator (
        task_id="motogp_create_table",
        postgres_conn_id="postgres_motogp",
        sql="sql/motogp_create_table.sql"
    )
    motogp_load_table = BashOperator (
        task_id="motogp_load_table",
        bash_command="python3 /opt/airflow/dags/sql/motogp_load_table.py"
    )
    motogp_select_table = PostgresOperator (
        task_id="motogp_select_table",
        postgres_conn_id="postgres_motogp",
        sql="sql/motogp_select_table.sql"
    )
    conditional_task = BranchPythonOperator(
        task_id="delete_or_drop",
        python_callable=delete_or_drop
    )
    motogp_delete_table = PostgresOperator (
        task_id="motogp_delete_table",
        postgres_conn_id="postgres_motogp",
        sql="sql/motogp_delete_table.sql"
    ) 
    motogp_drop_table = PostgresOperator (
        task_id="motogp_drop_table",
        postgres_conn_id="postgres_motogp",
        sql="sql/motogp_drop_table.sql"
    )

# These are the DAG dependencies
motogp_create_table >> motogp_load_table >> motogp_select_table >> conditional_task
conditional_task >> motogp_delete_table
conditional_task >> motogp_drop_table


`motogp_create_table.sql`

In [None]:
create table 
if not exists
motogp(circuit varchar(40), 
      class char(6),
      constructor varchar(40),
      country char(2),
      rider varchar(40),
      season integer)
;

`motogp_load_table.py`

In [None]:
import  psycopg2

# This code will not be considered as a DAG by Airflow because it is inside a function

def load_data():

    sql = "copy motogp from STDIN delimiter ';' csv  header"

    conn = psycopg2.connect("dbname='postgres' user='postgres' host='postgresql.postgres.svc.cluster.local' password='postgres'")
    cur = conn.cursor()

    with open("/opt/airflow/dags/sql/motogp.csv", "r") as file:
        cur.copy_expert(sql, file)
    cur.close()

load_data()

`motogp_select_table.sql`

In [None]:
select count(*) from motogp
;

`motogp_delete_table.sql`

In [None]:
delete from motogp
;

`motogp_drop_table.sql`

In [None]:
drop table motogp
;

`motogp.csv`


In [None]:
Circuit;Class;Constructor;Country;Rider;Season
Circuit Of The Americas;Moto3;KTM;ES;Jaume Masia;2022
Circuit Of The Americas;Moto2;Kalex;IT;Tony Arbolino;2022
Circuit Of The Americas;MotoGP;Ducati;IT;Enea Bastianini;2022
Termas de Río Hondo;Moto3;GASGAS;ES;Sergio Garcia;2022
Termas de Río Hondo;MotoGP;Aprilia;ES;Aleix Espargaro;2022
Termas de Río Hondo;Moto2;Kalex;IT;Celestino Vietti;2022
Pertamina Mandalika Circuit;MotoGP;KTM;PT;Miguel Oliveira;2022
Pertamina Mandalika Circuit;Moto2;Kalex;TH;Somkiat Chantra;2022
Pertamina Mandalika Circuit;Moto3;Honda;IT;Dennis Foggia;2022
Lusail International Circuit;Moto3;Honda;IT;Andrea Migno;2022
Lusail International Circuit;Moto2;Kalex;IT;Celestino Vietti;2022
Lusail International Circuit;MotoGP;Ducati;IT;Enea Bastianini;2022