In [0]:
# ============================================================================
# ETL PIPELINE: RAW DATA TO UNITY CATALOG
# ============================================================================
# Simple ETL demonstrating: Extract → Transform → Load pattern
# Author: Portfolio Project
# Last Updated: 2026-02-02
# ============================================================================

from pyspark.sql.functions import (
    col, trim, upper, to_date, year, month, round, current_timestamp
)
from datetime import datetime

start_time = datetime.now()
print(f"ETL Pipeline Started: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Spark Version: {spark.version}\n")

In [0]:
# ============================================================================
# 1. PARAMETERS
# ============================================================================

# Define notebook parameters (can be passed from jobs/workflows)
dbutils.widgets.text("CATALOG_NAME", "workspace")
dbutils.widgets.text("SCHEMA_NAME", "portfolio_project")
dbutils.widgets.text("TABLE_NAME", "sales_clean")

# Get parameter values
CATALOG_NAME = dbutils.widgets.get("CATALOG_NAME")
SCHEMA_NAME = dbutils.widgets.get("SCHEMA_NAME")
TABLE_NAME = dbutils.widgets.get("TABLE_NAME")
FULL_TABLE_PATH = f"{CATALOG_NAME}.{SCHEMA_NAME}.{TABLE_NAME}"

# Data paths
RAW_CSV_PATH = "/Volumes/workspace/portfolio_projects/volume_portfolio_projects/simple_etl_project_raw_data_csv/"

print(" Configuration:")
print(f"  Catalog: {CATALOG_NAME}")
print(f"  Schema: {SCHEMA_NAME}")
print(f"  Table: {TABLE_NAME}")
print(f"  Source: {RAW_CSV_PATH}\n")

In [0]:
# ============================================================================
# 2. SETUP - CREATE SCHEMA
# ============================================================================

spark.sql(f"USE CATALOG {CATALOG_NAME}")
spark.sql(f"""
    CREATE SCHEMA IF NOT EXISTS {CATALOG_NAME}.{SCHEMA_NAME}
    COMMENT 'Portfolio ETL project schema'
""")
print(f"✓ Schema ready: {CATALOG_NAME}.{SCHEMA_NAME}\n")

In [0]:
# ============================================================================
# 3. EXTRACT - READ RAW DATA
# ============================================================================

print("EXTRACT: Loading raw data...")

df_raw = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("dateFormat", "yyyy-MM-dd") \
    .option("mode", "PERMISSIVE") \
    .load(RAW_CSV_PATH)

raw_count = df_raw.count()
print(f"✓ Loaded {raw_count:,} rows with {len(df_raw.columns)} columns\n")

In [0]:
# ============================================================================
# 4. TRANSFORM - CLEAN AND ENRICH DATA
# ============================================================================

print("TRANSFORM: Cleaning and enriching data...")

# Remove duplicates
df_clean = df_raw.dropDuplicates()
duplicates_removed = raw_count - df_clean.count()

# Handle missing values
df_clean = df_clean.na.fill({
    "quantity": 0,
    "unit_price": 0.0,
    "discount": 0.0,
    "category": "Unknown",
    "region": "Unspecified"
})

# Remove rows with null critical fields
df_clean = df_clean \
    .filter(col("transaction_id").isNotNull()) \
    .filter(col("transaction_date").isNotNull()) \
    .filter(col("customer_id").isNotNull())

# Standardize text fields
df_clean = df_clean \
    .withColumn("customer_name", trim(upper(col("customer_name")))) \
    .withColumn("product_name", trim(upper(col("product_name")))) \
    .withColumn("category", trim(upper(col("category")))) \
    .withColumn("region", trim(upper(col("region")))) \
    .withColumn("sales_person", trim(upper(col("sales_person"))))

# Convert date and add derived columns
df_clean = df_clean \
    .withColumn("transaction_date", to_date(col("transaction_date"), "yyyy-MM-dd")) \
    .withColumn("year", year(col("transaction_date"))) \
    .withColumn("month", month(col("transaction_date"))) \
    .withColumn("total_amount", round((col("quantity") * col("unit_price")) * (1 - col("discount")), 2))

# Data validation
df_clean = df_clean \
    .filter(col("quantity") >= 0) \
    .filter(col("unit_price") >= 0) \
    .filter(col("discount").between(0, 1)) \
    .filter(col("quantity") <= 1000)

clean_count = df_clean.count()
rows_removed = raw_count - clean_count

print(f"✓ Duplicates removed: {duplicates_removed:,}")
print(f"✓ Invalid records removed: {rows_removed - duplicates_removed:,}")
print(f"✓ Clean records: {clean_count:,} ({clean_count/raw_count*100:.1f}% retention)\n")

In [0]:
# ============================================================================
# 5. LOAD - WRITE TO UNITY CATALOG
# ============================================================================

print("LOAD: Writing to Unity Catalog...")

# Detect partitions
partitions = df_clean.select("year", "month").distinct().collect()
partition_conditions = [f"(year = {p.year} AND month = {p.month})" for p in partitions]
replace_where = " OR ".join(partition_conditions)

print(f"  Partitions to write: {len(partitions)}")

# Check if table exists
table_exists = spark.catalog.tableExists(FULL_TABLE_PATH)
if table_exists:
    pre_count = spark.sql(f"SELECT COUNT(*) as cnt FROM {FULL_TABLE_PATH}").collect()[0].cnt
    print(f"  Existing rows: {pre_count:,}")
else:
    pre_count = 0
    print(f"  Creating new table")

# Write data
write_start = datetime.now()

df_clean.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .option("replaceWhere", replace_where) \
    .option("overwriteSchema", "false") \
    .option("delta.columnMapping.mode", "name") \
    .saveAsTable(FULL_TABLE_PATH)

write_duration = (datetime.now() - write_start).total_seconds()

# Post-write validation
post_count = spark.sql(f"SELECT COUNT(*) as cnt FROM {FULL_TABLE_PATH}").collect()[0].cnt

print(f"✓ Write completed in {write_duration:.2f}s")
print(f"✓ Throughput: {clean_count/write_duration:,.0f} rows/sec")
print(f"✓ Final row count: {post_count:,}")
if table_exists:
    print(f"✓ Net change: {post_count - pre_count:+,} rows\n")
else:
    print()

In [0]:
# ============================================================================
# 6. OPTIMIZE TABLE
# ============================================================================

print("OPTIMIZE: Improving query performance...")

# Configure table properties
spark.sql(f"""
    ALTER TABLE {FULL_TABLE_PATH} SET TBLPROPERTIES (
        'delta.autoOptimize.optimizeWrite' = 'true',
        'delta.autoOptimize.autoCompact' = 'true'
    )
""")

# Optimize with Z-ORDER
optimize_start = datetime.now()
spark.sql(f"OPTIMIZE {FULL_TABLE_PATH} ZORDER BY (category, region)")
optimize_duration = (datetime.now() - optimize_start).total_seconds()

# Compute statistics
spark.sql(f"ANALYZE TABLE {FULL_TABLE_PATH} COMPUTE STATISTICS FOR ALL COLUMNS")

print(f"✓ OPTIMIZE completed in {optimize_duration:.2f}s")
print(f"✓ Z-ORDER applied on: category, region")
print(f"✓ Statistics computed\n")

In [0]:
# ============================================================================
# 7. FINAL SUMMARY
# ============================================================================

end_time = datetime.now()
total_duration = (end_time - start_time).total_seconds()
minutes, seconds = divmod(total_duration, 60)

# Get table details
table_details = spark.sql(f"DESCRIBE DETAIL {FULL_TABLE_PATH}").collect()[0]
table_size_mb = table_details.sizeInBytes / (1024**2) if hasattr(table_details, 'sizeInBytes') else 0
num_files = table_details.numFiles if hasattr(table_details, 'numFiles') else 0

print("=" * 80)
print("ETL PIPELINE COMPLETED SUCCESSFULLY")
print("=" * 80)

print("\nPipeline Summary:")
print(f"  • Source records: {raw_count:,}")
print(f"  • Records loaded: {post_count:,}")
print(f"  • Data quality: {clean_count/raw_count*100:.1f}% retention")
print(f"  • Execution time: {int(minutes)}m {seconds:.1f}s")

print("\nTable Details:")
print(f"  • Location: {FULL_TABLE_PATH}")
print(f"  • Size: {table_size_mb:.2f} MB ({num_files} files)")
print(f"  • Format: Delta Lake with Unity Catalog")

print("\nOptimizations:")
print(f"  ✓ Partitioned by year/month")
print(f"  ✓ Z-ORDERed by category/region")
print(f"  ✓ Auto-optimize enabled")
print(f"  ✓ Statistics computed")

print("\nNext Steps:")
print(f"  • Query: SELECT * FROM {FULL_TABLE_PATH} WHERE year = 2026")
print(f"  • Dashboard: Create visualizations in Databricks SQL")
print(f"  • Schedule: Set up automated daily runs")

print("\n" + "=" * 80)
print(f" Started: {start_time.strftime('%H:%M:%S')}")
print(f" Ended: {end_time.strftime('%H:%M:%S')}")
print("=" * 80)