In [1]:
import sys
!{sys.executable} -m pip install pyspark delta-spark

Collecting pyspark
  Downloading pyspark-4.1.1.tar.gz (455.4 MB)
     ---------------------------------------- 0.0/455.4 MB ? eta -:--:--
      -------------------------------------- 7.6/455.4 MB 41.5 MB/s eta 0:00:11
     - ------------------------------------ 18.9/455.4 MB 48.9 MB/s eta 0:00:09
     -- ----------------------------------- 30.9/455.4 MB 51.7 MB/s eta 0:00:09
     --- ---------------------------------- 43.3/455.4 MB 53.0 MB/s eta 0:00:08
     ---- --------------------------------- 56.1/455.4 MB 55.1 MB/s eta 0:00:08
     ----- -------------------------------- 66.8/455.4 MB 53.8 MB/s eta 0:00:08
     ------ ------------------------------- 79.2/455.4 MB 54.3 MB/s eta 0:00:07
     ------- ------------------------------ 91.8/455.4 MB 55.2 MB/s eta 0:00:07
     -------- ---------------------------- 105.1/455.4 MB 55.9 MB/s eta 0:00:07
     --------- --------------------------- 117.2/455.4 MB 56.0 MB/s eta 0:00:07
     ---------- -------------------------- 129.0/455.4 MB 56.2

  DEPRECATION: Building 'pyspark' using the legacy setup.py bdist_wheel mechanism, which will be removed in a future version. pip 25.3 will enforce this behaviour change. A possible replacement is to use the standardized build interface by setting the `--use-pep517` option, (possibly combined with `--no-build-isolation`), or adding a `pyproject.toml` file to the source tree of 'pyspark'. Discussion can be found at https://github.com/pypa/pip/issues/6334


In [2]:
# Core imports (no Java required!)
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random
import os
import shutil
import warnings
warnings.filterwarnings('ignore')

print("‚úÖ Libraries imported successfully!")
print(f"üì¶ Pandas version: {pd.__version__}")
print(f"üì¶ NumPy version: {np.__version__}")
print("\nüí° Using Pandas to simulate Medallion Architecture")
print("   (Same concepts as PySpark/Databricks, portable without Java)")

‚úÖ Libraries imported successfully!
üì¶ Pandas version: 2.2.3
üì¶ NumPy version: 2.1.3

üí° Using Pandas to simulate Medallion Architecture
   (Same concepts as PySpark/Databricks, portable without Java)


In [3]:
# Generate sample raw transaction data (simulating messy source data)

def generate_raw_transactions(num_records=1000):
    """Generate messy raw transaction data"""
    
    random.seed(42)
    np.random.seed(42)
    
    customers = ['CUST001', 'cust002', 'CUST-003', 'cust_004', 'CUST005',
                 'CUST006', 'cust007', 'CUST-008', 'cust_009', 'CUST010']
    
    products = ['Widget A', 'WIDGET A', 'widget a', 'Widget B', 'WIDGET B',
                'Gadget X', 'GADGET X', 'Gadget Y', 'gadget y', 'Tool Z']
    
    regions = ['Northeast', 'NORTHEAST', 'northeast', 'Southeast', 'SOUTHEAST',
               'Midwest', 'MIDWEST', 'Southwest', 'SOUTHWEST', 'West', 'WEST']
    
    statuses = ['completed', 'COMPLETED', 'Completed', 'pending', 'PENDING',
                'cancelled', 'CANCELLED', 'Cancelled']
    
    records = []
    base_date = datetime(2024, 1, 1)
    
    for i in range(num_records):
        days_offset = random.randint(0, 364)
        trans_date = base_date + timedelta(days=days_offset)
        
        date_formats = [
            trans_date.strftime('%Y-%m-%d'),
            trans_date.strftime('%m/%d/%Y'),
            trans_date.strftime('%d-%m-%Y'),
            trans_date.strftime('%Y/%m/%d'),
        ]
        
        record = {
            'transaction_id': f'TXN{str(i+1).zfill(6)}',
            'customer_id': random.choice(customers),
            'product_name': random.choice(products),
            'quantity': random.randint(1, 100),
            'unit_price': round(random.uniform(10.0, 500.0), 2),
            'transaction_date': random.choice(date_formats),
            'region': random.choice(regions),
            'status': random.choice(statuses),
            'discount_pct': random.choice([0, 5, 10, 15, 20, None]),
            'notes': random.choice(['', 'Rush order', 'PRIORITY', 'standard', None])
        }
        records.append(record)
    
    return pd.DataFrame(records)

raw_df = generate_raw_transactions(1000)

print("‚úÖ Generated 1000 raw transaction records")
print(f"\nüìä Sample Raw Data (first 5 rows):")
display(raw_df.head())

print(f"\nüìã Data Types:")
print(raw_df.dtypes)

‚úÖ Generated 1000 raw transaction records

üìä Sample Raw Data (first 5 rows):


Unnamed: 0,transaction_id,customer_id,product_name,quantity,unit_price,transaction_date,region,status,discount_pct,notes
0,TXN000001,cust002,Widget A,95,144.76,11/23/2024,northeast,COMPLETED,,
1,TXN000002,CUST010,GADGET X,5,24.6,02/14/2024,Southeast,completed,20.0,Rush order
2,TXN000003,cust_009,GADGET X,29,230.11,28-11-2024,Northeast,Completed,,standard
3,TXN000004,CUST005,widget a,28,479.03,23-06-2024,NORTHEAST,COMPLETED,15.0,
4,TXN000005,CUST006,Tool Z,34,405.49,2024/07/02,SOUTHWEST,COMPLETED,15.0,



üìã Data Types:
transaction_id       object
customer_id          object
product_name         object
quantity              int64
unit_price          float64
transaction_date     object
region               object
status               object
discount_pct        float64
notes                object
dtype: object


In [4]:
# Setup Medallion Architecture Directory Structure

base_path = "./medallion_lakehouse"

bronze_path = f"{base_path}/bronze/transactions"
silver_path = f"{base_path}/silver/transactions"
gold_path = f"{base_path}/gold"

if os.path.exists(base_path):
    shutil.rmtree(base_path)

for path in [bronze_path, silver_path, f"{gold_path}/daily_summary", f"{gold_path}/customer_metrics", f"{gold_path}/product_metrics"]:
    os.makedirs(path, exist_ok=True)

print("‚úÖ Medallion Lakehouse Directory Structure Created!")
print(f"""
üìÅ {base_path}/
   ‚îú‚îÄ‚îÄ ü•â bronze/
   ‚îÇ      ‚îî‚îÄ‚îÄ transactions/     (Raw ingested data)
   ‚îú‚îÄ‚îÄ ü•à silver/
   ‚îÇ      ‚îî‚îÄ‚îÄ transactions/     (Cleaned & standardized)
   ‚îî‚îÄ‚îÄ ü•á gold/
          ‚îú‚îÄ‚îÄ daily_summary/    (Daily aggregations)
          ‚îú‚îÄ‚îÄ customer_metrics/ (Customer analytics)
          ‚îî‚îÄ‚îÄ product_metrics/  (Product analytics)
""")

‚úÖ Medallion Lakehouse Directory Structure Created!

üìÅ ./medallion_lakehouse/
   ‚îú‚îÄ‚îÄ ü•â bronze/
   ‚îÇ      ‚îî‚îÄ‚îÄ transactions/     (Raw ingested data)
   ‚îú‚îÄ‚îÄ ü•à silver/
   ‚îÇ      ‚îî‚îÄ‚îÄ transactions/     (Cleaned & standardized)
   ‚îî‚îÄ‚îÄ ü•á gold/
          ‚îú‚îÄ‚îÄ daily_summary/    (Daily aggregations)
          ‚îú‚îÄ‚îÄ customer_metrics/ (Customer analytics)
          ‚îî‚îÄ‚îÄ product_metrics/  (Product analytics)



In [5]:
# ============================================================
# ü•â BRONZE LAYER - Raw Data Ingestion
# ============================================================

print("=" * 60)
print("ü•â BRONZE LAYER - Raw Data Ingestion")
print("=" * 60)

bronze_df = raw_df.copy()

bronze_df['_ingestion_timestamp'] = datetime.now()
bronze_df['_source_system'] = 'transaction_system'
bronze_df['_file_name'] = 'raw_transactions_2024.csv'

print("\nüìã Bronze Layer Schema:")
print(bronze_df.dtypes)

print("\nüìä Bronze Layer Sample Data:")
display(bronze_df.head())

bronze_df.to_parquet(f"{bronze_path}/transactions.parquet", index=False)

bronze_count = len(bronze_df)
print(f"\n‚úÖ Bronze Layer Complete!")
print(f"üìä Records ingested: {bronze_count}")
print(f"üìÅ Saved to: {bronze_path}/transactions.parquet")

ü•â BRONZE LAYER - Raw Data Ingestion

üìã Bronze Layer Schema:
transaction_id                  object
customer_id                     object
product_name                    object
quantity                         int64
unit_price                     float64
transaction_date                object
region                          object
status                          object
discount_pct                   float64
notes                           object
_ingestion_timestamp    datetime64[us]
_source_system                  object
_file_name                      object
dtype: object

üìä Bronze Layer Sample Data:


Unnamed: 0,transaction_id,customer_id,product_name,quantity,unit_price,transaction_date,region,status,discount_pct,notes,_ingestion_timestamp,_source_system,_file_name
0,TXN000001,cust002,Widget A,95,144.76,11/23/2024,northeast,COMPLETED,,,2026-01-22 22:31:24.173827,transaction_system,raw_transactions_2024.csv
1,TXN000002,CUST010,GADGET X,5,24.6,02/14/2024,Southeast,completed,20.0,Rush order,2026-01-22 22:31:24.173827,transaction_system,raw_transactions_2024.csv
2,TXN000003,cust_009,GADGET X,29,230.11,28-11-2024,Northeast,Completed,,standard,2026-01-22 22:31:24.173827,transaction_system,raw_transactions_2024.csv
3,TXN000004,CUST005,widget a,28,479.03,23-06-2024,NORTHEAST,COMPLETED,15.0,,2026-01-22 22:31:24.173827,transaction_system,raw_transactions_2024.csv
4,TXN000005,CUST006,Tool Z,34,405.49,2024/07/02,SOUTHWEST,COMPLETED,15.0,,2026-01-22 22:31:24.173827,transaction_system,raw_transactions_2024.csv



‚úÖ Bronze Layer Complete!
üìä Records ingested: 1000
üìÅ Saved to: ./medallion_lakehouse/bronze/transactions/transactions.parquet


In [6]:
# ============================================================
# ü•à SILVER LAYER - Data Cleaning & Standardization
# ============================================================

print("=" * 60)
print("ü•à SILVER LAYER - Data Cleaning & Standardization")
print("=" * 60)

bronze_raw = pd.read_parquet(f"{bronze_path}/transactions.parquet")

silver_df = bronze_raw.copy()

# 1. Standardize customer_id (uppercase, remove special characters)
import re
silver_df['customer_id'] = silver_df['customer_id'].apply(
    lambda x: re.sub(r'[^A-Za-z0-9]', '', str(x)).upper()
)

# 2. Standardize product_name (title case)
silver_df['product_name'] = silver_df['product_name'].str.title()

# 3. Standardize region (title case)
silver_df['region'] = silver_df['region'].str.title()

# 4. Standardize status (lowercase)
silver_df['status'] = silver_df['status'].str.lower()

# 5. Parse and standardize dates
def parse_date(date_str):
    formats = ['%Y-%m-%d', '%m/%d/%Y', '%d-%m-%Y', '%Y/%m/%d']
    for fmt in formats:
        try:
            return pd.to_datetime(date_str, format=fmt)
        except:
            continue
    return pd.to_datetime(date_str)

silver_df['transaction_date'] = silver_df['transaction_date'].apply(parse_date)

# 6. Handle null discount_pct (default to 0)
silver_df['discount_pct'] = silver_df['discount_pct'].fillna(0)

# 7. Calculate derived columns
silver_df['gross_amount'] = silver_df['quantity'] * silver_df['unit_price']
silver_df['discount_amount'] = silver_df['gross_amount'] * (silver_df['discount_pct'] / 100)
silver_df['net_amount'] = silver_df['gross_amount'] - silver_df['discount_amount']

# 8. Add date parts for partitioning/filtering
silver_df['year'] = silver_df['transaction_date'].dt.year
silver_df['month'] = silver_df['transaction_date'].dt.month
silver_df['day'] = silver_df['transaction_date'].dt.day

# 9. Add processing metadata
silver_df['_processing_timestamp'] = datetime.now()

print("\nüìã Silver Layer Schema:")
print(silver_df.dtypes)

print("\nüìä Silver Layer Sample Data (cleaned):")
display(silver_df[['transaction_id', 'customer_id', 'product_name', 
                   'quantity', 'unit_price', 'transaction_date',
                   'region', 'status', 'net_amount']].head(10))

silver_df.to_parquet(f"{silver_path}/transactions.parquet", index=False)

silver_count = len(silver_df)
print(f"\n‚úÖ Silver Layer Complete!")
print(f"üìä Records processed: {silver_count}")
print(f"üìÅ Saved to: {silver_path}/transactions.parquet")

ü•à SILVER LAYER - Data Cleaning & Standardization

üìã Silver Layer Schema:
transaction_id                   object
customer_id                      object
product_name                     object
quantity                          int64
unit_price                      float64
transaction_date         datetime64[ns]
region                           object
status                           object
discount_pct                    float64
notes                            object
_ingestion_timestamp     datetime64[us]
_source_system                   object
_file_name                       object
gross_amount                    float64
discount_amount                 float64
net_amount                      float64
year                              int32
month                             int32
day                               int32
_processing_timestamp    datetime64[us]
dtype: object

üìä Silver Layer Sample Data (cleaned):


Unnamed: 0,transaction_id,customer_id,product_name,quantity,unit_price,transaction_date,region,status,net_amount
0,TXN000001,CUST002,Widget A,95,144.76,2024-11-23,Northeast,completed,13752.2
1,TXN000002,CUST010,Gadget X,5,24.6,2024-02-14,Southeast,completed,98.4
2,TXN000003,CUST009,Gadget X,29,230.11,2024-11-28,Northeast,completed,6673.19
3,TXN000004,CUST005,Widget A,28,479.03,2024-06-23,Northeast,completed,11400.914
4,TXN000005,CUST006,Tool Z,34,405.49,2024-07-02,Southwest,completed,11718.661
5,TXN000006,CUST005,Tool Z,47,292.9,2024-10-09,Northeast,pending,12389.67
6,TXN000007,CUST002,Gadget X,36,232.17,2024-04-29,Northeast,cancelled,7522.308
7,TXN000008,CUST005,Widget A,78,321.14,2024-12-09,Northeast,cancelled,21291.582
8,TXN000009,CUST009,Widget B,88,168.9,2024-11-23,Southeast,completed,13376.88
9,TXN000010,CUST002,Widget B,73,439.42,2024-05-17,Southeast,cancelled,27266.011



‚úÖ Silver Layer Complete!
üìä Records processed: 1000
üìÅ Saved to: ./medallion_lakehouse/silver/transactions/transactions.parquet


In [7]:
# ============================================================
# üìä DATA QUALITY CHECKS - Silver Layer Validation
# ============================================================

print("=" * 60)
print("üìä DATA QUALITY CHECKS")
print("=" * 60)

silver_data = pd.read_parquet(f"{silver_path}/transactions.parquet")

print("\n1Ô∏è‚É£ NULL VALUE CHECK:")
print("-" * 40)
null_counts = silver_data.isnull().sum()
for col, count in null_counts.items():
    if count > 0:
        pct = (count / len(silver_data)) * 100
        print(f"   ‚ö†Ô∏è {col}: {count} nulls ({pct:.2f}%)")
if null_counts.sum() == 0:
    print("   ‚úÖ No null values found!")

print("\n2Ô∏è‚É£ UNIQUE VALUE CHECK:")
print("-" * 40)
print(f"   ‚Ä¢ Unique customers: {silver_data['customer_id'].nunique()}")
print(f"   ‚Ä¢ Unique products: {silver_data['product_name'].nunique()}")
print(f"   ‚Ä¢ Unique regions: {silver_data['region'].nunique()}")
print(f"   ‚Ä¢ Unique statuses: {silver_data['status'].nunique()}")

print("\n3Ô∏è‚É£ DATE RANGE CHECK:")
print("-" * 40)
print(f"   ‚Ä¢ Date range: {silver_data['transaction_date'].min()} to {silver_data['transaction_date'].max()}")

print("\n4Ô∏è‚É£ NUMERIC RANGE CHECK:")
print("-" * 40)
print(f"   ‚Ä¢ Net amount range: ${silver_data['net_amount'].min():.2f} to ${silver_data['net_amount'].max():.2f}")
print(f"   ‚Ä¢ Average net amount: ${silver_data['net_amount'].mean():.2f}")

print("\n5Ô∏è‚É£ STANDARDIZATION CHECK:")
print("-" * 40)
print("   Unique regions after standardization:")
print(f"   {silver_data['region'].unique().tolist()}")
print("\n   Unique statuses after standardization:")
print(f"   {silver_data['status'].unique().tolist()}")

print("\n‚úÖ Data Quality Checks Complete!")


üìä DATA QUALITY CHECKS

1Ô∏è‚É£ NULL VALUE CHECK:
----------------------------------------
   ‚ö†Ô∏è notes: 203 nulls (20.30%)

2Ô∏è‚É£ UNIQUE VALUE CHECK:
----------------------------------------
   ‚Ä¢ Unique customers: 10
   ‚Ä¢ Unique products: 5
   ‚Ä¢ Unique regions: 5
   ‚Ä¢ Unique statuses: 3

3Ô∏è‚É£ DATE RANGE CHECK:
----------------------------------------
   ‚Ä¢ Date range: 2024-01-01 00:00:00 to 2024-12-30 00:00:00

4Ô∏è‚É£ NUMERIC RANGE CHECK:
----------------------------------------
   ‚Ä¢ Net amount range: $13.85 to $48808.90
   ‚Ä¢ Average net amount: $12481.21

5Ô∏è‚É£ STANDARDIZATION CHECK:
----------------------------------------
   Unique regions after standardization:
   ['Northeast', 'Southeast', 'Southwest', 'West', 'Midwest']

   Unique statuses after standardization:
   ['completed', 'pending', 'cancelled']

‚úÖ Data Quality Checks Complete!


In [8]:
# ============================================================
# ü•á GOLD LAYER - Business Aggregations
# ============================================================

print("=" * 60)
print("ü•á GOLD LAYER - Business Aggregations")
print("=" * 60)

silver_data = pd.read_parquet(f"{silver_path}/transactions.parquet")

# ============================================================
# GOLD TABLE 1: Daily Sales Summary
# ============================================================
print("\nüìä Creating Gold Table: Daily Sales Summary")
print("-" * 40)

completed_data = silver_data[silver_data['status'] == 'completed']

daily_summary = completed_data.groupby(['transaction_date', 'region']).agg(
    total_transactions=('transaction_id', 'count'),
    total_units=('quantity', 'sum'),
    gross_revenue=('gross_amount', 'sum'),
    total_discounts=('discount_amount', 'sum'),
    net_revenue=('net_amount', 'sum'),
    avg_order_value=('net_amount', 'mean')
).reset_index()

daily_summary['_aggregation_timestamp'] = datetime.now()
daily_summary = daily_summary.sort_values(['transaction_date', 'region'])

print("\nüìà Daily Sales Summary (sample):")
display(daily_summary.head(10))

daily_summary.to_parquet(f"{gold_path}/daily_summary/summary.parquet", index=False)
print(f"‚úÖ Saved to: {gold_path}/daily_summary/summary.parquet")

ü•á GOLD LAYER - Business Aggregations

üìä Creating Gold Table: Daily Sales Summary
----------------------------------------

üìà Daily Sales Summary (sample):


Unnamed: 0,transaction_date,region,total_transactions,total_units,gross_revenue,total_discounts,net_revenue,avg_order_value,_aggregation_timestamp
0,2024-01-03,Midwest,1,53,14964.02,0.0,14964.02,14964.02,2026-01-22 22:32:03.595716
1,2024-01-05,Midwest,1,89,21616.32,1080.816,20535.504,20535.504,2026-01-22 22:32:03.595716
2,2024-01-07,Northeast,1,74,24998.68,0.0,24998.68,24998.68,2026-01-22 22:32:03.595716
3,2024-01-09,Southeast,1,57,13320.33,0.0,13320.33,13320.33,2026-01-22 22:32:03.595716
4,2024-01-10,Southwest,1,58,11446.88,572.344,10874.536,10874.536,2026-01-22 22:32:03.595716
5,2024-01-11,West,1,50,19597.5,0.0,19597.5,19597.5,2026-01-22 22:32:03.595716
6,2024-01-14,Midwest,1,51,13054.98,1305.498,11749.482,11749.482,2026-01-22 22:32:03.595716
7,2024-01-14,Southeast,1,98,48808.9,0.0,48808.9,48808.9,2026-01-22 22:32:03.595716
8,2024-01-15,Midwest,1,60,14870.4,743.52,14126.88,14126.88,2026-01-22 22:32:03.595716
9,2024-01-15,Northeast,1,92,30220.16,3022.016,27198.144,27198.144,2026-01-22 22:32:03.595716


‚úÖ Saved to: ./medallion_lakehouse/gold/daily_summary/summary.parquet


In [9]:
# ============================================================
# GOLD TABLE 2: Customer Metrics
# ============================================================
print("\nüìä Creating Gold Table: Customer Metrics")
print("-" * 40)

customer_metrics = completed_data.groupby('customer_id').agg(
    total_orders=('transaction_id', 'count'),
    total_units_purchased=('quantity', 'sum'),
    total_spend=('net_amount', 'sum'),
    avg_order_value=('net_amount', 'mean'),
    first_purchase_date=('transaction_date', 'min'),
    last_purchase_date=('transaction_date', 'max'),
    orders_with_discount=('discount_pct', lambda x: (x > 0).sum())
).reset_index()

customer_metrics['_aggregation_timestamp'] = datetime.now()

customer_metrics['spend_rank'] = customer_metrics['total_spend'].rank(ascending=False, method='dense').astype(int)

customer_metrics = customer_metrics.sort_values('total_spend', ascending=False)

print("\nüë• Customer Metrics (Top 10 by spend):")
display(customer_metrics.head(10))

customer_metrics.to_parquet(f"{gold_path}/customer_metrics/metrics.parquet", index=False)
print(f"‚úÖ Saved to: {gold_path}/customer_metrics/metrics.parquet")


üìä Creating Gold Table: Customer Metrics
----------------------------------------

üë• Customer Metrics (Top 10 by spend):


Unnamed: 0,customer_id,total_orders,total_units_purchased,total_spend,avg_order_value,first_purchase_date,last_purchase_date,orders_with_discount,_aggregation_timestamp,spend_rank
6,CUST007,41,2433,620597.474,15136.523756,2024-01-14,2024-12-30,29,2026-01-22 22:32:17.156913,1
1,CUST002,43,2344,525508.8505,12221.136058,2024-01-07,2024-12-29,22,2026-01-22 22:32:17.156913,2
9,CUST010,37,1919,509997.654,13783.720378,2024-01-10,2024-11-29,27,2026-01-22 22:32:17.156913,3
0,CUST001,43,2124,498107.3305,11583.891407,2024-01-03,2024-12-28,24,2026-01-22 22:32:17.156913,4
7,CUST008,31,1574,436013.4185,14064.948984,2024-02-10,2024-12-17,21,2026-01-22 22:32:17.156913,5
8,CUST009,36,1632,431231.6225,11978.656181,2024-01-27,2024-12-22,27,2026-01-22 22:32:17.156913,6
5,CUST006,32,1747,419126.1095,13097.690922,2024-01-15,2024-12-19,21,2026-01-22 22:32:17.156913,7
2,CUST003,27,1472,401172.293,14858.233074,2024-01-27,2024-12-21,18,2026-01-22 22:32:17.156913,8
3,CUST004,43,1993,386955.3045,8998.96057,2024-01-11,2024-12-30,24,2026-01-22 22:32:17.156913,9
4,CUST005,30,1549,317075.005,10569.166833,2024-02-07,2024-12-23,19,2026-01-22 22:32:17.156913,10


‚úÖ Saved to: ./medallion_lakehouse/gold/customer_metrics/metrics.parquet


In [10]:
# ============================================================
# GOLD TABLE 2: Customer Metrics
# ============================================================
print("\nüìä Creating Gold Table: Customer Metrics")
print("-" * 40)

customer_metrics = completed_data.groupby('customer_id').agg(
    total_orders=('transaction_id', 'count'),
    total_units_purchased=('quantity', 'sum'),
    total_spend=('net_amount', 'sum'),
    avg_order_value=('net_amount', 'mean'),
    first_purchase_date=('transaction_date', 'min'),
    last_purchase_date=('transaction_date', 'max'),
    orders_with_discount=('discount_pct', lambda x: (x > 0).sum())
).reset_index()

customer_metrics['_aggregation_timestamp'] = datetime.now()

customer_metrics['spend_rank'] = customer_metrics['total_spend'].rank(ascending=False, method='dense').astype(int)

customer_metrics = customer_metrics.sort_values('total_spend', ascending=False)

print("\nüë• Customer Metrics (Top 10 by spend):")
display(customer_metrics.head(10))

customer_metrics.to_parquet(f"{gold_path}/customer_metrics/metrics.parquet", index=False)
print(f"‚úÖ Saved to: {gold_path}/customer_metrics/metrics.parquet")


üìä Creating Gold Table: Customer Metrics
----------------------------------------

üë• Customer Metrics (Top 10 by spend):


Unnamed: 0,customer_id,total_orders,total_units_purchased,total_spend,avg_order_value,first_purchase_date,last_purchase_date,orders_with_discount,_aggregation_timestamp,spend_rank
6,CUST007,41,2433,620597.474,15136.523756,2024-01-14,2024-12-30,29,2026-01-22 22:32:28.103531,1
1,CUST002,43,2344,525508.8505,12221.136058,2024-01-07,2024-12-29,22,2026-01-22 22:32:28.103531,2
9,CUST010,37,1919,509997.654,13783.720378,2024-01-10,2024-11-29,27,2026-01-22 22:32:28.103531,3
0,CUST001,43,2124,498107.3305,11583.891407,2024-01-03,2024-12-28,24,2026-01-22 22:32:28.103531,4
7,CUST008,31,1574,436013.4185,14064.948984,2024-02-10,2024-12-17,21,2026-01-22 22:32:28.103531,5
8,CUST009,36,1632,431231.6225,11978.656181,2024-01-27,2024-12-22,27,2026-01-22 22:32:28.103531,6
5,CUST006,32,1747,419126.1095,13097.690922,2024-01-15,2024-12-19,21,2026-01-22 22:32:28.103531,7
2,CUST003,27,1472,401172.293,14858.233074,2024-01-27,2024-12-21,18,2026-01-22 22:32:28.103531,8
3,CUST004,43,1993,386955.3045,8998.96057,2024-01-11,2024-12-30,24,2026-01-22 22:32:28.103531,9
4,CUST005,30,1549,317075.005,10569.166833,2024-02-07,2024-12-23,19,2026-01-22 22:32:28.103531,10


‚úÖ Saved to: ./medallion_lakehouse/gold/customer_metrics/metrics.parquet


In [12]:
# ============================================================
# GOLD TABLE 3: Product Metrics
# ============================================================
print("\nüìä Creating Gold Table: Product Metrics")
print("-" * 40)

product_metrics = completed_data.groupby('product_name').agg(
    total_orders=('transaction_id', 'count'),
    total_units_sold=('quantity', 'sum'),
    total_revenue=('net_amount', 'sum'),
    avg_unit_price=('unit_price', 'mean'),
    avg_discount_pct=('discount_pct', 'mean'),
    customer_count=('customer_id', 'nunique')
).reset_index()

product_metrics['_aggregation_timestamp'] = datetime.now()

product_metrics['revenue_rank'] = product_metrics['total_revenue'].rank(ascending=False, method='dense').astype(int)

product_metrics = product_metrics.sort_values('total_revenue', ascending=False)

print("\nüì¶ Product Metrics (by revenue):")
display(product_metrics)

product_metrics.to_parquet(f"{gold_path}/product_metrics/metrics.parquet", index=False)
print(f"‚úÖ Saved to: {gold_path}/product_metrics/metrics.parquet")


üìä Creating Gold Table: Product Metrics
----------------------------------------

üì¶ Product Metrics (by revenue):


Unnamed: 0,product_name,total_orders,total_units_sold,total_revenue,avg_unit_price,avg_discount_pct,customer_count,_aggregation_timestamp,revenue_rank
3,Widget A,100,5056,1223387.0,256.4918,7.75,10,2026-01-22 22:33:13.302214,1
0,Gadget X,79,4222,1019434.0,244.777215,8.291139,10,2026-01-22 22:33:13.302214,2
4,Widget B,76,4072,960589.4,248.117237,7.828947,10,2026-01-22 22:33:13.302214,3
1,Gadget Y,70,3425,859284.0,258.856857,8.5,10,2026-01-22 22:33:13.302214,4
2,Tool Z,38,2012,483090.1,262.636579,7.894737,10,2026-01-22 22:33:13.302214,5


‚úÖ Saved to: ./medallion_lakehouse/gold/product_metrics/metrics.parquet


In [13]:
# ============================================================
# üìã MEDALLION PIPELINE SUMMARY
# ============================================================

print("=" * 60)
print("üìã MEDALLION PIPELINE SUMMARY")
print("=" * 60)

bronze_final = pd.read_parquet(f"{bronze_path}/transactions.parquet")
silver_final = pd.read_parquet(f"{silver_path}/transactions.parquet")
gold_daily = pd.read_parquet(f"{gold_path}/daily_summary/summary.parquet")
gold_customer = pd.read_parquet(f"{gold_path}/customer_metrics/metrics.parquet")
gold_product = pd.read_parquet(f"{gold_path}/product_metrics/metrics.parquet")

print(f"""
ü•â BRONZE LAYER (Raw Data)
   ‚îú‚îÄ‚îÄ Records: {len(bronze_final)}
   ‚îú‚îÄ‚îÄ Columns: {len(bronze_final.columns)}
   ‚îî‚îÄ‚îÄ Path: {bronze_path}

ü•à SILVER LAYER (Cleaned Data)
   ‚îú‚îÄ‚îÄ Records: {len(silver_final)}
   ‚îú‚îÄ‚îÄ Columns: {len(silver_final.columns)}
   ‚îî‚îÄ‚îÄ Path: {silver_path}

ü•á GOLD LAYER (Business Aggregations)
   ‚îú‚îÄ‚îÄ Daily Summary
   ‚îÇ      ‚îú‚îÄ‚îÄ Records: {len(gold_daily)}
   ‚îÇ      ‚îî‚îÄ‚îÄ Path: {gold_path}/daily_summary
   ‚îú‚îÄ‚îÄ Customer Metrics
   ‚îÇ      ‚îú‚îÄ‚îÄ Records: {len(gold_customer)}
   ‚îÇ      ‚îî‚îÄ‚îÄ Path: {gold_path}/customer_metrics
   ‚îî‚îÄ‚îÄ Product Metrics
          ‚îú‚îÄ‚îÄ Records: {len(gold_product)}
          ‚îî‚îÄ‚îÄ Path: {gold_path}/product_metrics
""")

completed_orders = silver_final[silver_final['status'] == 'completed']
total_revenue = completed_orders['net_amount'].sum()

print(f"""
üìà KEY BUSINESS METRICS
   ‚îú‚îÄ‚îÄ Total Completed Orders: {len(completed_orders)}
   ‚îú‚îÄ‚îÄ Total Revenue: ${total_revenue:,.2f}
   ‚îú‚îÄ‚îÄ Unique Customers: {len(gold_customer)}
   ‚îî‚îÄ‚îÄ Unique Products: {len(gold_product)}
""")

print("=" * 60)
print("üéâ MEDALLION PIPELINE COMPLETE!")
print("=" * 60)

üìã MEDALLION PIPELINE SUMMARY

ü•â BRONZE LAYER (Raw Data)
   ‚îú‚îÄ‚îÄ Records: 1000
   ‚îú‚îÄ‚îÄ Columns: 13
   ‚îî‚îÄ‚îÄ Path: ./medallion_lakehouse/bronze/transactions

ü•à SILVER LAYER (Cleaned Data)
   ‚îú‚îÄ‚îÄ Records: 1000
   ‚îú‚îÄ‚îÄ Columns: 20
   ‚îî‚îÄ‚îÄ Path: ./medallion_lakehouse/silver/transactions

ü•á GOLD LAYER (Business Aggregations)
   ‚îú‚îÄ‚îÄ Daily Summary
   ‚îÇ      ‚îú‚îÄ‚îÄ Records: 341
   ‚îÇ      ‚îî‚îÄ‚îÄ Path: ./medallion_lakehouse/gold/daily_summary
   ‚îú‚îÄ‚îÄ Customer Metrics
   ‚îÇ      ‚îú‚îÄ‚îÄ Records: 10
   ‚îÇ      ‚îî‚îÄ‚îÄ Path: ./medallion_lakehouse/gold/customer_metrics
   ‚îî‚îÄ‚îÄ Product Metrics
          ‚îú‚îÄ‚îÄ Records: 5
          ‚îî‚îÄ‚îÄ Path: ./medallion_lakehouse/gold/product_metrics


üìà KEY BUSINESS METRICS
   ‚îú‚îÄ‚îÄ Total Completed Orders: 363
   ‚îú‚îÄ‚îÄ Total Revenue: $4,545,785.06
   ‚îú‚îÄ‚îÄ Unique Customers: 10
   ‚îî‚îÄ‚îÄ Unique Products: 5

üéâ MEDALLION PIPELINE COMPLETE!


In [14]:
# ============================================================
# üíæ EXPORT GOLD TABLES TO CSV
# ============================================================

print("üíæ Exporting Gold tables to CSV...")

gold_daily.to_csv("gold_daily_summary.csv", index=False)
gold_customer.to_csv("gold_customer_metrics.csv", index=False)
gold_product.to_csv("gold_product_metrics.csv", index=False)

print("‚úÖ Exported to CSV files:")
print("   ‚Ä¢ gold_daily_summary.csv")
print("   ‚Ä¢ gold_customer_metrics.csv")
print("   ‚Ä¢ gold_product_metrics.csv")

print("""
============================================================
üéâ MEDALLION PIPELINE PROJECT COMPLETE!
============================================================

This pipeline demonstrated:
  ‚úÖ Bronze Layer - Raw data ingestion with metadata
  ‚úÖ Silver Layer - Data cleaning & standardization
  ‚úÖ Gold Layer - Business aggregations
  ‚úÖ Data Quality Checks
  ‚úÖ Parquet file storage (columnar format)
  ‚úÖ Customer & Product analytics

Skills Showcased:
  ‚Ä¢ Medallion Architecture design (Bronze ‚Üí Silver ‚Üí Gold)
  ‚Ä¢ Data transformation & cleaning
  ‚Ä¢ Aggregations & window functions
  ‚Ä¢ Data quality validation
  ‚Ä¢ Lakehouse patterns
  ‚Ä¢ Parquet file format

Relevant for:
  ‚Ä¢ Databricks Engineer roles
  ‚Ä¢ Data Engineer positions
  ‚Ä¢ Analytics Engineering
  
Note: This implementation uses Pandas to demonstrate the concepts.
In production, this would run on PySpark/Databricks at scale.
============================================================
""")

üíæ Exporting Gold tables to CSV...
‚úÖ Exported to CSV files:
   ‚Ä¢ gold_daily_summary.csv
   ‚Ä¢ gold_customer_metrics.csv
   ‚Ä¢ gold_product_metrics.csv

üéâ MEDALLION PIPELINE PROJECT COMPLETE!

This pipeline demonstrated:
  ‚úÖ Bronze Layer - Raw data ingestion with metadata
  ‚úÖ Silver Layer - Data cleaning & standardization
  ‚úÖ Gold Layer - Business aggregations
  ‚úÖ Data Quality Checks
  ‚úÖ Parquet file storage (columnar format)
  ‚úÖ Customer & Product analytics

Skills Showcased:
  ‚Ä¢ Medallion Architecture design (Bronze ‚Üí Silver ‚Üí Gold)
  ‚Ä¢ Data transformation & cleaning
  ‚Ä¢ Aggregations & window functions
  ‚Ä¢ Data quality validation
  ‚Ä¢ Lakehouse patterns
  ‚Ä¢ Parquet file format

Relevant for:
  ‚Ä¢ Databricks Engineer roles
  ‚Ä¢ Data Engineer positions
  ‚Ä¢ Analytics Engineering
  
Note: This implementation uses Pandas to demonstrate the concepts.
In production, this would run on PySpark/Databricks at scale.

