# Star Schema Transformation
**Transform Staging Tables to Star Schema for Analytics**

## Pipeline Overview
This notebook transforms staging tables from Delta Lake into a dimensional star schema:
- **Input**: 9 staging tables in `staging` database (with CDC metadata)
- **Process**: Filter deleted records, deduplicate by ID, create fact and dimension tables
- **Output**: 5 star schema tables ready for Snowflake export

**Star Schema Structure:**
- `FACT_ORDERS` - Central fact table with all measures
- `DIM_CUSTOMERS` - Customer dimension (enriched with geolocation)
- `DIM_SELLERS` - Seller dimension (enriched with geolocation)
- `DIM_PRODUCTS` - Product dimension (with Spanish & English categories)
- `DIM_DATE` - Date dimension (generated from order dates)

In [0]:
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.types import *
from datetime import datetime , timedelta

spark = SparkSession.builder \
    .appName("star-schema-transformation") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

print("=" * 100)
print("STAR SCHEMA TRANSFORMATION PIPELINE")
print("=" * 100)
print(f"Spark Version: {spark.version}")
print(f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 100)

STAR SCHEMA TRANSFORMATION PIPELINE
Spark Version: 3.5.2
Timestamp: 2025-11-09 18:03:39


## SECTION : Load Staging Tables from Delta Lake
Load all staging tables with CDC metadata (_fivetran_synced, _fivetran_deleted).

In [0]:
# Load all staging tables (with CDC metadata)
print("\nLoading staging tables from Delta Lake...\n")

stg_customers = spark.read.table("staging.stg_customers")
stg_orders = spark.read.table("staging.stg_orders")
stg_order_items = spark.read.table("staging.stg_order_items")
stg_payments = spark.read.table("staging.stg_payments")
stg_reviews = spark.read.table("staging.stg_reviews")
stg_products = spark.read.table("staging.stg_products")
stg_sellers = spark.read.table("staging.stg_sellers")
stg_product_category_translation = spark.read.table("staging.stg_product_category_translation")
stg_geolocation = spark.read.table("staging.stg_geolocation")

# Display row counts (includes deleted and duplicates)
print("Staging Tables Loaded (with CDC metadata):\n")
print(f"✓ stg_customers:                        {stg_customers.count():>12,} rows")
print(f"✓ stg_orders:                           {stg_orders.count():>12,} rows")
print(f"✓ stg_order_items:                      {stg_order_items.count():>12,} rows")
print(f"✓ stg_payments:                         {stg_payments.count():>12,} rows")
print(f"✓ stg_reviews:                          {stg_reviews.count():>12,} rows")
print(f"✓ stg_products:                         {stg_products.count():>12,} rows")
print(f"✓ stg_sellers:                          {stg_sellers.count():>12,} rows")
print(f"✓ stg_product_category_translation:     {stg_product_category_translation.count():>12,} rows")
print(f"✓ stg_geolocation:                      {stg_geolocation.count():>12,} rows")

print("✓ CDC deduplication will be applied in next section")


Loading staging tables from Delta Lake...

Staging Tables Loaded (with CDC metadata):

✓ stg_customers:                              99,441 rows
✓ stg_orders:                                 99,441 rows
✓ stg_order_items:                           112,650 rows
✓ stg_payments:                              103,886 rows
✓ stg_reviews:                                99,224 rows
✓ stg_products:                               32,951 rows
✓ stg_sellers:                                 3,095 rows
✓ stg_product_category_translation:               71 rows
✓ stg_geolocation:                           738,332 rows
✓ CDC deduplication will be applied in next section


## SECTION : CDC Deduplication & Deletion Filtering
Filter deleted records and deduplicate by ID for tables with unique identifiers.

**Deduplication Rules:**
- Tables WITH unique ID: Filter deleted → Deduplicate by ID (keep latest by _fivetran_synced)
- Tables WITHOUT unique ID or normal duplicates: Filter deleted only (NO deduplication)

In [0]:
def deduplicate_by_id(df, id_column, table_name):
    """
    Filter deleted records and deduplicate by ID column
    
    Steps:
    1. Filter _fivetran_deleted = FALSE
    2. Deduplicate by id_column, keep row with MAX(_fivetran_synced)
    3. Drop CDC metadata columns
    """
    print(f"\n{table_name}:")
    initial_count = df.count()
    
    # Step 1: Filter deleted records
    df_active = df.filter(F.col("_fivetran_deleted") == False)
    deleted_count = initial_count - df_active.count()
    print(f"  Initial rows: {initial_count:,}")
    print(f"  Deleted rows filtered: {deleted_count:,}")
    
    # Step 2: Deduplicate by ID (keep latest by _fivetran_synced)
    window_spec = Window.partitionBy(id_column).orderBy(F.col("_fivetran_synced").desc())
    df_dedup = df_active.withColumn("_rn", F.row_number().over(window_spec)) \
        .filter(F.col("_rn") == 1) \
        .drop("_rn")
    
    duplicates_removed = df_active.count() - df_dedup.count()
    print(f"  Duplicates removed (by {id_column}): {duplicates_removed:,}")
    
    # Step 3: Drop CDC metadata columns
    df_clean = df_dedup.drop("_fivetran_synced", "_fivetran_deleted", "_fivetran_id")
    
    final_count = df_clean.count()
    print(f"  Final rows: {final_count:,}")
    
    return df_clean

def filter_deleted_only(df, table_name):
    """
    Filter deleted records ONLY (NO deduplication)
    
    For tables without unique ID or where duplicates are normal
    """
    print(f"\n{table_name}:")
    initial_count = df.count()
    
    # Filter deleted records only
    df_active = df.filter(F.col("_fivetran_deleted") == False)
    deleted_count = initial_count - df_active.count()
    print(f"  Initial rows: {initial_count:,}")
    print(f"  Deleted rows filtered: {deleted_count:,}")
    print(f"  ✓ NO deduplication (duplicates are normal or no unique ID)")
    
    # Drop CDC metadata columns
    df_clean = df_active.drop("_fivetran_synced", "_fivetran_deleted", "_fivetran_id")
    
    final_count = df_clean.count()
    print(f"  Final rows: {final_count:,}")
    
    return df_clean

# ============================================================================
# TABLES WITH UNIQUE ID - Deduplicate by ID
# ============================================================================
print("\nTables with unique ID (applying deduplication):")
print("─" * 100)

customers_dedup = deduplicate_by_id(stg_customers, "customer_id", "CUSTOMERS")
orders_dedup = deduplicate_by_id(stg_orders, "order_id", "ORDERS")
products_dedup = deduplicate_by_id(stg_products, "product_id", "PRODUCTS")
sellers_dedup = deduplicate_by_id(stg_sellers, "seller_id", "SELLERS")
reviews_dedup = deduplicate_by_id(stg_reviews, "review_id", "REVIEWS")
geolocation_dedup = deduplicate_by_id(stg_geolocation,"geolocation_zip_code_prefix","GEOLOCATION ")

# ============================================================================
# TABLES WITHOUT UNIQUE ID OR NORMAL DUPLICATES - Filter deleted only
# ============================================================================
print("\n\nTables without unique ID or with normal duplicates (NO deduplication):")
print("─" * 100)

order_items_active = filter_deleted_only(stg_order_items, "ORDER_ITEMS (duplicates are normal)")
payments_active = filter_deleted_only(stg_payments, "PAYMENTS (multiple payments per order)")
translation_active = filter_deleted_only(stg_product_category_translation, "PRODUCT_CATEGORY_TRANSLATION (no ID column)")

print("\n✓ CDC deduplication and deletion filtering completed")


Tables with unique ID (applying deduplication):
────────────────────────────────────────────────────────────────────────────────────────────────────

CUSTOMERS:
  Initial rows: 99,441
  Deleted rows filtered: 0
  Duplicates removed (by customer_id): 0
  Final rows: 99,441

ORDERS:
  Initial rows: 99,441
  Deleted rows filtered: 0
  Duplicates removed (by order_id): 0
  Final rows: 99,441

PRODUCTS:
  Initial rows: 32,951
  Deleted rows filtered: 0
  Duplicates removed (by product_id): 0
  Final rows: 32,951

SELLERS:
  Initial rows: 3,095
  Deleted rows filtered: 0
  Duplicates removed (by seller_id): 0
  Final rows: 3,095

REVIEWS:
  Initial rows: 99,224
  Deleted rows filtered: 0
  Duplicates removed (by review_id): 814
  Final rows: 98,410

GEOLOCATION :
  Initial rows: 738,332
  Deleted rows filtered: 0
  Duplicates removed (by geolocation_zip_code_prefix): 719,317
  Final rows: 19,015


Tables without unique ID or with normal duplicates (NO deduplication):
───────────────────────

## SECTION : Create Orders Enrichment
Calculate delivery metrics from deduplicated orders table.

In [0]:
print("\nCalculating delivery metrics...")

# Add delivery_days and delivery_delay_flag to orders
orders_enriched = orders_dedup.withColumn(
    "delivery_days",
    F.when(
        F.col("order_delivered_customer_date").isNull(),
        -1
    ).otherwise(
        F.datediff(
            F.col("order_delivered_customer_date").cast("date"),
            F.col("order_purchase_timestamp").cast("date")
        )
    )
).withColumn(
    "delivery_delay_flag",
    F.when(
        F.col("order_delivered_customer_date").isNull(),
        "pending"
    ).when(
        F.col("order_delivered_customer_date") > F.col("order_estimated_delivery_date"),
        "delayed"
    ).otherwise("on_time")
)

# Statistics
delayed_count = orders_enriched.filter(F.col("delivery_delay_flag") == "delayed").count()
pending_count = orders_enriched.filter(F.col("delivery_delay_flag") == "pending").count()
on_time_count = orders_enriched.filter(F.col("delivery_delay_flag") == "on_time").count()

print(f"  Total orders: {orders_enriched.count():,}")
print(f"  On-time: {on_time_count:,} | Delayed: {delayed_count:,} | Pending: {pending_count:,}")

print("\n✓ Orders enrichment completed")


Calculating delivery metrics...
  Total orders: 99,441
  On-time: 88,649 | Delayed: 7,827 | Pending: 2,965

✓ Orders enrichment completed


## SECTION : Aggregate Payments and Reviews
Aggregate payments and reviews by order_id from deduplicated tables.

In [0]:
print("\n" + "=" * 100)
print("SECTION 4: AGGREGATE PAYMENTS AND REVIEWS")
print("=" * 100)

# Aggregate payments by order_id
print("\nAggregating payments by order_id...")

payments_agg = payments_active.groupBy("order_id").agg(
    F.sum("payment_value").alias("total_payment_value"),
    F.count("*").alias("payment_count"),
    F.max("payment_installments").alias("max_installments")
)

print(f"  ✓ Payments aggregated: {payments_agg.count():,} orders")

# Aggregate reviews by order_id
print("\nAggregating reviews by order_id...")

reviews_agg = reviews_dedup.groupBy("order_id").agg(
    F.avg("review_score").alias("avg_review_score"),
    F.count("*").alias("total_reviews")
)

print(f"  ✓ Reviews aggregated: {reviews_agg.count():,} orders")

print("\n✓ Aggregations completed")


SECTION 4: AGGREGATE PAYMENTS AND REVIEWS

Aggregating payments by order_id...
  ✓ Payments aggregated: 99,440 orders

Aggregating reviews by order_id...
  ✓ Reviews aggregated: 98,129 orders

✓ Aggregations completed


## SECTION : Enrich Products with English Category Names
Join deduplicated products with translation table to add English category names.

In [0]:
print("\nJoining products with translation table...")

# Join products with translation (both already filtered and deduplicated)
products_enriched = products_dedup.join(
    translation_active.drop("_fivetran_synced", "_fivetran_deleted", "_fivetran_id"),
    on="product_category_name",
    how="left"
).withColumn(
    "product_category_name_english",
    F.when(
        F.col("product_category_name_english").isNull(),
        F.col("product_category_name")
    ).otherwise(F.col("product_category_name_english"))
)

print(f"  ✓ Products enriched: {products_enriched.count():,}")

# Check for missing translations
null_english = products_enriched.filter(F.col("product_category_name_english").isNull()).count()
if null_english > 0:
    print(f"  ⚠ Warning: {null_english:,} products have no English translation")
else:
    print(f"  ✓ All products have English translations")

print("\n✓ Product enrichment completed")


Joining products with translation table...
  ✓ Products enriched: 32,951
  ✓ All products have English translations

✓ Product enrichment completed


## SECTION : Create DIM_CUSTOMERS (Enriched with Geolocation)
Embed geolocation data (city, state, lat, lng) into customer dimension.

In [0]:
print("\nJoining customers with geolocation...")

dim_customers = customers_dedup.join(
    geolocation_dedup,
    customers_dedup.customer_zip_code_prefix == geolocation_dedup.geolocation_zip_code_prefix,
    how="left"
).select(
    customers_dedup.customer_id,
    customers_dedup.customer_unique_id,
    customers_dedup.customer_zip_code_prefix,
    geolocation_dedup.geolocation_city.alias("customer_city"),
    geolocation_dedup.geolocation_state.alias("customer_state"),
    geolocation_dedup.geolocation_lat,
    geolocation_dedup.geolocation_lng
)

print(f"  ✓ DIM_CUSTOMERS created: {dim_customers.count():,} rows")

# Check for missing geolocation
null_geo = dim_customers.filter(F.col("customer_city").isNull()).count()
if null_geo > 0:
    print(f"  ⚠ Warning: {null_geo:,} customers have no geolocation data")
else:
    print(f"  ✓ All customers have geolocation data")

print("\nDIM_CUSTOMERS columns:", dim_customers.columns)


Joining customers with geolocation...
  ✓ DIM_CUSTOMERS created: 99,441 rows
  ✓ All customers have geolocation data

DIM_CUSTOMERS columns: ['customer_id', 'customer_unique_id', 'customer_zip_code_prefix', 'customer_city', 'customer_state', 'geolocation_lat', 'geolocation_lng']


## SECTION : Create DIM_SELLERS (Enriched with Geolocation)
Embed geolocation data (city, state, lat, lng) into seller dimension.

In [0]:
print("\nJoining sellers with geolocation...")

dim_sellers = sellers_dedup.join(
    geolocation_dedup,
    sellers_dedup.seller_zip_code_prefix == geolocation_dedup.geolocation_zip_code_prefix,
    how="left"
).select(
    sellers_dedup.seller_id,
    sellers_dedup.seller_zip_code_prefix,
    geolocation_dedup.geolocation_city.alias("seller_city"),
    geolocation_dedup.geolocation_state.alias("seller_state"),
    geolocation_dedup.geolocation_lat,
    geolocation_dedup.geolocation_lng
)

print(f"  ✓ DIM_SELLERS created: {dim_sellers.count():,} rows")

# Check for missing geolocation
null_geo = dim_sellers.filter(F.col("seller_city").isNull()).count()
if null_geo > 0:
    print(f"  ⚠ Warning: {null_geo:,} sellers have no geolocation data")
else:
    print(f"  ✓ All sellers have geolocation data")

print("\nDIM_SELLERS columns:", dim_sellers.columns)


Joining sellers with geolocation...
  ✓ DIM_SELLERS created: 3,095 rows
  ✓ All sellers have geolocation data

DIM_SELLERS columns: ['seller_id', 'seller_zip_code_prefix', 'seller_city', 'seller_state', 'geolocation_lat', 'geolocation_lng']


## SECTION : Create DIM_PRODUCTS (with Spanish & English Categories)
Create product dimension with both Spanish and English category names.

In [0]:
print("\nCreating DIM_PRODUCTS with both category names...")

dim_products = products_enriched.select(
    "product_id",
    "product_category_name",
    "product_category_name_english",
    "product_name_lenght",
    "product_description_lenght",
    "product_photos_qty",
    "product_weight_g",
    "product_length_cm",
    "product_height_cm",
    "product_width_cm"
)

print(f"  ✓ DIM_PRODUCTS created: {dim_products.count():,} rows")

print("\nDIM_PRODUCTS columns:", dim_products.columns)
print("\nSample:")
dim_products.select("product_id", "product_category_name", "product_category_name_english").show(5, truncate=False)


Creating DIM_PRODUCTS with both category names...
  ✓ DIM_PRODUCTS created: 32,951 rows

DIM_PRODUCTS columns: ['product_id', 'product_category_name', 'product_category_name_english', 'product_name_lenght', 'product_description_lenght', 'product_photos_qty', 'product_weight_g', 'product_length_cm', 'product_height_cm', 'product_width_cm']

Sample:
+--------------------------------+---------------------+-----------------------------+
|product_id                      |product_category_name|product_category_name_english|
+--------------------------------+---------------------+-----------------------------+
|00066f42aeeb9f3007548bb9d3f33c38|perfumaria           |perfumery                    |
|00088930e925c41fd95ebfe695fd2655|automotivo           |auto                         |
|0009406fd7479715e4bef61dd91f2462|cama_mesa_banho      |bed_bath_table               |
|000b8f95fcb9e0096488278317764d19|utilidades_domesticas|housewares                   |
|000d9be29b5207b54e86aa1b1ac54872|relogi

## SECTION : Create DIM_DATE
Generate date dimension from minimum order date to maximum estimated delivery date.

In [0]:
# Get date range from orders
print("\nCalculating date range...")

date_range = orders_enriched.agg(
    F.min("order_purchase_timestamp").alias("min_date"),
    F.max("order_estimated_delivery_date").alias("max_date")
).collect()[0]

min_date = date_range["min_date"]
max_date = date_range["max_date"]

print(f"  Min date: {min_date}")
print(f"  Max date: {max_date}")

# Generate date list
date_diff = (max_date - min_date).days
print(f"  Date range: {date_diff:,} days")

date_list = []
current_date = min_date
while current_date <= max_date:
    date_list.append((current_date,))
    current_date += timedelta(days=1)

print(f"  ✓ Generated {len(date_list):,} dates")

# Create date dimension
dim_date = spark.createDataFrame(date_list, ["date"])

dim_date = dim_date.withColumn(
    "date_id", 
    F.date_format(F.col("date"), "yyyyMMdd").cast("int")
).withColumn(
    "year", 
    F.year(F.col("date"))
).withColumn(
    "month", 
    F.month(F.col("date"))
).withColumn(
    "day", 
    F.dayofmonth(F.col("date"))
).withColumn(
    "quarter", 
    F.quarter(F.col("date"))
).withColumn(
    "week_of_year", 
    F.weekofyear(F.col("date"))
).withColumn(
    "day_of_week", 
    F.dayofweek(F.col("date"))
).withColumn(
    "day_name", 
    F.date_format(F.col("date"), "EEEE")
).withColumn(
    "month_name", 
    F.date_format(F.col("date"), "MMMM")
).withColumn(
    "is_weekend",
    F.when(F.col("day_of_week").isin([1, 7]), 1).otherwise(0)
)

print(f"  ✓ DIM_DATE created: {dim_date.count():,} rows")

print("\nSample:")
dim_date.orderBy("date").show(5, truncate=False)


Calculating date range...
  Min date: 2016-09-04 21:15:19
  Max date: 2018-11-12 00:00:00
  Date range: 798 days
  ✓ Generated 799 dates
  ✓ DIM_DATE created: 799 rows

Sample:
+-------------------+--------+----+-----+---+-------+------------+-----------+---------+----------+----------+
|date               |date_id |year|month|day|quarter|week_of_year|day_of_week|day_name |month_name|is_weekend|
+-------------------+--------+----+-----+---+-------+------------+-----------+---------+----------+----------+
|2016-09-04 21:15:19|20160904|2016|9    |4  |3      |35          |1          |Sunday   |September |1         |
|2016-09-05 21:15:19|20160905|2016|9    |5  |3      |36          |2          |Monday   |September |0         |
|2016-09-06 21:15:19|20160906|2016|9    |6  |3      |36          |3          |Tuesday  |September |0         |
|2016-09-07 21:15:19|20160907|2016|9    |7  |3      |36          |4          |Wednesday|September |0         |
|2016-09-08 21:15:19|20160908|2016|9    |8  |

## SECTION : Create FACT_ORDERS
Build the central fact table with all measures and dimension foreign keys.

In [0]:
# Start with order items (NO deduplication applied)
print("\nBuilding fact table from order items...")

fact_orders = order_items_active.select(
    "order_id",
    "order_item_id",
    "product_id",
    "seller_id",
    F.col("price").alias("item_price"),
    F.col("freight_value").alias("item_freight_value")
)

print(f"  ✓ Base: {fact_orders.count():,} rows")

# Join with enriched orders
print("\nJoining with orders...")

fact_orders = fact_orders.join(
    orders_enriched.select(
        "order_id",
        "customer_id",
        "order_purchase_timestamp",
        "order_delivered_customer_date",
        "order_estimated_delivery_date",
        "order_status",
        "delivery_days",
        "delivery_delay_flag"
    ),
    on="order_id",
    how="left"
)

print(f"  ✓ Orders joined: {fact_orders.count():,} rows")

# Join with aggregated payments
print("\nJoining with payments...")

fact_orders = fact_orders.join(
    payments_agg,
    on="order_id",
    how="left"
)

print(f"  ✓ Payments joined: {fact_orders.count():,} rows")

# Join with aggregated reviews
print("\nJoining with reviews...")

fact_orders = fact_orders.join(
    reviews_agg,
    on="order_id",
    how="left"
)

print(f"  ✓ Reviews joined: {fact_orders.count():,} rows")

# Create date dimension keys
print("\nCreating date dimension keys...")

fact_orders = fact_orders.withColumn(
    "order_purchase_date_id",
    F.date_format(F.col("order_purchase_timestamp"), "yyyyMMdd").cast("int")
).withColumn(
    "order_delivery_date_id",
    F.when(
        F.col("order_delivered_customer_date").isNotNull(),
        F.date_format(F.col("order_delivered_customer_date"), "yyyyMMdd").cast("int")
    ).otherwise(None)
)

# Create calculated measures
print("\nCreating calculated measures...")

fact_orders = fact_orders.withColumn(
    "total_item_amount",
    F.col("item_price") + F.col("item_freight_value")
).withColumn(
    "is_delivered",
    F.when(F.col("order_delivered_customer_date").isNotNull(), 1).otherwise(0)
).withColumn(
    "is_on_time",
    F.when(F.col("delivery_delay_flag") == "on_time", 1).otherwise(0)
).withColumn(
    "profit",
    F.when(
        F.col("order_delivered_customer_date").isNotNull(),
        F.col("total_payment_value") - F.col("item_price") - F.col("item_freight_value")
    ).otherwise(0)
)

# Select final columns
fact_orders = fact_orders.select(
    # Keys
    "order_id",
    "order_item_id",
    # Dimension Foreign Keys
    "customer_id",
    "seller_id",
    "product_id",
    "order_purchase_date_id",
    "order_delivery_date_id",
    # Order Details
    "order_status",
    "delivery_delay_flag",
    # Measures
    "item_price",
    "item_freight_value",
    "total_item_amount",
    "total_payment_value",
    "payment_count",
    "max_installments",
    "avg_review_score",
    "total_reviews",
    "delivery_days",
    "is_delivered",
    "is_on_time",
    "profit"
)

print(f"\n✓ FACT_ORDERS created: {fact_orders.count():,} rows")

# Statistics
print("\nFACT_ORDERS Statistics:")
fact_orders.select(
    F.count("*").alias("total_rows"),
    F.countDistinct("order_id").alias("unique_orders"),
    F.countDistinct("customer_id").alias("unique_customers"),
    F.countDistinct("product_id").alias("unique_products"),
    F.countDistinct("seller_id").alias("unique_sellers"),
    F.sum("is_delivered").alias("delivered_orders"),
    F.sum("is_on_time").alias("on_time_deliveries"),
    F.round(F.sum("total_payment_value"), 2).alias("total_revenue"),
    F.round(F.sum("profit"), 2).alias("total_profit")
).show(vertical=True)


Building fact table from order items...
  ✓ Base: 112,650 rows

Joining with orders...
  ✓ Orders joined: 112,650 rows

Joining with payments...
  ✓ Payments joined: 112,650 rows

Joining with reviews...
  ✓ Reviews joined: 112,650 rows

Creating date dimension keys...

Creating calculated measures...

✓ FACT_ORDERS created: 112,650 rows

FACT_ORDERS Statistics:
-RECORD 0---------------------------
 total_rows         | 112650        
 unique_orders      | 98666         
 unique_customers   | 98666         
 unique_products    | 32951         
 unique_sellers     | 3095          
 delivered_orders   | 110196        
 on_time_deliveries | 101481        
 total_revenue      | 2.030813471E7 
 total_profit       | 4356670.37    



## SECTION : Star Schema Summary
Display final row counts and schema information for all star schema tables.

In [0]:
print("\n📊 STAR SCHEMA TABLES CREATED:\n")

# Dimension Tables
print("DIMENSION TABLES:")
print(f"  ✓ DIM_CUSTOMERS:                        {dim_customers.count():>12,} rows")
print(f"  ✓ DIM_SELLERS:                          {dim_sellers.count():>12,} rows")
print(f"  ✓ DIM_PRODUCTS:                         {dim_products.count():>12,} rows")
print(f"  ✓ DIM_DATE:                             {dim_date.count():>12,} rows")

# Fact Table
print("\nFACT TABLE:")
print(f"  ✓ FACT_ORDERS:                          {fact_orders.count():>12,} rows")

# Total
total_rows = (
    dim_customers.count() + 
    dim_sellers.count() + 
    dim_products.count() + 
    dim_date.count() + 
    fact_orders.count()
)
print(f"\nTOTAL ROWS ACROSS ALL TABLES:             {total_rows:>12,}")

print("\n" + "=" * 100)
print("✓ Star schema transformation completed successfully!")
print("=" * 100)
print("""
These tables are ready to be exported to Snowflake:
  1. DIM_CUSTOMERS (deduplicated, with geolocation)
  2. DIM_SELLERS (deduplicated, with geolocation)
  3. DIM_PRODUCTS (deduplicated, with Spanish & English categories)
  4. DIM_DATE (generated from order dates)
  5. FACT_ORDERS (all measures and dimension keys)

All tables are clean and ready for analytics!
""")


📊 STAR SCHEMA TABLES CREATED:

DIMENSION TABLES:
  ✓ DIM_CUSTOMERS:                              99,441 rows
  ✓ DIM_SELLERS:                                 3,095 rows
  ✓ DIM_PRODUCTS:                               32,951 rows
  ✓ DIM_DATE:                                      799 rows

FACT TABLE:
  ✓ FACT_ORDERS:                               112,650 rows

TOTAL ROWS ACROSS ALL TABLES:                  248,936

✓ Star schema transformation completed successfully!

These tables are ready to be exported to Snowflake:
  1. DIM_CUSTOMERS (deduplicated, with geolocation)
  2. DIM_SELLERS (deduplicated, with geolocation)
  3. DIM_PRODUCTS (deduplicated, with Spanish & English categories)
  4. DIM_DATE (generated from order dates)
  5. FACT_ORDERS (all measures and dimension keys)

All tables are clean and ready for analytics!



## SECTION : Export Star Schema Tables to Snowflake
Write all star schema tables to Snowflake using Apache Spark connector.

In [0]:
import os
# Snowflake Connection Configuration
print("\nConfiguring Snowflake connection...\n")

SNOWFLAKE_CONFIG = {
    "sfUrl": os.getenv("sfURL"),
    "sfUser": os.getenv("sfUser"),
    "sfPassword": os.getenv("sfPassword"),
    "sfDatabase": "ECOMMERCE_DB",
    "sfSchema": "my_schema",
    "sfWarehouse": "ECOMMERCE_DWH",
    "sfRole": "ACCOUNTADMIN"
}

print("✓ Snowflake configuration loaded")
print(f"  Database: {SNOWFLAKE_CONFIG['sfDatabase']}")
print(f"  Schema: {SNOWFLAKE_CONFIG['sfSchema']}")
print(f"  Warehouse: {SNOWFLAKE_CONFIG['sfWarehouse']}")


Configuring Snowflake connection...

✓ Snowflake configuration loaded
  Database: ECOMMERCE_DB
  Schema: my_schema
  Warehouse: ECOMMERCE_DWH


### Write Dimension Tables
Export all dimension tables (DIM_CUSTOMERS, DIM_SELLERS, DIM_PRODUCTS, DIM_DATE) to Snowflake.

In [0]:
def write_to_snowflake(df, table_name, mode="overwrite"):
    """
    Write DataFrame to Snowflake table
    
    Args:
        df: Spark DataFrame to write
        table_name: Snowflake table name 
        mode: write mode - "overwrite"
    """
    try:
        print(f"  Writing {table_name}...", end=" ")
        
        df.write \
            .format("snowflake") \
            .options(**SNOWFLAKE_CONFIG) \
            .option("dbtable", table_name) \
            .option("truncateTable", "true") \
            .mode(mode) \
            .save()
        
        row_count = df.count()
        print(f"✓ Success ({row_count:,} rows)")
        return True
    except Exception as e:
        print(f"✗ Failed")
        print(f"    Error: {str(e)}")
        return False

print("\n" + "=" * 100)
print("WRITING DIMENSION TABLES TO SNOWFLAKE")
print("=" * 100 + "\n")

# Write dimension tables
results = {}

print("Dimension Tables:")
results["DIM_CUSTOMERS"] = write_to_snowflake(dim_customers, "DIM_CUSTOMERS", mode="overwrite")
results["DIM_SELLERS"] = write_to_snowflake(dim_sellers, "DIM_SELLERS", mode="overwrite")
results["DIM_PRODUCTS"] = write_to_snowflake(dim_products, "DIM_PRODUCTS", mode="overwrite")
results["DIM_DATE"] = write_to_snowflake(dim_date, "DIM_DATE", mode="overwrite")

print("\n✓ All dimension tables written to Snowflake")


WRITING DIMENSION TABLES TO SNOWFLAKE

Dimension Tables:
  Writing DIM_CUSTOMERS... ✓ Success (99,441 rows)
  Writing DIM_SELLERS... ✓ Success (3,095 rows)
  Writing DIM_PRODUCTS... ✓ Success (32,951 rows)
  Writing DIM_DATE... ✓ Success (799 rows)

✓ All dimension tables written to Snowflake


### Write Fact Table
Export the central FACT_ORDERS table (contains all measures and dimension foreign keys).

In [0]:
print("\n" + "=" * 100)
print("WRITING FACT TABLE TO SNOWFLAKE")
print("=" * 100 + "\n")

# Write fact table
print("Fact Table:")
results["FACT_ORDERS"] = write_to_snowflake(fact_orders, "FACT_ORDERS", mode="overwrite")

print("\n✓ Fact table written to Snowflake")


WRITING FACT TABLE TO SNOWFLAKE

Fact Table:
  Writing FACT_ORDERS... ✓ Success (112,650 rows)

✓ Fact table written to Snowflake


### Export Summary & Validation
Verify all tables were successfully exported to Snowflake.

In [0]:
print("\n" + "=" * 100)
print("SNOWFLAKE EXPORT SUMMARY")
print("=" * 100 + "\n")

# Summary
success_count = sum(1 for v in results.values() if v)
total_count = len(results)

print("Export Results:")
for table_name, success in results.items():
    status = "✓ SUCCESS" if success else "✗ FAILED"
    print(f"  {status}: {table_name}")

print(f"\nTotal: {success_count}/{total_count} tables exported successfully\n")

if success_count == total_count:
    print("✅ ALL STAR SCHEMA TABLES SUCCESSFULLY EXPORTED TO SNOWFLAKE!")
    print(f"\nSnowflake Location:")
    print(f"  Database: {SNOWFLAKE_CONFIG['sfDatabase']}")
    print(f"  Schema: {SNOWFLAKE_CONFIG['sfSchema']}")
    print(f"\nTables created:")
    print(f"  • DIM_CUSTOMERS ({dim_customers.count():,} rows)")
    print(f"  • DIM_SELLERS ({dim_sellers.count():,} rows)")
    print(f"  • DIM_PRODUCTS ({dim_products.count():,} rows)")
    print(f"  • DIM_DATE ({dim_date.count():,} rows)")
    print(f"  • FACT_ORDERS ({fact_orders.count():,} rows)")
else:
    print("⚠️  SOME TABLES FAILED TO EXPORT")
    print("Please check the error messages above and verify your Snowflake credentials.")

print("\n" + "=" * 100)
print("Export Complete")
print("=" * 100)


SNOWFLAKE EXPORT SUMMARY

Export Results:
  ✓ SUCCESS: DIM_CUSTOMERS
  ✓ SUCCESS: DIM_SELLERS
  ✓ SUCCESS: DIM_PRODUCTS
  ✓ SUCCESS: DIM_DATE
  ✓ SUCCESS: FACT_ORDERS

Total: 5/5 tables exported successfully

✅ ALL STAR SCHEMA TABLES SUCCESSFULLY EXPORTED TO SNOWFLAKE!

Snowflake Location:
  Database: ECOMMERCE_DB
  Schema: my_schema

Tables created:
  • DIM_CUSTOMERS (99,441 rows)
  • DIM_SELLERS (3,095 rows)
  • DIM_PRODUCTS (32,951 rows)
  • DIM_DATE (799 rows)
  • FACT_ORDERS (112,650 rows)

Export Complete
