# 02807 - Computational Tools for Data Science


# Week 9

# <center> Today: Optimising and Tuning Spark Applications <center>

Goals for today's lecture:

* Understand the performance impact of **data caching** in Spark.
* Learn how to cache your data for increased performance, and how to read the **storage tab** of the Spark UI.
* Optimise Spark jobs by **configuring Shuffle Partitions**.
* Optimise UDF performance using **vectorised/Pandas UDFs**.

# Readings:


* [*Learning Spark*, Ch. 7](https://pages.databricks.com/rs/094-YMS-629/images/LearningSpark2.0.pdf). *Optimizing and Tuning Spark Applications*


# San Francisco Fire Calls

We will illustrate the topics of today with an example from [Chapter 3 of Learning Spark](https://pages.databricks.com/rs/094-YMS-629/images/LearningSpark2.0.pdf), using the [San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3) dataset.
* There is a nice [code repository](https://github.com/databricks/LearningSparkV2) for the textbook. 
* It includes notebooks for several chapters; some of the today's code comes from them.

# Setting up

If you are running this on **Colab**, setup PySpark by running the following cell. If you are running this in **databricks**, ignore the following code cell:

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
!pip install pyarrow --upgrade
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Let's import the libraries we will need
import pyspark
from pyspark.sql import *
from pyspark import SparkContext, SparkConf

# create the Spark session
conf = SparkConf().set("spark.ui.port", "4050")
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

openjdk-8-jdk-headless is already the newest version (8u265-b01-0ubuntu2~18.04).
0 upgraded, 0 newly installed, 0 to remove and 21 not upgraded.
Requirement already up-to-date: pyarrow in /usr/local/lib/python3.6/dist-packages (2.0.0)


Let's import some modules.

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

If you are running this on **databricks**, define the location of the public dataset as follows:

In [None]:
sf_fire_file = "/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv"

If you are running this on **Colab**, grab a sample of the data and load it:

In [None]:
from google.colab import drive
# This will prompt for authorization.

drive.mount('/content/drive')

import os
import urllib.request
sf=urllib.request.urlretrieve("https://raw.githubusercontent.com/databricks/LearningSparkV2/master/chapter3/data/sf-fire-calls.csv",'/content/sample_data/sf-fire-calls.csv')

sf_fire_file = "/content/sample_data/sf-fire-calls.csv"

!ls '/content/sample_data'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
anscombe.json		      mnist_test.csv	     sf-fire-calls.csv
california_housing_test.csv   mnist_train_small.csv
california_housing_train.csv  README.md


The full file has 4 million records. Inferring the schema is expensive for large files, so we will define it instead.

In [None]:
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
                     StructField('UnitID', StringType(), True),
                     StructField('IncidentNumber', IntegerType(), True),
                     StructField('CallType', StringType(), True),                  
                     StructField('CallDate', StringType(), True),      
                     StructField('WatchDate', StringType(), True),
                     StructField('CallFinalDisposition', StringType(), True),
                     StructField('AvailableDtTm', StringType(), True),
                     StructField('Address', StringType(), True),       
                     StructField('City', StringType(), True),       
                     StructField('Zipcode', IntegerType(), True),       
                     StructField('Battalion', StringType(), True),                 
                     StructField('StationArea', StringType(), True),       
                     StructField('Box', StringType(), True),       
                     StructField('OriginalPriority', StringType(), True),       
                     StructField('Priority', StringType(), True),       
                     StructField('FinalPriority', IntegerType(), True),       
                     StructField('ALSUnit', BooleanType(), True),       
                     StructField('CallTypeGroup', StringType(), True),
                     StructField('NumAlarms', IntegerType(), True),
                     StructField('UnitType', StringType(), True),
                     StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                     StructField('FirePreventionDistrict', StringType(), True),
                     StructField('SupervisorDistrict', StringType(), True),
                     StructField('Neighborhood', StringType(), True),
                     StructField('Location', StringType(), True),
                     StructField('RowID', StringType(), True),
                     StructField('Delay', FloatType(), True)])

In [None]:
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)

# First look at the data

Let's have a first look at the data using `show()`:

In [None]:
fire_df.show()

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+

If you want a prettier visualisation, you can use `toPandas()`:

In [None]:
fire_df.limit(10).toPandas().head(5)

Unnamed: 0,CallNumber,UnitID,IncidentNumber,CallType,CallDate,WatchDate,CallFinalDisposition,AvailableDtTm,Address,City,Zipcode,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumAlarms,UnitType,UnitSequenceInCallDispatch,FirePreventionDistrict,SupervisorDistrict,Neighborhood,Location,RowID,Delay
0,20110016,T13,2003235,Structure Fire,01/11/2002,01/10/2002,Other,01/11/2002 01:51:44 AM,2000 Block of CALIFORNIA ST,SF,94109,B04,38,3362,3,3,3,False,,1,TRUCK,2,4,5,Pacific Heights,"(37.7895840679362, -122.428071912459)",020110016-T13,2.95
1,20110022,M17,2003241,Medical Incident,01/11/2002,01/10/2002,Other,01/11/2002 03:01:18 AM,0 Block of SILVERVIEW DR,SF,94124,B10,42,6495,3,3,3,True,,1,MEDIC,1,10,10,Bayview Hunters Point,"(37.7337623673897, -122.396113802632)",020110022-M17,4.7
2,20110023,M41,2003242,Medical Incident,01/11/2002,01/10/2002,Other,01/11/2002 02:39:50 AM,MARKET ST/MCALLISTER ST,SF,94102,B03,1,1455,3,3,3,True,,1,MEDIC,2,3,6,Tenderloin,"(37.7811772186856, -122.411699931232)",020110023-M41,2.433333
3,20110032,E11,2003250,Vehicle Fire,01/11/2002,01/10/2002,Other,01/11/2002 04:16:46 AM,APPLETON AV/MISSION ST,SF,94110,B06,32,5626,3,3,3,False,,1,ENGINE,1,6,9,Bernal Heights,"(37.7388432849018, -122.423948785199)",020110032-E11,1.5
4,20110043,B04,2003259,Alarms,01/11/2002,01/10/2002,Other,01/11/2002 06:01:58 AM,1400 Block of SUTTER ST,SF,94109,B04,3,3223,3,3,3,False,,1,CHIEF,2,4,2,Western Addition,"(37.7872890372638, -122.424236212664)",020110043-B04,3.483333


In **databricks**, you can also use `display`.

In [None]:
display(fire_df.limit(5))

DataFrame[CallNumber: int, UnitID: string, IncidentNumber: int, CallType: string, CallDate: string, WatchDate: string, CallFinalDisposition: string, AvailableDtTm: string, Address: string, City: string, Zipcode: int, Battalion: string, StationArea: string, Box: string, OriginalPriority: string, Priority: string, FinalPriority: int, ALSUnit: boolean, CallTypeGroup: string, NumAlarms: int, UnitType: string, UnitSequenceInCallDispatch: int, FirePreventionDistrict: string, SupervisorDistrict: string, Neighborhood: string, Location: string, RowID: string, Delay: float]

# Count rows

Let's see how long it takes to count the rows in our dataset. 

From here onwards, I'll report results concerning the entire, 4 million row dataset. If you are running this on Colab with the sample data, the numbers will be different. But the general points hold regardless.

In [None]:
fire_df.count()
%timeit fire_df.count()

1 loop, best of 3: 279 ms per loop


* It takes around 20 seconds.  
* If we continue to re-run this query, it will continue to take roughly 20 seconds. Why?
* **RDDs are not cached in memory by default**. When an uncached RDD is needed again, it must be recomputed. 
* When you run the ``count`` a second time, the file will be read, partitioned, loaded by the executors, and counted again. 
* **Caching/persisting data keeps it in the executor's memory**,  ensuring that processing done previously is preserved and **can be reused**.
* Important when the data will be accessed repeatedly, e.g., in iterative algorithms (ML).

# Caching

Let's cache the DataFrame since we will be performing several operations on it.

In [None]:
fire_df.cache()

DataFrame[CallNumber: int, UnitID: string, IncidentNumber: int, CallType: string, CallDate: string, WatchDate: string, CallFinalDisposition: string, AvailableDtTm: string, Address: string, City: string, Zipcode: int, Battalion: string, StationArea: string, Box: string, OriginalPriority: string, Priority: string, FinalPriority: int, ALSUnit: boolean, CallTypeGroup: string, NumAlarms: int, UnitType: string, UnitSequenceInCallDispatch: int, FirePreventionDistrict: string, SupervisorDistrict: string, Neighborhood: string, Location: string, RowID: string, Delay: float]

* Cache is a lazy operation. 
* The DataFrame is not fully cached until you invoke an action that goes through every row.
* The common idiom to trigger the caching of the entire dataset is to execute count since it's fairly cheap.

In [None]:
fire_df.count()

175296

* That's taking quite a while!
* What's happening, as we cache this table, is that we're loading all of our data onto the memory of the executors. It's around 1GB of data.
* That actually takes much longer than just counting the data itself.
* Let's see how long it takes to perform the count now.

In [None]:
fire_df.count()
%timeit fire_df.count()

10 loops, best of 3: 69.9 ms per loop


* That was significantly faster
* Let's have a look at the Spark UI storage tab.
  * If you are running this on Colab and want to check the actual Spark UI, set up a tunnel like we did last week.
  * Otherwise, check out the screenshots provided below.

Some things to notice:

* The cached data **takes up less space** than our file on disk! That is due to the **Tungsten Optimizer**. You can learn more about Tungsten from Josh Rosen's [talk](https://www.youtube.com/watch?v=5ajs8EIPWGI&ab_channel=SparkSummit) at Spark Summit.
* The data is 100% cached, broken down into **9 partitions**.
* The data is **deserialised**. This makes it much faster to query later on because we don't pay the cost of deserialising it. But it will take up more space than the corresponding serialised data. (Trade-off between access speed and memory-efficiency)
<img src="https://i.ibb.co/w4VZt2g/ui-storage.png" alt="ui-storage" width="1190" height="770" data-load="full" style="">

## Benefits of caching

* Reading data from source storage systems (hdfs://, s3://) is time consuming.
* If you don’t have enough memory data will be cached at the local disk of executors, which will still be faster than reading from the source.
* By caching you create a checkpoint in your spark application and if further down the execution of the application any of the tasks fails, you will be able to recompute the lost RDD partition from the cache.

## Best practices when caching
* **Unpersist the DataFrame after it is no longer needed using ``df.unpersist()``**. 
 * If your storage memory becomes full, Spark will start evicting the data from memory using the LRU (least recently used) strategy. 
 * To stay more in control about what should be evicted, use ``unpersist()``. 
 * Moreover, the more space you have in memory, the more can Spark use for execution, as opposed to storing cached data.
* **Only cache if you will use the data repeatedly. If you do, make sure you are caching only what you will need in your queries.**
 * For example, if one query will use ``(col1, col2, col3)`` and the second query will use ``(col2, col3, col4)``, select and cache these columns only: ``cached_df = df.select(col1, col2, col3, col4).cache()``

## Comparison between different storage levels

* Test run on the local machine with 12GB driver memory and input data of size 14GB. [Source](https://towardsdatascience.com/apache-spark-caching-603154173c48)
* Graph below shows the run time of a `filter` + `count` query executed 10 times in a loop.



<img src="https://miro.medium.com/max/700/1*bW7VO52Ht2kDvrdosEBpwQ.png" alt="performance" width="690" height="350" data-load="full" style="">

# Let's run some queries

Let's recall how the data looks like:

In [None]:
# use show() if you are running on Colab
display(fire_df)
#fire_df.show()

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+

**Q-1) How many distinct types of calls were made to the Fire Department?**

To be sure, let's not count "null" strings in that column.

In [None]:
fire_df.select("CallType").where(col("CallType").isNotNull()).distinct().count()

30

**Q-2) What are distinct types of calls were made to the Fire Department?**

These are all the distinct type of call to the SF Fire Department

In [None]:
fire_df.select("CallType").where(col("CallType").isNotNull()).distinct().show(32, False)

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Aircraft Emergency                          |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Oil Spill                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Collision                           |
|Assist Polic

**Q-3) What were the most common call types?**

List them in descending order:

In [None]:
(fire_df
 .select("CallType").where(col("CallType").isNotNull())
 .groupBy("CallType")
 .count()
 .orderBy("count", ascending=False)
 .show(n=10, truncate=False))

+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows



In [None]:
%timeit -n1 -r1 (fire_df.select("CallType").where(col("CallType").isNotNull()).groupBy("CallType").count().orderBy("count", ascending=False).show(n=10, truncate=False))

* The spawned job involves data shuffle
* Let's look at the two stages on Spark UI. In particular, let's inspect the second stage.
* **By default, Spark uses 200 tasks for a shuffle read**. So the resulting number of partitions 200. 
* It doesn't matter if your dataset is very large or very small. 
* Notice that some of the tasks are **not reading in any shuffle data**. Some partitions will contain no data.
<img src="https://i.ibb.co/hspm1sb/ui-partitions.png" alt="ui-partitions" width="1193" height="795" data-load="full" style="">

# Optimising Shuffle Partitions

* The `spark.sql.shuffle.partitions` parameter controls how many partitions there are after a shuffle (wide transformation). 
* By default, this value is 200,  no matter  how large or small your dataset is, or your cluster configuration.
* 200 may be too small for huge datasets, and too large for smaller ones.
* One of the most commonly tuned parameters in Spark.
* *Learning Spark*, ch. 7: "*There is no magic formula for the number of shuffle partitions [...] the number may vary depending on your use case, data set, number of cores, and the amount of executor memory available—it’s a trial-and-error approach".

Generally speaking, the default value is too high for smaller or streaming workloads. Let's change this parameter to be 8.

If you are using **databricks**, run the following:



In [None]:
# Set number of shuffle partitions to 8
sqlContext.setConf("spark.sql.shuffle.partitions", "8")
# Verify the setting 
sqlContext.getConf("spark.sql.shuffle.partitions")

If you are using **Colab**, run the following:

In [None]:
# Set number of shuffle partitions to 8
spark.conf.set("spark.sql.shuffle.partitions", 8)
# Verify the setting 
spark.conf.get("spark.sql.shuffle.partitions")

'8'

Let's re-execute our last query:

In [None]:
(fire_df
 .select("CallType").where(col("CallType").isNotNull())
 .groupBy("CallType")
 .count()
 .orderBy("count", ascending=False)
 .show(n=10, truncate=False))

+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows



In [None]:
%timeit -n1 -r1 (fire_df.select("CallType").where(col("CallType").isNotNull()).groupBy("CallType").count().orderBy("count", ascending=False).show(n=10, truncate=False))

+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows

1 loop, best of 1: 1.27 s per loop


* This was significantly faster (about 8x faster). 
* You may want to try changing the parameter to different values (e.g. 2, 64, 400) and see how it impacts performance.

# Optimising User Defined Functions

* Sparks allows the flexibility to define your own functions.
* Standard Python UDFs operate **one-row-at-a-time** (slow performance)
* Spark 2.3 introduced **vectorised/Pandas UDFs**. 
 * These UDFs use pandas to work with the data. 
 * Data is represented as a `pandas.Series`, which are worked on via vectorized operations.
 * Can increase performance up to 100x compared to row-at-a-time Python UDFs.

* Let's ilustrate the difference with a simple **plus one** function.
* This is just for illustration. Built-in operators can perform faster in this scenario.
* Start with some dummy data

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import col, count, rand, collect_list, explode, struct, count, pandas_udf

df = (spark
      .range(0, 10 * 1000 * 1000)
      .withColumn('id', (col('id') / 1000).cast('integer'))
      .withColumn('v', rand()))

df.cache()
df.count()

10000000

In [None]:
df.show()

+---+-------------------+
| id|                  v|
+---+-------------------+
|  0| 0.8941021336476704|
|  0| 0.8444526738723864|
|  0|0.11665741281281539|
|  0| 0.7234779987711553|
|  0| 0.7745724436175354|
|  0| 0.4195287123329897|
|  0| 0.8899211963386082|
|  0|0.09779443719344982|
|  0|0.30079979615793595|
|  0|0.18549639156320197|
|  0|0.12371822037611913|
|  0| 0.9565676160904173|
|  0| 0.9033709921721853|
|  0| 0.8634992221716727|
|  0|0.18040124595202378|
|  0| 0.5515451914794307|
|  0| 0.2789977056382631|
|  0| 0.5030959847311234|
|  0|0.28745408566386954|
|  0|0.38908030835233853|
+---+-------------------+
only showing top 20 rows



## Incrementing the `v` column by one

## PySpark UDF

Using row-at-a-time UDF:

In [None]:
from pyspark.sql.functions import udf
@udf("double")
def plus_one(v):
    return v + 1

%timeit -n1 -r1 df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()

+--------+
|count(v)|
+--------+
|10000000|
+--------+

1 loop, best of 1: 22.7 s per loop


## Vectorised UDF

In [None]:
from pyspark.sql.functions import pandas_udf
@pandas_udf('double')
def vectorized_plus_one(v):
    return v + 1

%timeit -n1 -r1 df.withColumn('v', vectorized_plus_one(df.v)).agg(count(col('v'))).show()

+--------+
|count(v)|
+--------+
|10000000|
+--------+

1 loop, best of 1: 4.73 s per loop


* That was significantly faster! (about 3x faster)
* Performance gains can be much larger, e.g., 100x. See [this article](https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html).

## Best practices with UDFs 

* Use vectorised versions...
* ...But, generally **avoid UDFs** if built-in functions can do the job.
* UDFs are a blackbox for Spark and it won't try to optimise them via, e.g., Catalyst optimisation

## Built-in method

Let's check the performance of the built-in method that increments by one.

In [None]:
from pyspark.sql.functions import lit

%timeit -n1 -r1 df.withColumn('v', df.v + lit(1)).agg(count(col('v'))).show()

+--------+
|count(v)|
+--------+
|10000000|
+--------+

1 loop, best of 1: 245 ms per loop


That's about **20x** faster than the vectorised UDF, and **60x** faster than the Python UDF.