<a href="https://colab.research.google.com/github/dansarmiento/analytics_portfolio/blob/main/Apache_Airflow_Python.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Step 1: Install and Configure Airflow

This initial step prepares your Google Colab environment to run an Airflow DAG. It begins by installing the apache-airflow package and its necessary dependencies. Once installed, it sets up a dedicated AIRFLOW_HOME directory. This is crucial as it's where Airflow stores its configuration files and the SQLite database that it uses to track DAG runs and task statuses. Finally, the airflow db init command initializes this database, making the environment ready to execute a DAG.

In [1]:
# --- START: Updated Installation Cell ---

# 1. Uninstall existing versions of pytest and pluggy to ensure a clean state
print("Uninstalling potentially conflicting packages...")
!pip uninstall -y pytest pluggy

# 2. Install a known compatible version of pytest
print("\nInstalling a compatible version of pytest...")
!pip install pytest==7.4.4

# 3. Now, install Airflow quietly using the constraints file.
# This will pull in all other dependencies correctly.
print("\nInstalling Apache Airflow...")
!pip install -q apache-airflow==2.8.1 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.8.txt"

# 4. Set the AIRFLOW_HOME directory
import os
os.makedirs('/content/airflow/dags', exist_ok=True)
os.environ['AIRFLOW_HOME'] = '/content/airflow'

# 5. Initialize the Airflow database
print("\nInitializing the Airflow database...")
!airflow db init

print("\n--- Airflow installation and initialization complete! ---")

# --- END: Updated Installation Cell ---

Uninstalling potentially conflicting packages...
Found existing installation: pytest 8.3.5
Uninstalling pytest-8.3.5:
  Successfully uninstalled pytest-8.3.5
Found existing installation: pluggy 1.3.0
Uninstalling pluggy-1.3.0:
  Successfully uninstalled pluggy-1.3.0

Installing a compatible version of pytest...
Collecting pytest==7.4.4
  Downloading pytest-7.4.4-py3-none-any.whl.metadata (7.9 kB)
Collecting pluggy<2.0,>=0.12 (from pytest==7.4.4)
  Downloading pluggy-1.6.0-py3-none-any.whl.metadata (4.8 kB)
Downloading pytest-7.4.4-py3-none-any.whl (325 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m325.3/325.3 kB[0m [31m6.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pluggy-1.6.0-py3-none-any.whl (20 kB)
Installing collected packages: pluggy, pytest
Successfully installed pluggy-1.6.0 pytest-7.4.4

Installing Apache Airflow...

Initializing the Airflow database...
DB: sqlite:////content/airflow/airflow.db
[[34m2025-05-23T21:46:52.939+0000[0m] {[34mmigration.

Step 2: Create the DAG File

Here, we define the entire data engineering pipeline as an Airflow DAG. Using a Colab "magic command" (%%writefile), we write the Python code directly into a dag.py file inside the dags folder. This file contains:

A DAG definition that sets the schedule and other metadata.
BashOperators to perform command-line tasks like creating directories (mkdir), downloading the dataset (wget), and unpacking the tarball (tar).
Python Tasks written using the modern @task decorator (TaskFlow API) to handle the data manipulation with the pandas library. These tasks are responsible for extracting data from CSV, TSV, and fixed-width files, consolidating them, and finally transforming the data.

Task dependencies (>>) at the end of the file, which explicitly define the order of execution, ensuring the pipeline runs in the correct sequence.

In [5]:
%%writefile /content/airflow/dags/etl_pipeline_dag.py
import os
import pandas as pd
import tarfile
from datetime import datetime

from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator

# Define file paths for clarity
STAGING_DIR = "/content/airflow/dags/staging"
DOWNLOAD_URL = "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/tolldata.tgz"
DOWNLOADED_FILE = os.path.join(STAGING_DIR, "tolldata.tgz")
EXTRACTED_DIR = STAGING_DIR

# Define paths for intermediate and final files
CSV_DATA_PATH = os.path.join(STAGING_DIR, 'csv_data.csv')
TSV_DATA_PATH = os.path.join(STAGING_DIR, 'tsv_data.csv')
FIXED_WIDTH_DATA_PATH = os.path.join(STAGING_DIR, 'fixed_width_data.csv')
CONSOLIDATED_DATA_PATH = os.path.join(STAGING_DIR, 'extracted_data.csv')
TRANSFORMED_DATA_PATH = os.path.join(STAGING_DIR, 'transformed_data.csv')


@dag(
    dag_id='etl_pipeline_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
    tags=['etl', 'assignment'],
)
def etl_pipeline():
    """
    A DAG to perform an ETL process: download, untar, extract, consolidate, and transform data.
    """
    create_staging_dir = BashOperator(
        task_id='create_staging_dir',
        bash_command=f"mkdir -p {STAGING_DIR}"
    )

    download_data = BashOperator(
        task_id='download_data',
        bash_command=f"wget '{DOWNLOAD_URL}' -O {DOWNLOADED_FILE}"
    )

    untar_data = BashOperator(
        task_id='untar_data',
        bash_command=f"tar -xvzf {DOWNLOADED_FILE} -C {EXTRACTED_DIR}"
    )

    @task(task_id='extract_from_csv')
    def extract_from_csv():
        """Extracts required columns from vehicle-data.csv."""
        # Define the column names since the file has no header
        csv_columns = ['Rowid', 'Timestamp', 'Anonymized Vehicle number', 'Vehicle type']

        # Read the CSV, specifying no header and providing our own column names
        df = pd.read_csv(
            os.path.join(EXTRACTED_DIR, 'vehicle-data.csv'),
            header=None,
            names=csv_columns
        )

        # The rest of the function is now correct
        subset = df[['Rowid', 'Timestamp', 'Anonymized Vehicle number', 'Vehicle type']]
        subset.to_csv(CSV_DATA_PATH, index=False)

    @task(task_id='extract_from_tsv')
    def extract_from_tsv():
        """Extracts required columns from tollplaza-data.tsv."""
        # Define the column names since the file has no header
        tsv_columns = ['Number of axles', 'Tollplaza id', 'Tollplaza code']

        # Read the TSV, specifying no header and providing our own column names
        df = pd.read_csv(
            os.path.join(EXTRACTED_DIR, 'tollplaza-data.tsv'),
            sep='\t',
            header=None,
            names=tsv_columns
        )

        # The rest of the function is now correct
        subset = df[['Number of axles', 'Tollplaza id', 'Tollplaza code']]
        subset.to_csv(TSV_DATA_PATH, index=False)

    @task(task_id='extract_from_fixed_width')
    def extract_from_fixed_width():
        """Extracts data from payment-data.txt using fixed-width positions."""
        col_specs = [(59, 61), (62, 68)]
        col_names = ['Type of Payment code', 'Vehicle Code']
        df = pd.read_fwf(os.path.join(EXTRACTED_DIR, 'payment-data.txt'), colspecs=col_specs, header=None, names=col_names)
        df.to_csv(FIXED_WIDTH_DATA_PATH, index=False)

    @task(task_id='consolidate_data')
    def consolidate_data():
        """Combines the three extracted data files into one."""
        df_csv = pd.read_csv(CSV_DATA_PATH)
        df_tsv = pd.read_csv(TSV_DATA_PATH)
        df_fixed = pd.read_csv(FIXED_WIDTH_DATA_PATH)
        consolidated_df = pd.concat([df_csv, df_tsv, df_fixed], axis=1)
        consolidated_df.to_csv(CONSOLIDATED_DATA_PATH, index=False)

    @task(task_id='transform_data')
    def transform_data():
        """Transforms the 'Vehicle type' column to uppercase."""
        df = pd.read_csv(CONSOLIDATED_DATA_PATH)
        df['Vehicle type'] = df['Vehicle type'].str.upper()
        df.to_csv(TRANSFORMED_DATA_PATH, index=False)

    # Define task dependencies
    extraction_tasks = [extract_from_csv(), extract_from_tsv(), extract_from_fixed_width()]

    create_staging_dir >> download_data >> untar_data
    untar_data >> extraction_tasks
    extraction_tasks >> consolidate_data() >> transform_data()

etl_dag = etl_pipeline()

Overwriting /content/airflow/dags/etl_pipeline_dag.py


Step 3: Test the DAG Execution

This step triggers the pipeline. Since we are not running the full Airflow scheduler service in Colab, we use the airflow dags test command. This powerful command-line tool allows you to execute a single, isolated run of your DAG from start to finish. It's the perfect way to test, debug, and validate your pipeline's logic within a temporary environment like Colab. You will see detailed logs from Airflow as it moves through each task defined in the DAG.

In [6]:
# Execute a test run of the DAG.
# We provide a dummy execution date.
!airflow dags test etl_pipeline_dag 2023-01-01

[[34m2025-05-23T21:50:42.981+0000[0m] {[34mdagbag.py:[0m538} INFO[0m - Filling up the DagBag from [01m/content/airflow/dags[22m[0m
[[34m2025-05-23T21:50:43.379+0000[0m] {[34mutils.py:[0m162} INFO[0m - NumExpr defaulting to 2 threads.[0m
[[34m2025-05-23T21:50:43.704+0000[0m] {[34mdagbag.py:[0m348} [31mERROR[0m - [31mFailed to import: [01m/usr/local/lib/python3.11/dist-packages/airflow/example_dags/example_branch_operator_decorator.py[22m[0m
[31mTraceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/airflow/models/dagbag.py", line 344, in parse
    loader.exec_module(new_module)
  File "<frozen importlib._bootstrap_external>", line 940, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/usr/local/lib/python3.11/dist-packages/airflow/example_dags/example_branch_operator_decorator.py", line 125, in <module>
    random_choice_venv = branching_virtualenv(choices=options)
                 

Step 4: Verify the Final Output

The final step is to confirm that your pipeline executed successfully and produced the correct result. First, we list the contents of the staging directory to ensure that all intermediate files and the final output file, transformed_data.csv, were created. Then, we use the pandas library to load the final CSV file into a DataFrame. By displaying the first few rows and checking the unique values in the Vehicle type column, we can visually verify that the data was not only processed but also correctly transformed to uppercase as required by the assignment.

In [7]:
# Check the contents of the staging directory to see all the created files
!ls -l /content/airflow/dags/staging

# Use pandas to read and display the head of the final transformed file
import pandas as pd

final_output_path = '/content/airflow/dags/staging/transformed_data.csv'

try:
    final_df = pd.read_csv(final_output_path)
    print("\nSuccessfully read the final file. Displaying the first 5 rows:")
    display(final_df.head())

    # Verify the transformation
    print("\nVerifying the 'Vehicle type' column is in uppercase:")
    print(final_df['Vehicle type'].unique())
except FileNotFoundError:
    print(f"\nError: The final output file was not found at {final_output_path}")

total 3656
-rw-r--r-- 1 root root  203687 May 23 21:50 csv_data.csv
-rw-r--r-- 1 root root  463765 May 23 21:50 extracted_data.csv
-rw-r--r-- 1  501 staff   1704 Aug 22  2021 fileformats.txt
-rw-r--r-- 1 root root   90034 May 23 21:50 fixed_width_data.csv
-rw-r--r-- 1  501 staff 680000 Aug 22  2021 payment-data.txt
-rw-r--r-- 1 root root  528994 Sep 21  2022 tolldata.tgz
-rw-r--r-- 1  501 staff 602524 Aug 22  2021 tollplaza-data.tsv
-rw-r--r-- 1 root root  463765 May 23 21:50 transformed_data.csv
-rw-r--r-- 1 root root  170044 May 23 21:50 tsv_data.csv
-rw-r--r-- 1  501 staff 512524 Aug 22  2021 vehicle-data.csv

Successfully read the final file. Displaying the first 5 rows:


Unnamed: 0,Rowid,Timestamp,Anonymized Vehicle number,Vehicle type,Number of axles,Tollplaza id,Tollplaza code,Type of Payment code,Vehicle Code
0,125094,car,2,VC965,2,4856,PC7C042B7,TE,VC965
1,174434,car,2,VC965,2,4154,PC2C2EF9E,TP,VC965
2,8538286,car,2,VC965,2,4070,PCEECA8B2,TE,VC965
3,5521221,car,2,VC965,2,4095,PC3E1512A,TP,VC965
4,3267767,car,2,VC965,2,4135,PCC943ECD,TE,VC965



Verifying the 'Vehicle type' column is in uppercase:
['VC965' 'VCD2F' 'VCB43']
