# Caching in SQL -- Part 2
Understand Spark SQL caching


In [1]:
%%time 

! [ ! -d /data/click-stream/json/ ] && cd /data/click-stream  && python gen-clickstream-json.py 

! ls -lh  /data/click-stream/json/

total 1.4G
-rw-r--r-- 1 ubuntu ubuntu 338M Oct 30 06:46 clickstream-2015-01-01.json
-rw-r--r-- 1 ubuntu ubuntu 338M Oct 30 06:47 clickstream-2015-01-02.json
-rw-r--r-- 1 ubuntu ubuntu 338M Oct 30 06:48 clickstream-2015-01-03.json
-rw-r--r-- 1 ubuntu ubuntu 338M Oct 30 06:48 clickstream-2015-01-04.json
CPU times: user 702 µs, sys: 12.2 ms, total: 13 ms
Wall time: 663 ms


In [2]:
# initialize Spark Session
import os
import sys
top_dir = os.path.abspath(os.path.join(os.getcwd(), "../"))
if top_dir not in sys.path:
    sys.path.append(top_dir)

from init_spark import init_spark
spark = init_spark()

Initializing Spark...
Spark found in :  /home/ubuntu/apps/spark
Spark config:
	 spark.app.name=TestApp
	spark.master=local[*]
	executor.memory=2g
	spark.sql.warehouse.dir=/tmp/tmp2iiupuv6
	some_property=some_value
Spark UI running on port 4040


## Step 1 : Read JSON data

In [3]:
import time

t1 = time.perf_counter()
clickstreamDF = spark.read.json("../data/click-stream/json")
t2 = time.perf_counter()
print ("Read JSON in {:,.2f} ms ".format( (t2-t1)*1000))

clickstreamDF.createOrReplaceTempView("clickstream")
print ("registered temp table clickstream")
spark.catalog.listTables()

Read JSON in 3,768.10 ms 
registered temp table clickstream


[Table(name='clickstream', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [4]:
## see table data
spark.sql("select * from clickstream limit 10").show()

+-------+-----------+----+-----------------+------+----------+-------------+----------+
| action|   campaign|cost|           domain|    ip|   session|    timestamp|      user|
+-------+-----------+----+-----------------+------+----------+-------------+----------+
|clicked|campaign_15| 175|          cnn.com|ip_983|session_15|1420156800000|user_52636|
|blocked|campaign_17| 146|       google.com|ip_254|session_91|1420156800043|user_60446|
|clicked|campaign_12|   0|   funnyordie.com|ip_410|session_11|1420156800086| user_1981|
| viewed|campaign_15|   3|comedycentral.com|ip_296| session_2|1420156800129|user_95346|
|blocked|campaign_15|  46|comedycentral.com|ip_409|session_70|1420156800172|user_79791|
|blocked| campaign_4|  27|       flickr.com| ip_40|session_39|1420156800215|user_23461|
|clicked| campaign_5|  11|       google.com|ip_902|session_17|1420156800258|user_58470|
|clicked|campaign_18| 102|sf.craigslist.org|ip_212|session_49|1420156800301| user_5382|
|blocked|campaign_15|  37|     u

## Step 2 : Query without caching


In [5]:
import time

spark.catalog.clearCache()

t1 = time.perf_counter()
sql="""
select domain, count(*) as total from clickstream
group by domain 
order by total desc
limit 10
"""
top10_domains = spark.sql(sql)
top10_domains.show()
t2 = time.perf_counter()
print ("query took {:,.2f} ms ".format( (t2-t1)*1000))



+-----------------+------+
|           domain| total|
+-----------------+------+
|sf.craigslist.org|402978|
|       google.com|402750|
|     facebook.com|402632|
|        bbc.co.uk|402438|
|      foxnews.com|402305|
|       amazon.com|402290|
|      youtube.com|402262|
|       flickr.com|402061|
|     usatoday.com|402054|
|         hulu.com|401965|
+-----------------+------+

query took 2,258.81 ms 


## Step 3 : Explain Query

In [6]:
#top10_domains.explain()

top10_domains.explain(extended=True)

== Parsed Logical Plan ==
'GlobalLimit 10
+- 'LocalLimit 10
   +- 'Sort ['total DESC NULLS LAST], true
      +- 'Aggregate ['domain], ['domain, 'count(1) AS total#88]
         +- 'UnresolvedRelation [clickstream]

== Analyzed Logical Plan ==
domain: string, total: bigint
GlobalLimit 10
+- LocalLimit 10
   +- Sort [total#88L DESC NULLS LAST], true
      +- Aggregate [domain#10], [domain#10, count(1) AS total#88L]
         +- SubqueryAlias clickstream
            +- Relation[action#7,campaign#8,cost#9L,domain#10,ip#11,session#12,timestamp#13L,user#14] json

== Optimized Logical Plan ==
GlobalLimit 10
+- LocalLimit 10
   +- Sort [total#88L DESC NULLS LAST], true
      +- Aggregate [domain#10], [domain#10, count(1) AS total#88L]
         +- Project [domain#10]
            +- Relation[action#7,campaign#8,cost#9L,domain#10,ip#11,session#12,timestamp#13L,user#14] json

== Physical Plan ==
TakeOrderedAndProject(limit=10, orderBy=[total#88L DESC NULLS LAST], output=[domain#10,total#88L])
+- *(2

## Step 3 : Cache

There are 3 ways to cache
1. dataframe.cache()  : non blocking
2. spark.sql("cache table TABLE_NAME") : blocking
3. spark.catalog.cacheTable('tableName') : non blocking

Try all these options and see the performance implications.

In [7]:
import time

# uncache
spark.catalog.clearCache() ## clear all tables
# spark.catalog.uncacheTable("clickstream")  # clear just one table

print ("is 'clickstream' cached : " , spark.catalog.isCached('clickstream'))

t1 = time.perf_counter()
## we have different ways to cache,
## uncomment one of the following
#spark.sql("cache table clickstream");  ## 1
#clickstreamDF.cache()  ## 2
spark.catalog.cacheTable("clickstream") ## 3

t2 = time.perf_counter()
print ("caching took {:,.2f} ms ".format( (t2-t1)*1000))

print ("is 'clickstream' cached : " , spark.catalog.isCached('clickstream'))

is 'clickstream' cached :  False
caching took 13.89 ms 
is 'clickstream' cached :  True


## Step : Query after caching
Run the following cell to measure query time after caching.

In [8]:
## Query1 after caching
## Note the time taken

import time

t1 = time.perf_counter()
sql="""
select domain, count(*) as total from clickstream
group by domain 
order by total desc
limit 10
"""
top10_domains = spark.sql(sql)
top10_domains.show()
t2 = time.perf_counter()
print ("query took {:,.2f} ms ".format( (t2-t1)*1000))

+-----------------+------+
|           domain| total|
+-----------------+------+
|sf.craigslist.org|402978|
|       google.com|402750|
|     facebook.com|402632|
|        bbc.co.uk|402438|
|      foxnews.com|402305|
|       amazon.com|402290|
|      youtube.com|402262|
|       flickr.com|402061|
|     usatoday.com|402054|
|         hulu.com|401965|
+-----------------+------+

query took 8,105.17 ms 


In [9]:
## Note the time for second query
import time

t1 = time.perf_counter()
sql="""
select domain, count(*) as total from clickstream
group by domain 
order by total desc
limit 10
"""
top10_domains = spark.sql(sql)
top10_domains.show()
t2 = time.perf_counter()
print ("query took {:,.2f} ms ".format( (t2-t1)*1000))

+-----------------+------+
|           domain| total|
+-----------------+------+
|sf.craigslist.org|402978|
|       google.com|402750|
|     facebook.com|402632|
|        bbc.co.uk|402438|
|      foxnews.com|402305|
|       amazon.com|402290|
|      youtube.com|402262|
|       flickr.com|402061|
|     usatoday.com|402054|
|         hulu.com|401965|
+-----------------+------+

query took 332.10 ms 


## Step : Explain Query
You will see caching in effect!

In [10]:
top10_domains.explain()

== Physical Plan ==
TakeOrderedAndProject(limit=10, orderBy=[total#313L DESC NULLS LAST], output=[domain#10,total#313L])
+- *(2) HashAggregate(keys=[domain#10], functions=[count(1)])
   +- Exchange hashpartitioning(domain#10, 200), true, [id=#200]
      +- *(1) HashAggregate(keys=[domain#10], functions=[partial_count(1)])
         +- Scan In-memory table clickstream [domain#10]
               +- InMemoryRelation [action#7, campaign#8, cost#9L, domain#10, ip#11, session#12, timestamp#13L, user#14], StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- FileScan json [action#7,campaign#8,cost#9L,domain#10,ip#11,session#12,timestamp#13L,user#14] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex[file:/data/click-stream/json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<action:string,campaign:string,cost:bigint,domain:string,ip:string,session:string,timestamp...




## Clear Cache
Try the following ways to clear cache

1. spark.sql ("CLEAR CACHE")  - removes all cache
2. spark.sql ("CLEAR CACHE tableName"); - removes one table
3. spark.catalog.uncacheTable('tableName') - removes one cached table
4. spark.catalog.clearCache() - clear all caches
5. dataframe.unpersist()

In [11]:
#spark.sql("CLEAR CACHE")
#spark.sql("CLEAR CACHE clickstream");
spark.catalog.uncacheTable('clickstream')
#spark.catalog.clearCache() 
#top10_domains.unpersist()