In [28]:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, text
import os
from dotenv import load_dotenv

# ==========================================
# 1. CONFIGURATION
# ==========================================
load_dotenv()

db_user = os.getenv('DB_USER')
db_password = os.getenv('DB_PASSWORD')
db_host = os.getenv('DB_HOST')
db_port = os.getenv('DB_PORT')
db_name = os.getenv('DB_NAME')

connection_str = f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}'
engine = create_engine(connection_str)

print("Starting ETL Process...")

# ==========================================
# 1.5 CLEANUP
# ==========================================
with engine.connect() as conn:
    print("Cleaning old data...")
    # TRUNCATE deletes data but keeps the table structure. CASCADE handles the foreign keys.
    # Try/except for the first run.
    try:
        conn.execute(text("""
            TRUNCATE TABLE
                fact_sales,
                dim_weather,
                dim_customers,
                dim_articles,
                dim_region,
                dim_customer_junk,
                dim_customer_outrigger,
                dim_product_type,
                dim_graphical_appearance,
                dim_color,
                dim_time
            RESTART IDENTITY CASCADE;
        """))
        conn.commit()
        print("Old data cleared.")
    except Exception as e:
        print("Cleanup skipped (Tables might not exist yet).")

Starting ETL Process...
Cleaning old data...
Old data cleared.


In [29]:
# ==========================================
# 2. SQL DDL
# ==========================================
ddl_statements = """
-- 1. Region Dimension
CREATE TABLE IF NOT EXISTS dim_region (
    region_id SERIAL PRIMARY KEY,
    postal_code VARCHAR(100),
    region_name VARCHAR(100)
);

-- 2. Customer Junk Dimension (Active / Club Status)
CREATE TABLE IF NOT EXISTS dim_customer_junk (
    junk_key SERIAL PRIMARY KEY,
    active NUMERIC(2,1),
    club_member_status VARCHAR(50)
);

-- 3. Customer Outrigger (FN / Fashion News)
CREATE TABLE IF NOT EXISTS dim_customer_outrigger (
    o_key SERIAL PRIMARY KEY,
    fn NUMERIC(2,1),
    fn_freq VARCHAR(50)
);

-- 4. Customer Dimension
CREATE TABLE IF NOT EXISTS dim_customers (
    customer_id VARCHAR(64) PRIMARY KEY,
    age INTEGER,
    junk_key INTEGER REFERENCES dim_customer_junk(junk_key),
    o_key INTEGER REFERENCES dim_customer_outrigger(o_key),
    region_id INTEGER REFERENCES dim_region(region_id)
);

-- 5. Product Type Dimension
CREATE TABLE IF NOT EXISTS dim_product_type (
    product_type_no INTEGER PRIMARY KEY,
    product_type_name VARCHAR(255),
    product_group_name VARCHAR(255)
);

-- 6. Graphical Appearance Dimension
CREATE TABLE IF NOT EXISTS dim_graphical_appearance (
    graph_appearance_no INTEGER PRIMARY KEY,
    graph_appearance_name VARCHAR(255)
);

-- 7. Color Dimension
CREATE TABLE IF NOT EXISTS dim_color (
    color_group_code INTEGER PRIMARY KEY,
    color_group_name VARCHAR(255),
    perceived_color_value_name VARCHAR(255),
    perceived_color_master_name VARCHAR(255)
);

-- 8. Article Dimension
CREATE TABLE IF NOT EXISTS dim_articles (
    article_id INTEGER PRIMARY KEY,
    product_type_no INTEGER REFERENCES dim_product_type(product_type_no),
    graph_appearance_no INTEGER REFERENCES dim_graphical_appearance(graph_appearance_no),
    color_group_code INTEGER REFERENCES dim_color(color_group_code),
    prod_code INTEGER,
    prod_name VARCHAR(255)
);

-- 9. Time Dimension
CREATE TABLE IF NOT EXISTS dim_time (
    t_dat DATE PRIMARY KEY,
    day INTEGER,
    weekday INTEGER,
    week INTEGER,
    month INTEGER,
    season VARCHAR(20)
);

-- 10. Weather Dimension
CREATE TABLE IF NOT EXISTS dim_weather (
    day DATE PRIMARY KEY REFERENCES dim_time(t_dat),
    weather_code INTEGER,
    description VARCHAR(255)
);

# TODO: The fact table should have a composite PK
-- 11. Fact Table (Sales)
CREATE TABLE IF NOT EXISTS fact_sales (
    t_dat DATE REFERENCES dim_time(t_dat),
    customer_id VARCHAR(64) REFERENCES dim_customers(customer_id),
    article_id INTEGER REFERENCES dim_articles(article_id),
    price NUMERIC(10,5),
    sales_channel_id INTEGER
);
"""

# Execute DDL
with engine.connect() as conn:
    conn.execute(text(ddl_statements))
    conn.commit()

print("Schema Checked/Created.")

Schema Checked/Created.


In [30]:
# ==========================================
# 3. EXTRACT (Load CSVs with Fixes)
# ==========================================

df_trans = pd.read_csv('data/transactions.csv', parse_dates=['t_dat'])
df_articles = pd.read_csv('data/articles.csv')
df_customers = pd.read_csv('data/customers.csv')
df_weather = pd.read_csv('data/open-meteo.csv', parse_dates=['day'])

# Clean weather columns
df_weather.columns = df_weather.columns.str.strip()

# Check if the file was read correctly
if len(df_weather.columns) < 2:
    print("Warning: Weather file looks like it has only 1 column. Trying semicolon separator...")
    df_weather = pd.read_csv('data/open-meteo.csv', parse_dates=['day'], sep=';')
    df_weather.columns = df_weather.columns.str.strip()

# And rename if necessary (Standardize to 'weather_code')
for col in df_weather.columns:
    if 'code' in col.lower() and 'weather' in col.lower():
        df_weather.rename(columns={col: 'weather_code'}, inplace=True)

print("Weather columns found:", df_weather.columns.tolist())

print("Files Loaded.")

Weather columns found: ['day', 'weather_code']
Files Loaded.


In [31]:
# ==========================================
# 4. TRANSFORM & LOAD
# ==========================================

# --- A. Region Dimension ---
# Logic: Map postal_code -> modulo 10 -> region_name
# Since schema puts postal_code IN region dim, we treat region_id as a surrogate for unique postal codes
# TODO: Why surrogate key?
region_names = {
    1: 'Stockholm', 2: 'Södermanland / Östergötland', 3: 'Jönköping',
    4: 'Skåne', 5: 'Kronoberg / Kalmar', 6: 'Värmland / Dalarna',
    7: 'Gävleborg / Västernorrland', 8: 'Västerbotten / Norrbotten',
    9: 'Blekinge', 0: 'Gotland'
}

unique_postals = df_customers[['postal_code']].drop_duplicates().reset_index(drop=True)

def calc_region_name(p_code):
    try:
        r_idx = int(p_code, 16) % 10
        return region_names.get(r_idx, 'Unknown')
    except:
        return 'Unknown'

unique_postals['region_name'] = unique_postals['postal_code'].apply(calc_region_name)
unique_postals['region_id'] = unique_postals.index + 1 # Surrogate Key

# Load Region Dim
unique_postals.to_sql('dim_region', engine, if_exists='append', index=False)
print(f"Loaded {len(unique_postals)} regions.")

Loaded 352899 regions.


In [32]:
# --- B. Product Type Dimension ---
cols_prod = ['product_type_no', 'product_type_name', 'product_group_name']
df_prod = df_articles[cols_prod].drop_duplicates('product_type_no')
df_prod.to_sql('dim_product_type', engine, if_exists='append', index=False)

132

In [33]:
# --- C. Graph Appearance Dimension ---
cols_graph = ['graphical_appearance_no', 'graphical_appearance_name']
df_graph = df_articles[cols_graph].drop_duplicates('graphical_appearance_no')

# TODO: Don't rename, work with the names that we have
df_graph.rename(columns={'graphical_appearance_no': 'graph_appearance_no',
                         'graphical_appearance_name': 'graph_appearance_name'}, inplace=True)
df_graph.to_sql('dim_graphical_appearance', engine, if_exists='append', index=False)

30

In [34]:
# --- D. Color Dimension ---

cols_color = ['colour_group_code', 'colour_group_name', 'perceived_colour_value_name', 'perceived_colour_master_name']
df_color = df_articles[cols_color].drop_duplicates('colour_group_code')

# TODO: Don't rename, work with the names that we have
# Rename to match schema PDF exactly
df_color.rename(columns={
    'colour_group_code': 'color_group_code',
    'colour_group_name': 'color_group_name',
    'perceived_colour_value_name': 'perceived_color_value_name',
    'perceived_colour_master_name': 'perceived_color_master_name'
}, inplace=True)
df_color.to_sql('dim_color', engine, if_exists='append', index=False)

50

In [35]:
# --- E. Article Dimension ---

cols_art = ['article_id', 'product_type_no', 'graphical_appearance_no', 'colour_group_code', 'product_code', 'prod_name']
df_art = df_articles[cols_art].drop_duplicates('article_id')

# TODO: Don't rename, work with the names that we have
df_art.rename(columns={
    'graphical_appearance_no': 'graph_appearance_no',
    'colour_group_code': 'color_group_code',
    'product_code': 'prod_code'
}, inplace=True)
df_art.to_sql('dim_articles', engine, if_exists='append', index=False)

542

In [36]:
# --- F. Customer Dimensions (Junk & Outrigger) ---

# F1. Junk: Active, club_member_status
df_junk = df_customers[['Active', 'club_member_status']].drop_duplicates().reset_index(drop=True)
df_junk['junk_key'] = df_junk.index + 1
df_junk.rename(columns={'Active': 'active'}, inplace=True)
df_junk.to_sql('dim_customer_junk', engine, if_exists='append', index=False)

# F2. Outrigger: FN, fashion_news_frequency
df_outrigger = df_customers[['FN', 'fashion_news_frequency']].drop_duplicates().reset_index(drop=True)
df_outrigger['o_key'] = df_outrigger.index + 1
df_outrigger.rename(columns={'FN': 'fn', 'fashion_news_frequency': 'fn_freq'}, inplace=True)
df_outrigger.to_sql('dim_customer_outrigger', engine, if_exists='append', index=False)

8

In [37]:
# --- G. Main Customer Dimension ---
# Merge Junk, Outrigger, and Region IDs
df_c_m = df_customers.merge(df_junk.rename(columns={'active': 'Active'}), on=['Active', 'club_member_status'], how='left')
df_c_m = df_c_m.merge(df_outrigger.rename(columns={'fn': 'FN', 'fn_freq': 'fashion_news_frequency'}), on=['FN', 'fashion_news_frequency'], how='left')
df_c_m = df_c_m.merge(unique_postals[['postal_code', 'region_id']], on='postal_code', how='left')

df_c_final = df_c_m[['customer_id', 'age', 'junk_key', 'o_key', 'region_id']]
df_c_final.to_sql('dim_customers', engine, if_exists='append', index=False)
print("Customers Loaded.")

Customers Loaded.


In [38]:
# --- H. Time Dimension ---

dates = pd.DataFrame({'t_dat': pd.unique(np.concatenate((df_trans['t_dat'], df_weather['day']), 0))})
dates['day'] = dates['t_dat'].dt.day
dates['weekday'] = dates['t_dat'].dt.weekday # 0=Monday
dates['week'] = dates['t_dat'].dt.isocalendar().week
dates['month'] = dates['t_dat'].dt.month

def get_season(m):
    if m in [12, 1, 2]: return 'Winter'
    elif m in [3, 4, 5]: return 'Spring'
    elif m in [6, 7, 8]: return 'Summer'
    else: return 'Autumn'

dates['season'] = dates['month'].apply(get_season)
dates.to_sql('dim_time', engine, if_exists='append', index=False)

365

In [39]:
# --- I. Weather Dimension ---
print("Processing Weather...")

# 1. Define the Aggregated Logic Function
def get_weather_category(code):

    try:
        c = int(code)
    except:
        return "Unknown"

# TODO: Change names
    if 0 <= c <= 19:
        return "No precipitation (Clouds/Mist)"
    elif 20 <= c <= 29:
        return "Recent precipitation/fog (Past hour)"
    elif 30 <= c <= 39:
        return "Duststorm/Sandstorm/Blowing Snow"
    elif 40 <= c <= 49:
        return "Fog or Ice Fog"
    elif 50 <= c <= 59:
        return "Drizzle"
    elif 60 <= c <= 69:
        return "Rain"
    elif 70 <= c <= 79:
        return "Snow"
    elif 80 <= c <= 89:
        return "Showers"
    elif 90 <= c <= 99:
        return "Thunderstorm"
    else:
        return "Unknown"

# Apply the function
df_weather['description'] = df_weather['weather_code'].apply(get_weather_category)

# Load to Database
df_weather_final = df_weather[['day', 'weather_code', 'description']]
df_weather_final.to_sql('dim_weather', engine, if_exists='append', index=False)

print("Weather loaded with aggregated descriptions.")

Processing Weather...
Weather loaded with aggregated descriptions.


In [40]:
# --- J. Fact Table ---

df_facts = df_trans[['t_dat', 'customer_id', 'article_id', 'price', 'sales_channel_id']]
df_facts.to_sql('fact_sales', engine, if_exists='append', index=False)

print("ETL Process Complete.")

ETL Process Complete.
