# **Pipeline Design**
1. Extract the data from CSV and JSON files to pandas
2. Transform the data
3. Split the JSON into two tables, one of the tables holding orders, the other
holding order_items.
4. Create database and load the cleaned data


## 1. Extract the Data

In [1]:
# Import Required Libraries

import pandas as pd
import numpy as np
import json
import os
import re
import sqlalchemy
from dotenv import load_dotenv
from sqlalchemy import create_engine

In [2]:
# load environment variables from .env file
load_dotenv()

# Get my database environment variables
DB_NAME = os.getenv("DB_NAME")
DB_USER = os.getenv("DB_USER")
DB_PASS = os.getenv("DB_PASS")
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")

# postgres db connection string
DB_CONNECTION_STRING = f"postgresql+psycopg2://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}" 

# Configure Base Path for Data Files to ensure readability and prevent DRY issues
BASE_PATH = "/Users/mac/Documents/Prosper_Python/ETL_Design"

In [3]:
# Create functions for extraction

def extract():
    """
    Extract data from the data sources (CSV & JSON) and return as dictionary tuples,
    using the Try, except method. And if the file is not found, return an empty DataFrame.

    """
    try:
        def safe_read_csv(filename):
            path = os.path.join(BASE_PATH, filename)
            return pd.read_csv(path) if os.path.exists(path) else pd.DataFrame()
        df_employees = safe_read_csv("employees.csv")
        df_customers = safe_read_csv("customers.csv")
        df_products = safe_read_csv("products.csv")
        df_stores = safe_read_csv("stores.csv")
       
       # Read JSON file
        json_path = os.path.join(BASE_PATH, "orders.json")
        orders_data = []
        if os.path.exists(json_path):
            with open(json_path, "r") as f:
                orders_data = json.load(f)

    except Exception as e:
        print(f"Error reading files: {e}")
        return None, None, None, None, None

    return df_employees.to_dict(), df_customers.to_dict(), df_products.to_dict(), df_stores.to_dict(), orders_data


## 2. Transformation

In [4]:
# Using the help function to ensure the email address and phone number are valid as required in the problem statement.
def is_valid_email(email):
    """Returns True if email follows a valid format, else False."""
    pattern = r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
    return bool(re.match(pattern, email))

def is_valid_phone(phone):
    """Returns True if phone follows a valid format (e.g., +1234567890), else False."""
    pattern = r"^\+?\d{10,15}$"
    return bool(re.match(pattern, phone))

#create a function to transform the data
def transform(df_employees_dict, df_customers_dict, df_products_dict, df_stores_dict, orders_data):
    """
    Transform the extracted data into a structured format suitable for loading into a database.
    """

    # Convert dictionaries to DataFrames
    df_employees = pd.DataFrame(df_employees_dict)
    df_customers = pd.DataFrame(df_customers_dict)
    df_products = pd.DataFrame(df_products_dict)
    df_stores = pd.DataFrame(df_stores_dict)

    #1. Handle missing data by filling NaN with 'unknown' for text fields
    for df in [df_employees, df_customers, df_products, df_stores]:
        str_cols = df.select_dtypes(include='object').columns
        df[str_cols] = df[str_cols].fillna("unknown")
  
    # 2. Handle Duplicates
    df_employees.drop_duplicates(inplace=True)
    df_customers.drop_duplicates(inplace=True)
    df_products.drop_duplicates(inplace=True)
    df_stores.drop_duplicates(inplace=True)

    #3. Merging first name and last name column to form full name,
    df_employees["Full_Name"] = df_employees["First_Name"].fillna("") + " " + df_employees["Last_Name"].fillna("")
    df_customers["Full_Name"] = df_customers["First_Name"].fillna("") + " " + df_customers["Last_Name"].fillna("")

    #4. Drop original first and last name columns
    for df in [df_employees, df_customers]:
        for col in ["First_Name", "Last_Name"]:
            if col in df.columns:
                df.drop(columns=col, inplace=True)

     #5. Adding required columns (email and phone number column to the employee and customer table, if not present and fill the value accordingly)
    required_columns = {
        "employees" : ["Email", "Phone"],
        "customers" : ["Email", "Phone"]
    }
    for df_name, cols in required_columns.items():
        df = df_employees if df_name == "employees" else df_customers
        for col in cols:
            if col not in df.columns:
                df[col] = None # Add the column with None values if it doesn't exist

    #6. Validate email and phone numbers
    df_employees["Email"] = df_employees["Email"].apply(lambda x: x if is_valid_email(str(x)) else "unknown")
    df_employees["Phone"] = df_employees["Phone"].apply(lambda x: x if is_valid_phone(str(x)) else "unknown")
    df_customers["Email"] = df_customers["Email"].apply(lambda x: x if is_valid_email(str(x)) else "unknown")
    df_customers["Phone"] = df_customers["Phone"].apply(lambda x: x if is_valid_phone(str(x)) else "unknown")


    
    #7. Split orders into two tables, one of the tables holding orders, the other holding order_items.
    # create an empty list to hold order items
    orders_list = []
    order_items_list = []

    for order in orders_data: 
        orders_list.append([
            order["Order_ID"], order["Customer_ID"],
            order["Store_ID"], order["Order_Date"], order["Total_Amount"]
        ])
        for item in order["Items"]:
            order_items_list.append([
                order["Order_ID"], item["Product_ID"], item["Quantity"], item["Unit_Price"]
            ])


    # Convert lists to DataFrames
    df_orders = pd.DataFrame(orders_list, columns=["Order_ID", "Customer_ID", "Store_ID", "Order_Date", "Total_Amount"])
    df_order_items = pd.DataFrame(order_items_list, columns=["Order_ID", "Product_ID", "Quantity", "Unit_Price"])

    return df_employees, df_customers, df_products, df_stores, df_orders, df_order_items



## 3. Load to Database

In [5]:
# Create a database engine
def load_to_db(df_employees, df_customers, df_products, df_stores, df_orders, df_order_items):
    """
    Load the transformed data into the PostgreSQL database.
    """
    engine = create_engine(DB_CONNECTION_STRING)

    try:
        df_employees.to_sql('employees', engine, if_exists='append', index=False)
        df_customers.to_sql('customers', engine, if_exists='append', index=False)
        df_products.to_sql('products', engine, if_exists='append', index=False)
        df_stores.to_sql('stores', engine, if_exists='append', index=False)
        df_orders.to_sql('orders', engine, if_exists='append', index=False)
        df_order_items.to_sql('order_items', engine, if_exists='append', index=False)
        print("Data loaded successfully into the database.")
    except Exception as e:
        print(f"Error loading data to database: {e}")
    finally:
        engine.dispose()


In [6]:
# Using the Functions
def main():
    """
    Main function to run the ETL process.
    """
    df_employees_dict, df_customers_dict, df_products_dict, df_stores_dict, orders_data = extract()
    if all([df_employees_dict, df_customers_dict, df_products_dict, df_stores_dict, orders_data]):
        df_employees, df_customers, df_products, df_stores, df_orders, df_order_items = transform(
            df_employees_dict, df_customers_dict, df_products_dict, df_stores_dict, orders_data
        )
        load_to_db(df_employees, df_customers, df_products, df_stores, df_orders, df_order_items)
    else:
        print("No data extracted. ETL process terminated.")

if __name__ == "__main__":
    main()

Data loaded successfully into the database.
