Dependencies

In [8]:
import mysql.connector
import pandas as pd
from typing import Dict
from PIL import Image,ImageEnhance
import pytesseract
import re
import os
from pathlib import Path
import requests
from bs4 import BeautifulSoup
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import sqlite3

Extraction

In [9]:
def extract_erp(host: str,database: str,user: str,password: str) -> None:
    try:
        connection=mysql.connector.connect(host=host,database=database,user=user,password=password)
        
        dataframe=pd.read_sql("SHOW TABLES",connection)
    
        for column in dataframe.columns:
            for table in dataframe[column]:
                tmp=pd.read_sql(f"SELECT * FROM {table}",connection)
                tmp.to_csv(f"staging/{table}.csv",index=False)
        
        connection.close()
    except mysql.connector.Error as e:
        print(f"Error extracting database: {e}")

In [10]:
def extract_ocr() -> None:
    path = "./data/legacy_invoices"
    orders_data=[]
    files=[file for file in os.listdir(path) if file.lower().startswith("order")]
    files.sort()
    for file in files:
        image=Image.open(f"{path}/{file}")

        enhancer=ImageEnhance.Contrast(image)
        image=enhancer.enhance(1)
        enhancer=ImageEnhance.Sharpness(image)
        image=enhancer.enhance(2)

        text=pytesseract.image_to_string(image,config="--psm 6 -c preserve_interword_spaces=1")

        data=parse_text(text.replace('$','S'))
        orders_data.append(data)
    
    dataframe=pd.DataFrame(orders_data)
    dataframe.to_csv(f"staging/invoices.csv",index=False)

def parse_text(text: str) -> Dict:
    data={}

    order=re.search(r"Ref[:,\.]?\s*([A-Z]+-\d+)",text)
    data["Order_ID"]=order.group(1)
    date=re.search(r"Date[:,\.]?\s*(\d{4}-\d{2}-\d{2})",text)
    data["Date"]=date.group(1)
    client_id=re.search(r"Client ID[:,\.]?\s*([A-Z]\d+)",text)
    data["Customer_ID"]=client_id.group(1)
    client=re.search(r"Nom[:,\.]?\s*([A-Z-a-z]+\s[A-Z-a-z]+)",text)
    data["Full_Name"]=client.group(1)

    for line in text.split("\n"):
        line=line.strip()
        
        if not line or 'Produit' in line or '---' in line or 'Signature' in line:
            continue
        
        if re.search(r'\d+.*\d+.*\d+',line) and not any(keyword in line for keyword in ['Date','Ref','Client ID','Nom']):            
            parts=re.split(r'\s{2,}',line)
            
            if len(parts)>=4:
                data['Product_Name']=parts[0]
                data['Qte']=parts[1]
                data['Unit_Price']=parts[2].replace(' ', '')
                data['Total']=parts[3].replace(' ', '')
            elif len(parts)>0:
                product_match=re.search(r'([A-Za-zÀ-ÿ\s\d]+)\s+(\d+)\s+(\d+)\s+(\d+)',line)
                if product_match:
                    data['Product_Name']=product_match.group(1).strip()
                    data['Qte']=product_match.group(2)
                    data['Unit_Price']=product_match.group(3)
                    data['Total']=product_match.group(4)
    
    return data

In [11]:
def extract_locale() -> None:
    input_folder = Path("./data")
    output_folder = Path("./staging")

    output_folder.mkdir(parents=True, exist_ok=True)

    files = [
        "marketing_expenses.xlsx",
        "monthly_targets.xlsx",
        "shipping_rates.xlsx"
    ]

    for file_name in files:
        file_path = input_folder / file_name

        if not file_path.exists():
            print(f"File not found: {file_path}")
            continue

        try:
            df = pd.read_excel(file_path)
            out_name = file_name.replace(".xlsx", ".csv")
            out_path = output_folder / out_name

            df.to_csv(out_path, index=False)

        except Exception as e:
            print(f"Error extracting {file_name}: {e}")


In [12]:
def extract_scraper(base_url: str) -> None:
    all_products=[]
    all_prices=[]

    current_url=base_url

    while current_url:

        try:
            res=requests.get(current_url)
            soup=BeautifulSoup(res.content,"html.parser")


            for tag in soup.find_all(["h5"],class_=["product-name"]):
                all_products.append(tag.text)
            for tag in soup.find_all(["span"],class_=["product-price"]):
                all_prices.append(str(tag.text).replace("DZD",""))

            next_link=soup.find(["a"],id="next-page-btn")
            
            if next_link and next_link.get("href"):
                current_url="".join([base_url,"/",next_link.get("href")])
            else:
                current_url=None

        except requests.RequestException as e:
            print(f"Error scraping {current_url}: {e}")
            break

    dataframe=pd.DataFrame({
        "Product_Name": all_products,
        "Unit_Price": all_prices
    })
    
    dataframe.to_csv("staging/competitor.csv",index=False)

Transform

In [None]:

USD_TO_DZD = 135.0

def handle_neg_values(df, column_names = []) -> pd.DataFrame:
    for column in column_names:
        df[column] = df[column].apply(lambda x: x if x >= 0 else None)
    return df

def clean_id(df, column_name) -> pd.DataFrame:
    df[column_name] = df[column_name].astype(str).str.replace(r'\D', '', regex=True)
    df[column_name] = pd.to_numeric(df[column_name], errors='coerce')
    return df

def clean_date(df: pd.DataFrame, column_name: str) -> pd.DataFrame:
    df = df.copy()
    s = df[column_name].astype(str).str.strip()

    # normalize separators: 2021.02.28 -> 2021-02-28
    s = s.str.replace(r"\.", "-", regex=True)

    parsed = pd.to_datetime(s, errors="coerce")

    # YYYY-MM-DD (after dot->dash normalization)
    mask = parsed.isna()
    parsed_y = pd.to_datetime(s[mask], format="%Y-%m-%d", errors="coerce")
    parsed.loc[mask] = parsed_y

    # MM-DD-YYYY
    mask = parsed.isna()
    parsed_mdy = pd.to_datetime(s[mask], format="%m-%d-%Y", errors="coerce")
    parsed.loc[mask] = parsed_mdy

    # Month Day, Year (March 3, 2021)
    mask = parsed.isna()
    parsed_long = pd.to_datetime(s[mask], format="%B %d, %Y", errors="coerce")
    parsed.loc[mask] = parsed_long

    # Mon-YYYY (Feb-2023) -> 2023-02-01
    mask = parsed.isna()
    parsed_mon_year = pd.to_datetime(s[mask], format="%b-%Y", errors="coerce")
    parsed.loc[mask] = parsed_mon_year

    df[column_name] = parsed
    return df

def usd_to_dzd(df: pd.DataFrame, column_name: str) -> pd.DataFrame:
    df = df.copy()

    df[column_name] = df[column_name] * USD_TO_DZD

    new_column_name = column_name.replace("USD", "DZD").replace("usd", "dzd")
    df.rename(columns={column_name: new_column_name}, inplace=True)

    return df

def standarize_names(df: pd.DataFrame, column_names=[]) -> pd.DataFrame:
    df = df.copy()
    for column in column_names:
        df[column] = (
            df[column]
            .astype(str)
            .str.replace("_", " ", regex=False)
            .str.replace("-", " ", regex=False) 
            .str.strip()
            .str.title()
        )
    return df

def remove_duplicates(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df = df.drop_duplicates(keep='first')
    return df


def normalize_number(df: pd.DataFrame, column_names) -> pd.DataFrame:
    df = df.copy()

    for col in column_names:
        s = df[col].astype("string")  # better than astype(str), preserves <NA>

        # remove currency text/symbols if they exist (optional)
        s = s.str.replace("USD", "", regex=False).str.replace("$", "", regex=False).str.strip()

        # remove spaces (thousand sep)
        s = s.str.replace(" ", "", regex=False)

        # if decimal comma is used, convert it to dot
        # example: "1234,56" -> "1234.56"
        s = s.str.replace(",", ".", regex=False)

        df[col] = pd.to_numeric(s, errors="coerce")

    return df




In [14]:


def transform_marketing_expenses() -> None:
    df = pd.read_csv("staging/marketing_expenses.csv")

    # 1) Clean date and prices
    df = clean_date(df, "Date")
    df = normalize_number(df, ["Marketing_Cost_USD"])

    # 2) Add Month column (start of month)
    df["Month"] = df["Date"].dt.to_period("M").dt.to_timestamp()
    df["Month"] = pd.to_datetime(df["Month"], errors="coerce")

    # 5) Negatives -> NULL
    df = handle_neg_values(df, ["Marketing_Cost_USD"])

    # 7) Convert USD -> DZD and rename
    df = usd_to_dzd(df, "Marketing_Cost_USD")  # creates Marketing_Cost_DZD

    # 3) Standardize names
    df = standarize_names(df, ["Category", "Campaign_Type"])

    # 6) Fill NULL with avg of same (Category, Campaign_Type)
    df["Marketing_Cost_DZD"] = df["Marketing_Cost_DZD"].fillna(
        df.groupby(["Category", "Campaign_Type"])["Marketing_Cost_DZD"].transform("mean")
    )

    # 8) Add monthly average marketing for this Category (Month + Category)
    df["Avg_Monthly_Category_Marketing_Cost"] = df.groupby(
        ["Month", "Category"]
    )["Marketing_Cost_DZD"].transform("mean")

    # 9) Remove duplicates
    df = remove_duplicates(df)

    df.to_csv("staging/marketing_expenses.csv", index=False)



def transform_cities() -> None:
    cities = pd.read_csv("staging/table_cities.csv")
    shipping = pd.read_csv("staging/shipping_rates.csv") 

    # average shipping cost per region
    avg_shipping = (
        shipping.groupby("region_name", as_index=False)["shipping_cost"]
        .mean()
        .rename(columns={
            "region_name": "Region",
            "shipping_cost": "Avg_Region_Shipping_Cost"
        })
    )

    # add Avg_Region_Shipping_Cost to cities by Region
    cities = cities.merge(
        avg_shipping,
        on="Region",
        how="left"
    )
    avg_value = cities["Avg_Region_Shipping_Cost"].mean()
    cities["Avg_Region_Shipping_Cost"] = cities["Avg_Region_Shipping_Cost"].fillna(avg_value)

    cities.to_csv("staging/table_cities.csv", index=False)

def transform_monthly_targets() -> None:
    df = pd.read_csv("staging/monthly_targets.csv")

    # 1) Clean Store_ID like: S1, Store_5 -> 1, 5 ...
    df = clean_id(df, "Store_ID")

    # 2) Clean Month (handles "Feb-2023", "Apr-2023", "2023-01-01 00:00:00", etc.)
    df = clean_date(df, "Month")

    # 3) Clean Target_Revenue (remove commas, spaces, etc.)
    df = normalize_number(df, ["Target_Revenue"])

    # 4) Fill null Target_Revenue with the average of the same store
    # (if a store has all NaNs, this will remain NaN)
    df["Target_Revenue"] = df["Target_Revenue"].fillna(
        df.groupby("Store_ID")["Target_Revenue"].transform("mean")
    )
    # 5) Standardize Manager_Name
    df = standarize_names(df, ["Manager_Name"])

    # 6) Remove duplicates
    df = remove_duplicates(df)

    df.to_csv("staging/monthly_targets.csv", index=False)

def transform_subcategories() -> None:
    sub = pd.read_csv("staging/table_subcategories.csv")
    cat = pd.read_csv("staging/table_categories.csv")

    sub = sub.merge(
        cat[["Category_ID", "Category_Name"]],
        on="Category_ID",
        how="left"
    )

    # remove foreign key
    sub.drop(columns=["Category_ID"], inplace=True)

    sub.to_csv("staging/table_subcategories.csv", index=False)

def fix_sales_ids() -> None:
    sales = pd.read_csv("staging/table_sales.csv")
    sales = clean_id(sales, "Trans_ID")
    sales.to_csv("staging/table_sales.csv", index=False)

def add_invoices() -> None:

    sales = pd.read_csv("staging/table_sales.csv")
    invoices = pd.read_csv("staging/invoices.csv")
    products = pd.read_csv("staging/table_products.csv")


    invoices = invoices.merge(
        products[["Product_ID", "Product_Name"]],
        on="Product_Name",
        how="left"
    )


    existing_ids = pd.to_numeric(sales["Trans_ID"], errors="coerce")
    max_id = int(existing_ids.max()) if existing_ids.notna().any() else 0

    new_ids = pd.RangeIndex(start=max_id + 1, stop=max_id + 1 + len(invoices))
    invoices["Trans_ID"] = new_ids.astype(int)

    # safety: ensure no collision (very unlikely, but guaranteed)
    existing_set = set(pd.to_numeric(sales["Trans_ID"], errors="coerce").dropna().astype(int).tolist())
    while invoices["Trans_ID"].isin(existing_set).any():
        max_id += len(invoices)
        invoices["Trans_ID"] = pd.RangeIndex(start=max_id + 1, stop=max_id + 1 + len(invoices)).astype(int)

    # ---- rename / select columns to match sales ----
    invoices = invoices.rename(columns={
        "Qte": "Quantity",
        "Total": "Total_Revenue",
    })

    invoices_sales = invoices[[
        "Trans_ID",
        "Date",
        "Customer_ID",
        "Product_ID",
        "Quantity",
        "Total_Revenue",
    ]]

    # ---- append into sales ----
    sales = pd.concat([sales, invoices_sales], ignore_index=True)

    sales.to_csv("staging/table_sales.csv", index=False)

def transform_products() -> None:
    products = pd.read_csv("staging/table_products.csv")
    subcats = pd.read_csv("staging/table_subcategories.csv")
    competitor = pd.read_csv("staging/competitor.csv")

    products = clean_id(products, "Product_ID")

    # 1) add SubCat_Name + Category_Name
    products = products.merge(
        subcats[["SubCat_ID", "SubCat_Name", "Category_Name"]],
        on="SubCat_ID",
        how="left"
    )

    # 2) remove foreign key
    products.drop(columns=["SubCat_ID"], inplace=True)

    # 3) add competitor price (if exists)
    products = products.merge(
        competitor[["Product_Name", "Unit_Price"]]
            .rename(columns={"Unit_Price": "Competitor_Unit_Price"}),
        on="Product_Name",
        how="left"
    )

    products.to_csv("staging/table_products.csv", index=False)


def transform_customers() -> None:
    customers = pd.read_csv("staging/table_customers.csv")
    cities = pd.read_csv("staging/table_cities.csv")

    customers = clean_id(customers, "Customer_ID")

    customers = customers.merge(
        cities[["City_ID", "City_Name", "Region", "Avg_Region_Shipping_Cost"]],
        on="City_ID",
        how="left"
    )

    # remove foreign key
    customers.drop(columns=["City_ID"], inplace=True)

    customers.to_csv("staging/table_customers.csv", index=False)


def transform_table_stores() -> None:
    stores = pd.read_csv("staging/table_stores.csv")
    cities = pd.read_csv("staging/table_cities.csv")
    targets = pd.read_csv("staging/monthly_targets.csv")

    # ---- add City_Name + Region ----
    stores = stores.merge(
        cities[["City_ID", "City_Name", "Region"]],
        on="City_ID",
        how="left"
    )

    # ---- compute average monthly target per store ----
    targets["Target_Revenue"] = (
        targets["Target_Revenue"]
        .astype(str)
        .str.replace(",", "", regex=False)
    )
    targets["Target_Revenue"] = pd.to_numeric(targets["Target_Revenue"], errors="coerce")

    avg_targets = (
        targets.groupby("Store_ID", as_index=False)["Target_Revenue"]
        .sum()
        .rename(columns={"Target_Revenue": "Monthly_Target"})
    )

    # ---- merge avg target into stores ----
    stores = stores.merge(
        avg_targets,
        on="Store_ID",
        how="left"
    )

    # ---- drop foreign key ----
    stores.drop(columns=["City_ID"], inplace=True)

    stores.to_csv("staging/table_stores.csv", index=False)



def transform_sales() -> None:
    sales = pd.read_csv("staging/table_sales.csv")
    products = pd.read_csv("staging/table_products.csv")
    customers = pd.read_csv("staging/table_customers.csv")
    marketing = pd.read_csv("staging/marketing_expenses.csv", parse_dates=["Month"])

    sales = clean_id(sales, "Customer_ID")
    sales = clean_id(sales, "Product_ID")
    sales = clean_date(sales, "Date")

    # 1) month of the sale
    sales["Month"] = pd.to_datetime(sales["Date"]).dt.to_period("M").dt.to_timestamp()

    # 2) bring category + unit cost into sales
    sales = sales.merge(
        products[["Product_ID", "Unit_Cost", "Category_Name"]],
        on="Product_ID",
        how="left"
    )

    # 3) shipping cost directly from customers
    sales = sales.merge(
        customers[["Customer_ID", "Avg_Region_Shipping_Cost"]],
        on="Customer_ID",
        how="left"
    )
    sales["Shipping_Cost"] = sales["Avg_Region_Shipping_Cost"]

    # 4) marketing monthly by (Month + Category)
    monthly_cat_marketing = marketing[["Month", "Category", "Avg_Monthly_Category_Marketing_Cost"]]

    sales = sales.merge(
        monthly_cat_marketing,
        left_on=["Month", "Category_Name"],
        right_on=["Month", "Category"],
        how="left"
    )

    sales["Marketing_Cost"] = sales["Avg_Monthly_Category_Marketing_Cost"].fillna(0)

    # count of sales rows in same Month+Category
    sales["cat_monthly_sales"] = sales.groupby(["Month", "Category_Name"])["Product_ID"].transform("count")

    # avoid division by zero
    sales["cat_monthly_sales"] = sales["cat_monthly_sales"].replace(0, 1)

    sales["Marketing_Cost"] = sales["Marketing_Cost"] / sales["cat_monthly_sales"]

    # 5) net profit (split marketing fairly)
    sales["Net_Profit"] = (
        sales["Total_Revenue"]
        - (sales["Unit_Cost"] * sales["Quantity"])
        - sales["Shipping_Cost"]
        - sales["Marketing_Cost"]
    )

    sales.drop(columns=["Month", "Category", "Avg_Monthly_Category_Marketing_Cost"], inplace=True, errors="ignore")

    sales.to_csv("staging/table_sales.csv", index=False)



def review_text_to_score() -> None:
    dataframe=pd.read_csv("staging/table_reviews.csv")
    sid_obj=SentimentIntensityAnalyzer()
    
    score=[]
    
    for product in dataframe.groupby("Product_ID"):
        count=0
        sum=0
        for sentence in product[1]["Review_Text"]:
            sentiment_dict=sid_obj.polarity_scores(sentence)
            count+=1
            sum+=sentiment_dict['compound']
        score.append(sum/count)
    
    dataframe=pd.read_csv("staging/table_products.csv")
    dataframe.sort_values(by=["Product_ID"])
    dataframe["Score"]=score

    dataframe.to_csv("staging/table_products.csv",index=False)



def transform_erp() -> None:
    transform_marketing_expenses()
    transform_cities()
    transform_monthly_targets()
    transform_subcategories()
    fix_sales_ids()
    add_invoices()
    transform_products()
    transform_customers()
    transform_table_stores()
    transform_sales()
    review_text_to_score()


Load

In [15]:
DB_PATH = Path("techstore_dw.db")

def create_dw_schema() -> None:
    # recreate DB
    if DB_PATH.exists():
        DB_PATH.unlink()

    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()

    cur.executescript("""
    PRAGMA foreign_keys = ON;

    -- =====================
    -- DIMENSIONS
    -- =====================

    CREATE TABLE Dim_Product (
        Product_ID            TEXT PRIMARY KEY,
        Product_Name          TEXT,
        SubCat_Name           TEXT,
        Category_Name         TEXT,
        Unit_Price            REAL,
        Unit_Cost             REAL,
        Score                 REAL,
        Competitor_Unit_Price REAL
    );

    CREATE TABLE Dim_Store (
        Store_ID           INTEGER PRIMARY KEY,
        Store_Name         TEXT,
        City_Name          TEXT,
        Region             TEXT,
        Monthly_Target REAL
    );

    CREATE TABLE Dim_Customer (
        Customer_ID              TEXT PRIMARY KEY,
        Full_Name                TEXT,
        City_Name                TEXT,
        Region                   TEXT,
        Avg_Region_Shipping_Cost REAL
    );

    CREATE TABLE Dim_Date (
        DateKey  INTEGER PRIMARY KEY, -- YYYYMMDD
        Day      INTEGER,
        Month    INTEGER,
        Year     INTEGER,
        DayName  TEXT
    );

    -- =====================
    -- FACT TABLE
    -- =====================

    CREATE TABLE Fact_Sales (
        Trans_ID      INTEGER PRIMARY KEY,
        DateKey       INTEGER NOT NULL,
        Store_ID      INTEGER NOT NULL,
        Product_ID    TEXT NOT NULL,
        Customer_ID   TEXT NOT NULL,

        Quantity      INTEGER,
        Total_Revenue REAL,
        Net_Profit    REAL,
        Marketing_Cost  REAL,

        FOREIGN KEY (DateKey)    REFERENCES Dim_Date(DateKey),
        FOREIGN KEY (Store_ID)   REFERENCES Dim_Store(Store_ID),
        FOREIGN KEY (Product_ID) REFERENCES Dim_Product(Product_ID),
        FOREIGN KEY (Customer_ID) REFERENCES Dim_Customer(Customer_ID)
    );

    -- =====================
    -- INDEXES (performance)
    -- =====================

    CREATE INDEX idx_fact_date     ON Fact_Sales(DateKey);
    CREATE INDEX idx_fact_store    ON Fact_Sales(Store_ID);
    CREATE INDEX idx_fact_product  ON Fact_Sales(Product_ID);
    CREATE INDEX idx_fact_customer ON Fact_Sales(Customer_ID);
    """)

    conn.commit()
    conn.close()

if __name__ == "__main__":
    create_dw_schema()
    print("✅ Data warehouse schema created (no data loaded).")


✅ Data warehouse schema created (no data loaded).


In [16]:
DB_PATH = Path("techstore_dw.db")

STAGING = Path("staging")
PRODUCTS_CSV   = STAGING / "table_products.csv"
STORES_CSV     = STAGING / "table_stores.csv"
CUSTOMERS_CSV  = STAGING / "table_customers.csv"
SALES_CSV      = STAGING / "table_sales.csv"


def load_dw_data() -> None:
    # 1) create DB if not exists
    if not DB_PATH.exists():
        create_dw_schema()

    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()
    cur.execute("PRAGMA foreign_keys = ON;")

    # 2) read staging
    products  = pd.read_csv(PRODUCTS_CSV)
    stores    = pd.read_csv(STORES_CSV)
    customers = pd.read_csv(CUSTOMERS_CSV)
    sales     = pd.read_csv(SALES_CSV)

    # 3) build Dim_Date from sales Date
    sales["Date"] = pd.to_datetime(sales["Date"], errors="coerce", format="mixed")
    dates = pd.Series(sales["Date"].dropna().dt.normalize().unique(), name="Date")
    dim_date = pd.DataFrame({"Date": dates})
    dim_date["DateKey"] = dim_date["Date"].dt.strftime("%Y%m%d").astype(int)
    dim_date["Day"] = dim_date["Date"].dt.day
    dim_date["Month"] = dim_date["Date"].dt.month
    dim_date["Year"] = dim_date["Date"].dt.year
    dim_date["DayName"] = dim_date["Date"].dt.strftime("%A")
    dim_date = dim_date.drop(columns=["Date"])

    sales["DateKey"] = sales["Date"].dt.strftime("%Y%m%d").astype("Int64")

    # 4) clear old rows
    cur.executescript("""
        DELETE FROM Fact_Sales;
        DELETE FROM Dim_Date;
        DELETE FROM Dim_Customer;
        DELETE FROM Dim_Store;
        DELETE FROM Dim_Product;
    """)

    conn.commit()

    # 5) load dimensions
    prod_cols = ["Product_ID","Product_Name","SubCat_Name","Category_Name",
                 "Unit_Price","Unit_Cost","Score","Competitor_Unit_Price"]
    products[[c for c in prod_cols if c in products.columns]].to_sql(
        "Dim_Product", conn, if_exists="append", index=False
    )

    store_cols = ["Store_ID","Store_Name","City_Name","Region","Monthly_Target"]
    stores[[c for c in store_cols if c in stores.columns]].to_sql(
        "Dim_Store", conn, if_exists="append", index=False
    )

    cust_cols = ["Customer_ID","Full_Name","City_Name","Region","Region_Shipping_Cost"]
    customers[[c for c in cust_cols if c in customers.columns]].to_sql(
        "Dim_Customer", conn, if_exists="append", index=False
    )

    dim_date.to_sql("Dim_Date", conn, if_exists="append", index=False)

    # 6) load fact
    fact_cols = ["Trans_ID","DateKey","Store_ID","Product_ID","Customer_ID",
                 "Quantity","Total_Revenue","Net_Profit", "Marketing_Cost"]
    fact = sales[[c for c in fact_cols if c in sales.columns]].copy()
    fact = fact.dropna(subset=["Trans_ID","DateKey","Store_ID","Product_ID","Customer_ID"])
    fact["DateKey"] = fact["DateKey"].astype(int)

    fact.to_sql("Fact_Sales", conn, if_exists="append", index=False)

    conn.commit()
    conn.close()


if __name__ == "__main__":
    load_dw_data()
    print("✅ DW loaded into techstore_dw.db")

✅ DW loaded into techstore_dw.db


Main

In [17]:
def extract(ENV_KEYS: Dict[str, str | None]) -> None:
    print("EXTRACTING DATABASE")
    extract_erp(
        ENV_KEYS.get("DB_HOST"),
        ENV_KEYS.get("DB_NAME"),
        ENV_KEYS.get("DB_USERNAME"),
        ENV_KEYS.get("DB_PASSWORD")
    )
    print("DONE EXTRACTING DATABASE")

    print("EXTRACTING WEBSITE")
    extract_scraper(ENV_KEYS.get("WEBSITE"))
    print("DONE EXTRACTING WEBSITE")

    print("EXTRACTING LOCAL FILES")
    extract_locale()
    print("DONE EXTRACTING LOCAL FILES")

    print("EXTRACTING IMAGES")
    extract_ocr()
    print("DONE EXTRACTING LOCAL FILES")

def transfrom() -> None:
    print("TRANSFROMING ERP")
    transform_erp()
    print("DONE TRANSFROMING ERP")

def load():
    print("LOADING DATA WAREHOUSE")
    # load dimensions + fact
    load_dw_data()
    print("DONE LOADING DATA WAREHOUSE")


In [18]:
from dotenv import dotenv_values

ENV_KEYS=dotenv_values(".env")

extract(ENV_KEYS)

transfrom()

load()

EXTRACTING DATABASE


  dataframe=pd.read_sql("SHOW TABLES",connection)
  tmp=pd.read_sql(f"SELECT * FROM {table}",connection)


DONE EXTRACTING DATABASE
EXTRACTING WEBSITE
DONE EXTRACTING WEBSITE
EXTRACTING LOCAL FILES
DONE EXTRACTING LOCAL FILES
EXTRACTING IMAGES
DONE EXTRACTING LOCAL FILES
TRANSFROMING ERP
DONE TRANSFROMING ERP
LOADING DATA WAREHOUSE
DONE LOADING DATA WAREHOUSE
