In [1]:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.dummy_operator import DummyOperator

# DAG arguments from Task 1.1
dag_args = {
    'owner': 'Aya',
    'start_date': datetime.today(),
    'email': 'dummy@email.com',
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Create the DAG
dag = DAG(
    dag_id='ETL_toll_data',
    default_args=dag_args,
    schedule_interval='@daily',
    description='Apache Airflow Final Assignment',
)




In [2]:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from urllib.request import urlopen
import zipfile
import os

# Function to unzip data
def unzip_data_function(**kwargs):
    source_url = "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/tolldata.tgz"  # Replace with the actual URL
    destination_directory = r"C:\Users\Magic00\Downloads"  # Replace with the actual path

    # Download and unzip data
    with urlopen(source_url) as zip_response:
        with zipfile.ZipFile(zip_response, 'r') as zip_file:
            zip_file.extractall(destination_directory)

# DAG definition
dag = DAG(
    'ETL_toll_data',
    default_args=dag_args,
    schedule_interval='@daily',
)

# Task to unzip data
unzip_data_task = BashOperator(
    task_id='unzip_data',
    bash_command='python /path/to/your/script.py',
    dag=dag,
)

In [3]:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
import pandas as pd

# Function to extract data from CSV file
def extract_data_from_csv_function(**kwargs):
    input_csv_path = r"C:\Users\Magic00\Downloads\tolldata\vehicle-data.csv"  # Replace with the actual path
    output_csv_path = r"C:\Users\Magic00\Downloads\csv_data.csv"  # Replace with the actual path

    # Read CSV file and extract required fields
    df = pd.read_csv(input_csv_path)
    selected_fields = ["Rowid", "Timestamp", "Anonymized Vehicle number", "Vehicle type"]
    extracted_data = df[selected_fields]

    # Save extracted data to a new CSV file
    extracted_data.to_csv(output_csv_path, index=False)

# Task to extract data from CSV
extract_data_from_csv_task = BashOperator(
    task_id='extract_data_from_csv',
    bash_command=r'C:\Users\Magic00\Anaconda3\python.exe C:\Users\Magic00\Downloads\extract_data_script.py',  # Replace with the actual path to your Python executable and script    provide_context=True,
    dag=dag # Make sure to use the same DAG object defined earlier
)

[[34m2024-01-11T23:18:49.777+0200[0m] {[34mutils.py:[0m159} INFO[0m - NumExpr defaulting to 4 threads.[0m


In [4]:
from airflow.operators.bash_operator import BashOperator

import pandas as pd

# Function to extract data from TSV file
def extract_data_from_tsv():
    input_tsv_path = r'C:\\Users\\Magic00\\Downloads\\tolldata\\tollplaza-data.tsv'
    output_csv_path = r'C:\\Users\\Magic00\\Downloads\\tsv_data.csv'

    # Read TSV file and extract required fields
    df = pd.read_csv(input_tsv_path, sep='\\t')
    extracted_data = df

    # Save extracted data to a new CSV file
    extracted_data.to_csv(output_csv_path, index=False)

extract_data_from_tsv()


# Task to extract data from TSV using BashOperator
extract_data_from_tsv_task = BashOperator(
    task_id='extract_data_from_tsv',
    bash_command=r'C:\Users\Magic00\Anaconda3\python.exe C:\Users\Magic00\Downloads\extract_data_script.py',
    dag=dag,  # No DAG definition for this standalone task
)

In [5]:
from airflow.operators.bash_operator import BashOperator

# Bash command to extract data from fixed width file using Python
extract_data_from_fixed_width_command = """
python -c "
import pandas as pd

# Function to extract data from fixed width file
def extract_data_from_fixed_width():
    input_fixed_width_path = r'C:\\Users\\Magic00\\Downloads\\tolldata\\payment-data.txt'
    output_csv_path = r'C:\\Users\\Magic00\\Downloads\\fixed_width_data.csv'

    # Define the column positions for fixed width fields
    column_positions = [(0, 2), (2, 6)]

    # Read fixed width file and extract required fields
    df = pd.read_fwf(input_fixed_width_path, colspecs=column_positions)
    extracted_data = df

    # Save extracted data to a new CSV file
    extracted_data.to_csv(output_csv_path, index=False)

extract_data_from_fixed_width()
"
"""

# Task to extract data from fixed width file using BashOperator
extract_data_from_fixed_width_task = BashOperator(
    task_id='extract_data_from_fixed_width',
    bash_command=extract_data_from_fixed_width_command,
    dag=dag,  
)

In [6]:
from airflow.operators.bash_operator import BashOperator

# Bash command to consolidate data using paste command
consolidate_data_command = """
paste C:\\Users\\Magic00\\Downloads\\csv_data.csv \
      C:\\Users\\Magic00\\Downloads\\tsv_data.csv \
      C:\\Users\\Magic00\\Downloads\\fixed_width_data.csv > C:\\Users\\Magic00\\Downloads\\extracted_data.csv
"""

# Task to consolidate data using BashOperator
consolidate_data_task = BashOperator(
    task_id='consolidate_data',
    bash_command=consolidate_data_command,
    dag=dag,  
)

In [7]:
# Bash command to transform data using awk command
transform_data_command = """
awk 'BEGIN {FS=OFS=","} {if(NR>1) $4=toupper($4)} 1' \
C:\\Users\\Magic00\\Downloads\\extracted_data.csv > C:\\Users\\Magic00\\Downloads\\transformed_data.csv
"""

# Task to transform data using BashOperator
transform_data_task = BashOperator(
    task_id='transform_data',
    bash_command=transform_data_command,
    dag=dag,  
)

In [8]:
from airflow import DAG
from datetime import datetime, timedelta

# Define task dependencies
unzip_data_task >> extract_data_from_csv_task >> extract_data_from_tsv_task >> extract_data_from_fixed_width_task
extract_data_from_csv_task >> consolidate_data_task
extract_data_from_tsv_task >> consolidate_data_task
extract_data_from_fixed_width_task >> consolidate_data_task
consolidate_data_task >> transform_data_task

<Task(BashOperator): transform_data>

In [14]:
#Submit the DAG
#airflow dag trigger "ETL_toll_data"

In [15]:
#Unpause the DAG
#airflow dag unpause "ETL_toll_data"