# Automated ETL Pipeline (Extract, Transform, Load)

In [15]:
pip install pandas

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.1.1 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [16]:
pip install numpy

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.1.1 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [17]:
pip install prefect

Collecting jinja2<4.0.0,>=3.1.6 (from prefect)
  Using cached jinja2-3.1.6-py3-none-any.whl.metadata (2.9 kB)
Using cached jinja2-3.1.6-py3-none-any.whl (134 kB)
Installing collected packages: jinja2
  Attempting uninstall: jinja2
    Found existing installation: Jinja2 3.1.4
    Uninstalling Jinja2-3.1.4:
      Successfully uninstalled Jinja2-3.1.4
Successfully installed jinja2-3.1.6
Note: you may need to restart the kernel to use updated packages.


ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
spyder 5.5.1 requires ipython!=8.17.1,<9.0.0,>=8.13.0; python_version > "3.8", but you have ipython 9.4.0 which is incompatible.

[notice] A new release of pip is available: 25.1.1 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [18]:
pip install numpy==1.26.4 --force-reinstall

Collecting numpy==1.26.4
  Using cached numpy-1.26.4-cp312-cp312-win_amd64.whl.metadata (61 kB)
Using cached numpy-1.26.4-cp312-cp312-win_amd64.whl (15.5 MB)
Installing collected packages: numpy
  Attempting uninstall: numpy
    Found existing installation: numpy 1.26.4
    Uninstalling numpy-1.26.4:
      Successfully uninstalled numpy-1.26.4
Successfully installed numpy-1.26.4
Note: you may need to restart the kernel to use updated packages.


  You can safely remove it manually.
  You can safely remove it manually.

[notice] A new release of pip is available: 25.1.1 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:
import pandas as pd
import numpy as np
from prefect import flow, task
import os

In [2]:

#Extract
# This step reads multiple CSV files (like GDP, Population, etc.)
# from the given folder and loads them into pandas DataFrames.

@task
def extract_data(file_path: str):
    """
    Extract Step:
    Reads the electricity access dataset (CSV file) into a pandas DataFrame.
    """
    print("\nExtracting data from CSV file...")

    if not os.path.exists(file_path):
        print(f"File not found: {file_path}")
        return None

    df = pd.read_csv(file_path)
    print(f"Loaded dataset with {len(df)} rows and {len(df.columns)} columns.")
    return df

In [3]:
#Transform
# This step cleans, merges, and enriches the data.
# Here we merge GDP and Population data, clean columns, and calculate GDP per capita.

def transform_data(df: pd.DataFrame):
    """
    Transform Step:
    Cleans the dataset and adds calculated insights.
    """
    print("\nTransforming data...")

    # Remove rows with missing values
    df_cleaned = df.dropna()

    # Standardize column names (remove spaces)
    df_cleaned.columns = df_cleaned.columns.str.strip().str.replace(" ", "_")

    # Example transformation: Calculate global mean electricity access for each year
    if "Electricity_Access_Percent" in df_cleaned.columns:
        df_cleaned["Above_Global_Avg"] = df_cleaned["Electricity_Access_Percent"] > df_cleaned["Electricity_Access_Percent"].mean()

    print(f"Cleaned data shape: {df_cleaned.shape}")
    return df_cleaned


In [4]:
#Load
# This step saves the cleaned and transformed data into a new CSV file.

@task
def load_data(df: pd.DataFrame, output_path="cleaned_electricity_data.csv"):
    """
    Load Step:
    Saves the transformed dataset to a CSV file.
    """
    if df is not None:
        df.to_csv(output_path, index=False)
        print(f"\nData successfully saved to: {output_path}")
    else:
        print("\nNo data to save!")


In [5]:
#ETL Pipeline Flow
# This is the main function that controls the entire ETL process:
# 1. Extract → 2. Transform → 3. Load

@flow
def etl_pipeline():
    """
    Full ETL Pipeline for Electricity Access Data.
    Extract → Transform → Load
    """
    print("\nStarting Automated ETL Pipeline...\n")

    # Step 1: Extract
    file_path = "electricity_access_percent.csv"  # Change path if needed
    df = extract_data(file_path)

    # Step 2: Transform
    cleaned_df = transform_data(df)

    # Step 3: Load
    load_data(cleaned_df)

    print("\nETL Pipeline completed successfully!")


In [None]:
etl_pipeline()

In [None]:
#run in the terminal 
# prefect server start
