# Lab : Spark AQE

Experiment with Spark Adaptive Query Engine

Refereces:
- http://blog.madhukaraphatak.com/spark-aqe-part-2/
- https://databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html
- https://docs.databricks.com/spark/latest/spark-sql/aqe.html
- https://docs.databricks.com/_static/notebooks/aqe-demo.html
- https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution

In [1]:
import findspark
findspark.init()  # uses SPARK_HOME
print("Spark found in : ", findspark.find())

import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession



# use a unique tmep dir for warehouse dir, so we can run multiple spark sessions in one dir
import tempfile
tmpdir = tempfile.TemporaryDirectory()

config = ( SparkConf()
         .setAppName("TestApp")
         .setMaster("local[*]")
         #.setMaster("spark://f96e0987354e:7077")
         .set('executor.memory', '2g')
         .set('spark.sql.warehouse.dir', tmpdir.name)
         .set('spark.sql.adaptive.enabled', 'true')
         .set('spark.sql.adaptive.coalescePartitions.enabled', 'true')
         )

print("Spark config:\n\t", config.toDebugString().replace("\n", "\n\t"))
spark = SparkSession.builder.config(conf=config).getOrCreate()
print('Spark UI running on port ' + spark.sparkContext.uiWebUrl.split(':')[2])

Spark found in :  /home/ubuntu/apps/spark
Spark config:
	 spark.app.name=TestApp
	spark.master=spark://f96e0987354e:7077
	executor.memory=2g
	spark.sql.warehouse.dir=/tmp/tmp98ehcraj
	spark.sql.adaptive.enabled=true
	spark.sql.adaptive.coalescePartitions.enabled=true
Spark UI running on port 4042


In [2]:
# check if AQE is enabled
spark.conf.get('spark.sql.adaptive.enabled')

# spark.conf.set('spark.sql.adaptive.coalescePartitions.minPartitionNum', 1)

'true'

## AQE

In [3]:
%%time 
# generate large clickstream data


! [ ! -d /data/click-stream/json/ ] && cd /data/click-stream  && python gen-clickstream-json.py 

! ls -lh  /data/click-stream/json/

total 1.4G
-rw-r--r-- 1 ubuntu ubuntu 338M Nov  6 19:09 clickstream-2015-01-01.json
-rw-r--r-- 1 ubuntu ubuntu 338M Nov  6 19:10 clickstream-2015-01-02.json
-rw-r--r-- 1 ubuntu ubuntu 338M Nov  6 19:11 clickstream-2015-01-03.json
-rw-r--r-- 1 ubuntu ubuntu 338M Nov  6 19:12 clickstream-2015-01-04.json
CPU times: user 8.57 ms, sys: 12.9 ms, total: 21.5 ms
Wall time: 2.59 s


In [4]:
%%time

# load clickstream json -- this is a large table about 1.4 GB in size
clickstream = spark.read.json("/data/click-stream/json/").repartition(500)
clickstream.rdd.getNumPartitions()

CPU times: user 16.5 ms, sys: 9.04 ms, total: 25.5 ms
Wall time: 1min 25s


500

In [5]:
count = clickstream.filter('cost > 100').groupBy('domain').count()

In [6]:
count.explain(extended=True)

== Parsed Logical Plan ==
'Aggregate ['domain], [unresolvedalias('domain, None), count(1) AS count#40L]
+- Filter (cost#9L > cast(100 as bigint))
   +- Repartition 500, true
      +- Relation[action#7,campaign#8,cost#9L,domain#10,ip#11,session#12,timestamp#13L,user#14] json

== Analyzed Logical Plan ==
domain: string, count: bigint
Aggregate [domain#10], [domain#10, count(1) AS count#40L]
+- Filter (cost#9L > cast(100 as bigint))
   +- Repartition 500, true
      +- Relation[action#7,campaign#8,cost#9L,domain#10,ip#11,session#12,timestamp#13L,user#14] json

== Optimized Logical Plan ==
Aggregate [domain#10], [domain#10, count(1) AS count#40L]
+- Repartition 500, true
   +- Project [domain#10]
      +- Filter (isnotnull(cost#9L) AND (cost#9L > 100))
         +- Relation[action#7,campaign#8,cost#9L,domain#10,ip#11,session#12,timestamp#13L,user#14] json

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[domain#10], functions=[count(1)], output=[domain#10, coun

In [7]:
count.show()

+-----------------+------+
|           domain| count|
+-----------------+------+
|      nytimes.com|160021|
|      youtube.com|160692|
|        zynga.com|161032|
|       google.com|160864|
|     usatoday.com|160163|
|        yahoo.com|160920|
|     facebook.com|160479|
|          cnn.com|161130|
|    wikipedia.org|160861|
|        bbc.co.uk|159884|
|      foxnews.com|161440|
|       sfgate.com|160476|
|          npr.org|160417|
|         hulu.com|161072|
|      twitter.com|160381|
|       amazon.com|160824|
|   funnyordie.com|161536|
|sf.craigslist.org|160693|
|       flickr.com|160627|
|comedycentral.com|160263|
+-----------------+------+



In [8]:
count.explain(extended=True)

== Parsed Logical Plan ==
'Aggregate ['domain], [unresolvedalias('domain, None), count(1) AS count#40L]
+- Filter (cost#9L > cast(100 as bigint))
   +- Repartition 500, true
      +- Relation[action#7,campaign#8,cost#9L,domain#10,ip#11,session#12,timestamp#13L,user#14] json

== Analyzed Logical Plan ==
domain: string, count: bigint
Aggregate [domain#10], [domain#10, count(1) AS count#40L]
+- Filter (cost#9L > cast(100 as bigint))
   +- Repartition 500, true
      +- Relation[action#7,campaign#8,cost#9L,domain#10,ip#11,session#12,timestamp#13L,user#14] json

== Optimized Logical Plan ==
Aggregate [domain#10], [domain#10, count(1) AS count#40L]
+- Repartition 500, true
   +- Project [domain#10]
      +- Filter (isnotnull(cost#9L) AND (cost#9L > 100))
         +- Relation[action#7,campaign#8,cost#9L,domain#10,ip#11,session#12,timestamp#13L,user#14] json

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[domain#10], functions=[count(1)], output=[domain#10, coun