# Bakehouse Performance Optimization

## Business Context

Welcome back to **The Bakehouse**! As the franchise continues to grow, the Data Analytics team is facing new challenges. Dashboard queries that used to complete in seconds now take minutes. The monthly reporting pipeline is timing out. And the Marketing team has discovered duplicate customer records causing confusion in email campaigns.

As a **Performance Engineer** at Bakehouse HQ, you've been tasked with diagnosing and resolving these performance bottlenecks while ensuring data quality across the organization.

## Dataset Overview

You'll continue working with the Bakehouse data from `samples.bakehouse`:

| Table | Description | Row Count | Usage |
|-------|-------------|-----------|-------|
| `sales_transactions` | Individual purchases | 3,333 | Query optimization, partitioning |
| `sales_customers` | Customer information | 300 | Base for duplicate generation |
| `sales_franchises` | Franchise locations | 48 | Join optimization |

## Learning Objectives

This comprehensive lab covers four core Apache Spark performance topics:

1. **Query Optimization** - Analyze execution plans, leverage Catalyst optimizer, implement predicate pushdown
2. **Partitioning** - Understand data distribution, use repartition vs coalesce, configure shuffle partitions
3. **De-Duplication** - Remove duplicate records, implement case-insensitive matching, standardize data formats
4. **Integration Challenge** - Combine optimization techniques into a production-ready pipeline

## Performance Journey

**Act 1: Slow Dashboards** → Diagnose query performance issues and apply Catalyst optimizer techniques
**Act 2: Scale Challenges** → Implement partitioning strategies for efficient data distribution
**Act 3: Data Quality** → Clean duplicate customer records with advanced deduplication
**Act 4: Production Pipeline** → Integrate all optimizations into a comprehensive reporting system

## Setup

In [0]:
%sql
-- Create a catalog for our performance lab
CREATE CATALOG IF NOT EXISTS bakehouse_catalog;

-- Create a schema (database) in the catalog
CREATE SCHEMA IF NOT EXISTS bakehouse_catalog.performance_lab;

-- Create a managed volume for file storage
CREATE VOLUME IF NOT EXISTS bakehouse_catalog.performance_lab.workspace;

In [0]:
# Set up working directory using Unity Catalog volume
import os

# Use Unity Catalog managed volume for file storage
working_dir = "/Volumes/bakehouse_catalog/performance_lab/workspace"

print(f"Working directory: {working_dir}")

Working directory: /Volumes/bakehouse_catalog/performance_lab/workspace


In [0]:
# Clean up working directory to account for any failed previous runs.
dbutils.fs.rm(f"{working_dir}/transactions_partitioned", recurse=True)
dbutils.fs.rm(f"{working_dir}/repartitioned_demo", recurse=True)
dbutils.fs.rm(f"{working_dir}/coalesced_demo", recurse=True)
dbutils.fs.rm(f"{working_dir}/customers_with_duplicates", recurse=True)
dbutils.fs.rm(f"{working_dir}/customers_deduplicated", recurse=True)
dbutils.fs.rm(f"{working_dir}/franchise_performance_report", recurse=True)
print(f"✅ Cleaned up working directory: {working_dir}")

✅ Cleaned up working directory: /Volumes/bakehouse_catalog/performance_lab/workspace


## Verification Utilities

These utility functions help you verify your work throughout the lab.

In [0]:
from pyspark.sql.functions import col

def verify_schema(df, expected_columns):
    """Check if DataFrame has expected columns."""
    missing = set(expected_columns) - set(df.columns)
    if missing:
        print(f"❌ Missing columns: {missing}")
        return False
    print(f"✅ Schema correct: {len(expected_columns)} columns present")
    return True

def check_partition_count(path, expected_count=None):
    """Check number of partitions in Delta table."""
    files = dbutils.fs.ls(path)
    partition_count = len([f for f in files if f.name.startswith('part-')])
    if expected_count and partition_count != expected_count:
        print(f"⚠️ Found {partition_count} partitions, expected {expected_count}")
        return False
    print(f"✅ Partition count: {partition_count}")
    return True

def inspect_sample(df, num_rows=5, description=""):
    """Display sample rows for manual inspection."""
    print(f"\n📊 Sample Data: {description}")
    display(df.limit(num_rows))
    print(f"Total rows: {df.count():,}")

print("✅ Verification utilities loaded")

✅ Verification utilities loaded


---
# Section 1: Query Optimization Fundamentals

**Business Goal:** Dashboard queries are taking 2+ minutes to complete. Management wants sub-second response times.

In this section, you'll learn to diagnose slow queries using execution plans, understand how the Catalyst optimizer works, and implement optimization techniques like predicate pushdown and filter ordering.

## Key Concepts:
- **Catalyst Optimizer**: Spark's query optimizer that automatically improves query execution
- **Logical Plan**: High-level description of what the query does
- **Physical Plan**: Low-level description of how Spark will execute the query
- **Predicate Pushdown**: Moving filters closer to the data source to reduce data transfer

### Task 1.1: Analyze Slow Query with Explain Plans

Load the transactions data and apply multiple filters. Use the `explain()` method to view how Catalyst optimizes your query by consolidating redundant filters.

In [0]:
# TODO: Load transactions table
# Use spark.table() to load samples.bakehouse.sales_transactions

from pyspark.sql.functions import col

transactions_df = spark.table("samples.bakehouse.sales_transactions")

# Apply multiple filters (some redundant)
slow_query_df = (transactions_df
    .filter(col("totalPrice") > 20)
    .filter(col("totalPrice") > 10)  # Redundant - already filtered > 20
    .filter(col("product") != "cookies")
    .filter(col("product") != "bread")
)

# Display the logical and physical plans
slow_query_df.explain(True)

== Parsed Logical Plan ==
'Filter 'not('`==`('product, bread))
+- 'Filter 'not('`==`('product, cookies))
   +- 'Filter '`>`('totalPrice, 10)
      +- 'Filter '`>`('totalPrice, 20)
         +- 'UnresolvedRelation [samples, bakehouse, sales_transactions], [], false

== Analyzed Logical Plan ==
transactionID: bigint, customerID: bigint, franchiseID: bigint, dateTime: timestamp, product: string, quantity: bigint, unitPrice: bigint, totalPrice: bigint, paymentMethod: string, cardNumber: bigint
Filter NOT (product#16229 = bread)
+- Filter NOT (product#16229 = cookies)
   +- Filter (totalPrice#16232L > cast(10 as bigint))
      +- Filter (totalPrice#16232L > cast(20 as bigint))
         +- SubqueryAlias samples.bakehouse.sales_transactions
            +- Relation samples.bakehouse.sales_transactions[transactionID#16225L,customerID#16226L,franchiseID#16227L,dateTime#16228,product#16229,quantity#16230L,unitPrice#16231L,totalPrice#16232L,paymentMethod#16233,cardNumber#16234L] parquet

== Optimiz

In [0]:
# CHECK YOUR WORK
assert 'transactions_df' in dir(), "transactions_df should be defined"
assert transactions_df.count() == 3333, "Should load all 3,333 transactions"
print("✅ Task 1.1 complete: Execution plan displayed")
print("📝 Note: Look at the 'Optimized Logical Plan' - Catalyst consolidated the filters!")

✅ Task 1.1 complete: Execution plan displayed
📝 Note: Look at the 'Optimized Logical Plan' - Catalyst consolidated the filters!


### Task 1.2: Demonstrate Predicate Pushdown with Partitioned Delta

Write the transactions data as a partitioned Delta table (partitioned by `franchiseID`). Then read it back with a filter and observe how Spark prunes partitions in the execution plan.

**Why This Matters**: Predicate pushdown reduces data transfer by filtering at the storage layer instead of after loading all data.

In [0]:
# TODO: Write partitioned Delta table and read with filter
# 1. Partition by "franchiseID" column
# 2. Filter for franchiseID == 3000033 when reading back

(transactions_df
 .write
 .partitionBy("franchiseID")  # Which column to partition by?
 .format("delta")
 .mode("overwrite")
 .save(f"{working_dir}/transactions_partitioned")
)

# Read with filter - Spark will only read relevant partitions
filtered_df = spark.read.format("delta").load(
    f"{working_dir}/transactions_partitioned"
).filter(col("franchiseID") == 3000033)  # Filter condition: col("franchiseID") == 3000033

# Display the execution plan
filtered_df.explain(True)

display(filtered_df)

== Parsed Logical Plan ==
'Filter '`==`('franchiseID, 3000033)
+- Relation [transactionID#16510L,customerID#16511L,franchiseID#16512L,dateTime#16513,product#16514,quantity#16515L,unitPrice#16516L,totalPrice#16517L,paymentMethod#16518,cardNumber#16519L] parquet

== Analyzed Logical Plan ==
transactionID: bigint, customerID: bigint, franchiseID: bigint, dateTime: timestamp, product: string, quantity: bigint, unitPrice: bigint, totalPrice: bigint, paymentMethod: string, cardNumber: bigint
Filter (franchiseID#16512L = cast(3000033 as bigint))
+- Relation [transactionID#16510L,customerID#16511L,franchiseID#16512L,dateTime#16513,product#16514,quantity#16515L,unitPrice#16516L,totalPrice#16517L,paymentMethod#16518,cardNumber#16519L] parquet

== Optimized Logical Plan ==
Filter (isnotnull(franchiseID#16512L) AND (franchiseID#16512L = 3000033))
+- Relation [transactionID#16510L,customerID#16511L,franchiseID#16512L,dateTime#16513,product#16514,quantity#16515L,unitPrice#16516L,totalPrice#16517L,pa

transactionID,customerID,franchiseID,dateTime,product,quantity,unitPrice,totalPrice,paymentMethod,cardNumber
1002920,2000119,3000033,2024-05-07T09:08:29.107Z,Pearly Pies,10,3,30,visa,4512140833802082
1002951,2000072,3000033,2024-05-08T22:12:33.980Z,Tokyo Tidbits,1,3,3,visa,4517133419504961
1002967,2000027,3000033,2024-05-10T17:09:57.932Z,Pearly Pies,2,3,6,amex,347726165286313
1002987,2000093,3000033,2024-05-06T05:01:28.491Z,Austin Almond Biscotti,6,3,18,visa,4195673612785593
1003000,2000256,3000033,2024-05-14T20:23:44.498Z,Pearly Pies,3,3,9,amex,376481000341048
1003005,2000107,3000033,2024-05-15T16:52:12.463Z,Tokyo Tidbits,9,3,27,visa,4487722735852553
1003015,2000172,3000033,2024-05-11T04:14:23.247Z,Tokyo Tidbits,8,3,24,visa,4228081889367973
1003084,2000052,3000033,2024-05-02T02:06:06.463Z,Tokyo Tidbits,2,3,6,amex,345239704624095
1003115,2000205,3000033,2024-05-08T14:23:11.412Z,Orchard Oasis,8,3,24,mastercard,2711867627801341
1003128,2000293,3000033,2024-05-03T18:18:22.974Z,Tokyo Tidbits,2,3,6,visa,4776265971833366


In [0]:
# CHECK YOUR WORK
assert filtered_df.count() > 0, "Should have results for franchiseID = 3000033"
print(f"✅ Task 1.2 complete: Filtered to {filtered_df.count()} transactions with partition pruning")
print("📝 Look at the explain plan output above - you should see PartitionFilters showing Spark pruned partitions!")

✅ Task 1.2 complete: Filtered to 90 transactions with partition pruning
📝 Look at the explain plan output above - you should see PartitionFilters showing Spark pruned partitions!


### Task 1.3: Optimize Join Query with Filter Pushdown

Compare two approaches:
1. **Inefficient**: Join first, then filter
2. **Efficient**: Filter before join

Observe the dramatic difference in rows processed.

In [0]:
# Load franchises data
franchises_df = spark.table("samples.bakehouse.sales_franchises")

# INEFFICIENT APPROACH: Join then filter
slow_join_df = (transactions_df
    .join(franchises_df, "franchiseID")
    .filter(col("country") == "US")
)

print(f"Inefficient approach processes {transactions_df.count()} transactions")
print(f"Filtered result: {slow_join_df.count()} rows")

# TODO: Optimize join by filtering before joining
# 1. Filter franchises_df where country == "USA"
# 2. Join transactions_df with filtered franchises on "franchiseID"

fast_franchises_df = franchises_df.filter(col("country") == "US")  # Filter condition for USA
fast_join_df = transactions_df.join(fast_franchises_df, "franchiseID")  # Join with filtered DataFrame, join column

print(f"\nEfficient approach joins with only {fast_franchises_df.count()} franchises")
print(f"Same filtered result: {fast_join_df.count()} rows")

Inefficient approach processes 3333 transactions
Filtered result: 1124 rows

Efficient approach joins with only 16 franchises
Same filtered result: 1124 rows


In [0]:
# CHECK YOUR WORK
assert slow_join_df.count() == fast_join_df.count(), "Both approaches should return same results"
assert fast_franchises_df.count() < franchises_df.count(), "Should filter franchises before join"
print("✅ Task 1.3 complete: Join optimization demonstrated")
print(f"📊 Efficiency gain: Reduced franchise table from {franchises_df.count()} to {fast_franchises_df.count()} rows before join")

✅ Task 1.3 complete: Join optimization demonstrated
📊 Efficiency gain: Reduced franchise table from 48 to 16 rows before join


---
# Section 2: Partitioning for Performance

**Business Goal:** As data grows, queries are slowing down. We need to distribute workload efficiently across our cluster.

In this section, you'll learn to inspect partition counts, use repartition vs coalesce, configure shuffle partitions, and leverage Adaptive Query Execution (AQE).

## Key Concepts:
- **Partition**: A chunk of data distributed across the cluster
- **repartition()**: Full shuffle to create evenly balanced partitions
- **coalesce()**: Narrow transformation to reduce partitions without full shuffle
- **Shuffle Partitions**: Number of partitions created during wide transformations (joins, aggregations)

### Task 2.1: Repartition for Balanced Distribution

Use `repartition()` to redistribute data evenly across 8 partitions. This performs a full shuffle. We'll verify by writing the data and counting output files.

In [0]:
# TODO: Repartition to 8 partitions
# Use .repartition(8) method on transactions_df

repartitioned_df = transactions_df.repartition(8)

# Write to verify partition count through file output
(repartitioned_df
 .write
 .mode("overwrite")
 .format("delta")
 .save(f"{working_dir}/repartitioned_demo")
)

# Count the data files (each partition creates one file)
output_files = dbutils.fs.ls(f"{working_dir}/repartitioned_demo")
data_files = [f for f in output_files if f.name.endswith('.parquet')]
print(f"Output files created: {len(data_files)}")
print(f"→ Each partition writes one file, so we have {len(data_files)} partitions")

Output files created: 8
→ Each partition writes one file, so we have 8 partitions


In [0]:
# CHECK YOUR WORK
assert len(data_files) == 8, f"Should have exactly 8 files (partitions), got {len(data_files)}"
reloaded_count = spark.read.format("delta").load(f"{working_dir}/repartitioned_demo").count()
assert reloaded_count == transactions_df.count(), "Row count should remain the same"
print("✅ Task 2.1 complete: Repartitioned to 8 partitions")
print("📝 Note: repartition() triggers a full shuffle but ensures even distribution")

✅ Task 2.1 complete: Repartitioned to 8 partitions
📝 Note: repartition() triggers a full shuffle but ensures even distribution


### Task 2.2: Coalesce for Narrow Transformation

Use `coalesce()` to reduce partitions without a full shuffle. Observe the key difference: coalesce is efficient but cannot increase partition count.

In [0]:
# TODO: Coalesce to reduce partitions
# Use .coalesce(2) method to reduce to 2 partitions

# Load the repartitioned data from Task 2.1
base_df = spark.read.format("delta").load(f"{working_dir}/repartitioned_demo")

coalesced_df = base_df.coalesce(2)

# Write and verify
(coalesced_df
 .write
 .mode("overwrite")
 .format("delta")
 .save(f"{working_dir}/coalesced_demo")
)

output_files = dbutils.fs.ls(f"{working_dir}/coalesced_demo")
data_files = [f for f in output_files if f.name.endswith('.parquet')]
print(f"After coalesce(2): {len(data_files)} files (partitions)")
print(f"→ Reduced from 8 to {len(data_files)} partitions without full shuffle!")

After coalesce(2): 1 files (partitions)
→ Reduced from 8 to 1 partitions without full shuffle!


In [0]:
# CHECK YOUR WORK
assert len(data_files) <= 2, f"Should have at most 2 files (partitions), got {len(data_files)}"
assert len(data_files) < 8, f"Should have fewer files than before repartitioning (8), got {len(data_files)}"
assert coalesced_df.count() == base_df.count(), "Row count should remain the same"
print("✅ Task 2.2 complete: Coalesce behavior understood")
print("📝 Use coalesce() to reduce partitions cheaply, repartition() to increase or rebalance")
print("💡 Note: With small datasets, Delta may optimize to fewer files than the coalesce number")

✅ Task 2.2 complete: Coalesce behavior understood
📝 Use coalesce() to reduce partitions cheaply, repartition() to increase or rebalance
💡 Note: With small datasets, Delta may optimize to fewer files than the coalesce number


**Understanding Shuffle Partitions & Adaptive Query Execution:**

### Shuffle Partitions
When Spark performs wide transformations (joins, aggregations, sorts), it shuffles data across partitions. The number of partitions created during shuffles is controlled by `spark.sql.shuffle.partitions`.

**Key considerations:**
- Default is often 200 partitions (too many for small datasets)
- Best practice: 2-4x your core count for small data
- Too many partitions = overhead, too few = underutilization

### Adaptive Query Execution (AQE)
Databricks serverless compute has AQE automatically enabled. AQE dynamically optimizes queries at runtime by:
- **Coalescing shuffle partitions** based on actual data size
- **Optimizing join strategies** (broadcast vs shuffle)
- **Handling data skew** automatically

This means Spark automatically adjusts partition counts for optimal performance, even if you start with a high number!

---
# Section 3: De-Duplicating Customer Data

**Business Goal:** Marketing reports duplicate customers receiving multiple emails. We need to clean the customer database.

In this section, you'll generate synthetic duplicates, attempt simple deduplication, implement case-insensitive matching, standardize data formats, and write optimized output.

## Key Concepts:
- **dropDuplicates()**: Remove duplicate rows based on column values
- **Case-Insensitive Matching**: "John" = "JOHN" = "john"
- **Data Standardization**: "123-45-6789" = "123456789"
- **Single File Output**: repartition(1) for consolidated results

### Task 3.1: Generate Duplicate Customer Dataset

Create synthetic duplicates from the base customer data by introducing case variations, spacing differences, and format inconsistencies.

In [0]:
# TODO
# Generate ~103K customer records with duplicates
# Use explode() to repeat customers multiple times, then add variations

from pyspark.sql.functions import lit, concat, when, upper, lower, expr, explode

# Load base customers
base_customers_df = spark.table("samples.bakehouse.sales_customers")

print(f"Base customers: {base_customers_df.count()}")

# Create duplicates by repeating each customer ~350 times with variations
# Generate an array of 350 elements using sequence(), then explode to create 350 rows per customer
duplicates_df = (base_customers_df
    .withColumn("duplicate_copies", explode(expr("sequence(0, 349)")))

    # Use duplicate_copies as the variation seed
    .withColumn("duplicate_id", col("duplicate_copies"))

    # Add case variations to first_name
    .withColumn("first_name",
        when(col("duplicate_id") % 3 == 0, upper(col("first_name")))
        .otherwise(col("first_name"))
    )

    # TODO: Add case variations to last_name
    # Use when(col("duplicate_id") % 2 == 0, ...) to uppercase some last names
    .withColumn("last_name",
        when(col("duplicate_id") % 2 == 0, upper(col("last_name"))).otherwise(col("last_name"))
    )

    # TODO: Add variations to email (some uppercase domain)
    # Use when(col("duplicate_id") % 4 == 0, ...) to uppercase some emails
    .withColumn("email_address",
        when(col("duplicate_id") % 4 == 0, upper(col("email_address"))).otherwise(col("email_address"))
    )

    # Drop the temporary column
    .drop("duplicate_copies")
)

# Write to volume
(duplicates_df
 .write
 .mode("overwrite")
 .format("delta")
 .save(f"{working_dir}/customers_with_duplicates")
)

# Check the count
dup_count = spark.read.format("delta").load(f"{working_dir}/customers_with_duplicates").count()
print(f"Generated {dup_count:,} customer records (including duplicates)")

Base customers: 300
Generated 105,000 customer records (including duplicates)


In [0]:
# CHECK YOUR WORK
assert dup_count > 50000, f"Should generate significant duplicates, got {dup_count}"
print(f"✅ Task 3.1 complete: Generated {dup_count:,} records with duplicates")

✅ Task 3.1 complete: Generated 105,000 records with duplicates


### Task 3.2: Simple Deduplication Attempt

Try using `dropDuplicates()` on the raw data. Discover that it misses case-sensitive duplicates.

In [0]:
# Read duplicates
dups_df = spark.read.format("delta").load(f"{working_dir}/customers_with_duplicates")

# TODO: Apply simple deduplication
# Use dropDuplicates() on key columns: customerID, first_name, last_name, email_address
# Pass columns as a list

simple_dedup_df = dups_df.dropDuplicates(["customerID", "first_name", "last_name", "email_address"])  # List of column names

print(f"Original: {dups_df.count():,}")
print(f"After simple dedup: {simple_dedup_df.count():,}")
print(f"Removed: {dups_df.count() - simple_dedup_df.count():,} records")
print("\n⚠️ Still has duplicates due to case sensitivity!")
print("   'John' != 'JOHN' in simple dropDuplicates")

Original: 105,000
After simple dedup: 1,800
Removed: 103,200 records

⚠️ Still has duplicates due to case sensitivity!
   'John' != 'JOHN' in simple dropDuplicates


In [0]:
# CHECK YOUR WORK
assert simple_dedup_df.count() < dups_df.count(), "Should remove some duplicates"
assert simple_dedup_df.count() > 300, "Should still have many duplicates due to case sensitivity"
print("✅ Task 3.2 complete: Simple deduplication attempted")

✅ Task 3.2 complete: Simple deduplication attempted


### Task 3.3: Case-Insensitive Deduplication

Create lowercase versions of text columns and use them for deduplication, then drop the temporary columns.

In [0]:
# TODO: Case-insensitive deduplication
# 1. Create lowercase columns using lower(col(...))
# 2. Drop duplicates based on lowercase columns and customerID
# 3. Drop temporary lowercase columns

from pyspark.sql.functions import lower

normalized_df = (dups_df
    .withColumn("lcFirstName", lower(col("first_name")))
    .withColumn("lcLastName", lower(col("last_name")))
    .withColumn("lcEmail", lower(col("email_address")))
)

# Drop duplicates based on normalized columns and customerID
deduped_df = normalized_df.dropDuplicates(["customerID", "lcFirstName", "lcLastName", "lcEmail"])

# Clean up temporary columns
final_df = deduped_df.drop("lcFirstName", "lcLastName", "lcEmail")

print(f"After case-insensitive dedup: {final_df.count():,}")
print(f"Additional duplicates removed: {simple_dedup_df.count() - final_df.count():,}")

After case-insensitive dedup: 300
Additional duplicates removed: 1,500


In [0]:
# CHECK YOUR WORK
assert final_df.count() < simple_dedup_df.count(), "Should remove more duplicates than simple method"
assert "lcFirstName" not in final_df.columns, "Should drop temporary columns"
expected_count = 300  # Should be close to base customer count
tolerance = 50
assert abs(final_df.count() - expected_count) < tolerance, f"Should have ~{expected_count} unique customers"
print("✅ Task 3.3 complete: Case-insensitive deduplication successful")

✅ Task 3.3 complete: Case-insensitive deduplication successful


### Task 3.4: Data Standardization for Better Matching

The Bakehouse customers don't have SSN fields in the actual data, so we'll demonstrate the standardization concept using the postal_zip_code field instead, removing any formatting inconsistencies.

In [0]:
# TODO
# Demonstrate standardization concept by creating normalized columns
# In a real scenario, you'd standardize phone numbers, SSNs, etc.

from pyspark.sql.functions import translate, regexp_replace, trim

# For demonstration: normalize postal codes and phone numbers
standardized_df = (dups_df
    .withColumn("lcFirstName", lower(col("first_name")))
    .withColumn("lcLastName", lower(col("last_name")))
    .withColumn("lcEmail", lower(col("email_address")))
    # Normalize phone numbers (remove dashes, spaces, parentheses)
    .withColumn("cleanPhone",
        regexp_replace(regexp_replace(col("phone_number"), "[^0-9]", ""), " ", ""))
)

# Dedup including standardized fields
final_standardized_df = (standardized_df
    .dropDuplicates([
        "lcFirstName", "lcLastName", "lcEmail",
        "cleanPhone", "postal_zip_code", "gender"
    ])
    .drop("lcFirstName", "lcLastName", "lcEmail", "cleanPhone")
)

print(f"After standardization: {final_standardized_df.count():,}")
print(f"Further improved matching: {final_df.count() - final_standardized_df.count():,} more duplicates removed")

After standardization: 300
Further improved matching: 0 more duplicates removed


In [0]:
# CHECK YOUR WORK
assert final_standardized_df.count() <= final_df.count(), "Should not increase count"
print("✅ Task 3.4 complete: Data standardization applied")
print("📝 Standardization catches duplicates with formatting differences")

✅ Task 3.4 complete: Data standardization applied
📝 Standardization catches duplicates with formatting differences


### Task 3.5: Write Single Partition Output

Write the deduplicated data as a single file for downstream systems.

In [0]:
# TODO: Write to single partition
# Use .repartition(1) to create a single output file

(final_standardized_df
 .repartition(1)  # How many partitions for single file?
 .write
 .mode("overwrite")
 .format("delta")
 .save(f"{working_dir}/customers_deduplicated")
)

# Verify single file
output_files = dbutils.fs.ls(f"{working_dir}/customers_deduplicated")
data_files = [f for f in output_files if f.name.endswith('.parquet')]
print(f"Output files: {len(data_files)} data file(s)")

Output files: 1 data file(s)


In [0]:
# CHECK YOUR WORK
deduped_count = spark.read.format("delta").load(f"{working_dir}/customers_deduplicated").count()
assert deduped_count == final_standardized_df.count(), "Counts should match"
print(f"✅ Task 3.5 complete: Deduplicated {deduped_count:,} clean customer records")
print(f"📊 Summary: {dups_df.count():,} → {deduped_count:,} customers ({((1 - deduped_count/dups_df.count()) * 100):.1f}% duplicates removed)")

✅ Task 3.5 complete: Deduplicated 300 clean customer records
📊 Summary: 105,000 → 300 customers (99.7% duplicates removed)


---
# Section 4: Comprehensive Performance Challenge

**Business Goal:** Build an optimized monthly reporting pipeline that combines all performance techniques.

## Requirements:
1. Load deduplicated customers
2. Join with transactions (optimize join order)
3. Join with franchises (filter before join)
4. Aggregate metrics by franchise
5. Write results with appropriate partitioning

Apply everything you've learned: predicate pushdown, filter ordering, and efficient joins!

### Challenge: Build Optimized Reporting Pipeline

Combine query optimization, partitioning, and clean data into a production-ready pipeline.

In [0]:
# TODO: Build optimized reporting pipeline
# Apply all performance techniques learned:
# - Load deduplicated data
# - Filter before joining (predicate pushdown)
# - Proper join order

from pyspark.sql.functions import sum, count, countDistinct, desc, upper, trim

# Step 1: Load deduplicated customers from f"{working_dir}/customers_deduplicated"
clean_customers_df = spark.read.format("delta").load(f"{working_dir}/customers_deduplicated") 

# Step 2: Load and filter franchises to US only (predicate pushdown!) 
usa_franchises_df = spark.table("samples.bakehouse.sales_franchises").filter(col("country") == "US")

# Step 3: Join transactions with clean customers, then with USA franchises
# Use "customerID" for customer join, "franchiseID" for franchise join
enriched_transactions_df = (transactions_df
    .join(clean_customers_df, "customerID")  
    .join( usa_franchises_df  , "franchiseID")
)

# Step 4: Calculate franchise performance metrics
# Note: After joining multiple tables, some columns (name, city, country) exist in both
# customers and franchises DataFrames. You must disambiguate these columns using
# the DataFrame reference syntax: usa_franchises_df["column_name"]
franchise_report_df = (enriched_transactions_df
    .groupBy(
        "franchiseID",
        usa_franchises_df["name"],  # Disambiguate franchise name
        usa_franchises_df["city"],  # Disambiguate franchise city
        usa_franchises_df["country"]  # Disambiguate franchise country
    )
    .agg(
        sum("totalPrice").alias("total_revenue"),
        count("transactionID").alias("transaction_count"),
        countDistinct("customerID").alias("unique_customers")
    )
    .orderBy(desc("total_revenue"))
)

display(franchise_report_df)

# Step 5: Write optimized output
(franchise_report_df
 .repartition(1)   # How many partitions for single file?
 .write
 .mode("overwrite")
 .format("delta")
 .save(f"{working_dir}/franchise_performance_report")
)


franchiseID,name,city,country,total_revenue,transaction_count,unique_customers
3000021,Butter Bliss,Honolulu,US,2514,64,57
3000002,Golden Crumbs,San Francisco,US,2502,76,69
3000011,Butter Babies,Miami,US,1404,86,73
3000030,Caramel Cravings,Boston,US,1371,77,70
3000032,Sweet Temptations,New Orleans,US,1368,83,76
3000017,Crumbly Creations,Austin,US,1356,76,68
3000036,Sugar High,Washington D.C.,US,1329,69,57
3000005,Floured Fantasies,Los Angeles,US,1272,74,66
3000028,Batter Up,Philadelphia,US,1188,64,59
3000034,Frosted Fantasies,Las Vegas,US,1155,76,64


In [0]:
# CHECK YOUR WORK
report_df = spark.read.format("delta").load(f"{working_dir}/franchise_performance_report")
assert report_df.count() > 0, "Should have franchise performance data"
assert "franchiseID" in report_df.columns, "Should include franchiseID"
print(f"✅ Challenge complete: Generated report for {report_df.count()} USA franchises")
print("🎉 Congratulations! You've mastered Spark performance optimization!")

✅ Challenge complete: Generated report for 16 USA franchises
🎉 Congratulations! You've mastered Spark performance optimization!


---
# Cleanup

Run the following cell to clean up your environment.

In [0]:
# Clean up working directory
dbutils.fs.rm(f"{working_dir}/transactions_partitioned", recurse=True)
dbutils.fs.rm(f"{working_dir}/repartitioned_demo", recurse=True)
dbutils.fs.rm(f"{working_dir}/coalesced_demo", recurse=True)
dbutils.fs.rm(f"{working_dir}/customers_with_duplicates", recurse=True)
dbutils.fs.rm(f"{working_dir}/customers_deduplicated", recurse=True)
dbutils.fs.rm(f"{working_dir}/franchise_performance_report", recurse=True)
print(f"✅ Cleaned up working directory: {working_dir}")

✅ Cleaned up working directory: /Volumes/bakehouse_catalog/performance_lab/workspace


## Congratulations!

You've completed the comprehensive Bakehouse Performance Optimization lab, covering:

✅ **Query Optimization** - Catalyst optimizer, explain plans, predicate pushdown, join optimization
✅ **Partitioning** - repartition vs coalesce, shuffle configuration, AQE
✅ **De-Duplication** - Case-insensitive matching, data standardization, efficient output
✅ **Integration** - Combined techniques in production pipeline

## Key Takeaways:

1. **Always check explain plans** - Understand what Spark is actually doing
2. **Filter early, filter often** - Push predicates close to data sources
3. **Partition wisely** - Balance parallelism with overhead
4. **Match data size to method** - Small data different from big data
5. **Standardize before deduplication** - Clean data improves matching

These performance optimization skills are essential for building scalable, production-ready data pipelines!