# Coffee Shop ETL
#### Omrahn Faqiri - 991 732 187

---

### Instructions:
Just change the `CURRENT_DAY` variable to either `"day1"`, `"day2"`, or `"day3"` depending on the day and run all cells in order. Refactoring everything from a pynb to a single python file was overcomplicating everything, so I left is as a notebook.

The current `/database/schema/dim_suppliers.csv` file is populated with the result of the third day. Just clear everything but the header row and run the days 1-3 and it should give the same output.

In [11]:
CURRENT_DAY = 'day3'

### Notes
---
#### SCD Type 2 Triggers (new row):
- `supplier_name`
- `supplier_category`
- `city`
- `delivery_frequency`
- `email`

> Create new row, assign new `supplier_key`, mark `is_active` as `True`, mark old row's `is_active` as `False`

#### SCD Type 1 Triggers (overwrite):
- `notes`
- `is_active`

> Just overwrite these bro
---

# Extract

In [12]:
# load data/day1/supplier_details.csv and suppliers.csv into pandas
import pandas as pd
import os
from datetime import datetime

# Paths
DIM_SUPPLIERS_PATH = "../database/schema/dim_suppliers.csv"
DETAILS_PATH = f"../data/{CURRENT_DAY}/supplier_details.csv"
SUPPLIERS_PATH = f"../data/{CURRENT_DAY}/suppliers.csv"

# Triggers
TYPE2 = ["supplier_name","supplier_category","city","delivery_frequency","email"]
TYPE1 = ["notes","is_active"]

# Daily Dataframes
details = pd.read_csv(DETAILS_PATH)
suppliers = pd.read_csv(SUPPLIERS_PATH)

# Ensure that there is a dim_suppliers to work with
if os.path.exists(DIM_SUPPLIERS_PATH) and os.path.getsize(DIM_SUPPLIERS_PATH) > 0:
    dim_suppliers = pd.read_csv(DIM_SUPPLIERS_PATH)
    # Convert is_active to boolean if it exists
    if 'is_active' in dim_suppliers.columns:
        dim_suppliers['is_active'] = dim_suppliers['is_active'].astype(bool)
else:
    dim_suppliers = pd.DataFrame() # start empty

# just to make the day mapping a bit more dynamic
def get_days_mapping(day: str):
    days_mapping = {
        'day1' : '2025-01-01',
        'day2' : '2025-01-02',
        'day3' : '2025-01-03'
    }
    return days_mapping[day]

# Clean

In [13]:
def clean(df_supp, df_det):
    # strip whitespace from strings
    df_supp = df_supp.map(lambda x: x.strip() if isinstance (x, str) else x)
    df_det = df_det.map(lambda x: x.strip() if isinstance (x, str) else x)
    # lowercase all emails
    df_det["Email"] = df_det["Email"].str.lower()

    # suppliers.dtypes already shows 'is_active' as bool

    return df_supp, df_det

suppliers, details = clean(suppliers, details)
print("suppliers:",suppliers.head(3))
print("\n" + "-"*90)
print("details:",details.head(3))
print(f"{"|"*35} [Data Types for Reference] {"|"*35}")
print(f"{suppliers.dtypes} \n\n\n {details.dtypes}")


suppliers:    SupplierID          SupplierName SupplierCategory         City  IsActive
0         101  BeanRoasters Company     Coffee Beans  Mississauga      True
1         102    SweetTreats Bakery           Bakery  Mississauga      True
2         104     CupTech Equipment        Equipment      Toronto      True

------------------------------------------------------------------------------------------
details:    SupplierID                     Email DeliveryFrequency  \
0         101  contact@beanroasters.com            Weekly   
1         102       info@sweettreats.ca             Daily   
2         104       support@cuptech.com           Monthly   

                            Notes  
0          Expanded delivery area  
1                  Same as before  
2  Now offers maintenance service  
||||||||||||||||||||||||||||||||||| [Data Types for Reference] |||||||||||||||||||||||||||||||||||
SupplierID           int64
SupplierName        object
SupplierCategory    object
City           

# Staging & Metadata
---
### Notes:
- first day has only 8 entreis anyway
- `source_date` for day 1 will be null since we don't *actually know* the source date.
  - day 2 will have a source date of '2025-01-01'
  - day 3 will have a source date of '2025-01-02'

In [14]:

def stage(df_supp, df_det, day='day1'):
    # merge the two dataframes based on the supplier_id
    df_staging = df_supp.merge(df_det, on="SupplierID", how="inner")

    # rename columns for consistency
    df_staging = df_staging.rename(columns={
        'SupplierID': 'supplier_id',
        'SupplierName': 'supplier_name',
        'SupplierCategory': 'supplier_category',
        'City': 'city',
        'IsActive': "is_active",
        'Email': 'email',
        'DeliveryFrequency': 'delivery_frequency',
        'Notes': 'notes'
    })

    # Drop the duplicate column (i kept having errors here)
    if 'supplier_id.1' in df_staging.columns:
        df_staging = df_staging.drop(columns=['supplier_id.1'])

    # -- add metadata --
    # add the source date
    df_staging["source_date"] = get_days_mapping(day)
    
    # update "last_updated_at" using isoformat
    df_staging["last_updated_at"] = datetime.now().isoformat(timespec='seconds')

    return df_staging

# Load Current Dimension & Delta Detection/SCD Logic

In [15]:
def load():
    # Get the staging & active df
    df_staging = stage(suppliers, details, CURRENT_DAY)
    df_active = dim_suppliers

    # -------- Day 1: Initial Load ------------
    # If we're in day 1, load the entire thing since it has no deltas
    if df_active.empty:
        # add supplier_key starting from 1
        df_staging.insert(0, 'supplier_key', range(1, len(df_staging) + 1))
        df_staging.to_csv(DIM_SUPPLIERS_PATH, index=False)
        print(f"Day 1: {len(df_staging)} rows loaded")
        return

    # -------- Day 2+: Loads  --------
    print(f"Finding deltas for day: {CURRENT_DAY}")

    # Get the rows that are active
    df_active_current = df_active[df_active['is_active'] == True].copy()
    
    # -------- Compare 'df_active_current' with 'df_staging' --------
    dim_suppliers_result = df_active.copy() # The df we will be updating & loading into the csv

    # Reset indices (for some reason they're different)
    df_active_current = df_active_current.reset_index(drop=True)
    df_staging = df_staging.reset_index(drop=True)

    # Get New Suppliers and append to the result'
    active_ids = set(df_active_current['supplier_id'])
    staging_ids = set(df_staging['supplier_id'])
    new_ids = staging_ids - active_ids
    new_suppliers = df_staging[df_staging['supplier_id'].isin(new_ids)].copy()

    # Assign supplier_key to new suppliers
    if not new_suppliers.empty:
        max_key = dim_suppliers_result['supplier_key'].max()
        next_key = int(max_key) + 1 if pd.notna(max_key) else 1
        new_suppliers.insert(0, 'supplier_key', range(next_key, next_key + len(new_suppliers)))
        new_suppliers['is_active'] = True
        dim_suppliers_result = pd.concat([dim_suppliers_result, new_suppliers], ignore_index=True)
        print(f"the new suppliers added: {len(new_suppliers)}")

    # Deactivate suppliers missing from staging
    missing_ids = active_ids - staging_ids
    if missing_ids:
        for mid in missing_ids:
            ids = dim_suppliers_result[
                (dim_suppliers_result['supplier_id'] == mid) &
                (dim_suppliers_result['is_active'] == True)
            ].index
            dim_suppliers_result.loc[ids, 'is_active'] = False
            dim_suppliers_result.loc[ids, 'last_updated_at'] = datetime.now().isoformat(timespec='seconds')
        print(f"Suppliers deactivated (missing from staging): {len(missing_ids)}")

    # Now for every existing supplier, compare staging row to the current active row 
    intersect_ids = active_ids.intersection(staging_ids)

    type2_count = 0
    type1_count = 0

    for sid in intersect_ids:
        # Get active row
        active_row = df_active_current[df_active_current['supplier_id'] == sid].iloc[0].to_dict()
        # Get staging row
        staging_row = df_staging[df_staging['supplier_id'] == sid].iloc[0].to_dict()

        # ---- TYPE 2 CHECK ----
        type2_changed = any(
            str(active_row.get(c, '')) != str(staging_row.get(c, '')) 
            for c in TYPE2)

        if type2_changed:
            # 1 seactivate current row in dim_suppliers_result
            ids = dim_suppliers_result[
                (dim_suppliers_result['supplier_id'] == sid) 
                & (dim_suppliers_result['is_active'] == True)].index

            dim_suppliers_result.loc[ids, 'is_active'] = False
            dim_suppliers_result.loc[ids, 'last_updated_at'] = datetime.now().isoformat(timespec='seconds')

            # 2 Append new TYPE 2 row
            new_row = staging_row.copy()
            max_key = dim_suppliers_result['supplier_key'].max()
            new_row['supplier_key'] = int(max_key) + 1 if pd.notna(max_key) else 1
            new_row['is_active'] = True
            new_row['last_updated_at'] = datetime.now().isoformat(timespec='seconds')

            dim_suppliers_result = pd.concat(
                [dim_suppliers_result, pd.DataFrame([new_row])],
                ignore_index=True,
            )
            type2_count += 1

            continue  # Skip 1 if 2 happened

        # ---- TYPE 1 CHECK ----
        type1_changed = any(str(active_row.get(c, '')) != str(staging_row.get(c, '')) for c in TYPE1)

        if type1_changed:
            ids = dim_suppliers_result[
                (dim_suppliers_result['supplier_id'] == sid) &
                (dim_suppliers_result['is_active'] == True)].index

            for rid in ids:
                for c in TYPE1:
                    dim_suppliers_result.at[rid, c] = staging_row[c]

                # Update last_updated_at but keep original
                dim_suppliers_result.at[rid, 'last_updated_at'] = datetime.now().isoformat(timespec='seconds')
            
            type1_count += 1

    # Save toCSV
    dim_suppliers_result.to_csv(DIM_SUPPLIERS_PATH, index=False)
    print(f"Loaded: {CURRENT_DAY}")
    print(f"Type 2 count: {type2_count}")
    print(f"Type 1 counrt: {type1_count}")
load()

Finding deltas for day: day3
the new suppliers added: 1
Suppliers deactivated (missing from staging): 1
Loaded: day3
Type 2 count: 3
Type 1 counrt: 3
