In [0]:
# Cell 1: Setup
%load_ext autoreload
%autoreload 2

import sys
from config.table_config import PROJECT_PATH

if PROJECT_PATH not in sys.path:
    sys.path.append(PROJECT_PATH)

from config import *
from utils.logger import log_info, log_metric
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql.types import StringType, DoubleType, TimestampType

configure_spark(spark)

In [0]:
# Cell 2: Transform and append orders_new
from utils.data_quality_check import run_all_checks

log_info("=" * 60)
log_info("APPEND ORDERS_NEW TO SILVER.ORDERS")
log_info("=" * 60)

# Read orders_new
log_info("Reading orders_new from bronze...")
orders_new_df = spark.table(BRONZE_ORDERS_NEW_TABLE)

# DQA on orders_new (1% sample)
log_info("Running DQA on orders_new (1% sample)...")
sample = orders_new_df.sample(fraction=0.01, seed=42)
run_all_checks(
    df=sample,
    table_name=BRONZE_ORDERS_NEW_TABLE,
    key_columns=["order_id"],
    partition_col=None
)

# Transform: Normalize schema
log_info("Normalizing orders_new schema...")
orders_new_normalized = orders_new_df \
    .withColumn("order_id", col("order_id").cast(StringType())) \
    .withColumn("order_date", col("order_date").cast(TimestampType())) \
    .withColumn("order_total_price", lit(None).cast(DoubleType())) \
    .withColumn("source_system", lit("new_system")) \
    .drop("order_year", "order_month") \
    .repartition(32)

# Deduplicate
log_info("Deduplicating orders_new...")
orders_new_clean = orders_new_normalized.dropDuplicates(["order_id"])

# Add metadata
orders_new_clean = orders_new_clean \
    .withColumn("ingested_at", current_timestamp()) \
    .withColumn("data_source", lit("ecommerce.bronze.orders_new"))

log_info("Orders_new schema:")
orders_new_clean.printSchema()

# APPEND to existing silver.orders table
log_info("Appending to existing silver.orders table...")

spark.sql(f"USE CATALOG {CATALOG_NAME}")
spark.sql(f"USE SCHEMA {SILVER_SCHEMA}")

# Append using INSERT INTO (Iceberg supports this)
orders_new_clean.writeTo("orders") \
    .using("iceberg") \
    .append()

log_info("Orders_new appended to silver.orders")

# Show final counts
final_count = spark.sql(f"SELECT COUNT(*) as cnt FROM {SILVER_ORDERS_TABLE}").collect()[0]['cnt']
log_metric("Total orders in silver", f"{final_count:,}")
log_metric("Expected", "~1,900,000,000 (700M legacy + 1.2B new)")

log_info("=" * 60)

In [0]:
# Get final silver.orders metadata
final_detail = spark.sql("DESCRIBE DETAIL ecommerce.silver.orders").collect()[0]

final_gb = final_detail['sizeInBytes'] / (1024**3)
final_files = final_detail['numFiles']
final_rows = 1_532_110_121

print("=" * 60)
print("FINAL SILVER.ORDERS METRICS (MERGED)")
print("=" * 60)
print(f"Total rows:  {final_rows:,}")
print(f"Total size:  {final_gb:.2f} GB")
print(f"Total files: {final_files}")
print()

# Calculate breakdown
legacy_rows = 700_000_000
orders_new_rows = final_rows - legacy_rows

print("BREAKDOWN:")
print(f"Legacy orders:     {legacy_rows:,} rows")
print(f"Orders_new clean:  {orders_new_rows:,} rows")
print(f"Orders_new removed: ~6,380,000 duplicates (from 1.2B)")
print()

# Calculate size contribution
legacy_gb = 25.56
orders_new_gb = final_gb - legacy_gb

print("SIZE BREAKDOWN:")
print(f"Legacy contribution:     {legacy_gb:.2f} GB")
print(f"Orders_new contribution: {orders_new_gb:.2f} GB")
print(f"Total:                   {final_gb:.2f} GB")
print()

# Verify source distribution
print("SOURCE SYSTEM DISTRIBUTION:")
spark.sql("""
    SELECT source_system, COUNT(*) as count
    FROM ecommerce.silver.orders
    GROUP BY source_system
    ORDER BY source_system
""").show()

print("=" * 60)