This is a DAG orchestrated by Airflow, which extracts tollgate data and then transforms it 

In [None]:

from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

#defining DAG arguments
default_args = {
    'owner': 'Peter Alexander',
    'start_date': days_ago(0),
    'email': ['mrachidothi@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# DAG definition
dag = DAG(
    dag_id='ETL_toll_data',
    default_args=default_args,
    description='Apache Airflow Final Assignment',
    schedule_interval=timedelta(days=1),
)

# define the tasks

# defining the first task
unzip_data = BashOperator(
    task_id='extract_unzip',
    bash_command='tar -xzf'
    '/home/project/airflow/dags/finalassignment/staging/tolldata.tgz -C'
    '/home/project/airflow/dags/finalassignment/staging',
    dag=dag,
)

extract_data_from_csv = BashOperator(
    task_id='extract_csv',
    bash_command= 'cut -d":" -f1,2,3,4 '
        '/home/project/airflow/dags/finalassignment/staging/vehicle-data.csv > '
        '/home/project/airflow/dags/finalassignment/staging/csv_data.csv',
    dag=dag,
)

extract_data_from_tsv = BashOperator(
    task_id='extract_tsv',
    bash_command= 'cut -d":" -f5,6,7 '
        '/home/project/airflow/dags/finalassignment/staging/tollplaza-data.tsv > '
        '/home/project/airflow/dags/finalassignment/staging/tsv_data.csv',
    dag=dag,
)

extract_data_from_fixed_width = BashOperator(
    task_id='extract_width',
    bash_command= 'cut -d":" -f7,8 '
        '/home/project/airflow/dags/finalassignment/staging/payment-data.txt > '
        '/home/project/airflow/dags/finalassignment/staging/fixed_width_data.csv',
    dag=dag,
)
# defining the second task
consolidate_data = BashOperator(
    task_id='consolidate_data',
    bash_command='paste -d,'
        '/home/project/airflow/dags/finalassignment/staging/csv_data.csv '
        '/home/project/airflow/dags/finalassignment/staging/tsv_data.csv '
        '/home/project/airflow/dags/finalassignment/staging/fixed_width_data.csv >'
        '/home/project/airflow/dags/finalassignment/staging/extracted_data.csv',
    dag=dag,
)

transform_data = BashOperator(
    task_id='transform_data',
    bash_command='awk -F, \'BEGIN {OFS=FS} { $4 = toupper($4) } 1\' '
        '/home/project/airflow/dags/finalassignment/staging/extracted_data.csv > '
        '/home/project/airflow/dags/finalassignment/staging/transformed_data.csv',
    dag=dag,
)

#setting task dependencies
unzip_data >> extract_data_from_csv
extract_data_from_csv >> extract_data_from_tsv
extract_data_from_tsv >> extract_data_from_fixed_width
extract_data_from_fixed_width >> consolidate_data
consolidate_data >> transform_data