# Lesson 5 - Performance and Optimization

### 1. What is a Partition?
- Huge datasets cannot fit into a single node. Hence have to be partitioned across different nodes
- A partition in spark is basically and atomic chunk of data stored on a node in a cluster. They are the basic unit of parallelism
- One partition cannot span over multiple machines
- Spark automatically partitions RDDs/DataFrames and distributes the partitions across different nodes
- We can configure the optimal number of partitions. Having too few/many partitions is not good


`How Spark does the dafault partitioning of data`

--> Spark checks HDFS block size for Hadoop(128MB for Hadoop 2.0/YARN) --> It creates one partition per block size (e.g. file of 500MB will have 4 partitions)


` Why is it necessary to change partitions then?`
- Spark partitions data (e.g. a file of size 2.6GB = 2.6 * 1024MB = 2662.4MB; 2662.4/128 = 20.8) into 21 partitions. 
- If you apply a filter on that dataframe which reduces it to a dataframe of size 1MB, spark will still consider it to have 21 partitions.
- In this case, it'll make sense to reduce the partitions to reallocate resources and speed up data processing

In [1]:
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as F
import pyspark.sql.window as W

In [2]:
spark = SparkSession \
    .builder \
    .appName("Spark Training - DF APIs") \
    .getOrCreate()

In [3]:
df = spark.read.csv('PracticeFiles/IMDB-Movie-Data.csv', header=True)
df.printSchema()

root
 |-- Rank: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Director: string (nullable = true)
 |-- Actors: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Runtime (Minutes): string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Votes: string (nullable = true)
 |-- Revenue (Millions): string (nullable = true)
 |-- Metascore: string (nullable = true)



In [4]:
df.show(5)

+----+--------------------+--------------------+--------------------+--------------------+--------------------+----+-----------------+------+------+------------------+---------+
|Rank|               Title|               Genre|         Description|            Director|              Actors|Year|Runtime (Minutes)|Rating| Votes|Revenue (Millions)|Metascore|
+----+--------------------+--------------------+--------------------+--------------------+--------------------+----+-----------------+------+------+------------------+---------+
|   1|Guardians of the ...|Action,Adventure,...|A group of interg...|          James Gunn|Chris Pratt, Vin ...|2014|              121|   8.1|757074|            333.13|       76|
|   2|          Prometheus|Adventure,Mystery...|Following clues t...|        Ridley Scott|Noomi Rapace, Log...|2012|              124|     7|485820|            126.46|       65|
|   3|               Split|     Horror,Thriller|Three girls are k...|  M. Night Shyamalan|James McAvoy, Any...

### 1.2 Repartition(numPartitions, *cols):
- *cols: optional
- dataframe is hash partitioned
- creates almost equal sized partitions
- can increase or decrease the level of parallelism
- Internally, this redistributes data from all partitions leading to a very expensive operation. So avoid if not required
- `Note:` Spark performs better with equal sized partitions. If you need further processing of huge data, it is preferred to have equal sized partitions, so worth considering
- If you are decreasing teh number of partitions, consider using `coalese`, since this minimises movement od data across partitions and doesn't try creating equal size partitions

<img src="repartition.png" style="width:400px; height:400px">

In [17]:
# check number of partitions
df.rdd.getNumPartitions()

1

In [20]:
df.count()

1000

In [22]:
# Task 1: create new dataframe with more partitions
df_new = df.repartition(5)

In [24]:
df_new.rdd.getNumPartitions()

5

In [28]:
# how many rows per partition
df_new.rdd.glom().map(len).collect()
# Notice it creates equal size partition

[200, 200, 200, 200, 200]

In [30]:
# Task 2: Create a new dataframe partitioned on Year
df_partitioned_on_yrs = df.repartition('Year')

In [31]:
# notice it will give it a default of 200 partitions
df_partitioned_on_yrs.rdd.getNumPartitions()

200

In [32]:
# lets partitions by Year and give it 4 partitions
df_partitioned_on_yrs = df.repartition(4, 'Year')

In [33]:
df_partitioned_on_yrs.rdd.getNumPartitions()

4

### 1.3 coalesce(numPartitions)
- Returns a new dataframe that is reduced into numPartitions partitions
- optimised version of repartition()
- no shuffling
- Results in a narrow dependency e.g. if you go from 1000 partitions to 100 partitions, because there's no shuffle, you'll get 100 new partitions that will claim 10 of the current partitions?

<img src="coalesce.png" style="width:400px; height:400px">

From above diagram, we see that coalese when used to reduce partitions e.g. from 4 to 2:
- does not create a new partition
- It uses existing partitions p1 and p3, and tries to move data from partitions p2 into p1, and from partitions p4 into p3
- p1 now contains data for p1 and p2
- p3 now contains data for p3 and p4

In [34]:
# check numb of partitions in df_new
df_new.rdd.glom().map(len).collect()
# Notice it creates equal size partition

[200, 200, 200, 200, 200]

##### repartiton vs coalesce to reduce df_new

In [36]:
# using repartition
df_new_repart = df_new.repartition(3)

df_new_repart.rdd.glom().map(len).collect()

[333, 334, 333]

In [37]:
# using coalesce
df_new_coalesce = df_new.coalesce(3)

df_new_coalesce.rdd.glom().map(len).collect()

[200, 400, 400]

`Notice:`
- repartition creates equal sized partitions
- Coalese creates unequal sized partitions

`also notice: next`:
- with coalese you can't increase the number of partitions. It'll just stay unchanged
- you can only decrease number of partitions with coalesce

In [39]:
# let's increase number of partitions from 5 to 6
df_new3 = df_new.coalesce(6)

df_new3.rdd.glom().map(len).collect()
# There'll be no change

[200, 200, 200, 200, 200]

### repartition vs coalesce

<img src="comparison.png">

<img src="comparison.png" style="width:500px; height:00px">

## 2. Performance Tuning


### 2.1 Join Strategies
Spark has the below important join strategies
- Broadcast join (hint - Broadcast)
- Shuffle Hash join (hint - shuffle_hash)
- Sort Merge join (hint - sort merge)
- Cartesian Product Join (Hint - Broadcast)
- Broadcast Nested Loop Join (Hint - Shuffle_replicate_nl)

#### i. Broadcast join
- This is one of the most powerful performance optimization technique we can use
- Performs a join in 2 steps:

<img src='broadcast.png'>

Let's say we are trying to join a massive table (table A) to a small table - table B.
- `Step 1:` The driver will send table B data to all executors where Table A's partitions are present using a BitTorrent Protocol (basically driver sends it to a worker node, which sends a copy to each worker node
- `Step 2:` It then performs a hash join between the partitions and table B

`Notes`
- In this way, all executors have all information required to perform the join at it's location, without needing to redistribute data and shuffle
- Broadcast join can be very efficient join between a large table (fact table) and a relatively small table (dimensions table) that could then be used to perform a star-schema join
- Size of the smaller table should be less than: `spark.sql.autoBroadcastJoinThreshold.` Configurable Default size: 10MB - `int(spark.conf.get("spark.sql.autoBroadcastJoinThreshold"))/1024/1024`
- Recently Spark increased the max size for the broadcast table from 2GB to 8GB. Thus, it's not possible to broadcast tables greater than 8GB in size. (so we can increase default size to 8GB)
- aka replicated join since smaller df is replicated to all the executors
- use Hint `BROADCAST` to force Spark Optimizer to perform broadcast join

#### 2.11 Auto Detection
- In many cases, Spark can automatically detect whether to use a broadcast join or not, depending on the size of the data.
- If spark detects that one of the joined dataframes is amall (10 MB by default), it will automatically broadcast it for us

`Note`: Spark will only perform autodetection in the following instances:

1. Spark constructs the dataframe from scratch e.g. using spark.range
2. It reads from files with schema and/or size info e.g. Parquet, Avro

The reason is because spark can detect the sizes of these files easily

### 2.12 Testing
Follow the following steps to test your joins:
1. Test the joins with and without the presence of auto optimisation by turning it off i.e. set `spark.sql.autoBroadcastJoinThreshold` to -1
2. Compare run times

#### 2.13 Further Notes:
- broadcast joins support all join types (inner, left, right) except full outer join
- Faster than any other join strategies
- Only supported for '=' join
- broadcast table needs to be < 10MB in size (default). Can be increased to 8GB

#### 2.13 Examples

In [12]:
# lets check default value (in bytes) of this broadcast join threshold
spark.conf.get('spark.sql.autoBroadcastJoinThreshold')[:8]

'10485760'

In [13]:
# lets convert that to MB
int(spark.conf.get('spark.sql.autoBroadcastJoinThreshold')[:8])/1024/1024

10.0

In [20]:
ord = spark.read.load('PracticeFiles/Orders', sep=',', format='csv', schema=('order_id int,order_date timestamp, order_customer_id int, order_status string'))
ord.show(3)

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
+--------+-------------------+-----------------+---------------+
only showing top 3 rows



In [35]:
order_item = spark.read.load('PracticeFiles/Order_items', sep=',', format='csv', schema=('order_item_id int, order_item_order_id int, order_item_product_id int, quantity tinyint, subtotal float, price float'))
order_item.show(3)

+-------------+-------------------+---------------------+--------+--------+------+
|order_item_id|order_item_order_id|order_item_product_id|quantity|subtotal| price|
+-------------+-------------------+---------------------+--------+--------+------+
|            1|                  1|                  957|       1|  299.98|299.98|
|            2|                  2|                 1073|       1|  199.99|199.99|
|            3|                  2|                  502|       5|   250.0|  50.0|
+-------------+-------------------+---------------------+--------+--------+------+
only showing top 3 rows



In [37]:
joined = ord.join(order_item, ord.order_id == order_item.order_item_order_id)

In [38]:
joined.show(3)

+--------+-------------------+-----------------+---------------+-------------+-------------------+---------------------+--------+--------+------+
|order_id|         order_date|order_customer_id|   order_status|order_item_id|order_item_order_id|order_item_product_id|quantity|subtotal| price|
+--------+-------------------+-----------------+---------------+-------------+-------------------+---------------------+--------+--------+------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|            1|                  1|                  957|       1|  299.98|299.98|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|            2|                  2|                 1073|       1|  199.99|199.99|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|            3|                  2|                  502|       5|   250.0|  50.0|
+--------+-------------------+-----------------+---------------+-------------+-------------------+---------------------+----

In [39]:
# let's explain the join plan
joined.explain()

== Physical Plan ==
*(2) BroadcastHashJoin [order_id#190], [order_item_order_id#448], Inner, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#265]
:  +- *(1) Filter isnotnull(order_id#190)
:     +- FileScan csv [order_id#190,order_date#191,order_customer_id#192,order_status#193] Batched: false, DataFilters: [isnotnull(order_id#190)], Format: CSV, Location: InMemoryFileIndex[file:/Users/paulfru/Desktop/projects/pyspark/PracticeFiles/Orders], PartitionFilters: [], PushedFilters: [IsNotNull(order_id)], ReadSchema: struct<order_id:int,order_date:timestamp,order_customer_id:int,order_status:string>
+- *(2) Filter isnotnull(order_item_order_id#448)
   +- FileScan csv [order_item_id#447,order_item_order_id#448,order_item_product_id#449,quantity#450,subtotal#451,price#452] Batched: false, DataFilters: [isnotnull(order_item_order_id#448)], Format: CSV, Location: InMemoryFileIndex[file:/Users/paulfru/Desktop/projects/pyspar

- On the second line, we see it gets both files and applies broadcastHashjoin. So looks fine

`Let's turn auto optimisation off`

In [40]:
# turn optimisation off by setting threshold to -1
spark.conf.set('spark.sql.autoBroadcastJoinThreshold', -1)

In [43]:
# lets re-perform join and check join plan
joined = ord.join(order_item, ord.order_id == order_item.order_item_order_id)

joined.explain()

== Physical Plan ==
*(5) SortMergeJoin [order_id#190], [order_item_order_id#448], Inner
:- *(2) Sort [order_id#190 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(order_id#190, 200), ENSURE_REQUIREMENTS, [id=#300]
:     +- *(1) Filter isnotnull(order_id#190)
:        +- FileScan csv [order_id#190,order_date#191,order_customer_id#192,order_status#193] Batched: false, DataFilters: [isnotnull(order_id#190)], Format: CSV, Location: InMemoryFileIndex[file:/Users/paulfru/Desktop/projects/pyspark/PracticeFiles/Orders], PartitionFilters: [], PushedFilters: [IsNotNull(order_id)], ReadSchema: struct<order_id:int,order_date:timestamp,order_customer_id:int,order_status:string>
+- *(4) Sort [order_item_order_id#448 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(order_item_order_id#448, 200), ENSURE_REQUIREMENTS, [id=#308]
      +- *(3) Filter isnotnull(order_item_order_id#448)
         +- FileScan csv [order_item_id#447,order_item_order_id#448,order_item_product_id#449,quanti

#### Example 2: Lets create two dataframes and see if spark can figure out whether to broadcast

In [44]:
largDF = spark.range(1, 1_000_000_000)
data = [(1, 'a'),(2, 'b'), (3, 'c')]
schema = ['id', 'col2']
smallDF = spark.createDataFrame(data, schema)

- In theory if optimizer can figure out the sizes of both the large and small df, it will apply broadcast join
- But if small dataframe is created on top of local collection, spark wont know whether to apply broadcast join

In [46]:
spark.conf.set('spark.sql.autoBroadcastJoinThreshold',10485760)

In [49]:
joindf = largDF.join(smallDF, 'id')
# Notice broadcast join isnt used
joindf.explain()

== Physical Plan ==
*(5) Project [id#601L, col2#604]
+- *(5) SortMergeJoin [id#601L], [id#603L], Inner
   :- *(2) Sort [id#601L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#601L, 200), ENSURE_REQUIREMENTS, [id=#342]
   :     +- *(1) Range (1, 1000000000, step=1, splits=8)
   +- *(4) Sort [id#603L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#603L, 200), ENSURE_REQUIREMENTS, [id=#348]
         +- *(3) Filter isnotnull(id#603L)
            +- *(3) Scan ExistingRDD[id#603L,col2#604]




In [51]:
# We can instruct optimizer to use broadcast join here
joindf = smallDF.hint("BROADCAST").join(largDF, 'id')
# take note of the ordering of the tables
joindf.explain()
# now it uses broadcast join

== Physical Plan ==
*(2) Project [id#603L, col2#604]
+- *(2) BroadcastHashJoin [id#603L], [id#601L], Inner, BuildLeft, false
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#379]
   :  +- *(1) Filter isnotnull(id#603L)
   :     +- *(1) Scan ExistingRDD[id#603L,col2#604]
   +- *(2) Range (1, 1000000000, step=1, splits=8)




## 3. Driver Configurations

### 3.1 Driver Options
- When we apply collect(), take() operations on datasets, it requires the data to be moved to Driver. If we do so on huge datasets, it can crash the driver process with Out of Memory erros (OOM).
- If you oberve we perform most of the computational work of a Spark Job in the executors and so rarely require to do any performance tuning for the driver
- However, sometimes the job might fail if we collect too much data to the driver.
- Setting a proper limit can protect the driver from out of memory errors.

### 3.2 Spark-submit Options
1. `driver-memory:` Momory for the driver (e.g. 1000M, 2G). Default = 1024
- Driver memory is the amount of memory to use for driver process i.e. the process running the main() function of the application and where SparkContext is instantiated
2. `Driver cores:`
- Number of cores used by the driver, only in cluster mode (default: 1)
- Generally not required unless you want to perform some local computations in parallel

### 3.3 spark.driver.maxResultSize
- Limit of each Spark action (e.g. collect) in bytes
- Should be atleast 1M, or 0 for unlimited. Jobs will be aborted if the total size is above this limit
- Having a high limit may cause Out-of-memory erros in driver (depends on spark.driver.memory and memory overhead of objects in JVM)

### 3.4 spark.driver.memoryOverhead
- Amount of overhead (non-heap memory to be allocated per driver process in cluster mode, in MiB unless otherwise specified
- This is memory that accounts for things like VM overheads, interned strings, other native overheads etc
- This tends to grow tith the container size (typically 6-10%)

### 3.4 Summary plus other properties
- spark.driver.memory: Default 1024
- spark.driver.cores: Default 1
- spark.driver.maxResultSize
- spark.driver.memoryOverhead: Default driver Momory * 0.10 with minimum of 384

## 4. Excutor Configurations

<img src='driverprog.png'>

<img src='executors.png'>

### Executors and Cores:
- Executors are created in worker/data nodes and thay are in charge of running tasks in a given job
- Each executor comprises a JVM (for each executor, one JVM process is created). They are launched at the beginning of a spark application and run the entire lifetime of the spark job
- After they run the assigned task, they send the results to the driver
- They also provide in-memory storage for RDDs that are cached by user programs
- Each worker node can have multiple cores
- To run the tasks in parallel, we can launch executors with multiple cores


### How do we configure them
Below are some helpful configs for executors
- `spark.executor.memory:` Default = 1G
- `spark.executor.cores:` Default = 2
- `spark.executor.memoryOverhead:` 10% of 384MB (whichever is higher)
--> The amount of off-heap memory to be allocated per executor, in MiB unless otherwise specified.
--> This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
--> This tends to grow with the executor size (typically 6-10%).
-->When we plan the performance tuning we need to consider this as well

#### Spark runtime components in cluster deploy mode
<img src='clusterdeploymode.png'>


#### Spark runtime components in client deploy mode
<img src='clientdeploymode.png'>

#### node manager
<img src='nodemanager.png'>