# Databricks Performance Pitfalls Workshop
## Common Mistakes and How to Fix Them (70 mins)

**Workshop Objectives:**
- Identify and fix common Spark performance issues
- Learn to read and interpret Spark UI and execution plans
- Understand job scheduling and alerting strategies
- Apply best practices for production workloads

**Prerequisites:**
- Basic Python/PySpark knowledge
- Access to Databricks workspace

**Datasets Used:**
- Databricks sample datasets (diamonds, nyctaxi)
- Synthetic data for specific examples

---

## Setup: Create Sample Data

Let's start by creating some sample data we'll use throughout the workshop.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import random
from datetime import datetime, timedelta

# In Databricks, spark session is already available
# Set some initial configs
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

print(f"Spark version: {spark.version}")
print(f"Adaptive Query Execution: {spark.conf.get('spark.sql.adaptive.enabled')}")

# Setup workspace path (works with restricted DBFS access)
username = spark.sql("SELECT current_user()").collect()[0][0].split('@')[0].replace('.', '_')
base_path = f"dbfs:/user/{username}/workshop_data"

# Alternative if above doesn't work: Use FileStore which is often accessible
# base_path = f"/dbfs/FileStore/workshop_data/{username}"

print(f"\nüìÅ Workshop data will be stored at: {base_path}")
print("   (This path works with restricted DBFS root access)")

In [None]:
# Create a large customer transactions dataset
# This will be used for demonstrating various performance issues

from pyspark.sql import Row

# Generate 1 million transaction records
num_records = 1000000

# Create skewed data (some customers have way more transactions)
def generate_customer_id():
    rand = random.random()
    if rand < 0.3:  # 30% of transactions belong to just 10 customers (SKEW!)
        return random.randint(1, 10)
    else:
        return random.randint(11, 10000)

transactions_data = [
    (
        i,
        generate_customer_id(),
        round(random.uniform(10, 5000), 2),
        random.choice(['Electronics', 'Clothing', 'Food', 'Books', 'Home']),
        (datetime.now() - timedelta(days=random.randint(0, 365))).strftime('%Y-%m-%d %H:%M:%S'),
        random.choice(['US/Pacific', 'US/Eastern', 'Europe/London', 'Asia/Tokyo'])
    )
    for i in range(num_records)
]

transactions_df = spark.createDataFrame(
    transactions_data,
    ['transaction_id', 'customer_id', 'amount', 'category', 'timestamp_str', 'timezone']
)

# Cache for reuse
transactions_df.cache()
print(f"Created {transactions_df.count():,} transaction records")
transactions_df.show(5)

In [None]:
# Create customer dimension table
customers_data = [
    (i, f"Customer_{i}", random.choice(['Gold', 'Silver', 'Bronze', 'Platinum']),
     random.choice(['USA', 'UK', 'Japan', 'Germany', 'France']))
    for i in range(1, 10001)
]

customers_df = spark.createDataFrame(
    customers_data,
    ['customer_id', 'customer_name', 'tier', 'country']
)

customers_df.cache()
print(f"Created {customers_df.count():,} customer records")
customers_df.show(5)

---
## Pitfall #1: Shuffle Explosion (Wide Transformations)

**What is it?**  
Shuffle operations move data across executors and are expensive. They occur during operations like `groupBy`, `join`, `repartition`, and `distinct`.

**Problem:** Unnecessary or excessive shuffles can kill performance.

**Time: 7 minutes**

In [None]:
# ‚ùå BAD: Multiple unnecessary shuffles
print("BAD APPROACH: Multiple shuffles")

bad_result = transactions_df \
    .repartition(200) \
    .groupBy('customer_id').agg(sum('amount').alias('total_1')) \
    .repartition(100) \
    .groupBy('customer_id').agg(sum('total_1').alias('total_2')) \
    .repartition(50)

# Look at the execution plan - notice all the "Exchange" operations (shuffles)
bad_result.explain()

In [None]:
# ‚úÖ GOOD: Minimize shuffles
print("GOOD APPROACH: Optimized shuffles")

good_result = transactions_df \
    .groupBy('customer_id').agg(
        sum('amount').alias('total_amount'),
        count('transaction_id').alias('transaction_count'),
        avg('amount').alias('avg_amount')
    )

# Much cleaner execution plan with only necessary shuffles
good_result.explain()
good_result.show(5)

**üí° Key Takeaways:**
- Use `explain()` to see execution plans and identify shuffles (look for "Exchange")
- Combine aggregations into single operations
- Avoid unnecessary `repartition()` calls
- Let Adaptive Query Execution handle partition sizing when possible

---
## Pitfall #2: Skewed Keys

**What is it?**  
When data is unevenly distributed across keys, some partitions get overwhelmed while others sit idle.

**Problem:** One executor does all the work, causing stragglers and potential OOM errors.

**Time: 7 minutes**

In [None]:
# First, let's identify the skew
print("Checking for skewed data distribution...")

skew_check = transactions_df \
    .groupBy('customer_id') \
    .agg(count('*').alias('transaction_count')) \
    .orderBy(col('transaction_count').desc())

skew_check.show(20)

# Notice: Top customers have WAY more transactions than others

In [None]:
# ‚ùå BAD: Direct join with skewed data
print("BAD APPROACH: Join with skewed keys")

# This will have terrible performance due to skew
bad_join = transactions_df \
    .join(customers_df, 'customer_id') \
    .groupBy('customer_name', 'tier') \
    .agg(sum('amount').alias('total_spent'))

# Check the plan - you'll see uneven partition sizes in Spark UI
bad_join.explain()

In [None]:
# ‚úÖ GOOD: Use salting technique for skewed joins
print("GOOD APPROACH: Salted join to handle skew")

# Enable skew join optimization (Databricks/Spark 3.0+)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")

# Adaptive Query Execution will automatically handle skew!
good_join = transactions_df \
    .join(customers_df, 'customer_id') \
    .groupBy('customer_name', 'tier') \
    .agg(sum('amount').alias('total_spent'))

good_join.show(10)

In [None]:
# Alternative: Manual salting for extreme cases
print("ALTERNATIVE: Manual salting technique")

# Add salt to skewed keys
salt_size = 10

transactions_salted = transactions_df \
    .withColumn('salt', (rand() * salt_size).cast('int')) \
    .withColumn('salted_key', concat(col('customer_id').cast('string'), lit('_'), col('salt')))

# Explode the small dimension table
from pyspark.sql.functions import explode, array, lit as sql_lit

customers_exploded = customers_df \
    .withColumn('salt', explode(array([sql_lit(i) for i in range(salt_size)]))) \
    .withColumn('salted_key', concat(col('customer_id').cast('string'), lit('_'), col('salt')))

# Join on salted keys
salted_join = transactions_salted \
    .join(customers_exploded, 'salted_key') \
    .groupBy('customer_name', 'tier') \
    .agg(sum('amount').alias('total_spent'))

salted_join.show(10)

**üí° Key Takeaways:**
- Always check data distribution before joins/aggregations
- Enable Adaptive Query Execution and skew join optimization
- For extreme skew, use salting techniques
- Monitor Spark UI to identify straggler tasks

---
## Pitfall #3: Join Strategy Not Optimized

**What is it?**  
Spark offers different join strategies (Broadcast, Sort-Merge, Shuffle Hash). Wrong choice = slow performance.

**Problem:** Large tables get broadcast, small tables don't, causing unnecessary shuffles.

**Time: 6 minutes**

In [None]:
# Check current broadcast threshold
print(f"Current broadcast threshold: {spark.conf.get('spark.sql.autoBroadcastJoinThreshold')}")

# ‚ùå BAD: Not using broadcast for small tables
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")  # Disable auto broadcast

bad_join_strategy = transactions_df \
    .join(customers_df, 'customer_id') \
    .select('transaction_id', 'customer_name', 'amount')

print("\nBAD: Sort-Merge Join (when broadcast would be better)")
bad_join_strategy.explain()

In [None]:
# ‚úÖ GOOD: Use broadcast hint for small tables
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760")  # 10MB (default)

from pyspark.sql.functions import broadcast

good_join_strategy = transactions_df \
    .join(broadcast(customers_df), 'customer_id') \
    .select('transaction_id', 'customer_name', 'amount')

print("\nGOOD: Broadcast Hash Join")
good_join_strategy.explain()
good_join_strategy.show(5)

**üí° Key Takeaways:**
- Broadcast small tables (< 10MB) to avoid shuffles
- Use `broadcast()` hint to force broadcast joins
- Check `explain()` output to verify join strategy
- **Join types in order of preference:**
  1. Broadcast Hash Join (fastest, no shuffle)
  2. Shuffle Hash Join
  3. Sort-Merge Join (default for large tables)

---
## Pitfall #4: Python UDF Slowness

**What is it?**  
Python UDFs serialize data between JVM and Python, losing Spark's optimizations.

**Problem:** 10-100x slower than native Spark operations!

**Time: 7 minutes**

In [None]:
# ‚ùå BAD: Python UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import time

@udf(StringType())
def categorize_amount_udf(amount):
    """Slow Python UDF"""
    if amount < 100:
        return 'Small'
    elif amount < 1000:
        return 'Medium'
    else:
        return 'Large'

print("BAD: Using Python UDF")
start = time.time()

bad_udf_result = transactions_df \
    .withColumn('amount_category', categorize_amount_udf(col('amount'))) \
    .select('transaction_id', 'amount', 'amount_category')

bad_udf_result.write.mode('overwrite').format('noop').save()  # Trigger execution
bad_time = time.time() - start

print(f"Python UDF execution time: {bad_time:.2f} seconds")
bad_udf_result.show(5)

In [None]:
# ‚úÖ GOOD: Native Spark SQL functions
print("GOOD: Using native Spark functions")
start = time.time()

good_native_result = transactions_df \
    .withColumn('amount_category', 
                when(col('amount') < 100, 'Small')
                .when(col('amount') < 1000, 'Medium')
                .otherwise('Large')) \
    .select('transaction_id', 'amount', 'amount_category')

good_native_result.write.mode('overwrite').format('noop').save()  # Trigger execution
good_time = time.time() - start

print(f"Native Spark execution time: {good_time:.2f} seconds")
print(f"Speedup: {bad_time/good_time:.1f}x faster!")
good_native_result.show(5)

In [None]:
# ‚úÖ BETTER: Pandas UDF (if you must use Python)
from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf(StringType())
def categorize_amount_pandas(amounts: pd.Series) -> pd.Series:
    """Faster Pandas UDF - vectorized processing"""
    return pd.cut(amounts, 
                  bins=[-float('inf'), 100, 1000, float('inf')],
                  labels=['Small', 'Medium', 'Large'])

print("BETTER: Using Pandas UDF")
start = time.time()

pandas_udf_result = transactions_df \
    .withColumn('amount_category', categorize_amount_pandas(col('amount'))) \
    .select('transaction_id', 'amount', 'amount_category')

pandas_udf_result.write.mode('overwrite').format('noop').save()
pandas_time = time.time() - start

print(f"Pandas UDF execution time: {pandas_time:.2f} seconds")
print(f"Pandas UDF is {bad_time/pandas_time:.1f}x faster than Python UDF")
pandas_udf_result.show(5)

**üí° Key Takeaways:**
- **Avoid Python UDFs whenever possible!**
- Use native Spark SQL functions (when, case, etc.)
- If you need Python, use Pandas UDFs (vectorized)
- Performance hierarchy: Native Spark > Pandas UDF > Python UDF

---
## Pitfall #5: Ineffective Caching/Persistence

**What is it?**  
Caching stores DataFrames in memory for reuse, but misuse wastes memory or doesn't help.

**Problem:** Cache too much = OOM. Cache wrong things = no benefit.

**Time: 6 minutes**

In [None]:
# ‚ùå BAD: Caching everything or caching before filtering
print("BAD: Caching before filtering (wastes memory)")

# Don't do this - caches the full dataset!
bad_cache = transactions_df.cache()
filtered_data = bad_cache.filter(col('amount') > 1000)  # Only need this subset!

print(f"Cached full dataset: {bad_cache.storageLevel}")

In [None]:
# Clean up bad cache
bad_cache.unpersist()

# ‚úÖ GOOD: Cache after filtering, only when reusing
print("GOOD: Cache after filtering, when data is reused multiple times")

# Filter first, then cache
high_value_transactions = transactions_df \
    .filter(col('amount') > 1000) \
    .cache()

# Trigger caching
count = high_value_transactions.count()
print(f"Cached {count:,} high-value transactions")

# Now reuse the cached data multiple times (this is when cache helps!)
result1 = high_value_transactions.groupBy('category').agg(sum('amount'))
result2 = high_value_transactions.groupBy('customer_id').agg(count('*'))
result3 = high_value_transactions.agg(avg('amount'), max('amount'))

result1.show()
result2.show(5)
result3.show()

In [None]:
# Different storage levels for different use cases
from pyspark import StorageLevel

# MEMORY_ONLY (default) - fastest but can cause OOM
memory_only = transactions_df.persist(StorageLevel.MEMORY_ONLY)

# MEMORY_AND_DISK - spills to disk if memory full (safer)
memory_and_disk = transactions_df.persist(StorageLevel.MEMORY_AND_DISK)

# MEMORY_AND_DISK_SER - serialized (saves memory, slower access)
serialized = transactions_df.persist(StorageLevel.MEMORY_AND_DISK_SER)

print("\n‚úÖ Best Practices:")
print("- Cache AFTER filtering/transformation")
print("- Only cache if you'll reuse the data 2+ times")
print("- Use MEMORY_AND_DISK for production (prevents OOM)")
print("- Always unpersist() when done to free memory")

# Clean up
high_value_transactions.unpersist()
memory_only.unpersist()
memory_and_disk.unpersist()
serialized.unpersist()

**üí° Key Takeaways:**
- Only cache data that will be reused multiple times
- Cache AFTER filtering/transformations to reduce memory usage
- Use `MEMORY_AND_DISK` for production workloads
- Always `unpersist()` when done to free up memory
- Check Spark UI ‚Üí Storage tab to monitor cached data

---
## Pitfall #6: Timezone / Timestamp Parsing Issues

**What is it?**  
Timestamp parsing and timezone handling can cause incorrect results and performance issues.

**Problem:** Wrong timestamps, daylight saving bugs, slow parsing.

**Time: 6 minutes**

In [None]:
# ‚ùå BAD: Parsing timestamps without timezone awareness
print("BAD: Naive timestamp parsing")

# This can give wrong results depending on cluster timezone
bad_timestamp = transactions_df \
    .withColumn('parsed_time', to_timestamp(col('timestamp_str'), 'yyyy-MM-dd HH:mm:ss'))

bad_timestamp.select('timestamp_str', 'timezone', 'parsed_time').show(5, truncate=False)

# Problem: All timestamps are treated as if they're in the same timezone!

In [None]:
# ‚úÖ GOOD: Timezone-aware timestamp handling
print("GOOD: Timezone-aware parsing")

# Set session timezone
spark.conf.set("spark.sql.session.timeZone", "UTC")

good_timestamp = transactions_df \
    .withColumn('parsed_time', to_timestamp(col('timestamp_str'), 'yyyy-MM-dd HH:mm:ss')) \
    .withColumn('utc_time', 
                to_utc_timestamp(col('parsed_time'), col('timezone')))

good_timestamp.select('timestamp_str', 'timezone', 'parsed_time', 'utc_time').show(5, truncate=False)

In [None]:
# ‚ùå BAD: Using Python UDF for date parsing (slow!)
from datetime import datetime as dt

@udf(StringType())
def extract_hour_bad(timestamp_str):
    return dt.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S').strftime('%H')

print("BAD: Python UDF for date operations")
bad_hour = transactions_df.withColumn('hour', extract_hour_bad(col('timestamp_str')))
bad_hour.select('timestamp_str', 'hour').show(5)

In [None]:
# ‚úÖ GOOD: Native Spark date/time functions
print("GOOD: Native Spark date functions")

good_date_ops = transactions_df \
    .withColumn('parsed_time', to_timestamp(col('timestamp_str'), 'yyyy-MM-dd HH:mm:ss')) \
    .withColumn('hour', hour(col('parsed_time'))) \
    .withColumn('day_of_week', dayofweek(col('parsed_time'))) \
    .withColumn('quarter', quarter(col('parsed_time'))) \
    .withColumn('date', to_date(col('parsed_time')))

good_date_ops.select('timestamp_str', 'hour', 'day_of_week', 'quarter', 'date').show(5)

**üí° Key Takeaways:**
- Always set `spark.sql.session.timeZone` explicitly (prefer UTC)
- Use `to_utc_timestamp()` and `from_utc_timestamp()` for timezone conversions
- Use native Spark date functions: `hour()`, `dayofweek()`, `date_format()`, etc.
- Store timestamps in UTC in your data lake
- Be aware of daylight saving time issues

---
## Pitfall #7: Bad Partitioning Strategy

**What is it?**  
Data partitioning affects how data is organized on disk and in memory.

**Problem:** Too many partitions = overhead. Too few = no parallelism. Wrong key = slow queries.

**Time: 7 minutes**

In [None]:
# Check current partitioning
print(f"Current DataFrame partitions: {transactions_df.rdd.getNumPartitions()}")
print(f"Current DataFrame records: {transactions_df.count():,}")
print(f"Records per partition: {transactions_df.count() / transactions_df.rdd.getNumPartitions():,.0f}")

In [None]:
# ‚ùå BAD: Too many small partitions
print("BAD: Too many partitions (overhead!)")

bad_partitioning = transactions_df.repartition(5000)  # Way too many!
print(f"Partitions: {bad_partitioning.rdd.getNumPartitions()}")
print(f"Records per partition: {transactions_df.count() / bad_partitioning.rdd.getNumPartitions():.0f}")
print("‚ö†Ô∏è Only 200 records per partition - massive overhead!")

In [None]:
# ‚ùå BAD: Too few large partitions
print("BAD: Too few partitions (no parallelism!)")

bad_partitioning2 = transactions_df.coalesce(2)  # Only 2 partitions!
print(f"Partitions: {bad_partitioning2.rdd.getNumPartitions()}")
print(f"Records per partition: {transactions_df.count() / bad_partitioning2.rdd.getNumPartitions():,.0f}")
print("‚ö†Ô∏è 500k records per partition - can't utilize cluster parallelism!")

In [None]:
# ‚úÖ GOOD: Appropriate partition sizing
print("GOOD: Right-sized partitions")

# Rule of thumb: 128MB - 1GB per partition
# For 1M records, 100-200 partitions is reasonable

good_partitioning = transactions_df.repartition(100)
print(f"Partitions: {good_partitioning.rdd.getNumPartitions()}")
print(f"Records per partition: {transactions_df.count() / good_partitioning.rdd.getNumPartitions():,.0f}")
print("‚úÖ ~10k records per partition - good balance!")

In [None]:
# ‚úÖ BETTER: Partition by logical keys (for disk storage)
print("BETTER: Partitioning by logical columns for queries")

# Add date column for partitioning
partitioned_data = transactions_df \
    .withColumn('date', to_date(to_timestamp(col('timestamp_str'), 'yyyy-MM-dd HH:mm:ss'))) \
    .withColumn('year', year(col('date'))) \
    .withColumn('month', month(col('date')))

# Write partitioned by year/month (common for time-series data)
output_path = f"{base_path}/transactions_partitioned"

partitioned_data.write \
    .mode('overwrite') \
    .partitionBy('year', 'month') \
    .parquet(output_path)

print(f"‚úÖ Data written partitioned by year/month")
print("Benefits: Query pruning when filtering by date!")

# Read back and show partition pruning
partitioned_read = spark.read.parquet(output_path)
filtered_query = partitioned_read.filter((col('year') == 2025) & (col('month') == 11))
print("\nExecution plan (notice partition pruning):")
filtered_query.explain()

**üí° Key Takeaways:**
- **Target 128MB - 1GB per partition**
- Use `repartition()` to increase partitions (full shuffle)
- Use `coalesce()` to decrease partitions (no shuffle)
- Partition by query patterns (e.g., date for time-series)
- Avoid high-cardinality partition keys
- **Formula:** `partitions = data_size_MB / 128`

---
## Pitfall #8: Not Reading Spark UI and Query Plans

**What is it?**  
Spark UI and execution plans provide crucial insights into what Spark is actually doing.

**Problem:** Flying blind leads to mystery performance issues.

**Time: 7 minutes**

In [None]:
# Create a complex query to demonstrate plan reading
complex_query = transactions_df \
    .filter(col('amount') > 100) \
    .join(broadcast(customers_df), 'customer_id') \
    .groupBy('tier', 'category') \
    .agg(
        sum('amount').alias('total_amount'),
        count('*').alias('transaction_count'),
        avg('amount').alias('avg_amount')
    ) \
    .orderBy(col('total_amount').desc())

print("=" * 80)
print("SIMPLE EXPLAIN - High level overview")
print("=" * 80)
complex_query.explain()

print("\n" + "=" * 80)
print("EXTENDED EXPLAIN - Shows parsed, analyzed, optimized, and physical plans")
print("=" * 80)
complex_query.explain(extended=True)

print("\n" + "=" * 80)
print("FORMATTED EXPLAIN - Easier to read tree structure")
print("=" * 80)
complex_query.explain(mode='formatted')

In [None]:
# Key things to look for in execution plans
print("""
üìä HOW TO READ EXECUTION PLANS:

1. READ BOTTOM-TO-TOP (data flows upward)

2. LOOK FOR THESE OPERATIONS:
   ‚úÖ BroadcastHashJoin - Good! Small table broadcast
   ‚ö†Ô∏è  SortMergeJoin - OK for large-large joins
   ‚ö†Ô∏è  Exchange - Shuffle operation (expensive)
   ‚ö†Ô∏è  Sort - Full sort (expensive)
   ‚ùå CartesianProduct - BAD! Cross join
   
3. PARTITION INFORMATION:
   - Look for partition counts in Exchange operations
   - Check for partition skew warnings
   
4. FILTER PUSHDOWN:
   - Filters should appear early (bottom of plan)
   - PushedFilters means optimization worked!
   
5. ADAPTIVE QUERY EXECUTION:
   - AdaptiveSparkPlan shows AQE is active
   - Look for dynamic optimizations
""")

# Execute to show in Spark UI
result = complex_query.collect()
print("\n‚úÖ Query executed - Now check Spark UI!")

In [None]:
# Demonstrate how to identify problems in plans
print("""
üîç SPARK UI CHECKLIST:

üìç Jobs Tab:
   - Which stages took longest?
   - Any failed tasks?
   - Task distribution across executors

üìç Stages Tab:
   - Task timing (look for stragglers)
   - Shuffle read/write volumes
   - GC time (high = memory issues)
   - Spill to disk (bad - not enough memory)

üìç Storage Tab:
   - What's cached?
   - How much memory used?
   - Which partitions are cached?

üìç SQL Tab:
   - Query execution timeline
   - Physical plan visualization
   - Click on SQL query to see DAG

üö® RED FLAGS:
   - Tasks with 10x+ longer duration than median (skew!)
   - Large shuffle writes (> 1GB)
   - High GC time (> 10% of task time)
   - Spill to disk (means insufficient memory)
   - Many small tasks (< 1s each = too many partitions)
""")

**üí° Key Takeaways:**
- **Always run `explain()` before executing expensive queries**
- Learn to read execution plans (bottom-to-top)
- Check Spark UI for every production job
- Focus on: shuffles, joins, task skew, and GC time
- Use `explain(mode='formatted')` for easier reading

---
## Real-World Example: Building a Production Pipeline

**Time: 8 minutes**

Let's put it all together and build a production-quality ETL pipeline that applies all best practices.

In [None]:
# Production-ready pipeline incorporating all best practices
def customer_analytics_pipeline():
    """
    A production-grade pipeline that calculates customer analytics
    with proper error handling, optimization, and monitoring
    """
    
    print("üöÄ Starting Customer Analytics Pipeline...")
    
    # Step 1: Load and validate data
    print("\nüì• Step 1: Loading data...")
    transactions = transactions_df.filter(col('amount').isNotNull())  # Filter early!
    customers = customers_df
    
    # Step 2: Parse timestamps correctly
    print("\n‚è∞ Step 2: Processing timestamps...")
    transactions_with_date = transactions \
        .withColumn('parsed_time', to_timestamp(col('timestamp_str'), 'yyyy-MM-dd HH:mm:ss')) \
        .withColumn('date', to_date(col('parsed_time'))) \
        .withColumn('hour', hour(col('parsed_time'))) \
        .drop('timestamp_str')  # Drop unused columns to save memory
    
    # Step 3: Check for and handle data skew
    print("\n‚öñÔ∏è  Step 3: Checking for skewed keys...")
    spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
    
    # Step 4: Optimized join with broadcast hint
    print("\nüîó Step 4: Joining with customers (broadcast)...")
    enriched = transactions_with_date \
        .join(broadcast(customers), 'customer_id')  # Broadcast small table
    
    # Step 5: Cache intermediate result (reused multiple times)
    print("\nüíæ Step 5: Caching enriched data...")
    enriched.persist(StorageLevel.MEMORY_AND_DISK)  # Safe caching
    enriched.count()  # Materialize cache
    
    # Step 6: Calculate metrics using native Spark functions (no UDFs!)
    print("\nüìä Step 6: Calculating customer metrics...")
    customer_metrics = enriched.groupBy('customer_id', 'customer_name', 'tier', 'country').agg(
        sum('amount').alias('total_spent'),
        count('transaction_id').alias('transaction_count'),
        avg('amount').alias('avg_transaction'),
        max('amount').alias('max_transaction'),
        countDistinct('category').alias('categories_purchased'),
        min('date').alias('first_purchase'),
        max('date').alias('last_purchase')
    )
    
    # Step 7: Category analysis
    print("\nüìà Step 7: Analyzing by category and tier...")
    category_metrics = enriched.groupBy('tier', 'category').agg(
        sum('amount').alias('total_revenue'),
        count('*').alias('transaction_count'),
        countDistinct('customer_id').alias('unique_customers')
    )
    
    # Step 8: Write results with proper partitioning
    print("\nüíæ Step 8: Writing results...")
    
    customer_metrics \
        .repartition(10) \
        .write \
        .mode('overwrite') \
        .format('parquet') \
        .option('compression', 'snappy') \
        .save(f'{base_path}/customer_metrics')
    
    category_metrics \
        .write \
        .mode('overwrite') \
        .partitionBy('tier') \
        .format('parquet') \
        .save(f'{base_path}/category_metrics')
    
    # Step 9: Clean up
    print("\nüßπ Step 9: Cleaning up...")
    enriched.unpersist()
    
    print("\n‚úÖ Pipeline completed successfully!")
    return customer_metrics, category_metrics

# Run the pipeline
customer_results, category_results = customer_analytics_pipeline()

print("\n" + "="*80)
print("CUSTOMER METRICS SAMPLE:")
customer_results.orderBy(col('total_spent').desc()).show(10)

print("\n" + "="*80)
print("CATEGORY METRICS SAMPLE:")
category_results.orderBy(col('total_revenue').desc()).show(10)

---
## Job Scheduling & Monitoring in Production

**Time: 10 minutes**

Now let's see how to schedule and monitor this pipeline in production.

### Databricks Jobs Configuration

To schedule this notebook as a Databricks Job:

1. **Go to Workflows ‚Üí Jobs ‚Üí Create Job**

2. **Configure Job:**
   - **Name:** `customer_analytics_daily`
   - **Task Type:** Notebook
   - **Notebook Path:** Path to this notebook
   - **Cluster:** Choose cluster or create new

3. **Schedule:**
   - **Trigger:** Scheduled
   - **Cron:** `0 2 * * *` (runs at 2 AM daily)
   - **Timezone:** UTC

4. **Advanced Settings:**
   - **Timeout:** 3600 seconds (1 hour)
   - **Retries:** 2
   - **Retry interval:** 300 seconds (5 minutes)
   - **Max concurrent runs:** 1 (prevent overlaps)

5. **Alerts:**
   - Email on failure
   - Slack/PagerDuty integration

---

In [None]:
# Example: Adding monitoring and alerting to your pipeline
import json
from datetime import datetime

def run_with_monitoring():
    """
    Production pipeline with monitoring, error handling, and alerting
    """
    
    # Track metrics
    metrics = {
        'job_name': 'customer_analytics_pipeline',
        'start_time': datetime.now().isoformat(),
        'status': 'running',
        'records_processed': 0,
        'errors': []
    }
    
    try:
        # Run pipeline
        customer_results, category_results = customer_analytics_pipeline()
        
        # Collect metrics
        metrics['records_processed'] = customer_results.count()
        metrics['status'] = 'success'
        
        # Data quality checks
        print("\nüîç Running data quality checks...")
        
        # Check 1: No null customer IDs
        null_customers = customer_results.filter(col('customer_id').isNull()).count()
        if null_customers > 0:
            metrics['errors'].append(f"Found {null_customers} null customer IDs")
        
        # Check 2: All amounts are positive
        negative_amounts = customer_results.filter(col('total_spent') < 0).count()
        if negative_amounts > 0:
            metrics['errors'].append(f"Found {negative_amounts} negative amounts")
        
        # Check 3: Record count within expected range
        expected_min_customers = 1000
        actual_customers = customer_results.count()
        if actual_customers < expected_min_customers:
            metrics['errors'].append(
                f"Customer count {actual_customers} below threshold {expected_min_customers}"
            )
        
        metrics['data_quality_checks'] = {
            'null_customers': null_customers,
            'negative_amounts': negative_amounts,
            'total_customers': actual_customers
        }
        
    except Exception as e:
        metrics['status'] = 'failed'
        metrics['errors'].append(str(e))
        print(f"\n‚ùå Pipeline failed: {e}")
        raise
    
    finally:
        metrics['end_time'] = datetime.now().isoformat()
        
        # Log metrics (in production, send to monitoring system)
        print("\nüìä Pipeline Metrics:")
        print(json.dumps(metrics, indent=2))
        
        # In production, you would:
        # - Send metrics to CloudWatch/DataDog/etc.
        # - Trigger alerts if errors exist
        # - Update job status dashboard
        
        if metrics['errors']:
            print("\n‚ö†Ô∏è  ALERTS TRIGGERED:")
            for error in metrics['errors']:
                print(f"   - {error}")
    
    return metrics

# Run with monitoring
job_metrics = run_with_monitoring()

### Production Job Checklist

‚úÖ **Before Deploying:**

1. **Performance:**
   - [ ] Reviewed execution plans with `explain()`
   - [ ] Checked Spark UI for bottlenecks
   - [ ] Optimized joins (broadcast where appropriate)
   - [ ] Validated partition sizing
   - [ ] Removed unnecessary caching
   - [ ] Used native Spark functions (no Python UDFs)

2. **Reliability:**
   - [ ] Added error handling and retries
   - [ ] Implemented data quality checks
   - [ ] Set appropriate timeouts
   - [ ] Configured alerts (email/Slack)
   - [ ] Prevented concurrent runs
   - [ ] Added logging and metrics

3. **Monitoring:**
   - [ ] Set up success/failure alerts
   - [ ] Track job duration trends
   - [ ] Monitor data volume changes
   - [ ] Track data quality metrics
   - [ ] Set up dashboard for key metrics

4. **Cost Optimization:**
   - [ ] Right-sized cluster for workload
   - [ ] Using spot instances where appropriate
   - [ ] Auto-scaling enabled
   - [ ] Job timeouts to prevent runaway costs

---

## Summary & Best Practices Recap

**Time: 5 minutes**

### Top 10 Performance Rules:

1. **Always use `explain()` before executing expensive queries**
2. **Minimize shuffles** - combine operations, avoid unnecessary repartitions
3. **Broadcast small tables** (< 10MB) in joins
4. **Never use Python UDFs** - use native Spark functions or Pandas UDFs
5. **Enable Adaptive Query Execution** and skew join optimization
6. **Cache intelligently** - only reused data, after filtering, with MEMORY_AND_DISK
7. **Partition smartly** - target 128MB-1GB per partition, use logical keys
8. **Handle timezones properly** - store in UTC, use native date functions
9. **Monitor Spark UI** - check for skew, shuffles, GC time, spills
10. **Implement proper job scheduling** - retries, alerts, quality checks

### Production Checklist:
- ‚úÖ Execution plan reviewed
- ‚úÖ Spark UI analyzed
- ‚úÖ Joins optimized
- ‚úÖ Caching appropriate
- ‚úÖ Error handling added
- ‚úÖ Quality checks implemented
- ‚úÖ Alerts configured
- ‚úÖ Monitoring enabled
- ‚úÖ Documentation complete

### Resources:
- Databricks Documentation: https://docs.databricks.com
- Spark UI Guide: https://spark.apache.org/docs/latest/web-ui.html
- Performance Tuning: https://spark.apache.org/docs/latest/tuning.html

---

## üéì Workshop Complete!

**Next Steps:**
1. Try these techniques on your own data
2. Schedule this notebook as a Databricks Job
3. Set up monitoring and alerts
4. Experiment with the other advanced topics (see companion notebook)

**Questions?** Check out the companion notebooks for:
- Delta Lake best practices
- Advanced partitioning strategies
- File format comparisons
- Complex job orchestration
