# A Modular ETL Pipeline Using Python & PostgreSQL

## Load library and dependencies

In [None]:
# Import dependencies
import pandas as pd
from IPython.display import display
import sys, os
from pathlib import Path
#sys.path.append(str(Path.cwd().parent)) # Add parent directory
#sys.path.append("../../scripts")
from dynamic_etl_pipeline import *

print("✅ Environment ready!")

## Extract (Load Raw Data)

In [None]:
# Trigger interactive source selection
result = select_and_load_source(ETL_CONFIG)

In [None]:
print(f"ℹ️ Using global raw_df, cfg & dataset_key for selected_source : {dataset_key}")

## Transform (Data Cleaning & Normalisation)

In [None]:
raw_df.nunique()

### Cleaning Steps for Data Modeling & Normalization

| Step                         | Action                                                                                       | Purpose                                                       |
|------------------------------|----------------------------------------------------------------------------------------------|----------------------------------------------------------------|
| 1. Handle Missing Values     | - Drop rows with missing `Order_ID`, `Customer_ID`                      | These are core entity identifiers; nulls would break relationships |
|                              | - Optionally handle or impute 1 missing value in `State`                                    | Tolerable since it's likely a lookup/dimension table           |
| 2. Ensure Correct Data Types | - Convert `Order_Date` to datetime format                                                   | Enables partitioning and relationship with a Date dimension   |
|                              | - Ensure `Postal_Code` is treated as string if needed (for leading zeros)                   | Maintains data integrity, especially in US/UK postal codes     |
| 3. Remove Duplicates         | - Check for and drop duplicates in `Order_ID`, `Product_ID`, `Customer_ID` combinations     | Avoids redundant relationships or many-to-many mapping noise   |
| 4. Standardize Categorical Fields | - Lowercase and strip spaces in fields like `Category`, `Brand`, `Payment_Method`       | Ensures consistency across joins or dimensions                 |
| 5. Verify Key Relationships  | - Ensure 1:many mappings exist: `Customer` → `Order`, `Order` → `Product`                  | For proper normalization and referential integrity             |
| 6. Normalize Derived Fields  | - Confirm `Total_Price = Price * Quantity` or drop `Total_Price` if calculated later        | Avoid storing redundant calculations in normalized schema      |
| 7. Split Compound Columns    | - Split `Customer_Name` into `First_Name` and `Last_Name`                       | Improves normalization (optional but good practice)            |
| 8. Validate Contact Info     | - Basic regex or null check on `Email`, `Phone_Number`                                     | Prevent invalid records from entering DB                       |


### **Data Modelling (Raw → OLTP → OLAP)**

#### **Data Model**
*Splitting the data into normalized tables like:*  `Customers`, `Products`, `Orders`, `Location`, `Payments`

![zulobank](modeldesign/design_lucidchart_zulodb.png)

![yankiecom](modeldesign/design_drawio_yankidb.png)

### Clean & Normalise df to 3NF Tables

In [None]:
# Clean, split to normalised tables, check for missing pks and insert missing fk from config
oltp_tables, pk_dict, fk_dict, sk_dict = transform_oltp(dataset_key, cfg, raw_df)

### Inspect OLTP tables to be created in the DB

In [None]:
# Visualize and inspect all OLTP tables
display_database_info(oltp_tables, pk_dict, fk_dict, sk_dict)

### Save normalised tables to csv (optional)

In [None]:
#save_tables_to_csv(tables=oltp_tables, export_dir="dataset/")

## Load to postgreSQL

### DB CONNECTION

In [None]:
conn = get_db_connection(env_prefix="YANKI_DB_") # YANKI_DB_ , ZULO_DB_

### Run ETL Pipeline (OLTP /OLAP)

In [None]:
result = run_dynamic_etl_pipeline(conn, dataset_key, raw_df, cfg, oltp_tables,pk_dict,fk_dict,sk_dict)
print(result)

In [None]:
if result["success"]:
    if "olap" not in cfg:
        print("✅ OLTP pipeline completed. No OLAP schema configured.")
    else:
        display_olap_info(conn, cfg)
else:
    print("❌ Pipeline failed. Please fix the errors before proceeding.")

## Export insert sql script to file

In [None]:
# Generate SQL script to insert to the database table (Optional)
export_sql_script(
    schema= "yanki_oltp",
    tables= oltp_tables,
    foreign_keys= fk_dict,
    output_sql_path= "sqlscript/yanki_insert.sql"
)

## Visualise ERD

In [None]:
if "olap" not in cfg:
    print("✅ OLTP-only model. Displaying OLTP ERD...")
    complete_erd = generate_erd_graph(fk_dict, sk_dict=pk_dict, title=f"{dataset_key} - OLTP Model")
    display(complete_erd)
else:
    print("📊 Displaying complete OLTP/OLAP model...")
    complete_erd = generate_erd_graph(
        fk_dict={**cfg["oltp"]["foreign_keys"], **cfg["olap"]["olap_foreign_keys"]},
        schema_type="both",
        sk_dict=sk_dict,
        title="Complete Data Model"
    )
    display(complete_erd)

In [None]:
oltp_erd = generate_erd_graph(fk_dict, sk_dict=pk_dict, title=f"{dataset_key} - OLTP ERD")
display(oltp_erd)

In [None]:
# For OLAP ERD 
if "olap" not in cfg:
    print("✅ OLTP pipeline completed. No OLAP schema configured.")
else:
    olap_erd = generate_erd_graph(cfg["olap_foreign_keys"], title=f"{dataset_key} - OLAP ERD")
    display(olap_erd)