Create a basic shell script


Here we will define a shell script extract_transform_load.sh which will define a pipeline of tasks such as

-extract
-transform
-load
For now, let the shell script have basic echo statements for extract,transform and load.

#!/bin/bash

echo "Extract data"

echo "Transform data"

echo "Load data"

An Apache Airflow DAG is a python program. It consists of these logical blocks.

Imports
DAG Arguments
DAG Definition
Task Definitions
Task Pipeline
A typical imports block looks like this.

In [None]:
# import the libraries

from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to write tasks!
from airflow.operators.bash_operator import BashOperator
# This makes scheduling easy
from airflow.utils.dates import days_ago

In [None]:
#defining DAG arguments

# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'Ramesh Sannareddy',
    'start_date': days_ago(0),
    'email': ['ramesh@somemail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

DAG arguments are like settings for the DAG.

The above settings mention

the owner name,
when this DAG should run from: days_age(0) means today,
the email address where the alerts are sent to,
whether alert must be sent on failure,
whether alert must be sent on retry,
the number of retries in case of failure, and
the time delay between retries.
A typical DAG definition block looks like this.

In [None]:
# define the DAG
dag = DAG(
    dag_id='sample-etl-dag',
    default_args=default_args,
    description='Sample ETL DAG using Bash',
    schedule_interval=timedelta(days=1),
)

Here we are creating a variable named -dag- by instantiating the DAG class with the following parameters.

-sample-etl-dag- is the ID of the DAG. This is what you see on the web console.

We are passing the dictionary -default_args-, in which all the defaults are defined.

-description- helps us in understanding what this DAG does.

-schedule_interval- tells us how frequently this DAG runs. In this case every day. (days=1).

A typical -task definitions- block looks like this:

In [None]:
# define the tasks

# define the task named extract_transform_and_load to call the shell script
extract_transform_and_load = BashOperator(
    task_id='extract_transform_and_load',
    bash_command='/home/project/airflow/dags/extract_transform_load.sh "',
    dag=dag,
)

A task is defined using:

A task_id which is a string and helps in identifying the task.
What bash command it represents. Here we are calling the shell script extract_transform_load.shwhich we previously defined
Which dag this task belongs to.
A typical task pipeline block looks like this:

In [None]:
# task pipeline
extract_transform_and_load

# Submit a DAG
Submitting a DAG is as simple as copying the DAG python file into dags folder in the AIRFLOW_HOME directory.

Open a terminal and run the command below to submit the DAG that was created in the previous exercise.

Note: While submitting the dag that was created in the previous exercise, use sudo in the terminal before the command used to submit the dag.

 cp my_first_dag.py $AIRFLOW_HOME/dags
 
# Next, run the command below one by one to submit shell script in the dags folder and to change the permission for reading shell script.

 cp my_first_dag.sh $AIRFLOW_HOME/dags
 cd airflow/dags
 chmod 777 my_first_dag.sh
 
# Verify that our DAG actually got submitted.

# Run the command below to list out all the existing DAGs

airflow dags list

#  Verify that my-first-dag is a part of the output.

airflow dags list|grep "my-first-dag"

# You should see your DAG name in the output.

Run the command below to list out all the tasks in my-first-dag.

airflow tasks list my-first-dag