# Mod 09: Adaptive Query Execution

### Lab 00: Setup Data sets for Labs

In [0]:
# Dynamic Partition Pruning Lab setup: Create Schema
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DecimalType

fireSchema = StructType([StructField('IncidentNumber', IntegerType(), True),
StructField('CallType', StringType(), True), StructField('ALSUnit', BooleanType(), True),
StructField('CallTypeGroup', StringType(), True),StructField('UnitType', StringType(), True), 
StructField('NeighborhoodDistrict', StringType(), True), StructField('ReceivedDtTmTS', StringType(), True)])

In [0]:
# Dynamic Partition Pruning Lab setup: Apply Schema, convert to DF and create Non-Partitioned View and Display

fireDF = spark.read.option("header","true").option("schema","fireSchema").option("sep","\t").csv("/FileStore/tables/fire_callsX_comma.csv")
display(fireDF)

In [0]:
# Dynamic Partition Pruning Lab setup: Create Non-Partitioned object
fireDF.write.mode("overwrite").parquet("dbfs:/FileStore/tables/fire_nonpart")

In [0]:
# Dynamic Partition Pruning Lab setup: Create Partitioned object (Notice 'partitionBy' function)
fireDF.write.mode("overwrite").partitionBy("NeighborhoodDistrict").parquet("dbfs:/FileStore/tables/fire_part")

In [0]:
# Dynamic Partition Pruning Lab setup: Create Schema and Apply. Convert to Temp View
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DecimalType
neighborSchema = StructType([StructField('Neighborhood', StringType(), True)])   

spark.read.csv("dbfs:/FileStore/tables/neighbooddistricts.csv", header=False, schema=neighborSchema).createOrReplaceTempView("neighbor_view")

display(spark.sql("SELECT * FROM neighbor_view"))

In [0]:
# Dynamic Partition Pruning Lab setup:: Create DIM Parquet 

dimDF = spark.read.csv("dbfs:/FileStore/tables/neighbooddistricts.csv", header=False, schema=neighborSchema)
dimDF.write.mode("overwrite").parquet("dbfs:/FileStore/tables/dim01")
dimDF1 = spark.read.format("parquet").load("dbfs:/FileStore/tables/dim01")
display(dimDF1)

In [0]:
%scala
// Setup for Coalesce Shuffle Partitions: First create 2 objects we will be JOINing

val txDF = spark.range(10 * 1000) // 10,000 rows
val companyDF = spark.range(100)

def genRandomCompanyTx: ( () => (Int, String, String, String) ) = {
  () => if ( scala.util.Random.nextInt(100) <= 10)
  (1, //Very Big Company! (10%)
  scala.util.Random.alphanumeric.take(5).mkString(""),
  scala.util.Random.alphanumeric.take(5).mkString(""),
  scala.util.Random.alphanumeric.take(5).mkString(""))
  else
  (scala.util.Random.nextInt(100)%99,
  scala.util.Random.alphanumeric.take(5).mkString(""),
  scala.util.Random.alphanumeric.take(5).mkString(""),
  scala.util.Random.alphanumeric.take(5).mkString(""))
}

def genCompanyLookup: ( (Int) => (String) ) = {
  (id) => if (id ==1) ("Very Big Company") else ("Company "+id)
  }

val gen = org.apache.spark.sql.functions.udf(genRandomCompanyTx)
val lkp = org.apache.spark.sql.functions.udf(genCompanyLookup)

val sample = txDF.withColumn("newCol", gen()).select("id", "newCol.*").toDF("tx_id", "company_id", "field1", "field2", "field3")
sample.createOrReplaceTempView("sample")

val lookup = companyDF.withColumn("company", lkp(companyDF("id")))
lookup.createOrReplaceTempView("lookup")

In [0]:
%scala
// Setup for Coalesce Shuffle Partitions: Write data to File System

sample.write.format("csv").mode("overwrite").save("dbfs:/FileStore/tables/sample")
lookup.write.format("csv").mode("overwrite").save("dbfs:/FileStore/tables/lookup")

In [0]:
# Setup for Coalesce Shuffle Partitions: Create Schemas for our 2 Dataframes

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType

sampleSchema = StructType([StructField('tx_id', IntegerType(), True),
                             StructField('company_id', IntegerType(), True),
                             StructField('field1', StringType(), True),
                             StructField('field2', StringType(), True),
                             StructField('field3', StringType(), True)])

lookupSchema = StructType([StructField('id', IntegerType(), True),
                             StructField('company', StringType(), True)])

In [0]:
# Setup for Coalesce Shuffle Partitions: Read into Dataframes

sampleDF = spark.read.load("dbfs:/FileStore/tables/sample/", format = "csv", header = "false", schema = sampleSchema)
lookupDF = spark.read.load("dbfs:/FileStore/tables/lookup/", format = "csv", header = "false", schema = lookupSchema)

In [0]:
# Setup for Coalesce Shuffle Partitions: Convert CSV to Parquet

sampleDF.write.format("parquet").mode("overwrite").save("dbfs:/FileStore/tables/sample_parq")
lookupDF.write.format("parquet").mode("overwrite").save("dbfs:/FileStore/tables/lookup_parq")

In [0]:
# Setup for Coalesce Shuffle Partitions: Create 2 Dataframes

sampleDF = spark.read.load("dbfs:/FileStore/tables/sample/", format = "parquet", header = "false", schema = sampleSchema)
lookupDF = spark.read.load("dbfs:/FileStore/tables/lookup/", format = "parquet", header = "false", schema = lookupSchema)

In [0]:
%scala
// Setup for Skew Partitions: Create Data objects

import scala.util.Random
import scala.math.BigDecimal

case class MakeModel(make: String, model: String)

case class T1(registration: String, make: String, model: String, engine_size: BigDecimal)

case class T2(make: String, model: String, engine_size: BigDecimal, sale_price: Double)

    val makeModelSet: Seq[MakeModel] = Seq(
      MakeModel("FORD", "FIESTA")
      , MakeModel("NISSAN", "QASHQAI")
      , MakeModel("HYUNDAI", "I20")
      , MakeModel("SUZUKI", "SWIFT")
      , MakeModel("MERCEDED_BENZ", "E CLASS")
      , MakeModel("VAUXHALL", "CORSA")
      , MakeModel("FIAT", "500")
      , MakeModel("SKODA", "OCTAVIA")
      , MakeModel("KIA", "RIO")
    )

    def randomMakeModel(): MakeModel = {
      val makeModelIndex = if (Random.nextBoolean()) 0 else Random.nextInt(makeModelSet.size)
      makeModelSet(makeModelIndex)
    }

    def randomEngineSize() = BigDecimal(s"1.${Random.nextInt(9)}")

    def randomRegistration(): String = s"${Random.alphanumeric.take(7).mkString("")}"

    def randomPrice() = 500 + Random.nextInt(5000)

    def randomT1(): T1 = {
      val makeModel = randomMakeModel()
      T1(randomRegistration(), makeModel.make, makeModel.model, randomEngineSize())
    }

    def randomT2(): T2 = {
      val makeModel = randomMakeModel()
      T2(makeModel.make, makeModel.model, randomEngineSize(), randomPrice())
    }

    val t1 = Seq.fill(10000)(randomT1()).toDS()

    val t2 = Seq.fill(100000)(randomT2()).toDS()

In [0]:
%scala
// Setup for Skew Partitions:  Write to File

t1.write.format("parquet").mode("overwrite").save("dbfs:/FileStore/tables/t1")
t2.write.format("parquet").mode("overwrite").save("dbfs:/FileStore/tables/t2")

## Mod 09 AQE: Adapative Query Execution

### Lab 01: Converting SortMergeJoin into a BroadcastHashJoin

In [0]:
# Lab 01a: Know the Data (Emp and Dept)
display(spark.read.parquet("dbfs:/FileStore/tables/emp_snappy.parquet/"))
display(spark.read.format("parquet").load("dbfs:/FileStore/tables/dept_snappy.parquet/"))

In [0]:
# Lab 01b: Create DFs 
empDF = spark.read.format("parquet").load("dbfs:/FileStore/tables/emp_snappy.parquet/")
deptDF = spark.read.format("parquet").load("dbfs:/FileStore/tables/dept_snappy.parquet/")

# Convert DF into Spark Views          
empDF.createOrReplaceTempView("emp_view")
deptDF.createOrReplaceTempView("dept_view")

In [0]:
# Lab 01c: Turn off both BroadcastHashJoins and AQE
spark.conf.set("spark.sql.autoBroadcastJoinThreshold",-1)  
spark.conf.set("spark.sql.adaptive.enabled",False)

In [0]:
# Lab 01d: Do SortMergeJoin (Open any Job -> SQL -> Click on Description)
display(empDF.join(deptDF, "dept").select("last_name", "dept", "dept_name").limit(4))

In [0]:
# Lab 01e: Turn on BroadcastHashJoin and AQE and execute again.
#         (Open any Job -> SQL -> Click on Description)
#         Did Performance Improve based on Clock time compared to SortMergeJoin?
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10485760)
spark.conf.set("spark.sql.adaptive.enabled",True)

display(empDF.join(deptDF, "dept").select("last_name", "dept", "dept_name"))

### Lab 02: Dynamic Coalesce Shuffle Partitions

In [0]:
# Lab 02a: Configure settings to not use AQE first

# Disable BroadcastHashJoins to force a SortMergeJoin
spark.conf.set("spark.sql.autoBroadcastJoinThreshold",-1)  

# Disable AQE
spark.conf.set("spark.sql.adaptive.enabled",False)

# Force # of Shuffle Partitions = 50 as MAX. Default = 200
spark.conf.set("spark.sql.shuffle.partitions", 50)

In [0]:
# Lab 02b: Know the Data
spark.read.load("dbfs:/FileStore/tables/sample_parq/", format = "parquet").createOrReplaceTempView("sample")
spark.read.load("dbfs:/FileStore/tables/lookup_parq/", format = "parquet").createOrReplaceTempView("lookup")

# Notice 'company_id' = 1 will be a large Partition when compared to rest when JOIN on 'company_id' column
display(spark.sql("SELECT company_id, count(tx_id) as transactions FROM sample GROUP BY company_id ORDER BY transactions DESC LIMIT 10"))

# Here's other Table we will be JOINing
display(spark.sql("SELECT * FROM lookup ORDER BY id"))

In [0]:
# Lab 02c: Notice Hint to force SortMergeJoin (another Spark 3.x functionality)
#          From Spark UI -> SQL -> Notice Lack of 'CustomShuffleReader' in DAG

display(spark.sql("SELECT /*+MERGE(sample, lookup)*/sample.tx_id, lookup.company FROM sample JOIN lookup ON sample.company_id = lookup.id"))

In [0]:
# Lab 02d: Enable both AQE and Coalesce Partitions
spark.conf.set("spark.sql.adaptive.enabled", True)

# When true and spark.sql.adaptive.enabled = true, Spark will coalesce contiguous shuffle partitions according to the target size 
# (specified by 'spark.sql.adaptive.advisoryPartitionSizeInBytes'), to avoid too many small tasks.
spark.conf.set("spark.sql.adaptive.coalescePartitions", True)

In [0]:
# Lab 02e: Drop HINT to force SORT MERGE JOIN (instead of BroadcastHashJoin)
# With AQE, it will Coalesce Shuffle Partitions
# Spark UI -> SQL -> Look for 'CustomShuffleReader'

display(spark.sql("SELECT /*+MERGE(sample, lookup)*/ sample.tx_id, lookup.company, sample.field1 FROM sample JOIN lookup ON sample.company_id = lookup.id"))

### Lab 03 AQE: Handling Skew Partitions

In [0]:
%scala
// Lab 03a: Load the Data

val t1DF = spark.read.parquet("dbfs:/FileStore/tables/t1/")
val t2DF = spark.read.parquet("dbfs:/FileStore/tables/t2/")

t1DF.createOrReplaceTempView("t1_view")
t2DF.createOrReplaceTempView("t2_view")

In [0]:
%sql
-- Lab 03b: Know the Data
SELECT * FROM t1_view

In [0]:
%sql
-- Lab 03c: Know the Data
SELECT * FROM t2_view

In [0]:
%sql
-- Lab 03d: Notice Skew for 'Ford Fiesta'
SELECT make, model, count(*) AS cnt FROM t2_view GROUP BY make, model ORDER BY cnt DESC

In [0]:
%scala
// Lab 02:  View Spark UI to find Bottleneck.  It's a Skew Partition issue (2 minute query)
import org.apache.spark.sql.functions._
import scala.collection.Seq

// We disable Broadcast join and AQE, then JOIN on 'make' and 'model'
// In order to see our Skew happening, we need to suppress this behaviour
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.adaptive.enabled",false)

// Skew eats up 2 Minutes in one of the Stages.  Ouch!!
display(t1DF.join(t2DF, Seq("make", "model"))
.filter(abs(t2DF("engine_size") - t1DF("engine_size")) <= BigDecimal("0.1"))
  .groupBy("registration")
  .agg(avg("sale_price").as("average_price")).collect())

In [0]:
%scala
// Lab 03e: View Spark UI to find Bottleneck.  It's a Skew Partition issue (2 minute query)

import org.apache.spark.sql.functions._

// We disable Broadcast join and AQE, then JOIN on 'make' and 'model'
// In order to see our Skew happening, we need to suppress this behaviour
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.adaptive.enabled", false)

// Skew eats up 2 Minutes in one of the Stages.  Ouch!!
display(t1DF.join(t2DF, Seq("make", "model"))
   .filter(abs(t2DF("engine_size") - t1DF("engine_size")) <= BigDecimal("0.1"))
  .groupBy("registration")
  .agg(avg("sale_price").as("average_price")).collect())

In [0]:
%scala
// Lab 03f: Let AQE and let it figure out the Skew problem and fix it automatically
// First configure the Settings

import org.apache.spark.sql.functions._

// We disable Broadcast join and enable AQE
// In order to see our skew happening, we need to suppress this behaviour
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.adaptive.enabled", true)

// I added this to see if it would work
// Disable coalesce Partitions so Skew occurs
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", false)
spark.conf.set("spark.sql.shuffle.partitions", 200)

// A Partition is considered as skewed if its size is larger than this factor multiplying 
// The median partition size and also larger than //spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
spark.conf.set("spark.sql.adaptive.skewedPartitionFactor", 2)

// A Partition is considered  skewed if its size in bytes is larger than this threshold and larger than spark.sql.adaptive.skewJoin
// skewedPartitionFactor multiplying the median partition size. 
//Ideally this config should be set larger than spark.sql.adaptive.advisoryPartitionSizeInBytes.
// Was 1KB, then 124 (1.95)
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "1KB")
// Was 4KB then 512
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes","4KB")

In [0]:
%scala
// Lab 03g: Solution: Let AQE figure out the Skew problem and fix it automatically
// Keep tweaking above settings to get better Performance here

display(t1DF.join(t2DF, Seq("make", "model"))
.filter(abs(t2DF("engine_size") - t1DF("engine_size")) <= BigDecimal("0.1"))
  .groupBy("registration")
  .agg(avg("sale_price").as("average_price")))

### Lab 04: Dynamic Partition Pruning

In [0]:
# Lab 04a: First, view Directory Partitions Folders (Partitioned by 'NeighborhoodDistrict')
display(dbutils.fs.ls("dbfs:/FileStore/tables/fire_part/"))

In [0]:
# Lab 04b: Load and View 'firePartition_view' 
spark.read.format("parquet").load("dbfs:/FileStore/tables/fire_part").createOrReplaceTempView("firePartition_view")
display(spark.sql("SELECT * FROM firePartition_view LIMIT 5"))

In [0]:
# Lab 04c: Load and View 'fireNonPart_view' (same Output as firePartition_view)
spark.read.format("parquet").load("dbfs:/FileStore/tables/fire_nonpart").createOrReplaceTempView("fireNonPart_view")
display(spark.sql("SELECT * FROM fireNonPart_view LIMIT 5"))

In [0]:
# Lab 04d: Create DIM object. Convert to Temp View
dimDF1 = spark.read.format("parquet").load("dbfs:/FileStore/tables/dim01")
dimDF1.createOrReplaceTempView("neighbor_view")
display(dimDF1)

In [0]:
#Lab 04e: First, ensure Defaults are enabled for AQE

spark.conf.set("spark.sql.adaptive.enabled",True)
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly", True)
spark.conf.set("spark.sql.cbo.enabled", True)
spark.conf.set("spark.sql.cbo.joinReorder.enabled", True)

In [0]:
%sql
-- Lab 04f: Dynamic Pruning (and FILTER) is taking place on large FACT table (firePartition_view)
--          Even though the WHERE clause only defines filtering on DIM table (neighbor_view)

SELECT fact.NeighborhoodDistrict, dim.Neighborhood, fact.UnitType 
FROM neighbor_view AS dim INNER JOIN firePartition_view AS fact 
ON dim.Neighborhood = fact.NeighborhoodDistrict
WHERE dim.Neighborhood IN ('Golden Gate Park', 'Twin Peaks')

In [0]:
%sql
-- Lab 04g: Dynamic Pruning won't happen if Fact Table NOT Partitioned

SELECT fact.NeighborhoodDistrict, dim.Neighborhood, fact.UnitType 
FROM neighbor_view AS dim INNER JOIN fireNonPart_view AS fact 
ON dim.Neighborhood = fact.NeighborhoodDistrict
WHERE dim.Neighborhood IN ('Golden Gate Park', 'Twin Peaks')

# End of Module 09: AQE (Adapative Query Execution)