# 🚀 **Lab 4 - Performance Tuning, Optimization & Scaling**
In this lab, you'll explore key concepts and techniques for tuning performance, selecting optimal Delta configs, and other critical concepts to maintain a scalable lakehouse architecture. Let's dive in!  

## 🎯 What You'll Learn
- The Native Execution Engine
- Optimizing tables with zone and use case appropriate table features
- Table clustering
- Compute sizing
- Query optimization
- Table maintenance

## **4.1 - Why Spark & Delta Performance Tuning Matters** | 🕑 2:00 – 2:05 PM
#### 🧠 Why Performance Tuning?
Performance tuning is not just about speed—it’s about:
- 💰 **Reducing cost** (fewer resources, faster jobs)
- 📈 **Improving scalability** (handle more data, more users)
- 📊 **Delivering consistent SLAs** (avoid those surprise slowdowns)
- 🧘 **Creating a smoother dev & user experience** (debug less, wait less)

In Spark + Delta workloads, **every layer matters**—from the execution engine and data layout to the shape of your compute and how queries are written. Getting performance right means **pulling the right levers at the right time**

**PLACEHOLDER FOR OVERVIEW IMAGE**

## **4.2 - Going Native for Big Gains** | 🕑 (2:05 – 2:15 PM)

**PLACEHOLDER FOR NEE vs. JVM DIAGRAM**

### 💪 **4.2.1 - The Power of the Native Execution Engine**

In [24]:
# This will be added to the GitHub repo post FabCon. For the Workshop we'll hit a copy of this data in OneLake to provide better perf (this data resides in East US).
blob_account_name = "azureopendatastorage"
blob_container_name = "nyctlc"
blob_relative_path = "yellow"

blob_sas_token = r""

spark.conf.set(
  'fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),
  blob_sas_token)

wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}"
spark.conf.set("spark.sql.parquet.int96RebaseModeInRead", "CORRECTED")

# Read the Parquet file from the WASBS path and write to LH
df = spark.read.parquet(wasbs_path)
df.createOrReplaceTempView("nyc")

StatementMeta(, c6992806-57be-47e4-9920-9ec3bb0239a8, 26, Finished, Available, Finished)

#### **4.2.1.1 - Create benchmark function**

In [1]:
def benchmark_native_engine(query):
    import time
    spark.conf.set("spark.synapse.vegas.useCache", "false")
    spark.conf.set("spark.native.enabled", "false")
    # Set description of the job
    spark.sparkContext.setJobDescription(f"Spark Query: {query}")
    # Record start time
    start_time_spark = time.time()
    # Execute query
    q = spark.sql(query)
    # Collect results (this triggers the query execution)
    if query.strip().replace('\n', '').split(' ')[0].lower() in ('with', 'select'):
        q.collect()
    # Record end time
    end_time_spark = time.time()
    duration_spark = (end_time_spark - start_time_spark) * 1000  # Convert to milliseconds
    print(f"Execution time w/ Spark: {duration_spark:.2f} ms")

    spark.conf.set("spark.native.enabled", "true")
    # Set description of the job
    spark.sparkContext.setJobDescription(f"Native Query: {query}")
    # Record start time
    start_time_native = time.time()
    # Execute query
    q = spark.sql(query)
    # Collect results (this triggers the query execution)
    if query.strip().replace('\n', '').split(' ')[0].lower() in ('with', 'select'):
        q.collect()
    # Record end time
    end_time_native = time.time()
    spark.sparkContext.setJobDescription(None)
    duration_native = (end_time_native - start_time_native) * 1000  # Convert to milliseconds

    # Get the execution plan
    execution_plan = q._jdf.queryExecution().executedPlan().toString()

    # Assert that the plan contains "Velox" to make sure that NEE was enabled and had at least 1 compatible operation
    assert "Velox" in execution_plan, f"Plan did not contain Velox: {execution_plan}"

    spark.conf.set("spark.synapse.vegas.useCache", "true")

    print(f"Execution time w/ Native: {duration_native:.2f} ms")
    times_faster = duration_spark/duration_native
    if duration_spark > duration_native:
        print(f"Native was \033[1;34m{times_faster:.1f}x faster\033[0m!!!")


StatementMeta(, 58897f71-d6ee-47fb-b8e1-2ffe597b8760, 3, Finished, Available, Finished)

#### **4.2.1.2 - Run Benchmark on 1.5B row parquet dataset**

In [2]:
taxi_df = spark.read.parquet('abfss://d24c0aa4-e6b2-4571-8e9a-6ae82ebf926d@msit-onelake.dfs.fabric.microsoft.com/9308f974-121a-4491-9d8a-8dde47a9ce9b/Files/nyctlc/yellow')
taxi_df.createOrReplaceTempView('nyc_yellow_taxi')

StatementMeta(, 58897f71-d6ee-47fb-b8e1-2ffe597b8760, 4, Finished, Available, Finished)

In [3]:
benchmark_native_engine("""
    SELECT 
        AVG(tripDistance), 
        VendorID 
    FROM nyc_yellow_taxi
    GROUP BY ALL
""")

StatementMeta(, 58897f71-d6ee-47fb-b8e1-2ffe597b8760, 5, Finished, Available, Finished)

Execution time w/ Spark: 47844.29 ms
Execution time w/ Native: 17661.24 ms
Native was [1;34m2.7x faster[0m!!!


Now write your own query and execute it with the benchmark function: `benchmark_native_engine()`. You can call `taxi_df.printSchema()` to see what columns are available in the DataFrame. Reference the DataFrame via the temp view `nyc_yellow_taxi` just like it is any other Lakehouse table or view.

> ℹ️ _Unhide the below two cells if you need help with the above task._

In [None]:
taxi_df.printSchema()

In [None]:
benchmark_native_engine("""
    SELECT *
    FROM nyc_yellow_taxi
    LIMIT 1000
""")

#### **4.2.1.3 - Compare Final Execution Plans**
- **Numbering (`*(1)`, `*(2)`) Represents Execution Order**
- **Caret symbol (`^`) Represents a Stage Executed via the Velox engine (NEE)**
- Graphical plans show Native execution in <span style="color:green">Green</span> and Spark in <span style="color:blue"> blue</span>


### **4.2.2 - Enable the Native Execution Engine**
We can enable any mutable Spark configuration via PySpark, Scala, or SparkSQL

In [4]:
# PySpark / Scala
spark.conf.set("spark.native.enabled", "true")

# SparkSQL
spark.sql("SET spark.native.enabled = True")

StatementMeta(, 58897f71-d6ee-47fb-b8e1-2ffe597b8760, 6, Finished, Available, Finished)

DataFrame[key: string, value: string]

#### **4.2.2.1 - Audit Session Configs**

In [5]:
# Verify that Spark config is set
spark.conf.get("spark.native.enabled")

StatementMeta(, 58897f71-d6ee-47fb-b8e1-2ffe597b8760, 7, Finished, Available, Finished)

'True'

#### 🚨 Key Takeaways

###### ❌ **Not Using the Native Execution Engine is a Missed Opportunity**

- If you aren't using the Native Execution, and thus operating on the traditional Spark JVM-based execution engine, you’re **missing out on huge performance gains**—we’re talking 2x–5x improvements in many cases.
- ✅ *Fix*: Validate engine usage via the Spark UI or logs. Ensure compatible functions and workloads are used to **unlock native execution**.

## **4.3 - Smart Feature Selection by Zone & Workload** | 🕑 2:15 – 2:30 PM
While default Spark configurations should benefit the most common use cases, sometimes we need to modify configurations to optimize our specific workloads. Here we will set spark configurations to optimize typical bronze and silver zones.

![OptimizeWrite](https://milescole.dev/assets/img/posts/Optimized-Writes/optimized-write.excalidraw.png)

![Deletion Vectors](https://milescole.dev/assets/img/posts/Deletion-Vectors/deletion-vectors-tldr2.excalidraw.png)

----------------
V-Order - "ELI5"
**NEEDS TO BE CREATED**

In [3]:
# TO BE REMOVED, ONLY USED FOR PRE-LAB TESTING TO RESET TABLE BACK TO DEFAULT CONFIGS
spark.sql(f"""
    CREATE OR REPLACE TABLE golddenormalized.dbo.patientobservations_gold
    AS SELECT * FROM golddenormalized.dbo.patientobservations_gold
""")

StatementMeta(, e346977a-d8d8-4349-ad64-a8d463239793, 5, Finished, Available, Finished)

DataFrame[]

#### **4.3.1.1 - Get Baseline Perf Before Config Modification**

In [36]:
%%sql
UPDATE golddenormalized.dbo.patientobservations_gold SET deceasedDateTime = NULL WHERE first_name = 'Dion244' and last_name = 'Bergnaum523'

StatementMeta(, 58897f71-d6ee-47fb-b8e1-2ffe597b8760, 38, Finished, Available, Finished)

DataFrame[num_affected_rows: bigint]

### **4.3.2 - Enable Session Configs**

In [4]:
spark.conf.set("spark.microsoft.delta.optimizeWrite.partitioned.enabled", "true") 
    # optimizeWrite is generally beneficial for partitioned tables
spark.conf.unset("spark.databricks.delta.optimizeWrite.enabled") 
    # unset so that `optimizeWrite.partitioned.enabled` is not overridden
spark.conf.set("spark.databricks.delta.optimizeWrite.binSize", "128m") 
    # appropriate target file size for small tables up to 10GB in size
spark.conf.unset("spark.sql.parquet.vorder.default") 
    # unset so that vorder is not enabled by default at the session level and can be controlled at the table level as needed
spark.conf.set("spark.databricks.delta.properties.defaults.enableDeletionVectors", "true") 
    # improve performance of performing updates and deletes
spark.conf.set("spark.databricks.delta.optimize.maxFileSize", "128m") 
    # appropriate target file size for small tables up to 10GB in size

StatementMeta(, e346977a-d8d8-4349-ad64-a8d463239793, 6, Finished, Available, Finished)

#### **4.3.2.1 - Enable Delta Features on an Existing Table**
Now that we've tailored our Spark configurations to our specific workload. Newly created Delta tables will inherit these configurations. We can modify the V-Order and Deletion Vector properties on existing tables by altering the tables:

In [5]:
%%sql
ALTER TABLE golddenormalized.dbo.patientobservations_gold
SET TBLPROPERTIES (
    'delta.parquet.vorder.enabled' = 'false',
    'delta.enableDeletionVectors' = 'true'
)

StatementMeta(, e346977a-d8d8-4349-ad64-a8d463239793, 7, Finished, Available, Finished)

DataFrame[]

#### **4.3.2.2 - Audit Delta Table Features**
To verify that our table has the desired optimal features, we can use `DESCRIBE DETAIL` using SparkSQL to check what features are enabled on the table. Use `DESCRIBE DETAIL` to verify  that Deletion Vectors are enabled and that V-Order is not specified or disabled on `{gold_lakehouse}.dbo.patientobservations_gold`.

> ℹ️ _If you use a `%%sql` cell you will have to statically type the name of your gold lakehouse. You can always run `print(gold_lakehouse)` in a PySpark cell if you don't remember the name of it._

In [None]:
%%sql

> ℹ️ _Unhide the below cell if you need help with the above task._

In [None]:
%%sql
DESCRIBE DETAIL golddenormalized.dbo.patientobservations_gold

#### **4.3.2.3 - Run Statement with Write Optimized Delta Features**

In [6]:
%%sql
UPDATE golddenormalized.dbo.patientobservations_gold SET deceasedDateTime = NULL WHERE first_name = 'Dion244' and last_name = 'Bergnaum523'

StatementMeta(, e346977a-d8d8-4349-ad64-a8d463239793, 8, Finished, Available, Finished)

DataFrame[num_affected_rows: bigint]

The `UPDATE` statement should've run at about 2x faster, largely due to Deletion Vectors being enabled. Use `DESCRIBE HISTORY` on the table and inpsect the `operationMetrics` columns for the two `UPDATE` operations to see the difference with Deletion Vectors enabled.

In [None]:
%%sql


> ℹ️ _You can query Delta table history by creating a temp view and then querying it just like any table. Run the below to get a cleaner display comparing key write operation metrics._

In [None]:
spark.sql("DESCRIBE HISTORY golddenormalized.dbo.patientobservations_gold").createOrReplaceTempView("po_history")
history_df = spark.sql("SELECT version, operationMetrics, operationMetrics.numUpdatedRows, operationMetrics.numCopiedRows, operationMetrics.numDeletionVectorsAdded, operationMetrics.rewriteTimeMs from po_history where operation = 'UPDATE'")
display(history_df)

## **4.4 - Getting the Most Out of Table Clustering** | 🕑 2:30 – 2:35 PM

**PLACEHOLDER FOR TABLE CLUSTERING DIAGRAMS**

- Partitioning is generally only beneficial for tables > 1Tb in compressed size. Partitioning below 1Tb generally hurts read performance and causes unnessesary write overhead.
- Partitioning on high cardinality columns will quickly results in **small-file problems**.
- ✅ *Fix*: Choose partition columns based on **query patterns**, **cardinality**, and **evolution over time**. Evaluate regularly.

## **4.5 - Right-Sizing Spark for the Job** | 🕑 2:35 – 2:40 PM
**PENDING**

---
<br>

###### 🐢 **Underpowered or Poorly Sized Compute**

- **Small clusters** may save money up front—but if they cause excessive **shuffling, spilling, or GC**, they end up costing more in time and dollars.
- **Too many small executors** can be worse than **fewer large ones**, especially for shuffles and joins.
- ✅ *Fix*: Right-size your compute for the workload. Use **metrics like shuffle spill, memory usage, and GC time** to guide tuning.
- 👉 *Discussed in Section 4.5*

<br>

---
<br>

###### 🔄 **Too Much Caching / Wrong Things Cached**

- Caching large DataFrames unnecessarily eats up executor memory and causes GC issues.
- Worse, caching things that change between actions results in stale or invalid data.
- ✅ *Fix*: Only cache when reuse is **frequent and beneficial**. Unpersist when done.

<br>

## **4.6 - Writing Smarter Queries** | 🕑 2:45 – 2:55 PM



### 🎲 **4.6.1. Avoid Relying on Schema Inference in Production**

- Schema inference slows down reads, **increases job execution time**, and can result in **unexpected schema drift**.
- Particularly painful in high-scale scenarios.
- ✅ *Fix*: Define schemas explicitly in production jobs, especially for semi-structured data like JSON and any structured files without a schema header (CSV, Excel, etc.).

👉 _Let's explore how this impacts execution times. First, lets start with allowing Spark to automatically infer the schema._

In [None]:
from pyspark.sql.functions import from_json,col
from pyspark.sql.types import *

observations_shortcut_path="Files/observationsraw_1k/"
# Load JSON data
observations_raw_df = spark.read.json(observations_shortcut_path)

display(observations_raw_df)

We can get the underlying schema of any DataFrame via calling `.schema` on the DataFrame. Use the following cell to return the schema and then create a variable named `schema` with the schema that was returned.

StatementMeta(, ac84e9cd-2952-438f-a72f-32cfe9d38831, 11, Finished, Available, Finished)

> ℹ️ _Unhide the below cell if you need help with the above task._

In [66]:
observations_raw_df.schema
observations_schema = StructType([StructField('category', ArrayType(StructType([StructField('coding', ArrayType(StructType([StructField('code', StringType(), True), StructField('display', StringType(), True), StructField('system', StringType(), True)]), True), True)]), True), True), StructField('code', StructType([StructField('coding', ArrayType(StructType([StructField('code', StringType(), True), StructField('display', StringType(), True), StructField('system', StringType(), True)]), True), True), StructField('text', StringType(), True)]), True), StructField('component', ArrayType(StructType([StructField('code', StructType([StructField('coding', ArrayType(StructType([StructField('code', StringType(), True), StructField('display', StringType(), True), StructField('system', StringType(), True)]), True), True), StructField('text', StringType(), True)]), True), StructField('valueCodeableConcept', StructType([StructField('coding', ArrayType(StructType([StructField('code', StringType(), True), StructField('display', StringType(), True), StructField('system', StringType(), True)]), True), True), StructField('text', StringType(), True)]), True), StructField('valueQuantity', StructType([StructField('code', StringType(), True), StructField('system', StringType(), True), StructField('unit', StringType(), True), StructField('value', DoubleType(), True)]), True), StructField('valueString', StringType(), True)]), True), True), StructField('effectiveDateTime', StringType(), True), StructField('encounter', StructType([StructField('reference', StringType(), True)]), True), StructField('id', StringType(), True), StructField('issued', StringType(), True), StructField('meta', StructType([StructField('profile', ArrayType(StringType(), True), True)]), True), StructField('resourceType', StringType(), True), StructField('status', StringType(), True), StructField('subject', StructType([StructField('reference', StringType(), True)]), True), StructField('valueCodeableConcept', StructType([StructField('coding', ArrayType(StructType([StructField('code', StringType(), True), StructField('display', StringType(), True), StructField('system', StringType(), True)]), True), True), StructField('text', StringType(), True)]), True), StructField('valueQuantity', StructType([StructField('code', StringType(), True), StructField('system', StringType(), True), StructField('unit', StringType(), True), StructField('value', DoubleType(), True)]), True)])

StatementMeta(, f391dc54-7263-4511-aa38-ef2be63147a3, 68, Finished, Available, Finished)

Now run the code below to read the JSON files into a DataFrame, but with the statically defined schema to see the performance improvement. You should see much better performance.

In [10]:
from pyspark.sql.functions import from_json,col
observations_shortcut_path="Files/observationsraw_1k/"
# Load JSON data
observations_raw_df = spark.read.json(observations_shortcut_path, schema)

display(observations_raw_df)

StatementMeta(, ac84e9cd-2952-438f-a72f-32cfe9d38831, 12, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 62b828ab-fb65-4b37-92bd-139cb8e4a82b)

### ❓ **4.6.2 - Avoid Overusing Repartition or Coalesce Without Understanding**

- Over-partitioning increases task overhead and shuffle volume. Under-partitioning can lead to skew and stragglers.
- `repartition()` is rarely needed but can be extremely helpful when used intentionally and in the right scenarios.
- ✅ *Fix*: Base partitioning decisions on **data size** and **number of output files**. Use the **Spark UI** to see partition/task counts. If you don't know what you are doing, don't repartition.


### 🔄 **4.6.3 - Avoid Too Much Caching / Wrong Things Cached**

- Caching large DataFrames unnecessarily eats up executor memory and causes GC issues.
- Worse, caching things that change between actions results in stale or invalid data.
- ✅ *Fix*: Only cache when reuse is **frequent and beneficial**. Unpersist when done.


### 🧊 **4.6.4 - Avoid Using Expensive UDFs in Hot Paths**

- Python, Scala, or Java UDFs are black boxes to the Catalyst optimizer.
- They **disable query optimizations**, and often run **single-threaded** per executor task.
- ✅ *Fix*: Rewrite logic using Spark SQL functions or **pandas_udf** (if applicable). Avoid UDFs in `WHERE`, `JOIN`, and `GROUP BY` clauses.


### 📥 **4.6.5 - Avoid Data Skew**

- When a small number of keys have a disproportionate number of rows, tasks processing those keys become **stragglers**, slowing down the job.
- ✅ *Fix*: Detect skew via Spark UI or metrics. Use **salting**, **adaptive skew join**, or data bucketing for heavily skewed keys.


## **4.7 - Keeping Tables Healthy** | 🕑 2:55 – 3:00 PM

### **📉 4.7.1 - Avoiding Small File Problems**
- Delta Lake performance relies heavily on **file pruning** and **partition pruning**.
- If your table has thousands of tiny files (or large unpartitioned ones), query planning and scanning become inefficient.
- ✅ *Fix*: Use **Auto Compaction**, combined with **Optimized Write** where appropriate.

<br>

![Auto Compaction](https://milescole.dev/assets/img/posts/Compaction/auto-compaction.excalidraw.png)

### 🔁 **4.7.2 - Preventing Excessive Storage Costs**
- Delta tables **accumulate metadata and file over time**. Without regular maintenance, the number of files will continue to accumulate. While running `OPTIMIZE` will prevent performance issues due to accumulating many files, `VACUUM` will help prevent unnessesary storage costs.
- ✅ *Fix*: Set up recurring jobs to:
  - Clean up obsolete data (`VACUUM`) -> Prevents excessive storage costs


#### **4.7.2.1 - Run Vacuum to Clean Up Old Files**
Delta has a safety check to prevent a possible concurrent long running write operations from have data deleted pre-commit. If you are certain that there are no operations being performed on this table that take longer than the retention interval you plan to specify, you can turn off this safety check by setting the Spark configuration property `spark.databricks.delta.retentionDurationCheck.enabled` to `false`.

In [10]:
# allow low retention vacuum
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")

StatementMeta(, e346977a-d8d8-4349-ad64-a8d463239793, 12, Finished, Available, Finished)

Before running VACUUM, let's get a count of parquet files in the table's data directory:

In [8]:
files = notebookutils.fs.ls(f"abfss://{notebookutils.runtime.context['currentWorkspaceName']}@msit-onelake.dfs.fabric.microsoft.com/golddenormalized.Lakehouse/Tables/dbo/patientobservations_gold/")
all_files = [f.path for f in files if f.path.endswith('.parquet')]
print(len(all_files))

StatementMeta(, e346977a-d8d8-4349-ad64-a8d463239793, 10, Finished, Available, Finished)

3


In [11]:
%%sql
VACUUM golddenormalized.dbo.patientobservations_gold RETAIN 0 HOURS

StatementMeta(, e346977a-d8d8-4349-ad64-a8d463239793, 13, Finished, Available, Finished)

DataFrame[path: string]

#### **4.7.2.2 - Verify Only Active Files Remain**
Now that VACUUM has been run, let's verify that files pertaining to older table versions has been cleaned

In [12]:
files = notebookutils.fs.ls(f"abfss://{notebookutils.runtime.context['currentWorkspaceName']}@msit-onelake.dfs.fabric.microsoft.com/golddenormalized.Lakehouse/Tables/dbo/patientobservations_gold/")
all_files = [f.path for f in files if f.path.endswith('.parquet')]
print(len(all_files))

StatementMeta(, e346977a-d8d8-4349-ad64-a8d463239793, 14, Finished, Available, Finished)

2


To verify that these are files in the active Delta version, we can use `df.inputFiles()` to return the files used to return a query against the table.

In [23]:
active_files = spark.sql(f"SELECT * FROM golddenormalized.dbo.patientobservations_gold").inputFiles()
print(active_files)

StatementMeta(, e346977a-d8d8-4349-ad64-a8d463239793, 25, Finished, Available, Finished)

['abfss://d24c0aa4-e6b2-4571-8e9a-6ae82ebf926d@msit-onelake.dfs.fabric.microsoft.com/d92e236c-12ca-4560-9e87-4b526cc19972/Tables/dbo/patientobservations_gold/part-00003-69f38967-0677-4a61-addb-234282d54b92-c000.snappy.parquet', 'abfss://d24c0aa4-e6b2-4571-8e9a-6ae82ebf926d@msit-onelake.dfs.fabric.microsoft.com/d92e236c-12ca-4560-9e87-4b526cc19972/Tables/dbo/patientobservations_gold/part-00000-997a6306-b40c-4c0c-a469-dd21708a7e73-c000.snappy.parquet']


Now we'll convert the two lists to sets so that we can easily subtract the two to veryify that all files from old versions has been vacuumed.

In [24]:
all_files = [f.path.split('/')[-1] for f in files if f.path.endswith('.parquet')]
active_files = [f.split('/')[-1] for f in active_files]
result = list(set(all_files) - set(active_files))
assert len(result) == 0

StatementMeta(, e346977a-d8d8-4349-ad64-a8d463239793, 26, Finished, Available, Finished)

## **BONUS: Cheat Sheet - Reasons for Sub-Optimal Performance**
---
<br>

###### ❌ **1. Not Using the Native Execution Engine**

- If you aren't using the Native Execution, and thus operating on the traditional Spark JVM-based execution engine, you’re **missing out on huge performance gains**—we’re talking 2x–5x improvements in some cases.
- ✅ *Fix*: Validate engine usage via the Spark UI or logs. Ensure compatible functions and workloads are used to **unlock native execution**.
- 👉 *Deep dive in Section 4.2*

<br>

---
<br>

###### ⚠️ **2. Using Features Without a Use Case**

- Features like **Deletion Vectors**, **Change Data Feed**, **Optimized Writes**, and **V-Order** are powerful—but they **add overhead**.
- When enabled on high-ingest or read-intensive tables **without a clear need**, they increase metadata size, write cost, and memory usage.
- ✅ *Fix*: Evaluate features **per table and per workload zone** based on workload requirements.
- 👉 *Best practices are covered in Section 4.3*

<br>

---
<br>

###### 🌀 **3. Mixing Execution Contexts**

- Combining **Spark, Pandas, Polars, DuckDB**, etc. in a single pipeline adds **serialization overhead**, breaks optimizations, and introduces **data duplication in memory**.
- While flexible, it reduces the optimizer’s ability to plan holistically.
- ✅ *Fix*: Stick to one processing engine per stage of the pipeline. Use cross-context conversions **sparingly and intentionally**.

<br>

---
<br>

###### 🐢 **4. Underpowered or Poorly Sized Compute**

- **Small clusters** may save money up front—but if they cause excessive **shuffling, spilling, or GC**, they end up costing more in time and dollars.
- **Too many small executors** can be worse than **fewer large ones**, especially for shuffles and joins.
- ✅ *Fix*: Right-size your compute for the workload. Use **metrics like shuffle spill, memory usage, and GC time** to guide tuning.
- 👉 *Discussed in Section 4.5*

<br>

---
<br>

###### 🎲 **5. Relying on Schema Inference in Production**

- Schema inference slows down reads, increases job startup time, and can result in **unexpected schema drift**.
- Particularly painful in high-scale scenarios.
- ✅ *Fix*: Define schemas explicitly in production jobs, especially for semi-structured data like JSON and any structured files without a schema header (CSV, Excel, etc.).
- 👉 *Discussed in Section 4.6*

<br>

---
<br>

###### 📉 **6. Poor File Layout and Small File Problem**

- Delta Lake performance relies heavily on **file pruning** and **partition pruning**.
- If your table has thousands of tiny files (or large unpartitioned ones), query planning and scanning become inefficient.
- ✅ *Fix*: Use **Auto Compaction**, combined with **Optimized Write** where appropriate.

<br>

---
<br>

###### 🔁 **7. No Table Maintenance Strategy**
- Delta tables **accumulate metadata and file churn**. Without regular maintenance, performance **gradually decays**.
- ✅ *Fix*: Set up recurring jobs to:
  - Compact files (`OPTIMIZE`) -> Maintains performance
  - Clean up obsolete data (`VACUUM`) -> Prevents excessive storage costs

<br>

---
<br>

###### ❓ **8. Overusing Repartition or Coalesce Without Understanding**

- Over-partitioning increases task overhead and shuffle volume. Under-partitioning can lead to skew and stragglers.
- `repartition()` is rarely needed but can be extremely helpful when used intentionally and in the right scenarios.
- ✅ *Fix*: Base partitioning decisions on **data size** and **number of output files**. Use the **Spark UI** to see partition/task counts. If you don't know what you are doing, don't repartition.

<br>

---
<br>

###### 📊 **9. Poor Partitioning Strategy**

- Partitioning is generally only beneficial for tables > 1Tb in compressed size. Partitioning below 1Tb generally hurts read performance and causes unnessesary write overhead.
- Partitioning on high cardinality columns will quickly results in **small-file problems**.
- ✅ *Fix*: Choose partition columns based on **query patterns**, **cardinality**, and **evolution over time**. Evaluate regularly.

<br>

---
<br>

###### 🔄 **10. Too Much Caching / Wrong Things Cached**

- Caching large DataFrames unnecessarily eats up executor memory and causes GC issues.
- Worse, caching things that change between actions results in stale or invalid data.
- ✅ *Fix*: Only cache when reuse is **frequent and beneficial**. Unpersist when done.

<br>

---
<br>

###### 🧊 **11. Using Expensive UDFs in Hot Paths**

- Python, Scala, or Java UDFs are black boxes to the Catalyst optimizer.
- They **disable query optimizations**, and often run **single-threaded** per executor task.
- ✅ *Fix*: Rewrite logic using Spark SQL functions or **pandas_udf** (if applicable). Avoid UDFs in `WHERE`, `JOIN`, and `GROUP BY` clauses.

<br>

---
<br>

###### 📥 **12. Lack of Data Skew Handling**

- When a small number of keys have a disproportionate number of rows, tasks processing those keys become **stragglers**, slowing down the job.
- ✅ *Fix*: Detect skew via Spark UI or metrics. Use **salting**, **adaptive skew join**, or data bucketing for heavily skewed keys.

<br>
