# Caching in SQL -- Part 2
Understand Spark SQL caching


In [1]:
# initialize Spark Session
import os
import sys
top_dir = os.path.abspath(os.path.join(os.getcwd(), "../"))
if top_dir not in sys.path:
    sys.path.append(top_dir)

from init_spark import init_spark
spark = init_spark()

Initializing Spark...
Spark found in :  /home/ubuntu/spark
Spark config:
	 spark.app.name=TestApp
	spark.master=local[*]
	executor.memory=2g
	spark.sql.warehouse.dir=/tmp/tmpdcgl02or
	some_property=some_value
Spark UI running on port 4044


## Step 1 : Read JSON data

In [2]:
import time

t1 = time.perf_counter()
clickstreamDF = spark.read.json("/data/click-stream/json")
t2 = time.perf_counter()
print ("Read JSON in {:,.2f} ms ".format( (t2-t1)*1000))

clickstreamDF.createOrReplaceTempView("clickstream")
print ("registered temp table clickstream")
spark.catalog.listTables()

Read JSON in 13,921.91 ms 
registered temp table clickstream


[Table(name='clickstream', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [3]:
## see table data
spark.sql("select * from clickstream limit 10").show()

+-------+-----------+----+-----------------+------+----------+-------------+----------+
| action|   campaign|cost|           domain|    ip|   session|    timestamp|      user|
+-------+-----------+----+-----------------+------+----------+-------------+----------+
|blocked|campaign_12| 108|   funnyordie.com|ip_301|session_95|1420329600000|user_83559|
| viewed| campaign_5|  74|         hulu.com|ip_791|session_62|1420329600043|user_89610|
|blocked| campaign_4| 127|sf.craigslist.org|ip_998|session_90|1420329600086|user_82694|
|clicked|campaign_14|  57|        bbc.co.uk|ip_514|session_84|1420329600129|user_43516|
|blocked|campaign_18|   2|   funnyordie.com|ip_799|session_46|1420329600172|user_85431|
| viewed|campaign_12|   7|sf.craigslist.org|ip_101|session_89|1420329600215|user_48910|
| viewed|campaign_12|  79|          npr.org|ip_535|session_49|1420329600258|user_43425|
| viewed| campaign_6|  14|        yahoo.com| ip_26|session_45|1420329600301|user_52671|
|blocked|campaign_19| 143|comedy

## Step 2 : Query without caching


In [4]:
import time

spark.catalog.clearCache()

t1 = time.perf_counter()
sql="""
select domain, count(*) as total from clickstream
group by domain 
order by total desc
limit 10
"""
top10_domains = spark.sql(sql)
top10_domains.show()
t2 = time.perf_counter()
print ("query took {:,.2f} ms ".format( (t2-t1)*1000))



+-----------------+------+
|           domain| total|
+-----------------+------+
|      youtube.com|402597|
|      nytimes.com|402569|
|          npr.org|402521|
|sf.craigslist.org|402374|
|        zynga.com|402372|
|       sfgate.com|402333|
|     facebook.com|402101|
|       google.com|402044|
|comedycentral.com|402003|
|       flickr.com|401926|
+-----------------+------+

query took 8,812.81 ms 


## Step 3 : Explain Query

In [5]:
#top10_domains.explain()

top10_domains.explain(extended=True)

== Parsed Logical Plan ==
'GlobalLimit 10
+- 'LocalLimit 10
   +- 'Sort ['total DESC NULLS LAST], true
      +- 'Aggregate ['domain], ['domain, 'count(1) AS total#74]
         +- 'UnresolvedRelation `clickstream`

== Analyzed Logical Plan ==
domain: string, total: bigint
GlobalLimit 10
+- LocalLimit 10
   +- Sort [total#74L DESC NULLS LAST], true
      +- Aggregate [domain#9], [domain#9, count(1) AS total#74L]
         +- SubqueryAlias `clickstream`
            +- Relation[action#6,campaign#7,cost#8L,domain#9,ip#10,session#11,timestamp#12L,user#13] json

== Optimized Logical Plan ==
GlobalLimit 10
+- LocalLimit 10
   +- Sort [total#74L DESC NULLS LAST], true
      +- Aggregate [domain#9], [domain#9, count(1) AS total#74L]
         +- Project [domain#9]
            +- Relation[action#6,campaign#7,cost#8L,domain#9,ip#10,session#11,timestamp#12L,user#13] json

== Physical Plan ==
TakeOrderedAndProject(limit=10, orderBy=[total#74L DESC NULLS LAST], output=[domain#9,total#74L])
+- *(2) Hash

## Step 3 : Cache

There are 3 ways to cache
1. dataframe.cache()  : non blocking
2. spark.sql("cache table TABLE_NAME") : blocking
3. spark.catalog.cacheTable('tableName') : non blocking

Try all these options and see the performance implications.

In [6]:
import time

# uncache
spark.catalog.clearCache() ## clear all tables
# spark.catalog.uncacheTable("clickstream")  # clear just one table

print ("is 'clickstream' cached : " , spark.catalog.isCached('clickstream'))

t1 = time.perf_counter()
## we have different ways to cache,
## uncomment one of the following
#spark.sql("cache table clickstream");  ## 1
#clickstreamDF.cache()  ## 2
spark.catalog.cacheTable("clickstream") ## 3

t2 = time.perf_counter()
print ("caching took {:,.2f} ms ".format( (t2-t1)*1000))

print ("is 'clickstream' cached : " , spark.catalog.isCached('clickstream'))

is 'clickstream' cached :  False
caching took 20.28 ms 
is 'clickstream' cached :  True


## Step : Query after caching
Run the following cell to measure query time after caching.

In [7]:
## Query1 after caching
## Note the time taken

import time

t1 = time.perf_counter()
sql="""
select domain, count(*) as total from clickstream
group by domain 
order by total desc
limit 10
"""
top10_domains = spark.sql(sql)
top10_domains.show()
t2 = time.perf_counter()
print ("query took {:,.2f} ms ".format( (t2-t1)*1000))

+-----------------+------+
|           domain| total|
+-----------------+------+
|      youtube.com|402597|
|      nytimes.com|402569|
|          npr.org|402521|
|sf.craigslist.org|402374|
|        zynga.com|402372|
|       sfgate.com|402333|
|     facebook.com|402101|
|       google.com|402044|
|comedycentral.com|402003|
|       flickr.com|401926|
+-----------------+------+

query took 29,379.45 ms 


In [8]:
## Note the time for second query
import time

t1 = time.perf_counter()
sql="""
select domain, count(*) as total from clickstream
group by domain 
order by total desc
limit 10
"""
top10_domains = spark.sql(sql)
top10_domains.show()
t2 = time.perf_counter()
print ("query took {:,.2f} ms ".format( (t2-t1)*1000))

+-----------------+------+
|           domain| total|
+-----------------+------+
|      youtube.com|402597|
|      nytimes.com|402569|
|          npr.org|402521|
|sf.craigslist.org|402374|
|        zynga.com|402372|
|       sfgate.com|402333|
|     facebook.com|402101|
|       google.com|402044|
|comedycentral.com|402003|
|       flickr.com|401926|
+-----------------+------+

query took 1,524.31 ms 


## Step : Explain Query
You will see caching in effect!

In [9]:
top10_domains.explain()

== Physical Plan ==
TakeOrderedAndProject(limit=10, orderBy=[total#215L DESC NULLS LAST], output=[domain#9,total#215L])
+- *(2) HashAggregate(keys=[domain#9], functions=[count(1)])
   +- Exchange hashpartitioning(domain#9, 200)
      +- *(1) HashAggregate(keys=[domain#9], functions=[partial_count(1)])
         +- InMemoryTableScan [domain#9]
               +- InMemoryRelation [action#6, campaign#7, cost#8L, domain#9, ip#10, session#11, timestamp#12L, user#13], StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- *(1) FileScan json [action#6,campaign#7,cost#8L,domain#9,ip#10,session#11,timestamp#12L,user#13] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/data/click-stream/json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<action:string,campaign:string,cost:bigint,domain:string,ip:string,session:string,timestamp...


## Clear Cache
Try the following ways to clear cache

1. spark.sql ("CLEAR CACHE")  - removes all cache
2. spark.sql ("CLEAR CACHE tableName"); - removes one table
3. spark.catalog.uncacheTable('tableName') - removes one cached table
4. spark.catalog.clearCache() - clear all caches
5. dataframe.unpersist()

In [11]:
#spark.sql("CLEAR CACHE")
#spark.sql("CLEAR CACHE clickstream");
spark.catalog.uncacheTable('clickstream')
#spark.catalog.clearCache() 
#top10_domains.unpersist()