# Build an ETL Pipeline using Airflow

By following the [coursera ETL course](https://author-ide.skills.network/render?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZF9pbnN0cnVjdGlvbnNfdXJsIjoiaHR0cHM6Ly9jZi1jb3Vyc2VzLWRhdGEuczMudXMuY2xvdWQtb2JqZWN0LXN0b3JhZ2UuYXBwZG9tYWluLmNsb3VkL0lCTS1EQjAyNTBFTi1Ta2lsbHNOZXR3b3JrL2xhYnMvRmluYWwlMjBBc3NpZ25tZW50L0VUTF9QZWVyX1Jldmlld19Bc3NpZ25tZW50Lm1kIiwidG9vbF90eXBlIjoidGhlaWFkb2NrZXIiLCJhZG1pbiI6ZmFsc2UsImlhdCI6MTY3NjYyOTI4NX0.T_PQR8ysLNqqh7dCvle_SLIxOiId8MxqetQQZjimnH4)

* Start Apache Airflow and create a new directory on Airflow dags directory

```sh
sudo mkdir -p /infinity/codes/data_engineering/etl-and-data-pipelines-shell-airflow-kafka/tools/apache-workflow/dags/finalassignment/staging
```

* Download the [dataset](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/tolldata.tgz) to the below destination `finalassignment` directory

```sh
cd /infinity/codes/data_engineering/etl-and-data-pipelines-shell-airflow-kafka/tools/apache-workflow/dags/finalassignment
wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/tolldata.tgz
```

* Define DAG arguments

In [17]:
from datetime import timedelta
from airflow import DAG
import pendulum

default_args = {
  'owner': 'Iron Man',
  'start_date': pendulum.now('Asia/Dhaka'),
  'email': ['ironman@somemail.com'],
  'email_on_failure': True,
  'email_on_retry': True,
  'retries': 1,
  'retry_delay': timedelta(minutes=5),
}

* Set source directory

In [18]:
from os import chdir
source_dir = '/infinity/codes/data_engineering/etl-and-data-pipelines-shell-airflow-kafka/tools/apache-workflow/dags/finalassignment'
chdir(source_dir)

* Define the DAG

In [49]:
dag = DAG(
  'ETL_toll_data',
  default_args=default_args,
  description='Apache Airflow Final Assignment',
  schedule=timedelta(days=1)
)

dag

<DAG: ETL_toll_data>

* Create a task to unzip data

In [50]:
from airflow.operators.bash import BashOperator

unzip_data = BashOperator(
  task_id='unzip_data',
  bash_command='tar -xzf tolldata.tgz',
  dag=dag,
)

* Create a task to extract data from csv file

In [51]:
# Rowid, Timestamp, Anonymized Vehicle number, and Vehicle type

extract_data_from_csv = BashOperator(
  task_id='extract_data_from_csv',
  bash_command='cut -d"," -f1-4 vehicle-data.csv > csv_data.csv',
  dag=dag,
)

* Create a task to extract data from tsv file

In [52]:
# Number of axles, Tollplaza id, and Tollplaza code

extract_data_from_tsv = BashOperator(
  task_id='extract_data_from_tsv',
  bash_command='tr "\t" "," < tollplaza-data.tsv | cut -d"," -f5-7 > tsv_data.csv',
  dag=dag,
)

* Create a task to extract data from fixed width file

In [53]:
# Type of Payment code, Vehicle Code

extract_data_from_fixed_width = BashOperator(
  task_id='extract_data_from_fixed_width',
  bash_command='cut -c 59-62,63-68 payment-data.txt | tr " " "," > fixed_width_data.csv',
  dag=dag,
)

* Create a task to consolidate data extracted from previous tasks

In [54]:
# Rowid, Timestamp, Anonymized Vehicle number, Vehicle type, Number of axles, Tollplaza id, Tollplaza code, Type of Payment code, and Vehicle Code

consolidate_data = BashOperator(
  task_id='consolidate_data',
  bash_command='paste -d"," csv_data.csv tsv_data.csv fixed_width_data.csv > extracted_data.csv',
  dag=dag,
)

* Transform and load the data

In [55]:
transform_data = BashOperator(
  task_id='transform_data',
  bash_command='cut -d","" -f4 extracted_data.csv | tr ""[:lower:]" "[:upper:]" > transformed_data.csv',
  dag=dag,
)

* Define the task pipeline

In [56]:
unzip_data >> extract_data_from_csv >> extract_data_from_tsv \
>> extract_data_from_fixed_width >> consolidate_data >> transform_data

<Task(BashOperator): transform_data>

## Submit the DAG

* Check DAGs list

```sh
airflow dags list
```

* Copy the DAG

```sh
cp /home/project/ETL_toll_data.py $AIRFLOW_HOME/dags
```

* Check task list

```sh
airflow tasks list ETL_toll_data
```

* Unpause the DAG

```sh
airflow dags unpause ETL_toll_data
```

* Pause the DAG

```sh
airflow dags upause ETL_toll_data
```