# Databricks

## Comprehensive Practice Notebook

This notebook provides hands-on practice for common Databricks. Each section includes examples that demonstrate key concepts and best practices.

### What's Covered:
1. **Environment Setup** - Spark configuration and initialization
2. **Data Ingestion** - Multiple data sources and Delta Lake integration
3. **Data Transformations** - PySpark operations and SQL
4. **Streaming Data** - Real-time processing patterns
5. **Data Quality** - Validation and constraints
6. **Performance Optimization** - Query tuning and optimization
7. **MLflow Integration** - ML lifecycle management
8. **Unity Catalog** - Data governance and security
9. **Job Workflows** - Scheduling and monitoring
10. **Debugging & Monitoring** - Troubleshooting techniques

---

## 1. Setup Databricks Environment

### Initialize Spark Session and Configure Cluster Settings

In [None]:
# Import essential libraries for data engineering
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from delta.tables import DeltaTable
import pandas as pd
from datetime import datetime, timedelta
import uuid

# Initialize Spark Session (in Databricks, this is already available as 'spark')
print(f"Spark Version: {spark.version}")
print(f"Python Version: {spark.sparkContext.pythonVer}")
print(f"Application ID: {spark.sparkContext.applicationId}")

# Configure Spark settings for optimal performance
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") 
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")

print("Spark session configured successfully!")
print("Key configurations:")
for key in ["spark.sql.adaptive.enabled", "spark.databricks.delta.optimizeWrite.enabled"]:
    print(f"  {key}: {spark.conf.get(key)}")

In [None]:
# Create sample datasets

# Create sample customer data
customers_data = [
    (1, "Alice Johnson", "alice@email.com", "2023-01-15", "Premium", "New York"),
    (2, "Bob Smith", "bob@email.com", "2023-02-20", "Standard", "California"), 
    (3, "Carol Davis", "carol@email.com", "2023-03-10", "Premium", "Texas"),
    (4, "David Wilson", "david@email.com", "2023-04-05", "Basic", "Florida"),
    (5, "Eve Brown", "eve@email.com", "2023-05-12", "Standard", "New York")
]

customers_schema = StructType([
    StructField("customer_id", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("email", StringType(), False),
    StructField("registration_date", StringType(), False),
    StructField("tier", StringType(), False),
    StructField("state", StringType(), False)
])

customers_df = spark.createDataFrame(customers_data, customers_schema)

# Create sample orders data
orders_data = [
    (101, 1, "2023-06-01", 150.00, "Electronics"),
    (102, 2, "2023-06-02", 89.50, "Books"),
    (103, 1, "2023-06-03", 299.99, "Electronics"),
    (104, 3, "2023-06-04", 45.00, "Clothing"),
    (105, 4, "2023-06-05", 199.99, "Electronics"),
    (106, 2, "2023-06-06", 25.99, "Books"),
    (107, 5, "2023-06-07", 75.50, "Clothing"),
    (108, 1, "2023-06-08", 399.00, "Electronics")
]

orders_schema = StructType([
    StructField("order_id", IntegerType(), False),
    StructField("customer_id", IntegerType(), False),
    StructField("order_date", StringType(), False),
    StructField("amount", DoubleType(), False),
    StructField("category", StringType(), False)
])

orders_df = spark.createDataFrame(orders_data, orders_schema)

print("Sample datasets created:")
print(f"Customers: {customers_df.count()} records")
print(f"Orders: {orders_df.count()} records")

# Display sample data
print("Sample Customer Data:")
customers_df.show(3, truncate=False)

print("\n Sample Orders Data:")
orders_df.show(3, truncate=False)

## 2. Data Ingestion with Delta Lake

### Demonstrate reading from various data sources and writing to Delta Lake

In [None]:
# Create Delta Lake tables

# Define base paths for our Delta tables
base_path = "/tmp/delta-practice"
customers_table_path = f"{base_path}/customers"
orders_table_path = f"{base_path}/orders"

# Write DataFrames to Delta Lake format
print("Writing data to Delta Lake...")

# Method 1: Write to Delta Lake using DataFrame API
customers_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save(customers_table_path)

orders_df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("category") \
    .save(orders_table_path)

print("Delta tables created successfully!")

# Method 2: Create managed tables
customers_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("customers_managed")

orders_df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("category") \
    .saveAsTable("orders_managed")

print("Managed Delta tables created!")

# Verify tables exist
print("\n Available tables:")
spark.sql("SHOW TABLES").show()

# Read data back from Delta Lake
print("\n Reading from Delta Lake:")
delta_customers = spark.read.format("delta").load(customers_table_path)
delta_orders = spark.read.format("delta").load(orders_table_path)

print(f"Customers from Delta: {delta_customers.count()} records")
print(f"Orders from Delta: {delta_orders.count()} records")

In [None]:
# Delta Lake CRUD Operations

# CREATE - Insert new records
print("CRUD Operations with Delta Lake\n")

# Insert new customer
new_customer_data = [(6, "Frank Miller", "frank@email.com", "2023-06-10", "Premium", "Washington")]
new_customer_df = spark.createDataFrame(new_customer_data, customers_schema)

new_customer_df.write \
    .format("delta") \
    .mode("append") \
    .save(customers_table_path)

print("INSERT: Added new customer")

# READ - Query with filters
premium_customers = spark.read.format("delta").load(customers_table_path) \
    .filter(col("tier") == "Premium")

print(f"Premium customers: {premium_customers.count()}")
premium_customers.select("name", "tier", "state").show()

# UPDATE - Using Delta Lake merge operation
print("\n UPDATE operations:")

# Create update data (customer tier change)
update_data = [(2, "Bob Smith", "bob@email.com", "2023-02-20", "Premium", "California")]
updates_df = spark.createDataFrame(update_data, customers_schema)

# Perform merge (upsert)
delta_customers_table = DeltaTable.forPath(spark, customers_table_path)

delta_customers_table.alias("customers") \
    .merge(
        updates_df.alias("updates"),
        "customers.customer_id = updates.customer_id"
    ) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

print("UPDATE: Customer tier updated using MERGE")

# DELETE - Remove records
print("\n  DELETE operations:")

# Delete using Delta table API
delta_customers_table.delete(col("customer_id") == 6)
print("DELETE: Removed customer using Delta API")

# Verify final state
print(f"\nFinal customer count: {spark.read.format('delta').load(customers_table_path).count()}")

# Show version history (time travel capability)
print("\n Delta Lake Version History:")
delta_customers_table.history().select("version", "timestamp", "operation", "operationParameters").show(truncate=False)

## 3. Data Transformation with PySpark

### Common transformation patterns using DataFrames and Spark SQL

In [None]:
# Common DataFrame transformations

# Read our Delta tables
customers = spark.read.format("delta").load(customers_table_path)
orders = spark.read.format("delta").load(orders_table_path)

print("DataFrame Transformations\n")

# 1. Basic transformations
print("1. Basic Transformations:")

# Add computed columns
customers_enhanced = customers \
    .withColumn("registration_year", year(to_date(col("registration_date")))) \
    .withColumn("email_domain", split(col("email"), "@")[1]) \
    .withColumn("name_length", length(col("name")))

customers_enhanced.select("name", "registration_year", "email_domain", "name_length").show()

# 2. Aggregations
print("\n 2. Aggregation Operations:")

# Customer order summary
customer_summary = orders.groupBy("customer_id") \
    .agg(
        sum("amount").alias("total_spent"),
        count("order_id").alias("total_orders"),
        avg("amount").alias("avg_order_value"),
        max("amount").alias("max_order"),
        collect_list("category").alias("categories_purchased")
    )

customer_summary.show(truncate=False)

# 3. Window functions - advanced topic often asked
print("\n 3. Window Functions:")

# Rank customers by total spending
window_spec = Window.orderBy(desc("total_spent"))

customer_ranking = customer_summary \
    .withColumn("spending_rank", row_number().over(window_spec)) \
    .withColumn("spending_percentile", percent_rank().over(window_spec))

customer_ranking.select("customer_id", "total_spent", "spending_rank", "spending_percentile").show()

# Running totals by customer
order_window = Window.partitionBy("customer_id").orderBy("order_date")

orders_with_running_total = orders \
    .withColumn("running_total", sum("amount").over(order_window)) \
    .withColumn("order_number", row_number().over(order_window))

print("\n Running totals by customer:")
orders_with_running_total.select("customer_id", "order_date", "amount", "running_total", "order_number").show()

In [None]:
# Joins and Complex Transformations

print("Join Operations and Complex Transformations\n")

# 1. Different types of joins
print("1. Join Types:")

# Inner join - most common
customer_orders = customers.alias("c").join(
    orders.alias("o"), 
    col("c.customer_id") == col("o.customer_id"),
    "inner"
).select(
    col("c.name"),
    col("c.tier"), 
    col("c.state"),
    col("o.order_date"),
    col("o.amount"),
    col("o.category")
)

print("Inner Join - Customers with their orders:")
customer_orders.show(5)

# Left join to find customers without orders
customers_with_orders = customers.alias("c").join(
    orders.alias("o"),
    col("c.customer_id") == col("o.customer_id"),
    "left"
).select(
    col("c.customer_id"),
    col("c.name"),
    col("o.order_id").isNull().alias("has_no_orders")
)

customers_without_orders = customers_with_orders.filter(col("has_no_orders") == True)
print(f"\nCustomers without orders: {customers_without_orders.count()}")

# 2. User Defined Functions (UDFs)
print("\n 2. User Defined Functions:")

from pyspark.sql.functions import udf

# Define UDF to categorize spending
def spending_category(total_spent):
    if total_spent > 500:
        return "High Spender"
    elif total_spent > 200:
        return "Medium Spender"
    else:
        return "Low Spender"

# Register UDF
spending_category_udf = udf(spending_category, StringType())

# Apply UDF
customers_categorized = customer_summary \
    .withColumn("spending_category", spending_category_udf(col("total_spent")))

print("Customer spending categories:")
customers_categorized.select("customer_id", "total_spent", "spending_category").show()

# 3. Pivot operations - advanced transformation
print("\n 3. Pivot Operations:")

# Pivot to see spending by category per customer
spending_by_category = orders.groupBy("customer_id") \
    .pivot("category") \
    .sum("amount") \
    .fillna(0)

print("Spending by category (pivoted):")
spending_by_category.show()

## 4. Working with Streaming Data

### Real-time data processing patterns and structured streaming

In [None]:
# Streaming Data Processing

print("Structured Streaming Examples\n")

# Create a streaming source (simulating real-time data)
# In real scenarios, this would be Kafka, Event Hubs, etc.

# Generate sample streaming data
import time
import random

def generate_streaming_orders():
    """Generate sample orders for streaming demo"""
    customers_list = [1, 2, 3, 4, 5]
    categories = ["Electronics", "Books", "Clothing", "Sports"]
    
    streaming_orders = []
    for i in range(20):
        order = {
            "order_id": 1000 + i,
            "customer_id": random.choice(customers_list),
            "timestamp": datetime.now() + timedelta(seconds=i),
            "amount": round(random.uniform(10, 500), 2),
            "category": random.choice(categories)
        }
        streaming_orders.append(order)
    return streaming_orders

# Create streaming DataFrame
streaming_orders = generate_streaming_orders()
streaming_df = spark.createDataFrame(streaming_orders)

# Save to Delta for streaming source
streaming_source_path = f"{base_path}/streaming_orders_source"
streaming_df.write.format("delta").mode("overwrite").save(streaming_source_path)

print("Sample streaming orders created:")
streaming_df.show(5)

# Set up streaming read
print("\n Setting up streaming pipeline...")

# Read stream from Delta table (Auto Loader pattern)
streaming_orders_read = spark.readStream \
    .format("delta") \
    .load(streaming_source_path)

print("Streaming source configured")

# Stream processing with aggregations
print("\n Real-time aggregations:")

# Aggregate by sliding window
streaming_aggregated = streaming_orders_read \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(col("timestamp"), "5 minutes", "1 minute"),
        col("category")
    ) \
    .agg(
        sum("amount").alias("total_sales"),
        count("order_id").alias("order_count"),
        avg("amount").alias("avg_order_value")
    )

# Display schema for streaming aggregation
print("Streaming aggregation schema:")
streaming_aggregated.printSchema()

In [None]:
# Streaming patterns

print("Advanced Streaming Patterns\n")

# 1. Streaming Upserts (Merge)
print("1. Streaming Upserts with Delta Lake:")

def streaming_upsert_demo():
    """Demonstrate streaming upsert pattern"""
    
    # Create target table for upserts
    upsert_target_path = f"{base_path}/customer_metrics"
    
    # Initialize with some data
    initial_metrics = [
        (1, 150.0, 1, datetime.now()),
        (2, 89.5, 1, datetime.now()),
        (3, 45.0, 1, datetime.now())
    ]
    
    initial_schema = StructType([
        StructField("customer_id", IntegerType(), False),
        StructField("total_spent", DoubleType(), False),
        StructField("order_count", IntegerType(), False),
        StructField("last_updated", TimestampType(), False)
    ])
    
    initial_df = spark.createDataFrame(initial_metrics, initial_schema)
    initial_df.write.format("delta").mode("overwrite").save(upsert_target_path)
    
    print("Target table initialized")
    
    # Show upsert function (would be used in foreachBatch)
    def upsert_function(micro_batch_df, batch_id):
        """Function to perform upserts in streaming"""
        
        # Aggregate incoming batch by customer
        batch_aggregated = micro_batch_df.groupBy("customer_id").agg(
            sum("amount").alias("batch_total"),
            count("order_id").alias("batch_orders")
        ).withColumn("last_updated", current_timestamp())
        
        # Perform merge (upsert)
        target_table = DeltaTable.forPath(spark, upsert_target_path)
        
        target_table.alias("target").merge(
            batch_aggregated.alias("updates"),
            "target.customer_id = updates.customer_id"
        ).whenMatchedUpdate(set = {
            "total_spent": "target.total_spent + updates.batch_total",
            "order_count": "target.order_count + updates.batch_orders", 
            "last_updated": "updates.last_updated"
        }).whenNotMatchedInsert(values = {
            "customer_id": "updates.customer_id",
            "total_spent": "updates.batch_total",
            "order_count": "updates.batch_orders",
            "last_updated": "updates.last_updated"
        }).execute()
        
        print(f"Batch {batch_id}: Upsert completed")
    
    return upsert_function

upsert_func = streaming_upsert_demo()

# 2. Watermarking for late data handling
print("\n 2. Watermarking Strategy:")

# Example of watermark configuration
watermarked_stream = streaming_orders_read \
    .withWatermark("timestamp", "15 minutes")  # Allow 15 minutes for late data

print("Watermark configured for late data handling")

# 3. Output modes explanation
print("\n 3. Streaming Output Modes:")
output_modes = {
    "append": "Only new rows added to result table",
    "update": "Updated rows in result table", 
    "complete": "Entire result table rewritten"
}

for mode, description in output_modes.items():
    print(f"  ‚Ä¢ {mode}: {description}")

## 5. Data Quality and Validation

### Implement data quality checks and constraints

In [None]:
# Data Quality Framework - Critical for production systems

print("üîç Data Quality and Validation\n")

# 1. Basic data quality checks
print("1. Basic Data Quality Checks:")

def perform_data_quality_checks(df, table_name):
    """Comprehensive data quality assessment"""
    
    total_rows = df.count()
    
    quality_report = {
        "table_name": table_name,
        "total_rows": total_rows,
        "checks": []
    }
    
    # Null value checks
    for column in df.columns:
        null_count = df.filter(col(column).isNull()).count()
        null_percentage = (null_count / total_rows) * 100 if total_rows > 0 else 0
        
        quality_report["checks"].append({
            "check_type": "null_check",
            "column": column,
            "null_count": null_count,
            "null_percentage": round(null_percentage, 2),
            "status": "PASS" if null_percentage < 5 else "FAIL"
        })
    
    # Duplicate checks
    duplicate_count = df.count() - df.dropDuplicates().count()
    quality_report["duplicate_rows"] = duplicate_count
    
    return quality_report

# Run quality checks on our data
customers_quality = perform_data_quality_checks(customers, "customers")
orders_quality = perform_data_quality_checks(orders, "orders")

print("Customer Data Quality Report:")
for check in customers_quality["checks"][:3]:  # Show first 3 checks
    print(f"  {check['column']}: {check['null_count']} nulls ({check['null_percentage']}%) - {check['status']}")

print(f"\nDuplicate rows: {customers_quality['duplicate_rows']}")

# 2. Custom validation rules
print("\n 2. Custom Validation Rules:")

def validate_email_format(df):
    """Validate email format"""
    email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    
    invalid_emails = df.filter(~col("email").rlike(email_pattern))
    invalid_count = invalid_emails.count()
    
    if invalid_count > 0:
        print(f"Found {invalid_count} invalid email addresses:")
        invalid_emails.select("customer_id", "name", "email").show()
    else:
        print("All emails have valid format")
    
    return invalid_count == 0

def validate_business_rules(df):
    """Validate business logic rules"""
    
    print("üîç Business Rule Validations:")
    
    # Rule 1: All orders must have positive amounts
    negative_amounts = df.filter(col("amount") <= 0).count()
    print(f"  Negative amounts: {negative_amounts} (should be 0)")
    
    # Rule 2: Order dates should not be in the future
    future_dates = df.filter(col("order_date") > current_date()).count()
    print(f"  Future dates: {future_dates} (should be 0)")
    
    # Rule 3: Customer IDs must exist in customers table
    customer_ids = customers.select("customer_id").distinct()
    invalid_customers = df.join(customer_ids, "customer_id", "left_anti").count()
    print(f"  Invalid customer IDs: {invalid_customers} (should be 0)")
    
    return negative_amounts == 0 and future_dates == 0 and invalid_customers == 0

# Run validations
email_valid = validate_email_format(customers)
business_rules_valid = validate_business_rules(orders)

print(f"\n Overall data quality: {'PASS' if email_valid and business_rules_valid else 'FAIL'}")

## 6. Performance Optimization Techniques

### Query optimization, caching, and performance tuning

In [None]:
# Performance Optimization

print("Performance Optimization Techniques\n")

# 1. Caching strategies
print("1. Caching Strategies:")

# Cache frequently accessed DataFrames
customers.cache()
orders.cache()

print("DataFrames cached in memory")

# Check what's cached
cached_tables = spark.sql("SHOW TABLES").filter(col("isTemporary") == True)
print("Cached tables:")
cached_tables.show()

# 2. Broadcast joins for small tables
print("\n 2. Broadcast Joins:")

from pyspark.sql.functions import broadcast

# Example: Broadcasting smaller customer table
large_orders_df = orders.union(orders).union(orders)  # Simulate larger dataset
print(f"Large orders dataset: {large_orders_df.count()} records")

# Regular join vs broadcast join comparison
print("\n Regular join (without broadcast):")
regular_join = large_orders_df.join(customers, "customer_id")
print(f"Regular join result: {regular_join.count()} records")

print("\n Broadcast join (optimized for small tables):")
broadcast_join = large_orders_df.join(broadcast(customers), "customer_id")
print(f"Broadcast join result: {broadcast_join.count()} records")

# 3. Partitioning optimization
print("\n 3. Partitioning Optimization:")

# Check current partitioning
print(f"Orders RDD partitions: {orders.rdd.getNumPartitions()}")

# Repartition for better performance
optimized_orders = orders.repartition(4, "category")
print(f"Optimized partitions: {optimized_orders.rdd.getNumPartitions()}")

# Coalesce for reducing partitions
coalesced_orders = orders.coalesce(2)
print(f"Coalesced partitions: {coalesced_orders.rdd.getNumPartitions()}")

# 4. Query execution plan analysis
print("\n 4. Query Execution Plan Analysis:")

# Create a complex query to analyze
complex_query = customers.alias("c") \
    .join(orders.alias("o"), "customer_id") \
    .groupBy("c.state", "o.category") \
    .agg(
        sum("o.amount").alias("total_sales"),
        count("o.order_id").alias("order_count")
    ) \
    .orderBy(desc("total_sales"))

print("Complex query execution plan:")
complex_query.explain(mode="simple")

In [None]:
# Delta Lake Optimization

print("üèéÔ∏è Delta Lake Performance Optimization\n")

# 1. OPTIMIZE command
print(" 1. Delta Lake OPTIMIZE:")

# Check table details before optimization
print("Table details before optimization:")
spark.sql(f"DESCRIBE DETAIL delta.`{orders_table_path}`").select(
    "numFiles", "sizeInBytes", "minReaderVersion", "minWriterVersion"
).show()

# Run OPTIMIZE to compact files
print("Running OPTIMIZE command...")
spark.sql(f"OPTIMIZE delta.`{orders_table_path}`")

print("OPTIMIZE completed!")

# 2. Z-ORDER optimization
print("\n 2. Z-ORDER Optimization:")

# Z-ORDER by frequently queried columns
spark.sql(f"""
    OPTIMIZE delta.`{orders_table_path}`
    ZORDER BY (customer_id, category)
""")

print("Z-ORDER optimization completed!")
print("Z-ORDER improves data skipping for range queries")

# 3. VACUUM for cleanup
print("\n 3. VACUUM Operations:")

# Check history before vacuum
print("Delta table history:")
delta_orders_table = DeltaTable.forPath(spark, orders_table_path)
delta_orders_table.history().select("version", "timestamp", "operation").show(5)

# Run VACUUM (note: default retention is 7 days)
print("Running VACUUM (simulated - normally retains 7 days):")
# spark.sql(f"VACUUM delta.`{orders_table_path}` RETAIN 0 HOURS")  # Don't run in production!
print("VACUUM removes old files - be careful with retention period!")

# 4. Statistics and data skipping
print("\n 4. Data Skipping Statistics:")

# Demonstrate data skipping with partition elimination
filtered_query = spark.read.format("delta").load(orders_table_path) \
    .filter(col("category") == "Electronics")

print("Query with data skipping (partition elimination):")
print(f"Electronics orders: {filtered_query.count()}")

# Show table statistics
print("\n Delta table statistics:")
spark.sql(f"DESCRIBE EXTENDED delta.`{orders_table_path}`").show(truncate=False)

In [None]:
# Slowly Changing Dimensions (SCD)

def implement_scd_type2():
    """Implement SCD Type 2 for customer data"""
    
    # Create SCD table structure
    scd_path = f"{base_path}/customer_scd"
    
    # Initial customer data with SCD fields
    initial_customers = customers.withColumn("effective_date", current_date()) \
        .withColumn("end_date", lit(None).cast("date")) \
        .withColumn("is_current", lit(True)) \
        .withColumn("record_version", lit(1))
    
    initial_customers.write.format("delta").mode("overwrite").save(scd_path)
    print("‚úÖ Initial SCD table created")
    
    # Simulate customer data changes
    customer_updates = [
        (2, "Bob Smith", "bob.smith@newemail.com", "2023-02-20", "Premium", "California"),  # Email change
        (3, "Carol Davis", "carol@email.com", "2023-03-10", "Standard", "Texas")  # Tier change
    ]
    
    updates_df = spark.createDataFrame(customer_updates, customers_schema)
    
    # Implement SCD Type 2 logic
    scd_table = DeltaTable.forPath(spark, scd_path)
    
    # Step 1: Close existing records for changed customers
    scd_table.alias("scd").merge(
        updates_df.alias("updates"),
        "scd.customer_id = updates.customer_id AND scd.is_current = true"
    ).whenMatchedUpdate(
        condition="scd.email != updates.email OR scd.tier != updates.tier",
        set={
            "end_date": "current_date()",
            "is_current": "false"
        }
    ).execute()
    
    # Step 2: Insert new versions
    new_versions = updates_df.withColumn("effective_date", current_date()) \
        .withColumn("end_date", lit(None).cast("date")) \
        .withColumn("is_current", lit(True)) \
        .withColumn("record_version", lit(2))
    
    new_versions.write.format("delta").mode("append").save(scd_path)
    
    print("SCD Type 2 updates completed")
    
    # Show results
    print("\n SCD Table Results:")
    spark.read.format("delta").load(scd_path) \
        .orderBy("customer_id", "record_version") \
        .select("customer_id", "name", "email", "tier", "effective_date", "end_date", "is_current") \
        .show(truncate=False)

implement_scd_type2()

# Data Deduplication
def handle_duplicates():
    """Remove duplicates from incoming data"""
    
    # Create data with duplicates
    duplicate_orders = [
        (201, 1, "2023-06-15", 100.0, "Electronics"),
        (201, 1, "2023-06-15", 100.0, "Electronics"),  # Exact duplicate
        (202, 2, "2023-06-16", 150.0, "Books"),
        (202, 2, "2023-06-16", 155.0, "Books")  # Partial duplicate (different amount)
    ]
    
    orders_with_dupes = spark.createDataFrame(duplicate_orders, orders_schema)
    
    print(f"Orders with duplicates: {orders_with_dupes.count()} records")
    orders_with_dupes.show()
    
    # Method 1: Remove exact duplicates
    exact_deduped = orders_with_dupes.dropDuplicates()
    print(f"\n After exact deduplication: {exact_deduped.count()} records")
    
    # Method 2: Remove duplicates based on key columns only
    key_deduped = orders_with_dupes.dropDuplicates(["order_id", "customer_id", "order_date"])
    print(f"After key-based deduplication: {key_deduped.count()} records")
    
    # Method 3: Advanced deduplication with window functions
    window_spec = Window.partitionBy("order_id", "customer_id", "order_date") \
        .orderBy(desc("amount"))  # Keep record with highest amount
    
    advanced_deduped = orders_with_dupes \
        .withColumn("row_number", row_number().over(window_spec)) \
        .filter(col("row_number") == 1) \
        .drop("row_number")
    
    print(f"After advanced deduplication: {advanced_deduped.count()} records")
    advanced_deduped.show()

handle_duplicates()

In [None]:
# Error Handling and Monitoring
def robust_etl_pipeline():
    """Demonstrate error handling in ETL pipeline"""
    
    # Create some problematic data
    problematic_data = [
        (301, 1, "2023-06-20", 200.0, "Electronics"),      # Good record
        (302, None, "2023-06-21", 150.0, "Books"),         # Null customer_id
        (303, 2, "invalid-date", 100.0, "Clothing"),       # Invalid date
        (304, 3, "2023-06-22", -50.0, "Electronics"),      # Negative amount
        (305, 999, "2023-06-23", 75.0, "Books")            # Non-existent customer
    ]
    
    problematic_df = spark.createDataFrame(problematic_data, orders_schema)
    
    print("Processing data with potential errors...")
    
    # Error handling strategy
    valid_records = []
    error_records = []
    
    def validate_record(row):
        """Validate individual record"""
        errors = []
        
        # Check for null customer_id
        if row.customer_id is None:
            errors.append("Null customer_id")
        
        # Check for negative amounts
        if row.amount <= 0:
            errors.append("Invalid amount")
        
        # Check date format (simplified)
        try:
            datetime.strptime(row.order_date, "%Y-%m-%d")
        except:
            errors.append("Invalid date format")
        
        return len(errors) == 0, errors
    
    # Process each record
    for row in problematic_df.collect():
        is_valid, errors = validate_record(row)
        
        if is_valid:
            valid_records.append(row)
        else:
            error_record = {
                "original_data": row.asDict(),
                "errors": errors,
                "timestamp": datetime.now(),
                "pipeline_run_id": str(uuid.uuid4())
            }
            error_records.append(error_record)
    
    print(f"Valid records: {len(valid_records)}")
    print(f"Error records: {len(error_records)}")
    
    # Create error log
    if error_records:
        error_df = spark.createDataFrame(error_records)
        error_path = f"{base_path}/error_logs"
        error_df.write.format("delta").mode("append").save(error_path)
        
        print("\n Error Log:")
        error_df.select("original_data", "errors").show(truncate=False)
    
    return len(valid_records), len(error_records)

valid_count, error_count = robust_etl_pipeline()

# Monitoring and alerting patterns
def create_pipeline_metrics():
    """Create monitoring metrics for pipeline"""
    
    pipeline_metrics = {
        "pipeline_name": "customer_orders_etl",
        "run_timestamp": datetime.now(),
        "records_processed": valid_count + error_count,
        "records_successful": valid_count,
        "records_failed": error_count,
        "success_rate": (valid_count / (valid_count + error_count)) * 100 if (valid_count + error_count) > 0 else 0,
        "pipeline_status": "SUCCESS" if error_count == 0 else "PARTIAL_FAILURE"
    }
    
    print("\n Pipeline Metrics:")
    for key, value in pipeline_metrics.items():
        print(f"  {key}: {value}")
    
    # In production, this would be sent to monitoring system
    # (DataDog, CloudWatch, etc.)
    
    return pipeline_metrics

metrics = create_pipeline_metrics()

## 8. SQL Analysis and Complex Queries

In [None]:
-- SQL
-- Create temporary views for SQL analysis
CREATE OR REPLACE TEMPORARY VIEW customers_view AS
SELECT * FROM delta.`/tmp/delta-practice/customers`;

CREATE OR REPLACE TEMPORARY VIEW orders_view AS  
SELECT * FROM delta.`/tmp/delta-practice/orders`;

-- Find top 3 customers by total spending
SELECT 
    c.name,
    c.tier,
    SUM(o.amount) as total_spent,
    COUNT(o.order_id) as total_orders,
    ROUND(AVG(o.amount), 2) as avg_order_value
FROM customers_view c
JOIN orders_view o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.name, c.tier
ORDER BY total_spent DESC
LIMIT 3;

In [None]:
-- Window functions - Rank orders within each customer
SELECT 
    customer_id,
    order_id,
    order_date,
    amount,
    ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY amount DESC) as order_rank,
    SUM(amount) OVER (PARTITION BY customer_id ORDER BY order_date ROWS UNBOUNDED PRECEDING) as running_total,
    LAG(amount, 1) OVER (PARTITION BY customer_id ORDER BY order_date) as previous_order_amount,
    LEAD(amount, 1) OVER (PARTITION BY customer_id ORDER BY order_date) as next_order_amount
FROM orders_view
ORDER BY customer_id, order_date;

In [None]:
-- Complex analytics - Customer behavior analysis
WITH customer_metrics AS (
    SELECT 
        c.customer_id,
        c.name,
        c.tier,
        c.state,
        COUNT(o.order_id) as order_count,
        SUM(o.amount) as total_spent,
        AVG(o.amount) as avg_order_value,
        MIN(o.order_date) as first_order_date,
        MAX(o.order_date) as last_order_date,
        DATEDIFF(MAX(o.order_date), MIN(o.order_date)) as customer_lifetime_days
    FROM customers_view c
    LEFT JOIN orders_view o ON c.customer_id = o.customer_id
    GROUP BY c.customer_id, c.name, c.tier, c.state
),
tier_stats AS (
    SELECT 
        tier,
        AVG(total_spent) as tier_avg_spending,
        PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY total_spent) as tier_median_spending
    FROM customer_metrics
    GROUP BY tier
)
SELECT 
    cm.*,
    ts.tier_avg_spending,
    ts.tier_median_spending,
    CASE 
        WHEN cm.total_spent > ts.tier_avg_spending THEN 'Above Average'
        WHEN cm.total_spent < ts.tier_avg_spending THEN 'Below Average'
        ELSE 'Average'
    END as spending_vs_tier_avg,
    CASE 
        WHEN cm.customer_lifetime_days = 0 THEN 'New Customer'
        WHEN cm.customer_lifetime_days <= 30 THEN 'Recent Customer' 
        WHEN cm.customer_lifetime_days <= 90 THEN 'Regular Customer'
        ELSE 'Long-term Customer'
    END as customer_segment
FROM customer_metrics cm
JOIN tier_stats ts ON cm.tier = ts.tier
ORDER BY cm.total_spent DESC;