In [0]:
display(spark.read.table("example.bronze_db.customers_view_filled"))

In [0]:
data = spark.read.table("example.bronze_db.customers_view_filled")
# df = data.toPandas()

In [0]:
# df.columns

Index(['customer_id', 'tax_id', 'tax_code', 'customer_name', 'state', 'city',
       'postcode', 'street', 'number', 'unit', 'region', 'district', 'lon',
       'lat', 'ship_to_address', 'valid_from', 'units_purchased',
       'loyalty_segment'],
      dtype='object')

In [0]:
# for i in df.columns:
#     print("="*10,i,"="*10)
#     print(df[i].unique())

[11123757. 30585978.   349822. ...  3128658. 27753282. 47564070.]
['322056439' '215191847' '733090732' ... '288683016' '663850404'
 '923301135']
['A']
['SMITH,  SHIRLEY' 'STEPHENS,  GERALDINE M' 'GUZMAN,  CARMEN' ...
 'nupower inc' 'northwest state community college' 'trom enterprises']
['IN' 'OR' 'VA' 'WI' 'OH' 'NY' 'MI' 'PA' 'MA' 'CA' 'NJ' 'FL' 'GA' 'IL'
 'UT' 'WA' 'MN' 'NC' 'HI' 'AZ' 'ND' 'KS' 'ME' 'CO' 'AL' 'TN' 'MO' 'MD'
 'DC' 'SC' 'MT' 'TX' 'ID' 'NE' 'KY' 'LA' 'IA' 'VT' 'AK' 'WY' 'NM' 'RI'
 'OK' 'NV' 'AR' 'DE' 'CT' 'SD' 'MS' 'WV' 'NH']
['BREMEN' 'ADDRESS' 'VIENNA' ... 'BADGER' 'NORTHVALE' 'HASBROUCK HEIGHTS']
['46506.0' '0' '22181' ... '93603.0' '7647.0' '7604.0']
['N CENTER ST' 'NO SITUS' 'HILL RD' ... 'South Sicily Circle'
 'CREIGHTON RD' 'BELLAIRE DR']
['521.0' '0' '2860' ... '18960' '5691' '416B']
['1' 'APT 102' 'A' ... 'Unit 2208' 'A-104' 'APT B3']
['Indiana' 'NY' 'VA' 'OH' 'MA' 'FL' 'IL' 'UT' 'NJ' 'HI' 'MI' 'ND' 'PA'
 'KS' 'OR' 'ME' 'CO' 'TN' ' ' 'MO' 'MN' 'DC' 'MT' 'CA' 'N

In [0]:
# df.shape

(28813, 18)

---

# Query Execution Plans and Partitioning:

In [0]:
# Create a filtered dataframe (Lazy evaluation - nothing happens yet)
filtered_df = data.filter((data.state == "TX") & (data.loyalty_segment == 3))

# ACTION 1: See the Physical Plan
print("=== Explain Plan ===")
filtered_df.explain()

=== Explain Plan ===
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   ColumnarToRow
   +- PhotonResultStage
      +- PhotonProject [coalesce(cast(customer_id#16981L as double), avg_customer_id#16635) AS customer_id#16665, coalesce(tax_id#16982, mode_tax_id#16637) AS tax_id#16666, coalesce(tax_code#16983, mode_tax_code#16639) AS tax_code#16667, coalesce(customer_name#16984, mode_customer_name#16641) AS customer_name#16668, coalesce(state#16985, mode_state#16643) AS state#16669, coalesce(city#16986, mode_city#16645) AS city#16670, coalesce(postcode#16987, mode_postcode#16647) AS postcode#16671, coalesce(street#16988, mode_street#16649) AS street#16672, coalesce(number#16989, mode_number#16651) AS number#16673, coalesce(unit#16990, mode_unit#16653) AS unit#16674, coalesce(region#16991, mode_region#16655) AS region#16675, coalesce(district#16992, mode_district#16657) AS district#16676, coalesce(lon#16993, avg_lon#16658) AS lon#16677, coalesce(lat#16994, avg_

### Since this dataframe `customers_view_filled` has logic built into it to fill null values (imputation) using the Mode (most frequent value) or Average. For each simple query, it has to recalculate all these stuff which is computationally loss. So, i am writing it as delta table so the spark will only performs the task it was given instead of calculating all these stuff

In [0]:
%sql
create volume if not exists example.bronze_db.customers_partitioned

In [0]:
table_path = "/Volumes/example/bronze_db/customers_partitioned"
data.write.format("delta").mode("overwrite").partitionBy('state').save(table_path)

In [0]:
df = spark.read.format("delta").load(table_path)
display(df)

customer_id,tax_id,tax_code,customer_name,state,city,postcode,street,number,unit,region,district,lon,lat,ship_to_address,valid_from,units_purchased,loyalty_segment
11928169.0,322056439,A,"OUTTEN, MIA G",NJ,PENNS GROVE,8069.0,Mill Street,89,1,NY,FRA,-75.4745981,39.72958970000001,"NJ, 8069.0, Mill Street, 89",1532887809.0,6.0,1.0
10603598.0,322056439,A,"CRYLEN, TAMMIE",NJ,SEA ISLE CITY,8243.0,Central Avenue North,6509,1,NY,FRA,-74.70574920000001,39.1399165,"NJ, 8243.0, Central Avenue North, 6509",1541181492.0,4.0,0.0
10549224.0,322056439,A,"DWYER, THOMAS H",NJ,VILLAS,8251.0,East Atlantic Avenue,48,1,NY,FRA,-74.9314136,39.0278947,"NJ, 8251.0, East Atlantic Avenue, 48",1521481144.0,5.0,0.0
9722000.0,322056439,A,"POLLEY, BARBARA E",NJ,BUTLER,7405.0,LAKESIDE TRL,6,1,NJ,FRA,-74.3544995,40.9758008,"NJ, 7405.0, LAKESIDE TRL, 6",1521882767.0,10.0,2.0
11913053.0,322056439,A,"ARROYO, JORGE",NJ,WEST MILFORD,7480.0,Macopin Road,485,1,NY,FRA,-74.3758848,41.0296687,"NJ, 7480.0, Macopin Road, 485",1537780231.0,0.0,0.0
12032784.0,322056439,A,"MCGOVERN, KYLE M",NJ,WARREN,7059.0,Saw Mill Road,19,1,NY,FRA,-74.4993575,40.6422208,"NJ, 7059.0, Saw Mill Road, 19",1525221015.0,6.0,1.0
12361005.0,322056439,A,"SANTOYO, JOSE A",NJ,DOVER,7801.0,West Clinton Street,435,1,NY,FRA,-74.5672107,40.9010068,"NJ, 7801.0, West Clinton Street, 435",1521613876.0,0.0,0.0
10953050.0,322056439,A,"OSWALD, ERIC F",NJ,SEWELL,8080.0,Firethorne Court,308,1,NY,FRA,-75.05286,39.7221983,"NJ, 8080.0, Firethorne Court, 308",1530205311.0,3.0,0.0
11023716.0,322056439,A,"BLAHUSIAK IV, JOHN",NJ,COLUMBUS,7086.0,Boulevard Ea,725,1,NY,FRA,-74.01834040000001,40.7713876,"NJ, 7086.0, Boulevard Ea, 725",1529342542.0,6.0,1.0
10076086.0,322056439,A,"WEBB LAY, VANESSA F",NJ,RIDGEFIELD PARK,7660.0,Spruce Avenue,47,1,NY,FRA,-74.0260282,40.856248,"NJ, 7660.0, Spruce Avenue, 47",1518147991.0,4.0,0.0


In [0]:
filtered_df = df.filter((df.state == "TX") & (df.loyalty_segment == 3))

# ACTION 1: See the Physical Plan
print("=== Explain Plan ===")
filtered_df.explain()


=== Explain Plan ===
== Physical Plan ==
*(1) ColumnarToRow
+- PhotonResultStage
   +- PhotonProject [customer_id#17994, tax_id#17995, tax_code#17996, customer_name#17997, state#17998, city#17999, postcode#18000, street#18001, number#18002, unit#18003, region#18004, district#18005, lon#18006, lat#18007, ship_to_address#18008, valid_from#18009, units_purchased#18010, loyalty_segment#18011]
      +- PhotonScan parquet [customer_id#17994,tax_id#17995,tax_code#17996,customer_name#17997,city#17999,postcode#18000,street#18001,number#18002,unit#18003,region#18004,district#18005,lon#18006,lat#18007,ship_to_address#18008,valid_from#18009,units_purchased#18010,loyalty_segment#18011,state#17998] DataFilters: [isnotnull(loyalty_segment#18011), (loyalty_segment#18011 = 3.0)], DictionaryFilters: [(loyalty_segment#18011 = 3.0)], Format: parquet, Location: PreparedDeltaFileIndex(1 paths)[dbfs:/Volumes/example/bronze_db/customers_partitioned], OptionalDataFilters: [], PartitionFilters: [isnotnull(state

### Look at that! Compare this to your previous "monster" plan. It went from dozens of lines of complex math and joins to a single, clean scan. This is the hallmark of a high-performance Big Data pipeline.

---

# Optimize and apply Z-Order(a.k.a Sorting)

In [0]:
%sql
OPTIMIZE delta.`/Volumes/example/bronze_db/customers_partitioned` 
ZORDER BY (customer_id);

path,metrics
dbfs:/Volumes/example/bronze_db/customers_partitioned,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 51, List(minCubeSize(107374182400), List(0, 0), List(51, 2837427), 0, List(0, 0), 0, null), null, 0, 0, 51, 51, false, 0, 0, 1768739117673, 1768739123130, 8, 0, null, List(0, 0), null, 18, 18, 0, 0, null)"


In [0]:
from delta.tables import DeltaTable
# Re-initialize the object to ensure it picks up the latest version
deltaTable = DeltaTable.forPath(spark, "/Volumes/example/bronze_db/customers_partitioned")
display(deltaTable.history())

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
2,2026-01-18T12:24:56.000Z,73331271719467,kalyanmistcse@gmail.com,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [""state""])",,List(1379506041694369),0118-122226-ucqupaow-v2n,1.0,WriteSerializable,False,"Map(numFiles -> 51, numRemovedFiles -> 51, numRemovedBytes -> 2837427, numDeletionVectorsRemoved -> 0, numOutputRows -> 28813, numOutputBytes -> 2837427)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
1,2026-01-18T09:23:18.000Z,73331271719467,kalyanmistcse@gmail.com,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [""state""])",,List(1379506041694369),0118-081903-a997t9ta-v2n,0.0,WriteSerializable,False,"Map(numFiles -> 51, numRemovedFiles -> 51, numRemovedBytes -> 2837427, numDeletionVectorsRemoved -> 0, numOutputRows -> 28813, numOutputBytes -> 2837427)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
0,2026-01-18T08:49:51.000Z,73331271719467,kalyanmistcse@gmail.com,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [""state""])",,List(1379506041694369),0118-081903-a997t9ta-v2n,,WriteSerializable,False,"Map(numFiles -> 51, numRemovedFiles -> 0, numRemovedBytes -> 0, numDeletionVectorsRemoved -> 0, numOutputRows -> 28813, numOutputBytes -> 2837427)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13


In [0]:
# List the contents of the volume path
display(dbutils.fs.ls("/Volumes/example/bronze_db/customers_partitioned"))

path,name,size,modificationTime
dbfs:/Volumes/example/bronze_db/customers_partitioned/_delta_log/,_delta_log/,0,1768728210266
dbfs:/Volumes/example/bronze_db/customers_partitioned/state=AK/,state=AK/,0,1768728210266
dbfs:/Volumes/example/bronze_db/customers_partitioned/state=AL/,state=AL/,0,1768728210266
dbfs:/Volumes/example/bronze_db/customers_partitioned/state=AR/,state=AR/,0,1768728210266
dbfs:/Volumes/example/bronze_db/customers_partitioned/state=AZ/,state=AZ/,0,1768728210266
dbfs:/Volumes/example/bronze_db/customers_partitioned/state=CA/,state=CA/,0,1768728210266
dbfs:/Volumes/example/bronze_db/customers_partitioned/state=CO/,state=CO/,0,1768728210266
dbfs:/Volumes/example/bronze_db/customers_partitioned/state=CT/,state=CT/,0,1768728210266
dbfs:/Volumes/example/bronze_db/customers_partitioned/state=DC/,state=DC/,0,1768728210266
dbfs:/Volumes/example/bronze_db/customers_partitioned/state=DE/,state=DE/,0,1768728210266


In [0]:
from delta.tables import DeltaTable

deltaTable = DeltaTable.forPath(
    spark,
    "/Volumes/example/bronze_db/customers_partitioned"
)
display(deltaTable.history())

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2026-01-18T09:23:18.000Z,73331271719467,kalyanmistcse@gmail.com,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [""state""])",,List(1379506041694369),0118-081903-a997t9ta-v2n,0.0,WriteSerializable,False,"Map(numFiles -> 51, numRemovedFiles -> 51, numRemovedBytes -> 2837427, numDeletionVectorsRemoved -> 0, numOutputRows -> 28813, numOutputBytes -> 2837427)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
0,2026-01-18T08:49:51.000Z,73331271719467,kalyanmistcse@gmail.com,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [""state""])",,List(1379506041694369),0118-081903-a997t9ta-v2n,,WriteSerializable,False,"Map(numFiles -> 51, numRemovedFiles -> 0, numRemovedBytes -> 0, numDeletionVectorsRemoved -> 0, numOutputRows -> 28813, numOutputBytes -> 2837427)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13


---
# Its time for Benchmark improvements!

Benchmark Improvements isn't just about "seeing a faster number"—it is about understanding the reduction in Resource Consumption and I/O Overhead.

In Spark, benchmarking measures how much work the cluster doesn't have to do thanks to your optimizations.

### 🚀 The Three Pillars of Improvement
1. **Eliminating "On-the-Fly" Computation (CPU Savings)**

Original: Your original DataFrame df contained logic to fill nulls using coalesce and global modes. Every time you ran a query, Spark had to calculate those averages/modes across all 28,000+ rows before it could even start filtering.

Optimized: By writing the data to a Volume, you "materialized" it. The nulls are already filled on disk, so Spark's CPU usage drops because it no longer performs that math during every search.

2. **Directory Pruning (I/O Savings)**

Original: To find a customer in Texas, Spark had to scan the entire file because the data was unsorted.

Optimized: Because you used partitionBy("state"), Spark uses Directory Pruning. It looks at your filter WHERE state = 'TX' and immediately jumps to the /state=TX/ folder, ignoring all other states.

The Benefit: If you have 50 states, you just reduced the amount of data read by roughly 98%.

3. **Data Skipping with Z-ORDER (File-Level Savings)**


Original: Even inside the Texas folder, Spark would have to read every single row to find a specific customer_id.

Optimized: You applied Z-ORDER on customer_id, which physically clusters similar IDs together in the Parquet files.

The Benefit: Spark checks the metadata (min/max values) of each file. If you are looking for ID 30585978 and a file's metadata says its range is 1000 to 5000, Spark skips that file entirely without opening it.

Efficiency Gain: 0.72x faster
