In [2]:
# %% Imports and Configuration
import pandas as pd
from sqlalchemy import create_engine, text, inspect
from pathlib import Path
from IPython.display import display

# PostgreSQL Configuration
PG_CONFIG = {
    'host': 'localhost',
    'port': 5432,
    'dbname': 'olap',
    'user': 'postgres',
    'password': 'aa'
}
CSV_PATH = '../data/row_data.csv'

# Create SQLAlchemy engine
conn_str = f"postgresql://{PG_CONFIG['user']}:{PG_CONFIG['password']}@{PG_CONFIG['host']}:{PG_CONFIG['port']}/{PG_CONFIG['dbname']}"
engine = create_engine(conn_str)

print('‚úÖ Configuration loaded')
print(f'üìÅ CSV Path: {CSV_PATH}')
print(f"üóÑÔ∏è  Database: {PG_CONFIG['dbname']} @ {PG_CONFIG['host']}")

‚úÖ Configuration loaded
üìÅ CSV Path: ../data/row_data.csv
üóÑÔ∏è  Database: olap @ localhost


## Step 1: Drop Existing Tables

In [3]:
# %% Drop tables in dependency order (fact first, then dimensions)
ddl_drop = """
DROP TABLE IF EXISTS fact_sales CASCADE;
DROP TABLE IF EXISTS dim_product CASCADE;
DROP TABLE IF EXISTS dim_geography CASCADE;
DROP TABLE IF EXISTS dim_customer CASCADE;
DROP TABLE IF EXISTS dim_time CASCADE;
DROP TABLE IF EXISTS stg_sales CASCADE;
"""

print('üßπ Dropping existing tables...')
with engine.begin() as conn:
    conn.execute(text(ddl_drop))
print('‚úÖ Tables dropped successfully')

üßπ Dropping existing tables...
‚úÖ Tables dropped successfully


## Step 2: Create Tables with Exact DDL Schema

In [4]:
# %% Create tables with SERIAL PRIMARY KEYS using SQL DDL
ddl_create = """
-- TIME DIMENSION
CREATE TABLE dim_time (
    time_key SERIAL PRIMARY KEY,
    date_full DATE NOT NULL,
    year SMALLINT NOT NULL,
    quarter SMALLINT NOT NULL,
    month SMALLINT NOT NULL,
    week SMALLINT NOT NULL,
    day SMALLINT NOT NULL,
    day_name VARCHAR(10),
    quarter_name VARCHAR(20),
    month_name VARCHAR(20)
);

-- CUSTOMER DIMENSION
CREATE TABLE dim_customer (
    customer_key SERIAL PRIMARY KEY,
    customer_id VARCHAR(20) UNIQUE NOT NULL,
    customer_name VARCHAR(100),
    segment VARCHAR(50)
);

-- GEOGRAPHY DIMENSION
CREATE TABLE dim_geography (
    geo_key SERIAL PRIMARY KEY,
    region VARCHAR(50),
    state VARCHAR(100),
    city VARCHAR(100),
    country VARCHAR(100) DEFAULT 'United States',
    postal_code INTEGER
);

-- PRODUCT DIMENSION
CREATE TABLE dim_product (
    product_key SERIAL PRIMARY KEY,
    product_id VARCHAR(20) UNIQUE,
    category VARCHAR(50),
    sub_category VARCHAR(50),
    product_name VARCHAR(255)
);

-- FACT TABLE
CREATE TABLE fact_sales (
    fact_key SERIAL PRIMARY KEY,
    time_key INTEGER REFERENCES dim_time(time_key),
    customer_key INTEGER REFERENCES dim_customer(customer_key),
    geo_key INTEGER REFERENCES dim_geography(geo_key),
    product_key INTEGER REFERENCES dim_product(product_key),
    order_id VARCHAR(20),
    ship_mode VARCHAR(50),
    sales DECIMAL(10,2) NOT NULL,
    quantity INTEGER DEFAULT 1,
    load_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- STAGING TABLE for raw data
CREATE TABLE stg_sales (
    order_date DATE,
    customer_id VARCHAR(20),
    region VARCHAR(50),
    state VARCHAR(100),
    city VARCHAR(100),
    postal_code INTEGER,
    product_id VARCHAR(20),
    order_id VARCHAR(20),
    ship_mode VARCHAR(50),
    sales DECIMAL(10,2)
);
"""

print('üèóÔ∏è Creating tables with exact DDL schema...')
with engine.begin() as conn:
    conn.execute(text(ddl_create))
print('‚úÖ All tables created successfully')

üèóÔ∏è Creating tables with exact DDL schema...
‚úÖ All tables created successfully


## Step 3: Load CSV Data

In [5]:
# %% Load and prepare CSV data
print('üì• Loading CSV data...')
df = pd.read_csv(CSV_PATH)

# Clean column names
df.columns = df.columns.str.strip().str.lower().str.replace(r'[^a-z0-9_]', '_', regex=True)

# Parse dates
df['order_date'] = pd.to_datetime(df['order_date'], errors='coerce', dayfirst=True)

# Convert postal_code to integer (handle NaN)
df['postal_code'] = pd.to_numeric(df['postal_code'], errors='coerce').astype('Int64')

# Ensure country column exists
if 'country' not in df.columns:
    df['country'] = 'United States'

print(f'‚úÖ Loaded {len(df):,} rows')
print(f'\nüìã Columns: {list(df.columns)}')
print('\nüîç Sample Data:')
display(df.head())

üì• Loading CSV data...
‚úÖ Loaded 9,800 rows

üìã Columns: ['row_id', 'order_id', 'order_date', 'ship_date', 'ship_mode', 'customer_id', 'customer_name', 'segment', 'country', 'city', 'state', 'postal_code', 'region', 'product_id', 'category', 'sub_category', 'product_name', 'sales']

üîç Sample Data:


Unnamed: 0,row_id,order_id,order_date,ship_date,ship_mode,customer_id,customer_name,segment,country,city,state,postal_code,region,product_id,category,sub_category,product_name,sales
0,1,CA-2017-152156,2017-11-08,11/11/2017,Second Class,CG-12520,Claire Gute,Consumer,United States,Henderson,Kentucky,42420,South,FUR-BO-10001798,Furniture,Bookcases,Bush Somerset Collection Bookcase,261.96
1,2,CA-2017-152156,2017-11-08,11/11/2017,Second Class,CG-12520,Claire Gute,Consumer,United States,Henderson,Kentucky,42420,South,FUR-CH-10000454,Furniture,Chairs,"Hon Deluxe Fabric Upholstered Stacking Chairs,...",731.94
2,3,CA-2017-138688,2017-06-12,16/06/2017,Second Class,DV-13045,Darrin Van Huff,Corporate,United States,Los Angeles,California,90036,West,OFF-LA-10000240,Office Supplies,Labels,Self-Adhesive Address Labels for Typewriters b...,14.62
3,4,US-2016-108966,2016-10-11,18/10/2016,Standard Class,SO-20335,Sean O'Donnell,Consumer,United States,Fort Lauderdale,Florida,33311,South,FUR-TA-10000577,Furniture,Tables,Bretford CR4500 Series Slim Rectangular Table,957.5775
4,5,US-2016-108966,2016-10-11,18/10/2016,Standard Class,SO-20335,Sean O'Donnell,Consumer,United States,Fort Lauderdale,Florida,33311,South,OFF-ST-10000760,Office Supplies,Storage,Eldon Fold 'N Roll Cart System,22.368


## Step 4: Load Dimension Tables

In [6]:
# %% Load DIM_TIME
print('üìÖ Loading DIM_TIME...')

time_df = df[['order_date']].dropna().drop_duplicates().copy()
time_df = time_df.rename(columns={'order_date': 'date_full'})

# Add time attributes
time_df['year'] = time_df['date_full'].dt.year.astype('int16')
time_df['quarter'] = time_df['date_full'].dt.quarter.astype('int16')
time_df['month'] = time_df['date_full'].dt.month.astype('int16')
time_df['week'] = time_df['date_full'].dt.isocalendar().week.astype('int16')
time_df['day'] = time_df['date_full'].dt.day.astype('int16')
time_df['day_name'] = time_df['date_full'].dt.day_name().str[:10]
time_df['quarter_name'] = 'Q' + time_df['quarter'].astype(str)
time_df['month_name'] = time_df['date_full'].dt.month_name().str[:20]

# Insert using to_sql with append
time_df.to_sql('dim_time', engine, if_exists='append', index=False, method='multi')

print(f'‚úÖ Inserted {len(time_df):,} rows into dim_time')
display(time_df.head())

üìÖ Loading DIM_TIME...
‚úÖ Inserted 1,230 rows into dim_time
‚úÖ Inserted 1,230 rows into dim_time


Unnamed: 0,date_full,year,quarter,month,week,day,day_name,quarter_name,month_name
0,2017-11-08,2017,4,11,45,8,Wednesday,Q4,November
2,2017-06-12,2017,2,6,24,12,Monday,Q2,June
3,2016-10-11,2016,4,10,41,11,Tuesday,Q4,October
5,2015-06-09,2015,2,6,24,9,Tuesday,Q2,June
12,2018-04-15,2018,2,4,15,15,Sunday,Q2,April


In [7]:
# %% Load DIM_CUSTOMER
print('üë• Loading DIM_CUSTOMER...')

customer_df = df[['customer_id', 'customer_name', 'segment']].dropna(subset=['customer_id']).drop_duplicates()

# Insert using to_sql with append
customer_df.to_sql('dim_customer', engine, if_exists='append', index=False, method='multi')

print(f'‚úÖ Inserted {len(customer_df):,} rows into dim_customer')
print(f'\nüìä Segments:')
print(customer_df['segment'].value_counts())
display(customer_df.head())

üë• Loading DIM_CUSTOMER...
‚úÖ Inserted 793 rows into dim_customer

üìä Segments:
segment
Consumer       409
Corporate      236
Home Office    148
Name: count, dtype: int64


Unnamed: 0,customer_id,customer_name,segment
0,CG-12520,Claire Gute,Consumer
2,DV-13045,Darrin Van Huff,Corporate
3,SO-20335,Sean O'Donnell,Consumer
5,BH-11710,Brosina Hoffman,Consumer
12,AA-10480,Andrew Allen,Consumer


In [8]:
# %% Load DIM_GEOGRAPHY
print('üåç Loading DIM_GEOGRAPHY...')

geo_df = df[['region', 'state', 'city', 'country', 'postal_code']].drop_duplicates()

# Insert using to_sql with append
geo_df.to_sql('dim_geography', engine, if_exists='append', index=False, method='multi')

print(f'‚úÖ Inserted {len(geo_df):,} rows into dim_geography')
print(f'\nüìä Regions:')
print(geo_df['region'].value_counts())
display(geo_df.head())

üåç Loading DIM_GEOGRAPHY...
‚úÖ Inserted 628 rows into dim_geography

üìä Regions:
region
Central    194
West       180
South      135
East       119
Name: count, dtype: int64


Unnamed: 0,region,state,city,country,postal_code
0,South,Kentucky,Henderson,United States,42420
2,West,California,Los Angeles,United States,90036
3,South,Florida,Fort Lauderdale,United States,33311
5,West,California,Los Angeles,United States,90032
12,South,North Carolina,Concord,United States,28027


In [9]:
# %% Load DIM_PRODUCT
print('üì¶ Loading DIM_PRODUCT...')

# Deduplicate on product_id specifically (keep first occurrence)
product_df = df[['product_id', 'category', 'sub_category', 'product_name']].drop_duplicates(subset=['product_id'])

# Insert using to_sql with append
product_df.to_sql('dim_product', engine, if_exists='append', index=False, method='multi')

print(f'‚úÖ Inserted {len(product_df):,} rows into dim_product')
print(f'\nüìä Categories:')
print(product_df['category'].value_counts())
display(product_df.head())

üì¶ Loading DIM_PRODUCT...
‚úÖ Inserted 1,861 rows into dim_product

üìä Categories:
category
Office Supplies    1082
Technology          404
Furniture           375
Name: count, dtype: int64
‚úÖ Inserted 1,861 rows into dim_product

üìä Categories:
category
Office Supplies    1082
Technology          404
Furniture           375
Name: count, dtype: int64


Unnamed: 0,product_id,category,sub_category,product_name
0,FUR-BO-10001798,Furniture,Bookcases,Bush Somerset Collection Bookcase
1,FUR-CH-10000454,Furniture,Chairs,"Hon Deluxe Fabric Upholstered Stacking Chairs,..."
2,OFF-LA-10000240,Office Supplies,Labels,Self-Adhesive Address Labels for Typewriters b...
3,FUR-TA-10000577,Furniture,Tables,Bretford CR4500 Series Slim Rectangular Table
4,OFF-ST-10000760,Office Supplies,Storage,Eldon Fold 'N Roll Cart System


## Step 5: Load Staging Table and Insert Fact via JOINs

In [10]:
# %% Load staging table
print('üì§ Loading staging table...')

stg_cols = ['order_date', 'customer_id', 'region', 'state', 'city', 'postal_code',
            'product_id', 'order_id', 'ship_mode', 'sales']
stg_df = df[stg_cols].copy()

# Ensure sales is numeric
stg_df['sales'] = pd.to_numeric(stg_df['sales'], errors='coerce')

# Insert staging data
stg_df.to_sql('stg_sales', engine, if_exists='append', index=False, method='multi')

print(f'‚úÖ Inserted {len(stg_df):,} rows into stg_sales')

üì§ Loading staging table...
‚úÖ Inserted 9,800 rows into stg_sales
‚úÖ Inserted 9,800 rows into stg_sales


In [11]:
# %% Insert into fact_sales using SQL JOINs to map foreign keys
print('üîó Inserting into fact_sales via SQL JOINs...')

insert_fact_sql = text("""
INSERT INTO fact_sales (
    time_key, customer_key, geo_key, product_key,
    order_id, ship_mode, sales, quantity
)
SELECT
    dt.time_key,
    dc.customer_key,
    dg.geo_key,
    dp.product_key,
    s.order_id,
    s.ship_mode,
    s.sales,
    1 AS quantity
FROM stg_sales s
LEFT JOIN dim_time dt ON dt.date_full = s.order_date
LEFT JOIN dim_customer dc ON dc.customer_id = s.customer_id
LEFT JOIN dim_geography dg ON 
    COALESCE(dg.region, '') = COALESCE(s.region, '') AND
    COALESCE(dg.state, '') = COALESCE(s.state, '') AND
    COALESCE(dg.city, '') = COALESCE(s.city, '') AND
    COALESCE(dg.postal_code, 0) = COALESCE(s.postal_code, 0)
LEFT JOIN dim_product dp ON dp.product_id = s.product_id
;""")

with engine.begin() as conn:
    result = conn.execute(insert_fact_sql)
    
fact_count = pd.read_sql("SELECT COUNT(*) as cnt FROM fact_sales", engine)['cnt'].iloc[0]
print(f'‚úÖ Inserted {fact_count:,} rows into fact_sales')

üîó Inserting into fact_sales via SQL JOINs...
‚úÖ Inserted 9,800 rows into fact_sales
‚úÖ Inserted 9,800 rows into fact_sales


## Step 6: Create Indexes

In [12]:
# %% Create indexes for performance
print('üß± Creating indexes...')

ddl_indexes = """
CREATE INDEX IF NOT EXISTS idx_fact_time ON fact_sales(time_key);
CREATE INDEX IF NOT EXISTS idx_fact_geo ON fact_sales(geo_key);
CREATE INDEX IF NOT EXISTS idx_fact_customer ON fact_sales(customer_key);
CREATE INDEX IF NOT EXISTS idx_fact_product ON fact_sales(product_key);
"""

with engine.begin() as conn:
    conn.execute(text(ddl_indexes))

print('‚úÖ Indexes created successfully')

üß± Creating indexes...
‚úÖ Indexes created successfully


## Step 7: Verification

In [13]:
# %% Row count verification
print('=' * 80)
print('üìä ROW COUNT VERIFICATION')
print('=' * 80)

tables = ['dim_time', 'dim_customer', 'dim_geography', 'dim_product', 'fact_sales']
counts = []
for t in tables:
    cnt = pd.read_sql(f"SELECT COUNT(*) as cnt FROM {t}", engine)['cnt'].iloc[0]
    counts.append({'Table': t, 'Row Count': f'{cnt:,}'})

summary_df = pd.DataFrame(counts)
display(summary_df)

üìä ROW COUNT VERIFICATION


Unnamed: 0,Table,Row Count
0,dim_time,1230
1,dim_customer,793
2,dim_geography,628
3,dim_product,1861
4,fact_sales,9800


In [14]:
# %% Foreign key NULL check
print('\n' + '=' * 80)
print('üîó FOREIGN KEY NULL CHECK')
print('=' * 80)

fk_null_query = """
SELECT
    COUNT(*) AS total_rows,
    SUM(CASE WHEN time_key IS NULL THEN 1 ELSE 0 END) AS null_time_key,
    SUM(CASE WHEN customer_key IS NULL THEN 1 ELSE 0 END) AS null_customer_key,
    SUM(CASE WHEN geo_key IS NULL THEN 1 ELSE 0 END) AS null_geo_key,
    SUM(CASE WHEN product_key IS NULL THEN 1 ELSE 0 END) AS null_product_key
FROM fact_sales
"""

fk_nulls = pd.read_sql(fk_null_query, engine)
print('\nüîç NULL foreign keys in fact_sales:')
display(fk_nulls)


üîó FOREIGN KEY NULL CHECK

üîç NULL foreign keys in fact_sales:


Unnamed: 0,total_rows,null_time_key,null_customer_key,null_geo_key,null_product_key
0,9800,0,0,0,0


In [15]:
# %% Sample query: Sales by Category
print('\n' + '=' * 80)
print('üìà SAMPLE QUERY: Sales by Category')
print('=' * 80)

sample_query = """
SELECT 
    p.category,
    COUNT(DISTINCT f.order_id) AS orders,
    ROUND(SUM(f.sales)::numeric, 2) AS total_sales
FROM fact_sales f
LEFT JOIN dim_product p ON p.product_key = f.product_key
GROUP BY p.category
ORDER BY total_sales DESC NULLS LAST
"""

result = pd.read_sql(sample_query, engine)
display(result)


üìà SAMPLE QUERY: Sales by Category


Unnamed: 0,category,orders,total_sales
0,Technology,1519,827455.94
1,Furniture,1727,728658.75
2,Office Supplies,3676,705422.28


In [16]:
# %% Sample query: Sales by Region and Year
print('\n' + '=' * 80)
print('üìà SAMPLE QUERY: Sales by Region and Year')
print('=' * 80)

region_query = """
SELECT 
    g.region,
    t.year,
    COUNT(DISTINCT f.order_id) AS orders,
    ROUND(SUM(f.sales)::numeric, 2) AS total_sales
FROM fact_sales f
LEFT JOIN dim_geography g ON g.geo_key = f.geo_key
LEFT JOIN dim_time t ON t.time_key = f.time_key
GROUP BY g.region, t.year
ORDER BY t.year, total_sales DESC
"""

result2 = pd.read_sql(region_query, engine)
display(result2)


üìà SAMPLE QUERY: Sales by Region and Year


Unnamed: 0,region,year,orders,total_sales
0,West,2015,308,145907.99
1,East,2015,255,127652.82
2,South,2015,161,103374.94
3,Central,2015,223,102920.52
4,East,2016,287,153225.12
5,West,2016,332,133709.56
6,Central,2016,233,102425.19
7,South,2016,167,70076.07
8,West,2017,417,182471.3
9,East,2017,365,178511.67


In [17]:
# %% Schema verification
print('\n' + '=' * 80)
print('üìê SCHEMA STRUCTURE')
print('=' * 80)

inspector = inspect(engine)
for table in tables:
    print(f'\nüìã {table.upper()}')
    print('-' * 40)
    cols = inspector.get_columns(table)
    col_df = pd.DataFrame([{'Column': c['name'], 'Type': str(c['type'])} for c in cols])
    display(col_df)


üìê SCHEMA STRUCTURE

üìã DIM_TIME
----------------------------------------


Unnamed: 0,Column,Type
0,time_key,INTEGER
1,date_full,DATE
2,year,SMALLINT
3,quarter,SMALLINT
4,month,SMALLINT
5,week,SMALLINT
6,day,SMALLINT
7,day_name,VARCHAR(10)
8,quarter_name,VARCHAR(20)
9,month_name,VARCHAR(20)



üìã DIM_CUSTOMER
----------------------------------------


Unnamed: 0,Column,Type
0,customer_key,INTEGER
1,customer_id,VARCHAR(20)
2,customer_name,VARCHAR(100)
3,segment,VARCHAR(50)



üìã DIM_GEOGRAPHY
----------------------------------------


Unnamed: 0,Column,Type
0,geo_key,INTEGER
1,region,VARCHAR(50)
2,state,VARCHAR(100)
3,city,VARCHAR(100)
4,country,VARCHAR(100)
5,postal_code,INTEGER



üìã DIM_PRODUCT
----------------------------------------


Unnamed: 0,Column,Type
0,product_key,INTEGER
1,product_id,VARCHAR(20)
2,category,VARCHAR(50)
3,sub_category,VARCHAR(50)
4,product_name,VARCHAR(255)



üìã FACT_SALES
----------------------------------------


Unnamed: 0,Column,Type
0,fact_key,INTEGER
1,time_key,INTEGER
2,customer_key,INTEGER
3,geo_key,INTEGER
4,product_key,INTEGER
5,order_id,VARCHAR(20)
6,ship_mode,VARCHAR(50)
7,sales,"NUMERIC(10, 2)"
8,quantity,INTEGER
9,load_date,TIMESTAMP


In [18]:
# %% Final summary
print('\n' + '=' * 80)
print('üéâ ETL PIPELINE COMPLETE')
print('=' * 80)

print('\n‚úÖ All dimension and fact tables created with SERIAL primary keys')
print('‚úÖ Foreign keys mapped via SQL JOINs')
print('‚úÖ Indexes created for query performance')
print('\nüåü Star schema is ready for OLAP queries!')


üéâ ETL PIPELINE COMPLETE

‚úÖ All dimension and fact tables created with SERIAL primary keys
‚úÖ Foreign keys mapped via SQL JOINs
‚úÖ Indexes created for query performance

üåü Star schema is ready for OLAP queries!


## Optional: Cleanup Staging Table

In [19]:
# %% Drop staging table (optional)
print('üßπ Dropping staging table...')

with engine.begin() as conn:
    conn.execute(text('DROP TABLE IF EXISTS stg_sales;'))

print('‚úÖ Staging table dropped')

üßπ Dropping staging table...
‚úÖ Staging table dropped
‚úÖ Staging table dropped
