# @author: Varun CK

### Sample DataFrame

In [1]:
var df1 = Seq((1,2),(3,4)).toDF("col1","col2")

Intitializing Scala interpreter ...

Spark Web UI available at http://Varun-CK:4040
SparkContext available as 'sc' (version = 2.3.0, master = local[*], app id = local-1583833980777)
SparkSession available as 'spark'


df1: org.apache.spark.sql.DataFrame = [col1: int, col2: int]


### 1) show:

**`show(n)`** displays the first n rows(default=20)

**Example:**

In [2]:
df1.show()

+----+----+
|col1|col2|
+----+----+
|   1|   2|
|   3|   4|
+----+----+



### 2) collect:

**`collect()`** returns an array of all elements.

**Example:**

In [3]:
df1.collect()

res1: Array[org.apache.spark.sql.Row] = Array([1,2], [3,4])


### 3) cache:

**`cache()`** is a synonym for **`persist()`** with no storage level specified.

**Example:**

In [4]:
df1.cache()

res2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [col1: int, col2: int]


### 4) persist:

The persist method offers other options called **storage levels**.

Storage levels lets us control:
 - Storage location (memory or disk)
 - Format in-memory
 - Partition replication
 
**Example:**

In [5]:
import org.apache.spark.storage.StorageLevel

import org.apache.spark.storage.StorageLevel


In [6]:
df1.persist(StorageLevel.MEMORY_AND_DISK)

2020-03-10 15:23:34 WARN  CacheManager:66 - Asked to cache already cached data.


res3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [col1: int, col2: int]


Below are the Storage Level Options:

 - StorageLevel.DISK_ONLY
 - StorageLevel.DISK_ONLY_2
 - StorageLevel.MEMORY_AND_DISK
 - StorageLevel.MEMORY_AND_DISK_2
 - StorageLevel.MEMORY_AND_DISK_SER
 - StorageLevel.MEMORY_AND_DISK_SER_2
 - StorageLevel.MEMORY_ONLY
 - StorageLevel.MEMORY_ONLY_2
 - StorageLevel.MEMORY_ONLY_SER
 - StorageLevel.MEMORY_ONLY_SER_2
 - StorageLevel.NONE
 - StorageLevel.OFF_HEAP

#### MEMORY_ONLY

In this storage level, RDD is stored as deserialized Java object in the JVM. If the size of RDD is greater than memory, It will not cache some partition and recompute them next time whenever needed. In this level the space used for storage is very high, the CPU computation time is low, the data is stored in-memory. It does not make use of the disk.

#### MEMORY_AND_DISK

In this level, RDD is stored as deserialized Java object in the JVM. When the size of RDD is greater than the size of memory, it stores the excess partition on the disk, and retrieve from disk whenever required. In this level the space used for storage is high, the CPU computation time is medium, it makes use of both in-memory and on disk storage

#### MEMORY_ONLY_SER

This level of Spark store the RDD as serialized Java object (one-byte array per partition). It is more space efficient as compared to deserialized objects, especially when it uses fast serializer. But it increases the overhead on CPU. In this level the storage space is low, the CPU computation time is high and the data is stored in-memory. It does not make use of the disk.

#### MEMORY_AND_DISK_SER

It is similar to MEMORY_ONLY_SER, but it drops the partition that does not fits into memory to disk, rather than recomputing each time it is needed. In this storage level, The space used for storage is low, the CPU computation time is high, it makes use of both in-memory and on disk storage.

#### DISK_ONLY

In this storage level, RDD is stored only on disk. The space used for storage is low, the CPU computation time is high and it makes use of on disk storage.

### 5) reduce:

Reduce is a spark action that aggregates a data set (RDD) element using a function.
That function takes two arguments and returns one.

**Example:**

In [7]:
var rdd = sc.parallelize(Seq(1, 2, 3)) 

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:26


In [8]:
rdd.reduce((a, b) => a * b)

res4: Int = 6


### 6) reduceByKey:

The function passed to reduceByKey combines values from multiple keys. Function must be binary.

**Note:**
If the value is a string, we can use the groupByKey() to reduce it. 

**Example:**

In [9]:
var rdd = sc.parallelize(Seq((1,2), (3,4), (3,6)))

rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:26


In [10]:
var output = rdd.reduceByKey((a,b) => (a+b))

output: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[5] at reduceByKey at <console>:28


In [11]:
output.collect()

res5: Array[(Int, Int)] = Array((1,2), (3,10))


### 7) GroupByKey:

**`groupByKey`** groups all the values for each key in an RDD.
**`groupByKey()`** is just to group our dataset based on a key.

It will result in data shuffling when RDD is not already partitioned. 

**Example:**

In [12]:
var rdd2 = sc.parallelize(Seq((1,'a'), (2,'c'), (1,'b')))

rdd2: org.apache.spark.rdd.RDD[(Int, Char)] = ParallelCollectionRDD[6] at parallelize at <console>:26


In [13]:
var output2 = rdd2.groupByKey()

output2: org.apache.spark.rdd.RDD[(Int, Iterable[Char])] = ShuffledRDD[7] at groupByKey at <console>:28


In [14]:
output2.collect()

res6: Array[(Int, Iterable[Char])] = Array((1,CompactBuffer(a, b)), (2,CompactBuffer(c)))


##### Note:

 - Be careful using `groupByKey()` as it can cause a lot of data movement across the network and create large Iterables at workers.

- Imagine you have an RDD where you have 1 million pairs that have the key 1. All of the values will have to fit in a single worker if you use group by key. So instead of a group by key, consider using reduced by key or a different key value transformation.

### 8) aggregateByKey

### 9) combineByKey

Refer below link: [combineByKey](http://www.hadoopexam.com/adi/index.php/spark-blog/90-how-combinebykey-works-in-spark-step-by-step-explaination)

### 10) count

`count()` returns the number of elements.

**Example:**

In [15]:
df1.count()

res7: Long = 2


### 11) countByKey

**`countByKey`** counts the value of RDD consisting of two components tuple for each distinct key. It actually counts the number of elements for each key and return the result to the master as lists of **(key, count)** pairs.

In [16]:
val rdd3 = sc.parallelize(Seq(("Spark",78),("Hive",95),("spark",15),("HBase",25),("spark",39),("BigData",78),("spark",49)))

rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[15] at parallelize at <console>:26


In [17]:
var output3 = rdd3.countByKey()

output3: scala.collection.Map[String,Long] = Map(Hive -> 1, BigData -> 1, HBase -> 1, spark -> 3, Spark -> 1)


In [18]:
output3

res8: scala.collection.Map[String,Long] = Map(Hive -> 1, BigData -> 1, HBase -> 1, spark -> 3, Spark -> 1)


### 12) countByValue

**`countByValue()`** returns the count of each unique value in an RDD as a local Map (as a Map to driver program) `(value, countofvalues)` pair.

**`countByValue()`** is an action that returns the Map of each unique value with its count.

**Example 1:**

In [19]:
val rdd4 = sc.parallelize{ Seq(10, 4, 3, 3) }

rdd4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:26


In [20]:
rdd4.collect()

res9: Array[Int] = Array(10, 4, 3, 3)


In [21]:
rdd4.countByValue()

res10: scala.collection.Map[Int,Long] = Map(4 -> 1, 10 -> 1, 3 -> 2)


**Example 2:**

In [22]:
val rdd5 = sc.parallelize(Seq(("HR",5),("RD",4),("ADMIN",5),("SALES",4),("SER",6),("MAN",8)))

rdd5: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[22] at parallelize at <console>:26


In [23]:
rdd5.countByValue()

res11: scala.collection.Map[(String, Int),Long] = Map((HR,5) -> 1, (RD,4) -> 1, (SALES,4) -> 1, (ADMIN,5) -> 1, (MAN,8) -> 1, (SER,6) -> 1)


### 13) subtractByKey

Very similar to `subtract`, but instead of supplying a function, the key-component of each pair will be automatically used as criterion for removing items from the first RDD.

**Example:**

In [24]:
val r1 = sc.parallelize(Seq((1,2), (3,4), (3,6)))

r1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[26] at parallelize at <console>:26


In [25]:
val r2 = sc.parallelize(Seq((3,9)))

r2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[27] at parallelize at <console>:26


In [26]:
var r3 = r1.subtractByKey(r2)

r3: org.apache.spark.rdd.RDD[(Int, Int)] = SubtractedRDD[28] at subtractByKey at <console>:30


In [27]:
r1.collect()

res12: Array[(Int, Int)] = Array((1,2), (3,4), (3,6))


In [28]:
r2.collect()

res13: Array[(Int, Int)] = Array((3,9))


In [29]:
r3.collect()

res14: Array[(Int, Int)] = Array((1,2))


### 14) sort

#### RDD:

Return the RDD sorted by the given key function

#### DataFrame:

#### To sort a dataframe

**Example:**

In [30]:
var df2 = Seq(3,8,6,10,1,5,9,4,7,2).toDF("col1")

df2: org.apache.spark.sql.DataFrame = [col1: int]


In [31]:
df2.show()

+----+
|col1|
+----+
|   3|
|   8|
|   6|
|  10|
|   1|
|   5|
|   9|
|   4|
|   7|
|   2|
+----+



##### Ascending

In [32]:
df2.sort("col1").show()

+----+
|col1|
+----+
|   1|
|   2|
|   3|
|   4|
|   5|
|   6|
|   7|
|   8|
|   9|
|  10|
+----+



##### Descending

In [33]:
df2.sort($"col1".desc).show()

+----+
|col1|
+----+
|  10|
|   9|
|   8|
|   7|
|   6|
|   5|
|   4|
|   3|
|   2|
|   1|
+----+



### 15) sortByKey

**`sortByKey`** sorts key in `Pair RDD` in `ascending` or `descending` order.

**Example:**

In [34]:
val rdd6 = sc.parallelize(Seq(("India",91),("USA",1),("Brazil",55),("Greece",30),("China",86),("Sweden",46),("Turkey",90),("Nepal",977)))

rdd6: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[37] at parallelize at <console>:26


In [35]:
rdd6.collect()

res18: Array[(String, Int)] = Array((India,91), (USA,1), (Brazil,55), (Greece,30), (China,86), (Sweden,46), (Turkey,90), (Nepal,977))


##### Ascending

In [36]:
val output_6_1 = rdd6.sortByKey(true)

output_6_1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[40] at sortByKey at <console>:28


In [37]:
output_6_1.collect()

res19: Array[(String, Int)] = Array((Brazil,55), (China,86), (Greece,30), (India,91), (Nepal,977), (Sweden,46), (Turkey,90), (USA,1))


##### Descending

In [38]:
val output_6_2 = rdd6.sortByKey(false)

output_6_2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[43] at sortByKey at <console>:28


In [39]:
output_6_2.collect()

res20: Array[(String, Int)] = Array((USA,1), (Turkey,90), (Sweden,46), (Nepal,977), (India,91), (Greece,30), (China,86), (Brazil,55))


### 16) fold

Aggregate the elements of each partition, and then the results for all the partitions, using a given associative and commutative function and a neutral "zero value".

### 17) foldByKey

We can use foldByKey operation to aggregate values based on keys.

**Example:**

[Reference-1](https://www.quora.com/What-is-the-difference-between-fold-and-foldByKey-in-Spark-What-are-some-real-use-cases)

[Reference-2](http://timepasstechies.com/spark-pair-rdd-reducebykey-foldbykey-flatmap-aggregation-function-example-scala-java/)

### 18) foldleft

The **`foldLeft`** method takes an associative binary operator function as parameter and will use it to collapse elements from the collection. The order for traversing the elements in the collection is from left to right and hence the name `foldLeft`. The foldLeft method allows you to also specify an initial value.

Using `foldLeft` is fundamental in recursive function and will help you prevent stack-overflow exceptions.

The `foldLeft` method is a member of the `TraversableOnce` trait.

[Reference](http://allaboutscala.com/tutorials/chapter-8-beginner-tutorial-using-scala-collection-functions/scala-foldleft-example/)

### Differences between fold, foldLeft and foldRight

[Ref](https://coderwall.com/p/4l73-a/scala-fold-foldleft-and-foldright)

### 19) top

**`top(n)`** returns the largest n elements using `natural ordering (RDD)`.

**Example:**

In [40]:
rdd2.collect()

res21: Array[(Int, Char)] = Array((1,a), (2,c), (1,b))


In [41]:
rdd2.top(2)

res22: Array[(Int, Char)] = Array((2,c), (1,b))


### 20) first

**`first`** returns the first element of the RDD.

**Example:**

In [42]:
rdd2.collect()

res23: Array[(Int, Char)] = Array((1,a), (2,c), (1,b))


In [43]:
rdd2.first()

res24: (Int, Char) = (1,a)


### 21) head

**`head`** returns the first row of a dataframe.

**Example:**

In [44]:
df1.show()

+----+----+
|col1|col2|
+----+----+
|   1|   2|
|   3|   4|
+----+----+



In [45]:
df1.head()

res26: org.apache.spark.sql.Row = [1,2]


### 22) head(n)

**`head(n)`** returns the first n rows of a dataframe.

**Example:**

In [46]:
df2.show(5)

+----+
|col1|
+----+
|   3|
|   8|
|   6|
|  10|
|   1|
+----+
only showing top 5 rows



In [47]:
df2.head(3)

res28: Array[org.apache.spark.sql.Row] = Array([3], [8], [6])


### 23) take(n)

#### RDD

**`take(n)`** returns an array of the first n elements.

**Example:**

In [48]:
rdd2.collect()

res29: Array[(Int, Char)] = Array((1,a), (2,c), (1,b))


In [49]:
rdd2.take(2)

res30: Array[(Int, Char)] = Array((1,a), (2,c))


#### Dataframe

Returns the first n rows in the DataFrame.

In [50]:
df2.show()

+----+
|col1|
+----+
|   3|
|   8|
|   6|
|  10|
|   1|
|   5|
|   9|
|   4|
|   7|
|   2|
+----+



In [51]:
df2.take(4)

res32: Array[org.apache.spark.sql.Row] = Array([3], [8], [6], [10])


### 24) Map

#### RDD

**`map(function)`** creates a new RDD by performing a function on each record in the baseRDD.

**Example:**

In [52]:
val x = sc.parallelize(List("spark", "rdd", "example",  "sample", "example"))

x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[55] at parallelize at <console>:26


In [53]:
x.collect()

res33: Array[String] = Array(spark, rdd, example, sample, example)


In [54]:
val y = x.map(x => (x.toUpperCase()))

y: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[56] at map at <console>:28


In [55]:
y.collect()

res34: Array[String] = Array(SPARK, RDD, EXAMPLE, SAMPLE, EXAMPLE)


#### DataFrame

In [56]:
df1.show

+----+----+
|col1|col2|
+----+----+
|   1|   2|
|   3|   4|
+----+----+



In [57]:
df1.select("col1").map(x => (x+"_a")).show()

+-----+
|value|
+-----+
|[1]_a|
|[3]_a|
+-----+



Or

In [58]:
df1.map(r => (r.getInt(0) + 1,r.getInt(1) + 5)).toDF("col1","col2").show()

+----+----+
|col1|col2|
+----+----+
|   2|   7|
|   4|   9|
+----+----+



#### DataSet

In [59]:
df1.printSchema

root
 |-- col1: integer (nullable = false)
 |-- col2: integer (nullable = false)



In [60]:
case class DatasetExample(col1: Int, col2: Int)

defined class DatasetExample


In [61]:
import org.apache.spark.sql.Dataset

import org.apache.spark.sql.Dataset


In [62]:
val ds1: Dataset[DatasetExample] = df1.as[DatasetExample]

ds1: org.apache.spark.sql.Dataset[DatasetExample] = [col1: int, col2: int]


In [63]:
ds1.select("col1").map(x => (x+"_a")).show()

+-----+
|value|
+-----+
|[1]_a|
|[3]_a|
+-----+



### 25) FlatMap

#### RDD

Returns a new RDD by first applying a function to all rows of the RDD, and then flattening the results.

**Example 1:**

In [64]:
val x1 = sc.parallelize(List("spark", "Scala", "java helps",  "hello world"))

x1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[77] at parallelize at <console>:27


In [65]:
x1.collect()

res40: Array[String] = Array(spark, Scala, java helps, hello world)


##### map

In [66]:
x.map(z => z.split(",")).collect()

res41: Array[Array[String]] = Array(Array(spark), Array(rdd), Array(example), Array(sample), Array(example))


##### flatMap

In [67]:
x.flatMap(z => z.split(",")).collect()

res42: Array[String] = Array(spark, rdd, example, sample, example)


**Example 2:**

In [68]:
val rdd7 = sc.textFile("..\\Resources\\word_count_ex1.txt")

rdd7: org.apache.spark.rdd.RDD[String] = ..\Resources\word_count_ex1.txt MapPartitionsRDD[81] at textFile at <console>:27


In [69]:
rdd7.collect()

res43: Array[String] = Array(the cat sat on the mat, the aardvark sat on the sofa)


##### map

In [70]:
rdd7.map(line => line.split(",")).collect()

res44: Array[Array[String]] = Array(Array(the cat sat on the mat), Array(the aardvark sat on the sofa))


##### flatMap

In [71]:
rdd7.flatMap(line => line.split(",")).collect()

res45: Array[String] = Array(the cat sat on the mat, the aardvark sat on the sofa)


#### DataFrame

Returns a new RDD by first applying a function to all rows of the DataFrame, and then flattening the results.

#### Dataset

In [72]:
val df3 = Seq(("A","B","x","D"),("A","B","y","D"),("A","B","z","D")).toDF("col1","col2","col3","col4")

df3: org.apache.spark.sql.DataFrame = [col1: string, col2: string ... 2 more fields]


In [73]:
df3.show()

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   A|   B|   x|   D|
|   A|   B|   y|   D|
|   A|   B|   z|   D|
+----+----+----+----+



In [74]:
val ds3 = df3.as[(String, String, String, String)]
ds3.flatMap { 
  case (x1, x2, x3, x4) => x3.split(",").map((x1, x2, _, x4))
}.toDF()

ds3: org.apache.spark.sql.Dataset[(String, String, String, String)] = [col1: string, col2: string ... 2 more fields]
res47: org.apache.spark.sql.DataFrame = [_1: string, _2: string ... 2 more fields]


In [75]:
ds3.show()

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   A|   B|   x|   D|
|   A|   B|   y|   D|
|   A|   B|   z|   D|
+----+----+----+----+



### 26) MapPartition

**`mapPartitions`** returns a new RDD by applying a function to each partition of this DataFrame

Spark mapPartitions - Similar to map() transformation but in this case function runs separately on each partition (block) of RDD unlike map() where it was running on each element of partition. Hence mapPartitions are also useful when you are looking for performance gain (calls your function once/partition not once/element). 

 - Suppose you have elements from 1 to 100 distributed among 10 partitions i.e. 10 elements/partition. map() transformation will call func 100 times to process these 100 elements but in case of mapPartitions(), func will be called once/partition i.e. 10 times. 


 - Secondly, mapPartitions() holds the data in-memory i.e. it will store the result in memory until all the elements of the partition has been processed.


 - mapPartitions() will return the result only after it finishes processing of whole partition.


 - mapPartitions() requires an iterator input unlike map() transformation.

**What is an Iterator?**

An iterator is a way to access collection of elements one-by-one, its similar to collection of elements like List(), Array() etc in few ways but the difference is that iterator doesn't load the whole collection of elements in memory all together. Instead iterator loads elements one after another. In Scala you access these elements with hasNext and Next operation.

**Example:**

##### Map

In [76]:
sc.parallelize(1 to 9, 3).map(x=>(x, "Hello")).collect()

res49: Array[(Int, String)] = Array((1,Hello), (2,Hello), (3,Hello), (4,Hello), (5,Hello), (6,Hello), (7,Hello), (8,Hello), (9,Hello))


In [77]:
sc.parallelize(1 to 9, 3).partitions.size

res50: Int = 3


##### mapPartitions

In [78]:
sc.parallelize(1 to 9, 3).mapPartitions(x=>(Array("Hello").iterator)).collect()

res51: Array[String] = Array(Hello, Hello, Hello)


##### mapPartitions with iterator

In [79]:
sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next).iterator)).collect()

res52: Array[Int] = Array(1, 4, 7)


In first example, I have applied map() transformation on dataset distributed between 3 partitions so that you can see function is called 9 times.

In second example, when we applied mapPartitions(), you will notice it ran 3 times i.e. for each partition once. We had to convert string "Hello" into iterator because mapPartitions() takes iterator as input.

In third step, I tried to get the iterator next value to show you the element. Note that next is always increasing value, so you can't step back. 

[Reference](https://www.dataneb.com/forum/apache-spark/what-is-mappartitions-in-spark-example)

### 27) partition

A **`partition`** in spark is an atomic chunk of data (logical division of data) stored on a node in the cluster.

Partitions are basic units of parallelism in Apache Spark.

RDDs in Apache Spark are collection of partitions.

<img src="../Resources/Data+Partitioning+in+Spark.jpg">

#### Example:

In [80]:
var rdd8 = spark.sparkContext.parallelize(Array("jan","feb","mar","april","may","jun"),3)

rdd8: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[91] at parallelize at <console>:26


In [81]:
rdd8.partitions.size

res53: Int = 3


### 28) repartition

Returns a new RDD/DataFrame partitioned by the given partitioning expressions into numPartitions. The resulting RDD/DataFrame is hash partitioned.

This is the same operation as `"DISTRIBUTE BY"` in `SQL (Hive QL)`.

In [82]:
rdd8 = rdd8.repartition(5)

rdd8: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[95] at repartition at <console>:28


In [83]:
rdd8.partitions.size

res54: Int = 5


### 29) coalese

The **`coalesce`** method reduces the number of partitions in a RDD/DataFrame.

In [84]:
rdd8 = rdd8.coalesce(2)

rdd8: org.apache.spark.rdd.RDD[String] = CoalescedRDD[96] at coalesce at <console>:28


In [85]:
rdd8.partitions.size

res55: Int = 2


### Differences between repartition and coalese:

The **`repartition`** algorithm does a full shuffle of the data and creates equal sized partitions of data. 

The **`coalesce`** combines existing partitions to avoid a full shuffle.


[Reference](https://medium.com/@mrpowers/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4)

#### Note:

Keep in mind that repartitioning your data is a fairly expensive operation. Spark also has an optimized version of repartition() called coalesce() that allows avoiding data movement, but only if you are decreasing the number of RDD partitions.

**`Coalese`** avoids a full shuffle. If it's known that the number is decreasing then the executor can safely keep data on the minimum number of partitions, only moving the data off the extra nodes, onto the nodes that we kept.


So, it would go something like this:

`
Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12
`

Then coalesce down to 2 partitions:

`
Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)
`

*Notice that Node 1 and Node 3 did not require its original data to move.*

### 30) shared variables

**`Shared variables`** are the variables that are required to be used by many functions & methods in parallel. Shared variables can be used in parallel operations.
	
Spark segregates the job into the smallest possible operation, a closure, running on different nodes and each having a copy of all the variables of the Spark job.

Any changes made to these variables doesn’t reflect in the driver program and hence to overcome this limitation Spark provides two special type of shared variables – **`Broadcast Variables`** and **`Accumulators`**.

### 31) Broadcast Variables

Used to cache a value in memory on all nodes. Here only one instance of this read-only variable is shared between all computations throughout the cluster.
	
Spark sends the broadcast variable to each node concerned by the related task. After that, each node caches it locally in serialised form.
	
Now before executing each of the planned tasks instead of getting values from the driver system retrieves them locally from the cache.

Broadcast variables are:
 - Immutable (Unchangeable)
 - Distributed i.e. broadcasted to the cluster
 - Fit in memory

**Syntax to create Broadcast variable:**

`SparkContext.broadcast(Value)`

[Reference](https://supergloo.com/spark-scala/spark-broadcast-accumulator-examples-scala/)

**Example:**

In [86]:
val broadcastVar = sc.broadcast(Array(1, 2, 3))

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(59)


In [87]:
broadcastVar.value

res56: Array[Int] = Array(1, 2, 3)


### 32) Accumulators

As its name suggests **`Accumulators`** main role is to accumulate values. The accumulator is variables that are used to implement counters and sums. Spark provides accumulators of numeric type only.
	
The user can create named or unnamed accumulators.
	
Unlike Broadcast Variables, accumulators are writable. However, written values can be only read in driver program.
	
It’s why accumulators work pretty well as data aggregators.

Syntax to create accumulator:

`SparkContext.accumulator(orgnlValue)`

[Reference](https://supergloo.com/spark-scala/spark-broadcast-accumulator-examples-scala/)

**Example:**

In [88]:
val accum = sc.accumulator(0, "Accumulator Example")

accum: org.apache.spark.Accumulator[Int] = 0


In [89]:
sc.parallelize(Array(1, 2, 3)).foreach(x => accum += x)

In [90]:
accum.value

res58: Int = 6


### 33) DAG

**`(Directed Acyclic Graph) DAG`** in Apache Spark is a set of Vertices and Edges, where vertices represent the RDDs and the edges represent the Operation to be applied on RDD.

In Spark DAG, every edge directs from earlier to later in the sequence. On the calling of Action, the created DAG submits to DAG Scheduler which further splits the graph into the stages of the task.

<img src="../Resources/DAG-Visualisation-01.jpg" style="height:300px">

**Example:**

<img src="../Resources/DAG-2.png" style="height:500px">

### 34) Physical execution plan

In Apache Spark, a stage is a physical unit of execution. We can say, it is a step in a physical execution plan.`

Actions trigger the translation of the logical DAG into a physical execution plan. The Spark Catalyst query optimizer creates the physical execution plan for DataFrames, as shown in the diagram below:

The physical plan identifies resources, such as memory partitions and compute tasks, that will execute the plan.

<img src="../Resources/physical_execution_plan.png" style="height:200px">

[Reference](https://mapr.com/blog/how-spark-runs-your-applications/)

**Example:**

In [91]:
var file = "..\\Resources\\flights20170102.json"

file: String = ..\Resources\flights20170102.json


In [92]:
case class Flight(_id: String, dofW: Long, carrier: String, origin: String, dest: String, crsdephour: Long, crsdeptime: Double,
                  depdelay: Double,crsarrtime: Double, arrdelay: Double, crselapsedtime: Double, dist: Double)
extends Serializable

defined class Flight


In [93]:
val df4 = spark.read.format("json").option("inferSchema", "true").load(file).as[Flight]

df4: org.apache.spark.sql.Dataset[Flight] = [_id: string, arrdelay: double ... 10 more fields]


In [94]:
val df5 = df4.filter($"depdelay" > 40)

df5: org.apache.spark.sql.Dataset[Flight] = [_id: string, arrdelay: double ... 10 more fields]


In [95]:
df5.take(1)

res59: Array[Flight] = Array(Flight(ATL_BOS_2017-01-02_16_DL_1210,1,DL,ATL,BOS,16,1616.0,68.0,1852.0,49.0,156.0,946.0))


In [96]:
df5.explain(true)

== Parsed Logical Plan ==
'Filter ('depdelay > 40)
+- AnalysisBarrier
      +- Relation[_id#271,arrdelay#272,carrier#273,crsarrtime#274,crsdephour#275L,crsdeptime#276,crselapsedtime#277,depdelay#278,dest#279,dist#280,dofW#281L,origin#282] json

== Analyzed Logical Plan ==
_id: string, arrdelay: double, carrier: string, crsarrtime: double, crsdephour: bigint, crsdeptime: double, crselapsedtime: double, depdelay: double, dest: string, dist: double, dofW: bigint, origin: string
Filter (depdelay#278 > cast(40 as double))
+- Relation[_id#271,arrdelay#272,carrier#273,crsarrtime#274,crsdephour#275L,crsdeptime#276,crselapsedtime#277,depdelay#278,dest#279,dist#280,dofW#281L,origin#282] json

== Optimized Logical Plan ==
Filter (isnotnull(depdelay#278) && (depdelay#278 > 40.0))
+- Relation[_id#271,arrdelay#272,carrier#273,crsarrtime#274,crsdephour#275L,crsdeptime#276,crselapsedtime#277,depdelay#278,dest#279,dist#280,dofW#281L,origin#282] json

== Physical Plan ==
*(1) Project [_id#271, arrdelay#

<img src="../Resources/physical_execution_plan_result.png" style="width:250px">

### 35) sparse vector

A **`sparse vector`** is represented by two parallel arrays: indices and values. Zero entries are not stored.

**Example:**

#### Vector

In [97]:
var v = Vector(1, 0, 0, 0, 0, 0, 3)

v: scala.collection.immutable.Vector[Int] = Vector(1, 0, 0, 0, 0, 0, 3)


#### Sparse Vector

In [98]:
import org.apache.spark.ml.linalg.Vectors

import org.apache.spark.ml.linalg.Vectors


In [99]:
var sv = Vectors.sparse(7, Array(0,6), Array(1,3))

sv: org.apache.spark.ml.linalg.Vector = (7,[0,6],[1.0,3.0])


### 36) dense vector

A **`dense vector`** is backed by a double array representing its entry values.

**Example:**

In [100]:
var dv = sv.toDense

dv: org.apache.spark.ml.linalg.DenseVector = [1.0,0.0,0.0,0.0,0.0,0.0,3.0]


### 37) pair RDD

 - **`Pair RDDs`** are a special form o fRDD
 - Each element must be a key-value pair (a two-element tuple)
 - Keys and values can be anytype

**Example:**

In [101]:
val users = sc.textFile("..\\Resources\\users.txt")

users: org.apache.spark.rdd.RDD[String] = ..\Resources\users.txt MapPartitionsRDD[107] at textFile at <console>:28


In [102]:
val pairRdd = users.map(line => line.split("\t")).map(fields => (fields(0), fields(1)))

pairRdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[109] at map at <console>:30


In [103]:
pairRdd.collect()

res61: Array[(String, String)] = Array((user001,Varun CK), (user022,Fred Flintstone), (user090,Bugs Bunny), (user111,Harry Potter))


### 38) filter

#### RDD

**`filter(function)`** creates a new RDD by including or excluding each record in the base RDD according to a Boolean function.

**Example:**

In [104]:
var rdd9 = sc.textFile("..\\Resources\\mydata.txt")

rdd9: org.apache.spark.rdd.RDD[String] = ..\Resources\mydata.txt MapPartitionsRDD[111] at textFile at <console>:28


In [105]:
rdd9.collect()

res62: Array[String] = Array(I've never seen a purple cow., I hope never to see one., But I can tell you, anyhow,, I'd rather see than one.)


In [106]:
rdd9.filter(line => line.startsWith("I")).collect()

res63: Array[String] = Array(I've never seen a purple cow., I hope never to see one., I'd rather see than one.)


#### DataFrame

**`filter()`** function is used to filter the rows from DataFrame or Dataset based on the given condition or SQL expression, alternatively, we can also use **`where()`** operator instead of the filter.

**Example:**

In [107]:
df2.show()

+----+
|col1|
+----+
|   3|
|   8|
|   6|
|  10|
|   1|
|   5|
|   9|
|   4|
|   7|
|   2|
+----+



In [108]:
df2.filter($"col1" > 5).show()

+----+
|col1|
+----+
|   8|
|   6|
|  10|
|   9|
|   7|
+----+



Or

In [109]:
df2.filter("col1 > 5").show()

+----+
|col1|
+----+
|   8|
|   6|
|  10|
|   9|
|   7|
+----+



Or

In [110]:
df2.where("col1 > 5").show()

+----+
|col1|
+----+
|   8|
|   6|
|  10|
|   9|
|   7|
+----+



### 39) sample

**`sample`** creates a new RDD with a sampling of elements.

Spark sampling functions allows to take different samples following distributions or only take a couple of them.

In Spark, there are two sampling operations, the transformation **sample** and the action **takeSample**.

By using a transformation we can tell Spark to apply successive transformation on a sample of a given RDD. 

By using an action we retrieve a given sample and we can have it in local memory to be used by any other standard library.

The sample transformation takes up to three parameters SparkSampling:

 - First is weather the sampling is done with replacement or not.
 - Second is the sample size as a fraction. Finally we can optionally provide a random seed.
 - Finally we can optionally provide a random seed.

In [111]:
val rawData = sc.textFile("..\\Resources\\kddcup.data.gz")

rawData: org.apache.spark.rdd.RDD[String] = ..\Resources\kddcup.data.gz MapPartitionsRDD[129] at textFile at <console>:28


In [112]:
val sampledData = rawData.sample(false, 0.1, 1234)

sampledData: org.apache.spark.rdd.RDD[String] = PartitionwiseSampledRDD[130] at sample at <console>:30


In [113]:
val sampleDataSize = sampledData.count()

sampleDataSize: Long = 490191


In [114]:
val rawDataSize = rawData.count()

rawDataSize: Long = 4898431


In [115]:
println(rawDataSize + " and after the sampling: " + sampleDataSize)

4898431 and after the sampling: 490191


### 40) Narrow transformation in RDD

 - Each partition in the child RDD depends on just one partition of the parent RDD
 - No shuffle required between executors
 - Can be collapsed into a single stage
 
**Examples**: `map`, `filter`, and `union`

### 41) Wide Transformation in RDD

 - Child partitions depend on multiple partitions in the parent RDD
 - Wide Transformationa defines a newstage

**Examples**: `reduceByKey`, `join`

**Note:**

Wide transformations are also called `shuffle transformations` as they may or may not depend on a shuffle.

### 42) union

Returns an RDD containing data from both sources

**Note:**

 - Unlike the Mathematical Union, duplicates are not removed.
 - Also type should be same in both the RDD

**Example:**

In [116]:
val rdd11 = sc.parallelize(List("lion", "tiger", "tiger", "peacock", "horse"))

rdd11: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[131] at parallelize at <console>:28


In [117]:
val rdd12 = sc.parallelize(List("lion", "tiger"))

rdd12: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[132] at parallelize at <console>:28


In [118]:
rdd11.union(rdd12).collect()

res69: Array[String] = Array(lion, tiger, tiger, peacock, horse, lion, tiger)


### 43) intersect

- Returns elements that are common b/w both RDDs. 
- Also removes Duplicates.

**Warning:**

Involves shuffling & has worst performance

**Example:**

In [119]:
rdd11.collect()

res70: Array[String] = Array(lion, tiger, tiger, peacock, horse)


In [120]:
rdd12.collect()

res71: Array[String] = Array(lion, tiger)


In [121]:
rdd11.intersection(rdd12).collect()

res72: Array[String] = Array(lion, tiger)


### 44) distinct

Returns distinct elements in the RDD

**Warning**:

Involves shuffling of data over N/W

**Example:**

In [122]:
rdd11.collect()

res73: Array[String] = Array(lion, tiger, tiger, peacock, horse)


In [123]:
rdd11.distinct().collect()

res74: Array[String] = Array(peacock, lion, horse, tiger)


### 45) subtract

- Returns only elements that are present in the first RDD.
- Preserves the duplicates.

**Example:**

In [124]:
rdd11.collect()

res75: Array[String] = Array(lion, tiger, tiger, peacock, horse)


In [125]:
rdd12.collect()

res76: Array[String] = Array(lion, tiger)


In [126]:
rdd11.subtract(rdd12).collect()

res77: Array[String] = Array(peacock, horse)


### 46) cartesian

Provides cartesian product b/w 2 RDDs

**Warning:**

Is very expensive for large RDDs

**Example:**

In [127]:
rdd11.collect()

res78: Array[String] = Array(lion, tiger, tiger, peacock, horse)


In [128]:
rdd12.collect()

res79: Array[String] = Array(lion, tiger)


In [129]:
rdd11.cartesian(rdd12).collect()

res80: Array[(String, String)] = Array((lion,lion), (lion,tiger), (tiger,lion), (tiger,tiger), (tiger,lion), (tiger,tiger), (peacock,lion), (horse,lion), (peacock,tiger), (horse,tiger))


## Joins (SQL and Core)

Joining data is an important part of many of our pipelines, and both Spark Core and SQL support the same fundamental types of joins. While joins are very common and powerful, they warrant special performance consideration as they may require large network transfers or even create datasets beyond our capability to handle.1 In core Spark it can be more important to think about the ordering of operations, since the DAG optimizer, unlike the SQL optimizer, isn’t able to re-order or push down filters.

### Core Spark Joins

In this section we will go over the RDD type joins. Joins in general are expensive since they require that corresponding keys from each RDD are located at the same partition so that they can be combined locally. If the RDDs do not have known partitioners, they will need to be shuffled so that both RDDs share a partitioner, and data with the same keys lives in the same partitions, as shown in Figure 4-1. If they have the same partitioner, the data may be colocated, as in Figure 4-3, so as to avoid network transfer. Regardless of whether the partitioners are the same, if one (or both) of the RDDs have a known partitioner only a narrow dependency is created, as in Figure 4-2. As with most key/value operations, the cost of the join increases with the number of keys and the distance the records have to travel in order to get to their correct partition.

<img src="../Resources/RDD_Joins1.png" style="height:200px">

___

<img src="../Resources/RDD_Joins2.png" style="height:230px">

___

<img src="../Resources/RDD_Joins3.png" style="height:250px">

___

### Choosing an Execution Plan

In order to join data, Spark needs the data that is to be joined (i.e., the data based on each key) to live on the same partition. The default implementation of a join in Spark is a shuffled hash join. The shuffled hash join ensures that data on each partition will contain the same keys by partitioning the second dataset with the same default partitioner as the first, so that the keys with the same hash value from both datasets are in the same partition. While this approach always works, it can be more expensive than necessary because it requires a shuffle. The shuffle can be avoided if:

 - Both RDDs have a known partitioner.
 - One of the datasets is small enough to fit in memory, in which case we can do a broadcast hash join.
 
Note that if the RDDs are colocated the network transfer can be avoided, along with the shuffle.

### Speeding up joins by assigning a known partitioner

If you have to do an operation before the join that requires a shuffle, such as aggregateByKey or reduceByKey, you can prevent the shuffle by adding a hash partitioner with the same number of partitions as an explicit argument to the first operation before the join. You could make the example in the previous section even faster, by using the partitioner for the address data as an argument for the reduceByKey step, as in Example 4-4 and Figure 4-4.

In [130]:
import org.apache.spark.rdd.RDD
import org.apache.spark.HashPartitioner

import org.apache.spark.rdd.RDD
import org.apache.spark.HashPartitioner


In [131]:
def joinScoresWithAddress3(scoreRDD: RDD[(Long, Double)],
   addressRDD: RDD[(Long, String)]) : RDD[(Long, (Double, String))]= {
    // If addressRDD has a known partitioner we should use that,
    // otherwise it has a default hash parttioner, which we can reconstruct by
    // getting the number of partitions.
    val addressDataPartitioner = addressRDD.partitioner match {
      case (Some(p)) => p
      case (None) => new HashPartitioner(addressRDD.partitions.length)
    }
    val bestScoreData = scoreRDD.reduceByKey(addressDataPartitioner,
      (x, y) => if(x > y) x else y)
    bestScoreData.join(addressRDD)
  }

joinScoresWithAddress3: (scoreRDD: org.apache.spark.rdd.RDD[(Long, Double)], addressRDD: org.apache.spark.rdd.RDD[(Long, String)])org.apache.spark.rdd.RDD[(Long, (Double, String))]


<img src="../Resources/RDD_Joins4.png" style="height:250px">

___

### Speeding up joins using a broadcast hash join

A broadcast hash join pushes one of the RDDs (the smaller one) to each of the worker nodes. Then it does a map-side combine with each partition of the larger RDD. If one of your RDDs can fit in memory or can be made to fit in memory it is always beneficial to do a broadcast hash join, since it doesn’t require a shuffle. Sometimes (but not always) Spark SQL will be smart enough to configure the broadcast join itself; in Spark SQL this is controlled with spark.sql.autoBroadcastJoinThreshold and spark.sql.broadcastTimeout. This is illustrated in Figure 4-5.

<img src="../Resources/RDD_Joins5.png" style="height:250px">

___

Spark Core does not have an implementation of the broadcast hash join. Instead, we can manually implement a version of the broadcast hash join by collecting the smaller RDD to the driver as a map, then broadcasting the result, and using mapPartitions to combine the elements.

### Partial manual broadcast hash join

Sometimes not all of our smaller RDD will fit into memory, but some keys are so overrepresented in the large dataset that you want to broadcast just the most common keys. This is especially useful if one key is so large that it can’t fit on a single partition. In this case you can use countByKeyApprox2 on the large RDD to get an approximate idea of which keys would most benefit from a broadcast. You then filter the smaller RDD for only these keys, collecting the result locally in a HashMap. Using sc.broadcast you can broadcast the HashMap so that each worker only has one copy and manually perform the join against the HashMap. Using the same HashMap you can then filter your large RDD down to not include the large number of duplicate keys and perform your standard join, unioning it with the result of your manual join. This approach is quite convoluted but may allow you to handle highly skewed data you couldn’t otherwise process.

### Spark SQL Joins

Spark SQL supports the same basic join types as core Spark, but the optimizer is able to do more of the heavy lifting for you—although you also give up some of your control. For example, Spark SQL can sometimes push down or reorder operations to make your joins more efficient. On the other hand, you don’t control the partitioner for DataFrames or Datasets, so you can’t manually avoid shuffles as you did with core Spark joins.

### DataFrame Joins

Joining data between DataFrames is one of the most common multi-DataFrame transformations. The standard SQL join types are all supported and can be specified as the joinType in df.join(otherDf, sqlCondition, joinType) when performing a join. As with joins between RDDs, joining with nonunique keys will result in the cross product (so if the left table has R1 and R2 with key1 and the right table has R3 and R5 with key1 you will get (R1, R3), (R1, R5), (R2, R3), (R2, R5)) in the output.

Spark’s supported join types are “inner,” “left_outer” (aliased as “outer”), “left_anti,” “right_outer,” “full_outer,” and “left_semi.”3 With the exception of “left_semi” these join types all join the two tables, but they behave differently when handling rows that do not have keys in both tables.

### Broadcast hash joins

In Spark SQL you can see the type of join being performed by calling queryExecution.executedPlan. As with core Spark, if one of the tables is much smaller than the other you may want a broadcast hash join. You can hint to Spark SQL that a given DF should be broadcast for join by calling broadcast on the DataFrame before joining it (e.g., df1.join(broadcast(df2), "key")). Spark also automatically uses the spark.sql.conf.autoBroadcastJoinThreshold to determine if a table should be broadcast.

### Dataset Joins

Joining Datasets is done with joinWith, and this behaves similarly to a regular relational join, except the result is a tuple of the different record types as shown in Example 4-11. This is somewhat more awkward to work with after the join, but also does make self joins, as shown in Example 4-12, much easier, as you don’t need to alias the columns first.

##### Example 4-11. Joining two Datasets

`
val result: Dataset[(RawPanda, CoffeeShop)] = pandas.joinWith(coffeeShops,
      $"zip" === $"zip")
`

##### Example 4-12. Self join a Dataset

`
val result: Dataset[(RawPanda, RawPanda)] = pandas.joinWith(pandas,
      $"zip" === $"zip")
`

___
___

### DataFrame Joins

<img src="../Resources/SQL_Joins.png" style="height:500px">

Currently, Spark offers:

- Inner-Join,
- Left-Join,
- Right-Join,
- Outer-Join
- Cross-Join,
- Left-Semi-Join,
- Left-Anti-Semi-Join

#### Example Data:

In [132]:
val payment = sc.parallelize(Seq(
  (1, 101,2500), (2,102,1110), (3,103,500), (4 ,104,400), (5 ,105, 150), (6 ,106, 450)
)).toDF("paymentId", "customerId","amount")

payment: org.apache.spark.sql.DataFrame = [paymentId: int, customerId: int ... 1 more field]


In [133]:
payment.show()

+---------+----------+------+
|paymentId|customerId|amount|
+---------+----------+------+
|        1|       101|  2500|
|        2|       102|  1110|
|        3|       103|   500|
|        4|       104|   400|
|        5|       105|   150|
|        6|       106|   450|
+---------+----------+------+



In [134]:
val customer = sc.parallelize(Seq((101,"Jon") , (102,"Aron") ,(103,"Sam"))).toDF("customerId", "name")

customer: org.apache.spark.sql.DataFrame = [customerId: int, name: string]


In [135]:
customer.show()

+----------+----+
|customerId|name|
+----------+----+
|       101| Jon|
|       102|Aron|
|       103| Sam|
+----------+----+



### 47) Inner-Join

 - This is the default join in Spark. Inner join basically removes all the things that are not common in both the tables. 
 - It returns back all the data that has a match on the join condition from both sides of the table. 
 - It is basically an intersection of sets on the join column if you visualize in terms of a Venn diagram.

In [136]:
val innerJoinDf = customer.join(payment,"customerId")

innerJoinDf: org.apache.spark.sql.DataFrame = [customerId: int, name: string ... 2 more fields]


In [137]:
innerJoinDf.show()

+----------+----+---------+------+
|customerId|name|paymentId|amount|
+----------+----+---------+------+
|       101| Jon|        1|  2500|
|       103| Sam|        3|   500|
|       102|Aron|        2|  1110|
+----------+----+---------+------+



So in the example, only customerId 101,102,103 have entries in both the tables hence inner join returns only those.

Note that it can have duplicate ids (which is generally not a case).


There are some other extra things that spark gives us out of the box:

- Spark automatically removes duplicated “customerId” column, so column names are unique(When we use the above-mentioned syntax, more on this later).
- And you don’t have to prefix the table name to address them in the join clause which gets really wasteful sometimes.
- We did not specify that we wanted to do an “inner-join”, by default spark performs an inner-join if no join type is given.

##### Some gotchas:

In [138]:
val innerJoinDf = customer.join(payment,"customerId", "inner")

<console>: 34: error: overloaded method value join with alternatives:

This wouldn’t work and error out as shown above. Either you should skip the join type or the column name should be wrapped into scala `Seq` if have a join type.

In [139]:
val innerJoinDf1 = customer.join(payment,Seq("customerId"), "inner")

innerJoinDf1: org.apache.spark.sql.DataFrame = [customerId: int, name: string ... 2 more fields]


In [140]:
innerJoinDf1.show()

+----------+----+---------+------+
|customerId|name|paymentId|amount|
+----------+----+---------+------+
|       101| Jon|        1|  2500|
|       103| Sam|        3|   500|
|       102|Aron|        2|  1110|
+----------+----+---------+------+



### 48) Left Join:

- In a left join, all the rows from the left table are returned irrespective of whether there is a match in the right side table.
- If a matching id is found in the right table is found, it is returned or else a null is appended. 
- We use Left Join when nulls matter, make sense of data when there were no sales or something like that. Say we need all the days when there was no payment activity in the Rental Store.

`
object JoinType {
  def apply(typ: String): JoinType = typ.toLowerCase(Locale.ROOT).replace("_", "") match {
    case "inner" => Inner
    case "outer" | "full" | "fullouter" => FullOuter
    case "leftouter" | "left" => LeftOuter
    case "rightouter" | "right" => RightOuter
    case "leftsemi" => LeftSemi
    case "leftanti" => LeftAnti
    case "cross" => Cross
`

Also, as you can see this is from the spark source code that the Left and left outer join are the same. It is just an alias in Spark code.

In [141]:
val leftJoinDf = payment.join(customer,Seq("customerId"), "left")

leftJoinDf: org.apache.spark.sql.DataFrame = [customerId: int, paymentId: int ... 2 more fields]


In [142]:
leftJoinDf.show()

+----------+---------+------+----+
|customerId|paymentId|amount|name|
+----------+---------+------+----+
|       101|        1|  2500| Jon|
|       103|        3|   500| Sam|
|       102|        2|  1110|Aron|
|       105|        5|   150|null|
|       106|        6|   450|null|
|       104|        4|   400|null|
+----------+---------+------+----+



Or

In [143]:
payment.join(customer,Seq("customerId"), "left_outer").show()

+----------+---------+------+----+
|customerId|paymentId|amount|name|
+----------+---------+------+----+
|       101|        1|  2500| Jon|
|       103|        3|   500| Sam|
|       102|        2|  1110|Aron|
|       105|        5|   150|null|
|       106|        6|   450|null|
|       104|        4|   400|null|
+----------+---------+------+----+



Here we are joining on `customerId` , and as you can see the resulting dataframe has all the entries for the rows in payment table. It is populated with customer data wherever a matching record is found in the right side Customer table else `nulls` are returned.

### 49) Right Join:

This is similar to Left join. In Right join, all the rows from the Right table are returned irrespective of whether there is a match in the left side table.

In [144]:
val rightJoinDf = payment.join(customer,Seq("customerId"), "right")

rightJoinDf: org.apache.spark.sql.DataFrame = [customerId: int, paymentId: int ... 2 more fields]


In [145]:
rightJoinDf.show()

+----------+---------+------+----+
|customerId|paymentId|amount|name|
+----------+---------+------+----+
|       101|        1|  2500| Jon|
|       103|        3|   500| Sam|
|       102|        2|  1110|Aron|
+----------+---------+------+----+



Or

In [146]:
payment.join(customer,Seq("customerId"), "right_outer").show()

+----------+---------+------+----+
|customerId|paymentId|amount|name|
+----------+---------+------+----+
|       101|        1|  2500| Jon|
|       103|        3|   500| Sam|
|       102|        2|  1110|Aron|
+----------+---------+------+----+



Here the right side table is the customer, hence all the data from the customer table is returned back. Since there is no matching data on the left side payment table, no nulls are appended as a part of the output.

Also, the right join and Right Outer join yield the same output. Theoretically speaking all the things that could be achieved from the right join can be achieved by using left join but there can be few scenarios where right-join might come in handy.

### 50) Outer Join:

- We use full outer joins to keep records from both the tables along with the associated null values in the respective left/right tables. 
- It is kind of rare but generally used exception reports or situations when you would require data from both the tables. 
- For example, you want to find a department which doesn’t have an employee and also find an employee who doesn’t have a department and also find a department which has an employee.

<code>
val fullJoinDf = employees.join(departments, Seq("departmentID"), "outer")
fullJoinDf: org.apache.spark.sql.DataFrame = [departmentId: int, name: string ... 1 more field]

fullJoinDf.show
+------------+------+--------------+
|departmentId|  name|departmentName|
+------------+------+--------------+
|          31|   Amy|            IT|
|          34|   Rob|       Medical|
|          34| Billy|       Medical|
|          27|Larson|          null|
|          35|  null|         Sales|
|          33| Bobby|           Law|
|          33| Richy|           Law|
+------------+------+--------------+
</code>

In this example, you can see that there is no employee in the sales dept and also Larson is not associated with any department.

**Example 2:**

In [147]:
val fullJoinDf = payment.join(customer,Seq("customerId"), "outer")

fullJoinDf: org.apache.spark.sql.DataFrame = [customerId: int, paymentId: int ... 2 more fields]


In [148]:
fullJoinDf.show()

+----------+---------+------+----+
|customerId|paymentId|amount|name|
+----------+---------+------+----+
|       101|        1|  2500| Jon|
|       103|        3|   500| Sam|
|       102|        2|  1110|Aron|
|       105|        5|   150|null|
|       106|        6|   450|null|
|       104|        4|   400|null|
+----------+---------+------+----+



### 51) Cross Join:

As the saying goes, the cross product of big data and big data is an out-of-memory exception(From Holden’s High-Performance Spark). 

Even before you start to read about it, try avoiding this with big tables in production. Unless it is the only way to do. Cross join basically computes a cartesian product of 2 tables. 

Say you have m rows in 1 table n rows in another, this would give (m*n) rows. So imagine a small table 10,000 customer table joined with a products table of 1000 records would give an exploding 10,000,000 records!

In [149]:
val crossJoinDf = customer.crossJoin(payment)

crossJoinDf: org.apache.spark.sql.DataFrame = [customerId: int, name: string ... 3 more fields]


In [150]:
crossJoinDf.show()

+----------+----+---------+----------+------+
|customerId|name|paymentId|customerId|amount|
+----------+----+---------+----------+------+
|       101| Jon|        1|       101|  2500|
|       101| Jon|        2|       102|  1110|
|       101| Jon|        3|       103|   500|
|       101| Jon|        4|       104|   400|
|       101| Jon|        5|       105|   150|
|       101| Jon|        6|       106|   450|
|       102|Aron|        1|       101|  2500|
|       102|Aron|        2|       102|  1110|
|       102|Aron|        3|       103|   500|
|       102|Aron|        4|       104|   400|
|       102|Aron|        5|       105|   150|
|       102|Aron|        6|       106|   450|
|       103| Sam|        1|       101|  2500|
|       103| Sam|        2|       102|  1110|
|       103| Sam|        3|       103|   500|
|       103| Sam|        4|       104|   400|
|       103| Sam|        5|       105|   150|
|       103| Sam|        6|       106|   450|
+----------+----+---------+-------

Spark is kind of restricting the users to accidentally trigger a cartesian join when no join condition was specified. Prior Spark 2.1, `customer.join(payment)` would trigger a cross join. But now Spark throws an AnalysisException when the user forgets to give a condition on the joins.

In [151]:
customer.join(payment)

res91: org.apache.spark.sql.DataFrame = [customerId: int, name: string ... 3 more fields]


In [152]:
customer.join(payment).show()

org.apache.spark.sql.AnalysisException:  Detected cartesian product for INNER join between logical plans

Also, to bypass this AnalysisException we have to either set the `spark.sql.crossJoin.enabled=true` in our Spark session builder object or set it for `spark-shell :spark-shell — conf spark.sql.crossJoin.enabled=true`. We can verify if this property is set by checking,

From Spark 2.1, a dedicated function for Cross join has been added to support Cartesian joins.

In [153]:
val crossJoin = customer.crossJoin(payment)

crossJoin: org.apache.spark.sql.DataFrame = [customerId: int, name: string ... 3 more fields]


In [154]:
customer.crossJoin(payment).show()

+----------+----+---------+----------+------+
|customerId|name|paymentId|customerId|amount|
+----------+----+---------+----------+------+
|       101| Jon|        1|       101|  2500|
|       101| Jon|        2|       102|  1110|
|       101| Jon|        3|       103|   500|
|       101| Jon|        4|       104|   400|
|       101| Jon|        5|       105|   150|
|       101| Jon|        6|       106|   450|
|       102|Aron|        1|       101|  2500|
|       102|Aron|        2|       102|  1110|
|       102|Aron|        3|       103|   500|
|       102|Aron|        4|       104|   400|
|       102|Aron|        5|       105|   150|
|       102|Aron|        6|       106|   450|
|       103| Sam|        1|       101|  2500|
|       103| Sam|        2|       102|  1110|
|       103| Sam|        3|       103|   500|
|       103| Sam|        4|       104|   400|
|       103| Sam|        5|       105|   150|
|       103| Sam|        6|       106|   450|
+----------+----+---------+-------

### 52) Left-Semi-Join

This returns only the data from the left side that has a match on the right side based on the condition provided for the join statement. 

In contrast to Left join where all the rows from the Right side table are also present
in the output, there is right side table data in the output. 

This can also be achieved in subquery kind of queries in conjunction with IN/EXISTS in SQL but using semi_join restricts the amount of data that is read from the right side table.

In [155]:
payment.show()

+---------+----------+------+
|paymentId|customerId|amount|
+---------+----------+------+
|        1|       101|  2500|
|        2|       102|  1110|
|        3|       103|   500|
|        4|       104|   400|
|        5|       105|   150|
|        6|       106|   450|
+---------+----------+------+



In [156]:
customer.show()

+----------+----+
|customerId|name|
+----------+----+
|       101| Jon|
|       102|Aron|
|       103| Sam|
+----------+----+



In [157]:
payment.join(customer, payment("customerId") === customer("customerId"), "leftsemi").show()

+---------+----------+------+
|paymentId|customerId|amount|
+---------+----------+------+
|        1|       101|  2500|
|        3|       103|   500|
|        2|       102|  1110|
+---------+----------+------+



If you look closely at the output, the joined output only consists of data from the Payment(Left) table which has a match for it in the Customer(Right) table. Rest of all the stuff is ignored. Also, note that the `name` column from the `Customer` table is not returned even for the matching `customerId`.This is really useful when you are trying to extract the only data in left that has a match on the right.

Semi-Join can feel similar to Inner Join but the difference between them is that `Left Semi Join` only returns the records from the left-hand table, whereas the `Inner Join` returns the `columns` from both tables.

### 53) Left-anti-Join

As the name suggests, it does exactly the opposite of Left semi-join. The output would just `return the data` that `doesn’t have a match` on the `right` side table. Only the columns on the left side table would be included in the result. Just the data filtered for the NOT IN condition.

In [158]:
payment.show()

+---------+----------+------+
|paymentId|customerId|amount|
+---------+----------+------+
|        1|       101|  2500|
|        2|       102|  1110|
|        3|       103|   500|
|        4|       104|   400|
|        5|       105|   150|
|        6|       106|   450|
+---------+----------+------+



In [159]:
customer.show()

+----------+----+
|customerId|name|
+----------+----+
|       101| Jon|
|       102|Aron|
|       103| Sam|
+----------+----+



In [160]:
payment.join(customer, payment("customerId") === customer("customerId"), "leftanti").show()

+---------+----------+------+
|paymentId|customerId|amount|
+---------+----------+------+
|        5|       105|   150|
|        6|       106|   450|
|        4|       104|   400|
+---------+----------+------+



### 54) Self Join

In self join, we join the dataframe with itself. We have to make sure we are aliasing the dataframe so that we can access the individual columns without name collisions.

Traditionally, self-joins are used to querying hierarchical data, comparing 2 attributes of the same table. In this example, we have the `employee` table which has data about the **employeeId** and his **manager**. We can use self join to get a view of data as if they are 2 different columns in the same table.

In [161]:
val employee1 = spark.createDataFrame(Seq(
  (1,"ceo",None),
  (2,"manager1",Some(1)),
  (3,"manager2",Some(1)),
  (101,"Amy",Some(2)),
  (102,"Sam",Some(2)),
  (103,"Aron",Some(3)),
  (104,"Bobby",Some(3)),
  (105,"Jon", Some(3))
)).toDF("employeeId","employeeName","managerId")

employee1: org.apache.spark.sql.DataFrame = [employeeId: int, employeeName: string ... 1 more field]


In [162]:
employee1.show()

+----------+------------+---------+
|employeeId|employeeName|managerId|
+----------+------------+---------+
|         1|         ceo|     null|
|         2|    manager1|        1|
|         3|    manager2|        1|
|       101|         Amy|        2|
|       102|         Sam|        2|
|       103|        Aron|        3|
|       104|       Bobby|        3|
|       105|         Jon|        3|
+----------+------------+---------+



In [163]:
val selfJoinedEmp = employee1.as("e").join(employee1.as("m"),$"m.employeeId" === $"e.managerId")

selfJoinedEmp: org.apache.spark.sql.DataFrame = [employeeId: int, employeeName: string ... 4 more fields]


In [164]:
selfJoinedEmp.show()

+----------+------------+---------+----------+------------+---------+
|employeeId|employeeName|managerId|employeeId|employeeName|managerId|
+----------+------------+---------+----------+------------+---------+
|         2|    manager1|        1|         1|         ceo|     null|
|         3|    manager2|        1|         1|         ceo|     null|
|       101|         Amy|        2|         2|    manager1|        1|
|       102|         Sam|        2|         2|    manager1|        1|
|       103|        Aron|        3|         3|    manager2|        1|
|       104|       Bobby|        3|         3|    manager2|        1|
|       105|         Jon|        3|         3|    manager2|        1|
+----------+------------+---------+----------+------------+---------+



This joined dataset has a lot of redundant data and doesn’t give us a clear picture of what our requirement is.

In [165]:
selfJoinedEmp
.select($"e.employeeName".as("employee"),$"m.employeeName".as("managerName"))
.show()

+--------+-----------+
|employee|managerName|
+--------+-----------+
|manager1|        ceo|
|manager2|        ceo|
|     Amy|   manager1|
|     Sam|   manager1|
|    Aron|   manager2|
|   Bobby|   manager2|
|     Jon|   manager2|
+--------+-----------+



We can select the required columns and alias them to make the output more understandable.

### 55) Joins on columns with nulls

Let’s say we wanted to join on columns which have nulls in it. By default, Spark would skip these columns.

Say I want to join, `df1` and `df2` on `id` column which has `nulls` in it. The result would not have the null values.

In [166]:
val dftest1 = Seq((Some(123),"name1"),(Some(456),"name3"),(None,"name2")).toDF("id","name")

dftest1: org.apache.spark.sql.DataFrame = [id: int, name: string]


In [167]:
val dftest2 = Seq((None,"sales"),(Some(223),"Legal"),(Some(456),"IT")).toDF("id","dept")

dftest2: org.apache.spark.sql.DataFrame = [id: int, dept: string]


In [168]:
dftest1.show()

+----+-----+
|  id| name|
+----+-----+
| 123|name1|
| 456|name3|
|null|name2|
+----+-----+



In [169]:
dftest2.show()

+----+-----+
|  id| dept|
+----+-----+
|null|sales|
| 223|Legal|
| 456|   IT|
+----+-----+



In [170]:
dftest1.join(dftest2, Seq("id")).show()

+---+-----+----+
| id| name|dept|
+---+-----+----+
|456|name3|  IT|
+---+-----+----+



But let say we don’t want to lose that data,

In [171]:
dftest1.join(dftest2, dftest1("id") <=> dftest2("id")).drop(dftest1("id")).show()

+-----+----+-----+
| name|  id| dept|
+-----+----+-----+
|name3| 456|   IT|
|name2|null|sales|
+-----+----+-----+



The `<=>` operator is an Equality test operator that is safe to use when the columns have null values vs `===`. `<=>` returns the same result as the = operator for non-null operands, but returns true if both are null, false if one of them is null.

**Gotchas:**

When you join 2 columns, we generally ended having at-least 1 column duplicated if we join using the below signature. Here `id` is repeated twice. Either we have to drop that column or use another elegant way.

In [172]:
dftest1.join(dftest2, dftest1.col("id") === dftest2.col("id")).show()

+---+-----+---+----+
| id| name| id|dept|
+---+-----+---+----+
|456|name3|456|  IT|
+---+-----+---+----+



The `join`S method in spark has a method that takes `usingColumns` as 1 of the parameter. When we use this method spark prevents duplicated columns when joining 2 dataframes.

In [173]:
dftest1.join(dftest2 , Seq("id")).show()

+---+-----+----+
| id| name|dept|
+---+-----+----+
|456|name3|  IT|
+---+-----+----+



The only thing to take care is, this expects the name of the column supplied for the `usingColumns` should be present on both the sides, at-least the name of the column should be the same.

### 56) Joins in Datasets

We have the `joinWith` function to join 2 Datasets. This is similar to other join discussed above but the only difference is that `joinWith` preserves the type information of the resulting Dataset. It returns a `Dataset[(T, U)]` compared to `DataFrame` in all the above-mentioned joins which can be really precious if type-safety matters to you.

In [174]:
case class Payments(paymentId: Int, customerId: Int, amount:Int)

defined class Payments


In [175]:
case class Customer(customerId: Int, name: String)

defined class Customer


In [176]:
val paymentDs: Dataset[Payments] = payment.as[Payments]

paymentDs: org.apache.spark.sql.Dataset[Payments] = [paymentId: int, customerId: int ... 1 more field]


In [177]:
val customerDs: Dataset[Customer] = customer.as[Customer]

customerDs: org.apache.spark.sql.Dataset[Customer] = [customerId: int, name: string]


In [178]:
paymentDs.show()

+---------+----------+------+
|paymentId|customerId|amount|
+---------+----------+------+
|        1|       101|  2500|
|        2|       102|  1110|
|        3|       103|   500|
|        4|       104|   400|
|        5|       105|   150|
|        6|       106|   450|
+---------+----------+------+



In [179]:
customerDs.show()

+----------+----+
|customerId|name|
+----------+----+
|       101| Jon|
|       102|Aron|
|       103| Sam|
+----------+----+



In [180]:
val x: Dataset[(Customer, Payments)] 
        = customerDs.joinWith(paymentDs,paymentDs.col("customerId") === customerDs.col("customerId"))

x: org.apache.spark.sql.Dataset[(Customer, Payments)] = [_1: struct<customerId: int, name: string>, _2: struct<paymentId: int, customerId: int ... 1 more field>]


In [181]:
x.show()

+-----------+--------------+
|         _1|            _2|
+-----------+--------------+
| [101, Jon]|[1, 101, 2500]|
| [103, Sam]| [3, 103, 500]|
|[102, Aron]|[2, 102, 1110]|
+-----------+--------------+



If you notice above, we get back a `Dataset[(Customer, Payments)]` when compared the join operation in the previous examples
where we get a `Dataframe` back. This has all the types of join that are available for Dataframes as discussed above. 1 gotcha is that it is relatively tricky to use the returned object as it is a Dataset of Set of (Customer and Payment) object which can get a little tricky to work with after the join. Also, note that in case of missing records in Left or Right joins, the values are replaced by their respective null values.

### 57) Broadcast Hash Join

When 1 of the dataframe is small enough to fit in the memory, it is broadcasted over to all the executors where the larger dataset resides and a hash join is performed.

This has 2 phase,
broadcast-> the smaller dataset is broadcasted across the executors in the cluster where the larger table is located.
hash join-> A standard hash join is performed on each executor.

There is no shuffling involved in this and can be much quicker.

<code>
spark.sql.autoBroadcastJoinThreshold
This can be configured to set the Maximum size in bytes for a dataframe to be broadcasted.
-1 will disable broadcast join
Default is 10485760 i.e., 10MB
</code>

In [182]:
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")

res112: String = 10485760


- `spark.broadcast.compress` can be used to configure whether to compress the data before sending it.
- It uses the compression specified in the `spark.io.compression.codec` config and the default is lz4. We can use other compression codecs such as lz4,lzf, snappy, ZStandard.
- `spark.broadcast.compress` is `true` by default.

Run explain to see if this type of join is used.

In [183]:
customer.join(payment,Seq("customerId")).queryExecution.executedPlan

res113: org.apache.spark.sql.execution.SparkPlan =
*(5) Project [customerId#378, name#379, paymentId#355, amount#357]
+- *(5) SortMergeJoin [customerId#378], [customerId#356], Inner
   :- *(2) Sort [customerId#378 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(customerId#378, 200)
   :     +- *(1) Project [_1#375 AS customerId#378, _2#376 AS name#379]
   :        +- *(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#375, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#376]
   :           +- Scan ExternalRDDScan[obj#374]
   +- *(4) Sort [customerId#356 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(customerId#356, 200)...

You can notice here that, even though my Dataframes are small in size sometimes spark doesn’t recognize that the size of the dataframe is < 10 MB. To enforce this we can use the `broadcast hint`.

In [184]:
customer.join(broadcast(payment),Seq("customerId"))

res114: org.apache.spark.sql.DataFrame = [customerId: int, name: string ... 2 more fields]


In [185]:
customer.join(broadcast(payment),Seq("customerId")).queryExecution.executedPlan

res115: org.apache.spark.sql.execution.SparkPlan =
*(2) Project [customerId#378, name#379, paymentId#355, amount#357]
+- *(2) BroadcastHashJoin [customerId#378], [customerId#356], Inner, BuildRight
   :- *(2) Project [_1#375 AS customerId#378, _2#376 AS name#379]
   :  +- *(2) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#375, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#376]
   :     +- Scan ExternalRDDScan[obj#374]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, false] as bigint)))
      +- *(1) Project [_1#351 AS paymentId#355, _2#352 AS customerId#356, _3#353 AS amount#357]
         +- *(1) SerializeFromObject [ass...

If a broadcast hint is specified, the join side with the hint will be broadcasted irrespective of autoBroadcastJoinThreshold. If both sides of the join have the broadcast hints, the side with a smaller `estimated physical size` will be broadcasted. If there is no hint and if the estimated physical size of the table < autoBroadcastJoinThreshold then that table is broadcasted to all the executor nodes.

Spark has a BitTorrent-like implementation to perform broadcast. This is to avoid the driver being the bottleneck when sending data to multiple executors. In this, the broadcasting table is serialized and divided into small chunks by the driver and is stored in the BlockManager of the driver. In each executor, the executor first tries to locate the object from its BlockManager, remote fetch smaller chunks of the object from the driver and/or others executors if they are available in the other executors’ BlockManager.

Once an executor has the chunk, it puts the chunks in its BlockManager for other executors.

Normally, `BHJ can perform faster` than other join algorithms when the broadcast side is small enough. However, broadcasting tables is a network-intensive operation and it can cause OOM sometimes or even perform worse than other algorithms when the table broadcasted is big enough.

BHJ is not supported for a full outer join. For right outer join, the only left side table can be broadcasted and for other left joins only right table can be broadcasted.

### 58) Shuffle Hash Join

Shuffle hash join has 2 phases:

 1. `A shuffle phase`, where the data from the join tables are partitioned based on the join key. What this phase does shuffles data across the partitions. The idea here is that if 2 tables have the same keys, they end up in the same partition so that the data required for joins is available in the same partition.

2. `Hash join phase`: The data on each partition performs a classic single node hash join algorithm.

What Shuffle hash join does is that breaks apart 1 big join of 2 tables into smaller chunks of the localized relatively smaller branch. Shuffle is a very expensive operation as it involves a lot of network-intensive movement of the data across the nodes in the cluster. Also, note that the creation of Hash tables is also an expensive operation and is memory bound. This is not well suited for joins that wouldn’t fit in memory.

Also, note that the performance of this is based on the `distribution of keys` in the dataset. The greater number of unique join keys the better data distribution we get. The maximum amount of parallelism that we can achieve is proportional to the `number of unique keys`.

Say we are joining 2 datasets based on something which would be unique like `empId` would be a good candidate over something like `DepartmentName` which wouldn’t have a lot of unique keys and would limit the maximum parallelism that we could achieve.

**Gotchas:**

We need to be cognizant about the situation in which 1 single partition(a small subset of all the partitions) getting way too much of data over the other partition. This can happen when there is a skew in our dataset. The distribution of the data should be uniform across the cluster.

Say we are joining Stock data on companies stock ticker and each partition gets data based on the company’s name. But there might not be a whole lot of data in the partition that house stocks of Z but there might be way too much of data on the partition that would house A such as Apple and Amazon. We would have to do something called as `salting` in these kinds of situations. Salting is a technique of adding some randomness in non-unique keys. Say we have 1 Million Amazon stocks in our dataset and Zillow stocks are 100. We selectively add a random integer to the Amazon stock so that the hash of the Amazon data is further uniformly distributed among different partitions.

*The main thing to note here is that Shuffle Hash join will be used as the join strategy only when **`spark.sql.join.preferSortMergeJoin`** is set to `false`
and the cost to build a hash map is less than sorting the data. By default, sort merge join is preffered over Shuffle Hash Join.*

ShuffledHashJoin is still useful when:

* any partition of the build side could `fit in memory`
* one table is much smaller than the other one, then `cost to build a hash table` on a smaller table is smaller than sorting the larger table.

### 59) Sort merge join

**Sort merge join is the default join strategy** if the matching join keys are sortable and not eligible for broadcast join or shuffle hash join. It is a very scalable approach and performs better than other joins most of the times. It has its traits from the legendary map-reduce programs. What makes it scalable is that it can spill the data to the disk and doesn’t require the entire data to fit inside the memory.

It has 3 phases:

* `Shuffle Phase`: The 2 large tables are repartitioned as per the join keys across the partitions in the cluster.

* `Sort Phase`: Sort the data within each partition parallelly.

* `Merge Phase`: Join the 2 sorted + partitioned data. This is basically merging of the dataset by iterating over the elements and joining the rows having the same value for the join key.

Also, note that sometimes spark by default chooses Merge Sort Join which might not be always ideal.

Let's say we have a customer Dataframe and a payment dataframe.

In [186]:
import org.apache.spark.util.SizeEstimator

import org.apache.spark.util.SizeEstimator


In [187]:
println(SizeEstimator.estimate(payment)/1000000.00)

47.767576


In [188]:
println(SizeEstimator.estimate(customer)/1000000.00)

47.770096


Both of the data frames are almost 47.5MB. By default, spark chooses Sort-Merge Join but sometimes a Broadcast join can do better.

In [189]:
customer.join(payment,"customerId").queryExecution.executedPlan

res118: org.apache.spark.sql.execution.SparkPlan =
*(5) Project [customerId#378, name#379, paymentId#355, amount#357]
+- *(5) SortMergeJoin [customerId#378], [customerId#356], Inner
   :- *(2) Sort [customerId#378 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(customerId#378, 200)
   :     +- *(1) Project [_1#375 AS customerId#378, _2#376 AS name#379]
   :        +- *(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#375, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#376]
   :           +- Scan ExternalRDDScan[obj#374]
   +- *(4) Sort [customerId#356 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(customerId#356, 200)...

In [190]:
spark.time(customer.join(payment,"customerId").show)

+----------+----+---------+------+
|customerId|name|paymentId|amount|
+----------+----+---------+------+
|       101| Jon|        1|  2500|
|       103| Sam|        3|   500|
|       102|Aron|        2|  1110|
+----------+----+---------+------+

Time taken: 637 ms


When we provide the broadcast hint,

In [191]:
customer.join(broadcast(payment),"customerId").queryExecution.executedPlan

res120: org.apache.spark.sql.execution.SparkPlan =
*(2) Project [customerId#378, name#379, paymentId#355, amount#357]
+- *(2) BroadcastHashJoin [customerId#378], [customerId#356], Inner, BuildRight
   :- *(2) Project [_1#375 AS customerId#378, _2#376 AS name#379]
   :  +- *(2) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#375, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#376]
   :     +- Scan ExternalRDDScan[obj#374]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, false] as bigint)))
      +- *(1) Project [_1#351 AS paymentId#355, _2#352 AS customerId#356, _3#353 AS amount#357]
         +- *(1) SerializeFromObject [ass...

In [192]:
spark.time(customer.join(broadcast(payment),"customerId").show)

+----------+----+---------+------+
|customerId|name|paymentId|amount|
+----------+----+---------+------+
|       101| Jon|        1|  2500|
|       102|Aron|        2|  1110|
|       103| Sam|        3|   500|
+----------+----+---------+------+

Time taken: 129 ms


We have to remember that there is no single right answer to a problem. The right answer is always `it depends`. We have to utilize spark’s features generously as per our data.

Note: `SizeEstimator.estimate` gives an estimate of how much space the object takes up on the JVM heap. This will often be higher than the typical serialized size of the object.

**Gotchas:**

For Ideal performance of Sort-Merge join, it is important that all rows having the same value for the join key are available in the same partition. This warrants for the infamous partition exchange(shuffle) between executors.

Collocated partitions can avoid unnecessary data shuffle. Data needs to be evenly distributed n the join keys. The number of join keys is unique enough so that they can be equally distributed across the cluster to achieve the max parallelism from the available partitions.

### Cartesian Join

When no join keys are specified and the join type is an inner join, CartesianProduct is chosen. This is an inherently very expensive operation and should not be chosen unless necessary. 

### 60) BroadcastNestedLoopJoin

This type of join is chosen when no joining keys are specified and either has a broadcast hint or if 1 side of the join could be broadcasted and is less than **spark.sql.autoBroadcastJoinThreshold**.

This type of join is also the final fall back option if no join keys are specified, and is not an inner join. But this could be very slow and can cause **OutOfMemoryExceptions!** if the broadcasted dataset is large enough.

### Key Takeaways:

- Sort-Merge join is the default join and performs well in most of the scenarios. And for cases, if you are confident enough that Shuffle Hash join is better than Sort-Merge join, disable Sort-Merge join for those scenarios. However, when the build size is smaller than the stream size, Shuffle Hash join will do better than Sort-Merge join.


- Tune the **spark.sql.autoBroadcastJoinThreshold** accordingly if deemed necessary. Try to use Broadcast joins wherever possible and filter out the irrelevant rows to the join key before the join to avoid unnecessary data shuffling.


- Joins without unique join keys or **no join keys** can often be **very expensive** and should be avoided.

## Joins in RDD

In [193]:
val myrdd1 = sc.parallelize(Seq(("math",55),("math",56),("english",57),("english",58),("science",59),("science",54)))

myrdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[412] at parallelize at <console>:31


In [194]:
val myrdd2 = sc.parallelize(Seq(("math",60),("math",65),("science",61),("science",62),("history",63),("history",64)))

myrdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[413] at parallelize at <console>:31


In [195]:
myrdd1.collect()

res122: Array[(String, Int)] = Array((math,55), (math,56), (english,57), (english,58), (science,59), (science,54))


In [196]:
myrdd2.collect()

res123: Array[(String, Int)] = Array((math,60), (math,65), (science,61), (science,62), (history,63), (history,64))


### 61) Join (RDD)

- `join()` is transformation.
- It’s in package `org.apache.spark.rdd.pairRDDFunction`.

Returns an RDD containing all pairs of elements with matching keys in this and other.

Each pair of elements will be returned as a `(k, (v1, v2))` tuple, where `(k, v1)` is in this and `(k, v2)` is in other. 

Performs a `hash join` across the cluster.

**Example:**

In [197]:
val joined = myrdd1.join(myrdd2)

joined: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[416] at join at <console>:35


In [198]:
joined.collect()

res124: Array[(String, (Int, Int))] = Array((math,(55,60)), (math,(55,65)), (math,(56,60)), (math,(56,65)), (science,(59,61)), (science,(59,62)), (science,(54,61)), (science,(54,62)))


### 62) leftOuterJoin

- `leftOuterJoin()` is transformation.
- It’s in package `org.apache.spark.rdd.pairRDDFunction`.

Performs a left outer join of this and other.

For each element `(k, v)` in this, the resulting RDD will either contain all pairs `(k, (v, Some(w)))` for `w` in other, or the pair `(k, (v, None))` if no elements in other have key `k`.

`Hash-partitions` the output using the existing partitioner/parallelism level.

`leftOuterJoin()` performs a join between two RDDs where the keys must be present in first RDD

**Example:**

In [199]:
myrdd1.collect()

res125: Array[(String, Int)] = Array((math,55), (math,56), (english,57), (english,58), (science,59), (science,54))


In [200]:
myrdd2.collect()

res126: Array[(String, Int)] = Array((math,60), (math,65), (science,61), (science,62), (history,63), (history,64))


In [201]:
val leftJoined = myrdd1.leftOuterJoin(myrdd2)

leftJoined: org.apache.spark.rdd.RDD[(String, (Int, Option[Int]))] = MapPartitionsRDD[419] at leftOuterJoin at <console>:35


In [202]:
leftJoined.collect()

res127: Array[(String, (Int, Option[Int]))] = Array((math,(55,Some(60))), (math,(55,Some(65))), (math,(56,Some(60))), (math,(56,Some(65))), (science,(59,Some(61))), (science,(59,Some(62))), (science,(54,Some(61))), (science,(54,Some(62))), (english,(57,None)), (english,(58,None)))


### 63) rightOuterJoin

- `rightOuterJoin()` is transformation.
- It’s in package `org.apache.spark.rdd.pairRDDFunction`.

Performs a right outer join of this and other. 

For each element `(k, w)` in other, the resulting RDD will either contain all pairs `(k, (Some(v), w))` for `v` in this, or the pair `(k, (None, w))` if no elements in this have key `k`. 

`Hash-partitions` the resulting RDD using the existing partitioner/parallelism level.

It performs the join between two RDDs where the key must be present in other RDD

**Example:**

In [203]:
myrdd1.collect()

res128: Array[(String, Int)] = Array((math,55), (math,56), (english,57), (english,58), (science,59), (science,54))


In [204]:
myrdd2.collect()

res129: Array[(String, Int)] = Array((math,60), (math,65), (science,61), (science,62), (history,63), (history,64))


In [205]:
val rightJoined = myrdd1.rightOuterJoin(myrdd2)

rightJoined: org.apache.spark.rdd.RDD[(String, (Option[Int], Int))] = MapPartitionsRDD[422] at rightOuterJoin at <console>:35


In [206]:
rightJoined.collect()

res130: Array[(String, (Option[Int], Int))] = Array((math,(Some(55),60)), (math,(Some(55),65)), (math,(Some(56),60)), (math,(Some(56),65)), (history,(None,63)), (history,(None,64)), (science,(Some(59),61)), (science,(Some(59),62)), (science,(Some(54),61)), (science,(Some(54),62)))


### 64) Cartesian

Provides cartesian product b/w 2 RDDs.

**Example:**

In [207]:
myrdd1.collect()

res131: Array[(String, Int)] = Array((math,55), (math,56), (english,57), (english,58), (science,59), (science,54))


In [208]:
myrdd2.collect()

res132: Array[(String, Int)] = Array((math,60), (math,65), (science,61), (science,62), (history,63), (history,64))


In [209]:
val cartesianJoined = myrdd1.cartesian(myrdd2)

cartesianJoined: org.apache.spark.rdd.RDD[((String, Int), (String, Int))] = CartesianRDD[423] at cartesian at <console>:35


In [210]:
cartesianJoined.collect()

res133: Array[((String, Int), (String, Int))] = Array(((math,55),(math,60)), ((math,55),(math,65)), ((math,55),(science,61)), ((math,55),(science,62)), ((math,55),(history,63)), ((math,55),(history,64)), ((math,56),(math,60)), ((english,57),(math,60)), ((math,56),(math,65)), ((math,56),(science,61)), ((english,57),(math,65)), ((english,57),(science,61)), ((math,56),(science,62)), ((english,57),(science,62)), ((math,56),(history,63)), ((math,56),(history,64)), ((english,57),(history,63)), ((english,57),(history,64)), ((english,58),(math,60)), ((english,58),(math,65)), ((english,58),(science,61)), ((english,58),(science,62)), ((english,58),(history,63)), ((english,58),(history,64)), ((science,59),(math,60)), ((science,54),(math,60)), ((science,59),(math,65)), ((science,59),(science,61)), ...

### 65) Tungsten Framework purpose

**`Tungsten`** is the codename for the umbrella project to make changes to Apache Spark’s execution engine that focuses on substantially improving the efficiency of memory and CPU for Spark applications, to push performance closer to the limits of modern hardware. 

#### Scope

Tungsten focuses on the hardware architecture of the platform Spark runs on, including but not limited to JVM, LLVM, GPU, NVRAM, etc.

<img src="../Resources/Tungsten.png" style="height:230px">

This effort includes the following initiatives:

 - **Memory Management and Binary Processing:** leveraging application semantics to manage memory explicitly and eliminate the overhead of JVM object model and garbage collection
 
 
 - **Cache-aware computation:** algorithms and data structures to exploit memory hierarchy
 
 
 - **Code generation:** using code generation to exploit modern compilers and CPUs
 
 
 - **No virtual function dispatches:** this reduces multiple CPU calls which can have a profound impact on performance when dispatching billions of times.
 
 
 - **Intermediate data in memory vs CPU registers:** Tungsten Phase 2 places intermediate data into CPU registers. This is an order of magnitudes reduction in the number of cycles to obtain data from the CPU registers instead of from memory.
 
 
 - **Loop unrolling and SIMD:** Optimize Apache Spark’s execution engine to take advantage of modern compilers and CPUs’ ability to efficiently compile and execute simple for loops (as opposed to complex function call graphs).

The focus on CPU efficiency is motivated by the fact that Spark workloads are increasingly bottlenecked by CPU and memory use rather than IO and network communication. The trend is shown by recent research on the performance of big data workloads.

**References:**

[Reference-1](https://community.cloudera.com/t5/Community-Articles/What-is-Tungsten-for-Apache-Spark/ta-p/248445)

[Reference-2](https://databricks.com/glossary/tungsten)

[Reference-3](https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html)

### 66) Catalyst Framework purpose

At the core of Spark SQL is the **`Catalyst optimizer`**, which leverages advanced programming language features (e.g. Scala’s pattern matching and quasi quotes) in a novel way to build an extensible query optimizer.

Catalyst is based on functional programming constructs in Scala and designed with these key two purposes:

* Easily add new optimization techniques and features to Spark SQL

* Enable external developers to extend the optimizer (e.g. adding data source specific rules, support for new data types, etc.)


<img src="../Resources/Catalyst-Optimizer-diagram.png" style="height:150px">

Catalyst contains a general library for representing trees and applying rules to manipulate them. On top of this framework, it has libraries specific to relational query processing (e.g., expressions, logical query plans), and several sets of rules that handle different phases of query execution: analysis, logical optimization, physical planning, and code generation to compile parts of queries to Java bytecode. For the latter, it uses another Scala feature, quasiquotes, that makes it easy to generate code at runtime from composable expressions. Catalyst also offers several public extension points, including external data sources and user-defined types. As well, Catalyst supports both rule-based and cost-based optimization.

**References:**

[Reference-1](https://data-flair.training/blogs/spark-sql-optimization/)

[Reference-2](https://databricks.com/glossary/catalyst-optimizer)

[Reference-3](https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html)

### 67) Akka Framework purpose

Akka is a general purpose framework to create reactive, distributed, parallel and resilient concurrent applications in Scala or Java. Akka uses the Actor model to hide all the thread-related code and gives you really simple and helpful interfaces to implement a scalable and fault-tolerant system easily. A good example for Akka is a real-time application that consumes and process data coming from mobile phones and sends them to some kind of storage.

Earlier Spark used to depend on Akka as it used to rely on Akka toolkit to communicate between nodes. As of Spark 1.6 , Spark no longer depends on Akka.

 Akka is an actor framework for the jvm. It's is based on erlang and supports actor based distributed concurrency. The Actor Model provides a higher level of abstraction for writing concurrent and distributed applications. It helps to developer to deals with explicit locking and thread management. Akka makes it easier to write correct concurrent and parallel application.

Major use cases :

* Transaction processing
* Concurrency/parallelism
* Simulation
* Batch processing
* Gaming and Betting
* Complex Event Stream Processing

[Reference](https://intellipaat.com/community/7403/apache-spark-vs-akka)

### 68) Difference between RDD and Dataframe ?

#### RDD:

It can be termed as building block of spark. Internal final computation is always done on RDDs no matter which of the abstraction DataFrame or Dataset is used, it is the vital part. One of the most advantageous things about RDD is its simplicity, it provides us with familiar OOP style APIs. RDD can also be easily cached if some data is to be reevaluated.

#### DataFrame:

DataFrame can simply be defined as an abstraction which gives a schema view of data. We can think of the data in DataFrame like a table in database. But It works only on structured and semi-structured data, it offers huge performance improvement over RDDs because of features like Custom Memory management and Optimized Execution Plans.

#### Differences:

*  RDD provides a more familiar OOP type programming style with compile    time safety, while DataFrame detects attribute error only at runtime.


* No inbuilt optimization engine is available in case of RDD while the DataFrame optimization takes place using Catalyst optimizer.


* Incase of RDD whenever the data needs to be distributed within the cluster or written to the disk, it is done using Java serialization. There is no need to use java serialization to encode the data in case of DataFrame.


* Efficiency in case of RDD is less than DataFrame because serialization needs to be performed individually on the objects which takes more time.


* RDD is slower in performing simple grouping and aggregation operations as compared to DataFrame.

[Reference](https://intellipaat.com/community/141/what-is-the-difference-between-rdd-and-dataframes-in-apache-spark)

### 69) Difference between Parquet,  AVRO and ORC file ?

In simplest word, these all are file formats.

Hadoop like big storage and data processing ecosystem need optimized read and write performance oriented data formats.

<img src="../Resources/data-formats-300x145.png" style="height:150px">

#### AVRO:

* It is row major format.
* Its primary design goal was schema evolution.
* In the avro format, we store schema separately from data. Generally avro schema file `(.avsc)` is maintained.

#### ORC:

* Column oriented storage format.
* Originally it is `Hive's Row Columnar` file. Now improved as `Optimized RC (ORC)`.
* Schema is with the data, but as a part of footer.
* Data is stored as row groups and stripes.
* Each stripe maintains indexes and stats about data it stores.

<img src="../Resources/orc-file-structure.png" style="width:450px">

#### Parquet:

* Similar to ORC. Based on google dremel
* Schema stored in footer
* Column oriented storage format
* Has integrated compression and indexes

And the basic difference between and Parquet and ORC is that ORC use snappy for data compression so the data is more compressed in ORC compared to Avro.

Basically ORC is best for retrieving data and compressing data as compare to Parquet.

<img src="../Resources/Nexla-File-Format.png" style="width:450px">

**References:**

[Reference-1](https://www.datanami.com/2018/05/16/big-data-file-formats-demystified/)

[Reference-2](https://www.quora.com/What-are-the-differences-between-ORC-Avro-and-Parquet-File-Formats-in-Hadoop-in-terms-of-compression-and-speed)

### 70) Same operation can be done through both RDD and Dataframe. Which one you will prefer and why ?

**DataFrame**

Reasons:

* Incase of RDD whenever the data needs to be distributed within the cluster or written to the disk, it is done using Java serialization. There is no need to use java serialization to encode the data in case of DataFrame.


* Efficiency in case of RDD is less than DataFrame because serialization needs to be performed individually on the objects which takes more time.


* RDD is slower in performing simple grouping and aggregation operations as compared to DataFrame.

### 71) How Spark provides high availability?

We can say any system highly available if its downtime is tolerable. This time depends on how critical the application is. Zero down time is an imaginary term for any system. Consider any machine has an uptime of 97.7%, so its probability to go down will be 0.023. If we have similar two machines, then the probability of both of them going down will be (0.023*0.023). in most high availability environment we have three machines in use, in that case, the probability of going down is (0.023*0.023*0.023) i.e. 0.000012167, which guarantees an uptime of system to be  99.9987833%  which is highly acceptable uptime guarantee. 6. Apache Spark High Availability

#### Note:

To setup high availability in Spark:

#### Components in play

As a reminder, here are the components in play to run an application:

The cluster:

* **Spark Master:** coordinates the resources
* **Spark Workers:** offer resources to run the applications

The application:

* **Driver:** the part of the application that coordinates the processing
* **Executors:** the distributed part of the application that process the data

When the *driver* is run in *cluster mode*, it runs on a *worker*.

Notice that each component run its own JVM: the *workers* spawn separate JVMs to run the *driver* and the *executors*.


#### Fault tolerance:

With a Spark *standalone* cluster, here is what happens if the JVM running a component dies:

* Master: ❌ can no longer run new applications and the UI becomes unavailable.
* Worker: ✅ not a problem, the cluster simply has less resources.
* Driver: ❌ the application dies.
* Executor: ✅ not a problem, the partitions being processed are sent to another executor.

Notice that losing a JVM or losing the whole EC2 instance has the same effect.

Here is how to deal with these problems:

*Master* -> setup a *standby master*.

*Driver* -> run the application in *supervised mode*.

#### Setting up a standby master

Since the Master is a single point of failure, Spark offers the ability to start another instance of a master which will be in standby until the active master disappears. When the standby master becomes the active master, the workers will reconnect to this master and existing applicatione will continue running without problem.

This functionality relies on ZooKeeper to perform master election.


#### Running the application in supervised mode

The supervised mode allows the driver to be restarted on a different node if it dies. Enabling this functionality simply requires adding the `--supervise` flag when running `spark-submit`:

`spark-submit --supervise ...`

[Reference - 1](https://gist.github.com/aseigneurin/3af6b228490a8deab519c6aea2c209bc)

[Reference - 2](https://data-flair.training/blogs/fault-tolerance-in-apache-spark/)

### 72) schemaRDD

- **`SchemaRDDs`** are composed Row objects along with a schema that describes the data types of each column in the row.

- A SchemaRDD is similar to a table in a traditional relational database.

- A SchemaRDD can be created from an existing RDD, Parquet file, a JSON dataset, or by running HiveQL against data stored in Apache Hive.

<code>
// One method for defining the schema of an RDD is to make a case class with the desired column
// names and types.
case class Record(key: Int, value: String)

val sc: SparkContext // An existing spark context.
val sqlContext = new SQLContext(sc)

// Importing the SQL context gives access to all the SQL functions and implicit conversions.
import sqlContext._

val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
// Any RDD containing case classes can be registered as a table.  The schema of the table is
// automatically inferred using scala reflection.
rdd.registerTempTable("records")

val results: SchemaRDD = sql("SELECT * FROM records")
</code>

### 73) What is the difference between schemaRDD and Dataframe ?

`SchemaRDD` in `Spark 1.2` has been `replaced by DataFrames` in `Spark 1.3`

### 74) Lazy evaluation

As the name itself indicates its definition, **`lazy evaluation`** in Spark means that the execution will not start until an action is triggered. 

In Spark, the picture of lazy evaluation comes when Spark `transformations` occur.

<img src="../Resources/lazy-evaluation-in-apache-spark-768x402-1.jpg" style="height:300px">

**Transformations** are lazy in nature meaning when we call some operation in RDD, it does not execute immediately.

Spark maintains the record of which operation is being called(Through DAG). We can think Spark RDD as the data, that we built up through transformation.

Since transformations are lazy in nature, so we can execute operation any time by calling an action on data. Hence, in lazy evaluation data is not loaded until it is necessary.

<img src="../Resources/apache-spark-lazy-evaluation.gif" style="height:300px">

### 75) Advantages of Lazy Evaluation in Spark Transformation

There are some benefits of Lazy evaluation in Apache Spark-

- **Increases Manageability:**
By lazy evaluation, users can organize their Apache Spark program into smaller operations. It reduces the number of passes on data by grouping operations.


- **Saves Computation and increases Speed**
Spark Lazy Evaluation plays a key role in saving calculation overhead. Since only necessary values get compute. It saves the trip between driver and cluster, thus speeds up the process.


- **Reduces Complexities**
The two main complexities of any operation are time and space complexity. Using Apache Spark lazy evaluation we can overcome both. Since we do not execute every operation, Hence, the time gets saved. It let us work with an infinite data structure. The action is triggered only when the data is required, it reduces overhead.


- **Optimization**
It provides optimization by reducing the number of queries. Learn more about Apache Spark Optimization.

Hence, Lazy evaluation enhances the power of Apache Spark by reducing the execution time of the RDD operations. It maintains the lineage graph to remember the operations on RDD. As a result, it Optimizes the performance and achieves fault tolerance.

### 76) Fault tolerance in Apache Spark – Reliable Spark Streaming

We will see fault-tolerant stream processing with Spark Streaming and Spark RDD fault tolerance. We will also learn what is Spark Streaming write ahead log, Spark streaming driver failure, Spark streaming worker failure to understand how to achieve fault tolerance in Apache Spark.

<img src="../Resources/Apache-Spark-Fault-Tolerance-01.jpg" style="height:250px">

#### Fault:

Fault refers to failure, thus fault tolerance in Apache Spark is the capability to operate and to recover loss after a failure occurs. 

If we want our system to be fault tolerant, it should be redundant because we require a redundant component to obtain the lost data. The faulty data recovers by redundant data.

#### Spark RDD Fault Tolerance:

Spark operates on data in fault-tolerant file systems like HDFS or S3. So all the RDDs generated from fault tolerant data is fault tolerant. 

But this does not set true for streaming/live data (data over the network). So the key need of fault tolerance in Spark is for this kind of data. 

The basic fault-tolerant semantic of Spark are:

* Since Apache Spark RDD is an immutable dataset, each Spark RDD remembers the lineage of the deterministic operation that was used on fault-tolerant input dataset to create it.


* If due to a worker node failure any partition of an RDD is lost, then that partition can be re-computed from the original fault-tolerant dataset using the lineage of operations.


* Assuming that all of the RDD transformations are deterministic, the data in the final transformed RDD will always be the same irrespective of failures in the Spark cluster.

To achieve fault tolerance for all the generated RDDs, the achieved data replicates among multiple Spark executors in worker nodes in the cluster. 

This results in two types of data that needs to recover in the event of failure:

* **Data received and replicated** – In this, the data gets replicated on one of the other nodes thus the data can be retrieved when a failure.


* **Data received but buffered for replication** – The data is not replicated thus the only way to recover fault is by retrieving it again from the source.

<img src="../Resources/Fault-Tolerance-in-Apache-Spark-min-1.jpg" style="height:300px">

Failure also occurs in worker as well as driver nodes:

* **Failure of worker node** – The node which runs the application code on the Spark cluster is Spark worker node. These are the slave nodes. Any of the worker nodes running executor can fail, thus resulting in loss of in-memory If any receivers were running on failed nodes, then their buffer data will be lost.


* **Failure of driver node** – If there is a failure of the driver node that is running the Spark Streaming application, then SparkContent losses and all executors lose their in-memory data.

`Apache Mesos` helps in making the Spark master fault tolerant by maintaining the backup masters.

It is open source software residing between the application layer and the operating system.

It makes easier to deploy and manage applications in large-scale clustered environment.

Executors are relaunched if they fail.

Post failure, executors are relaunched automatically and spark streaming does parallel recovery by recomputing Spark RDD’s on input data.

Receivers are restarted by the workers when they fail.

#### Fault Tolerance with Receiver-based sources:

For input sources based on receivers, the fault tolerance depends on both- the **failure scenario** and the **type of receiver**.

There are two types of receiver:

* **Reliable receiver** – Once we ensure that the received data replicates, the reliable sources are acknowledged. If the receiver fails, the source will not receive acknowledgment for the buffered data. So, the next time restarts the receiver, the source will resend the data. Hence, no data will lose due to failure.


* **Unreliable Receiver** – Due to the worker or driver failure, the data can loss sincethe receiver does not send an acknowledgment.

**If the worker node fails**, and the receiver is reliable there will be no data loss. But in the case of unreliable receiver data loss will occur. With the unreliable receiver, data received but not replicated can be lost.

#### Spark Streaming write ahead logs:

**If the driver node fails**, all the data that was received and replicated in memory will be lost. This will affect the result of the stateful transformation. To avoid the loss of data, Spark 1.2 introduced **write ahead logs**, which save received data to fault-tolerant storage. All the data received is written to write ahead logs before it can be processed to Spark Streaming.
Write ahead logs are used in database and file system. It ensure the durability of any data operations. It works in the way that first the intention of the operation is written down in the durable log. After this, the operation is applied to the data. This is done because if the system fails in the middle of applying the operation, the lost data can be recovered. It is done by reading the log and reapplying the data it has intended to do.

<table style="width:70%">
<thead>
<tr><th>Deployment Scenario	</th><th>Worker Failure	</th><th>Driver Failure</th></tr>
</thead>
<tbody>
<tr><td>Spark 1.1 or earlier</td><td>Buffered data lost with unreliable receivers</td><td>Buffered data lost with the unreliable receiver.</td></tr>
<tr><td>Spark 1.2 or later without write ahead logs</td><td>Zero data loss with reliable receivers, At-least-once semantics</td><td>Past data lost with all receivers, Undefined semantics</td></tr>
<tr><td>Spark 1.2 or later with write ahead logs</td><td>Zero data loss with reliable receivers, At-least-once semantics</td><td>Zero data loss with reliable receivers and files, At-least-once semantics</td></tr>
</tbody>
</table>

#### High Availability:

We can say any system highly available if its downtime is tolerable. This time depends on how critical the application is. Zero down time is an imaginary term for any system. Consider any machine has an uptime of 97.7%, so its probability to go down will be 0.023. If we have similar two machines, then the probability of both of them going down will be (0.023*0.023). in most high availability environment we have three machines in use, in that case, the probability of going down is (0.023*0.023*0.023) i.e. 0.000012167, which guarantees an uptime of system to be  99.9987833%  which is highly acceptable uptime guarantee. 

[Reference](https://data-flair.training/blogs/fault-tolerance-in-apache-spark/)

### SPARK DEFINITIONS:

#### 77) Node:

A server

#### 78) Worker Node:

A server that is part of the cluster and are available to run Spark jobs.

#### 79) Master Node:

The server that coordinates the Worker nodes.

#### 80) Executor:

A sort of virtual machine inside a node. **One Node can have multiple Executors**.

#### 81) Driver Node:

The Node that initiates the Spark session.

#### 82) Driver (Executor):

The Driver Node will also show up in the Executor list.

### 83) Driver Memory

Spark driver is a main program that declares the transformations and actions on RDDs and submits these requests to the master. This is the program where SparkContext is created.

The `--driver-memory` flag controls the amount of memory to allocate for a driver, which is `1GB` by default and should be increased in case you call a collect() or take(N) action on a large RDD inside your application.

### 84) Executor Memory

The workers is where the tasks are executed `-executors`. They should have resources and network connectivity sufficient to perform transformations and actions on the RDDs defined in the main program.

The `--executor-memory` flag controls the executor heap size (similarly for YARN and Slurm), the default value is 2 GB per executor.

By default, Spark uses `60%` of the configured `executor memory (--executor-memory)` to cache RDDs. The remaining `40%` of memory is available for any objects created during task execution. In case your tasks slow down due to frequent garbage-collecting in JVM or if JVM is running out of memory, lowering this value will help reduce the memory consumption. 

### 85) What is the impact of `spark.cores.max` ?

`spark.cores.max` is upper limit of how many cores can be allocated totally on the Hadoop cluster. If the `‘spark.executor.cores'` parameter is not set then the `'spark.cores.max'` controls the number of spark executors to be allocated.  This parameter will allow a cluster admin to restrict resource usage and there by avoid the scenario where Spark allocates too many cores. The parameter is optional when `'spark.executor.cores'` has been set.

### 86) no of executors

The `--num-executors` defines the number of executors, which really defines the total number of applications that will be run. 

The executors run on the NodeManagers (You can think of this like workers in Spark standalone). A number of Containers (includes vCPU, memory, network, disk, etc.) equal to number of executors specified will be allocated for your Spark application on YARN. Now these executor containers will be run on multiple NodeManagers and that depends on the CapacityScheduler (default scheduler in HDP).

So to sum up, total number of executors is the number of `resource containers` you specify for your application to run.

The `--num-executors` command-line flag or `spark.executor.instances` configuration property control the number of executors requested.

`--num-executors` is number of executors per node

### 87) no of cores per executor

Every Spark executor in an application has the same fixed number of cores and same fixed heap size. The number of cores can be specified with the `--executor-cores` flag when invoking spark-submit, spark-shell, and pyspark from the command line, or by setting the `spark.executor.cores` property in the `spark-defaults.conf` file or on a `SparkConf` object.

Similarly, the heap size can be controlled with the `--executor-memory` flag or the `spark.executor.memory property`. The cores property controls the number of concurrent tasks an executor can run.

`--executor-cores 5` means that each executor can run a maximum of five tasks at the same time. The memory property impacts the amount of data Spark can cache, as well as the maximum sizes of the shuffle data structures used for grouping, aggregations, and joins.

### Note:

 - `--executor-memory`/`spark.executor.memory` controls the executor heap size, but JVMs can also use some memory off heap, for example for interned Strings and direct byte buffers. The value of the `spark.yarn.executor.memoryOverhead` property is added to the executor memory to determine the full memory request to YARN for each executor. It defaults to max(384, .07 * spark.executor.memory).


 - YARN may round the requested memory up a little. YARN’s `yarn.scheduler.minimum-allocation-mb` and `yarn.scheduler.increment-allocation-mb properties` control the minimum and increment request values respectively.

The following (not to scale with defaults) shows the hierarchy of memory properties in Spark and YARN:

<img src="../Resources/spark-tuning2-f1.png" style="height:150px">

### 88) os overhead memory

- `Memory overhead` is the amount of `off-heap memory` allocated to `each executor`.


- By default, memory overhead is set to either `10% of executor memory` or `384`, whichever is higher.


- Memory overhead is used for `Java NIO direct buffers`, `thread stacks`, `shared native libraries`, or `memory mapped files`.

### 89) Cluster Manager

An external service for acquiring resources on the cluster (e.g. `standalone manager`, `Mesos`, `YARN`).

Spark is agnostic to a cluster manager as long as it can acquire executor processes and those can communicate with each other.We are primarily interested in Yarn as the cluster manager. A spark cluster can run in either `yarn cluster` or `yarn-client` mode:

### 90) yarn-client mode

A driver runs on client process, Application Master is only used for requesting resources from YARN.

### 91) yarn-cluster mode

A driver runs inside application master process, client goes away once the application is initialized.

### 92) Cores:

A core is a basic computation unit of CPU and a CPU may have one or more cores to perform tasks at a given time. The more cores we have, the more work we can do. In spark, this controls the number of parallel tasks an executor can run.

<img src="../Resources/Nodes.png" style="height:250px">

### 93) Steps involved in cluster mode for a Spark Job:

1. From the driver code, SparkContext connects to cluster manager (standalone/Mesos/YARN).
2. Cluster Manager allocates resources across the other applications. Any cluster manager can be used as long as the executor processes are running and they communicate with each other.
3. Spark acquires executors on nodes in cluster. Here each application will get its own executor processes.
4. Application code (jar/python files/python egg files) is sent to executors
5. Tasks are sent by SparkContext to the executors.

From the above steps, it is clear that the number of executors and their memory setting play a major role in a spark job. `Running executors with too much memory` often results in `excessive garbage collection delays`.

[Reference](https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application.html)

### 94) garbage collection

Spark runs on the Java Virtual Machine (JVM). Because Spark can store large amounts of data in memory, it has a major reliance on Java’s memory management and garbage collection (GC). Therefore, garbage collection (GC) can be a major issue that can affect many Spark applications.

Common symptoms of excessive GC in Spark are:

* Application speed.
* Executor heartbeat timeout.
* GC overhead limit exceeded error.

### 95) Memory Managenent in Spark

[Why Your Spark Applications Are Slow or Failing, Part 1: Memory Management](https://dzone.com/articles/common-reasons-your-spark-applications-are-slow-or)

[Why Your Spark Apps Are Slow Or Failing, Part II: Data Skew and Garbage Collection](https://dzone.com/articles/why-your-spark-apps-are-slow-or-failing-part-ii-da)

### 96) Data Skew

In an ideal Spark application run, when Spark wants to perform a join, for example, join keys would be evenly distributed and each partition that needed processing would be nicely organized. However, real business data is rarely so neat and cooperative. We often end up with less than ideal data organization across the Spark cluster that results in degraded performance due to `data skew`.

Data skew is not an issue with Spark per se, rather it is a data problem. The cause of the data skew problem is the uneven distribution of the underlying data. Uneven partitioning is sometimes unavoidable in the overall data layout or the nature of the query.

For joins and aggregations Spark needs to co-locate records of a single key in a single partition. Records of a key will always be in a single partition. Similarly, other key records will be distributed in other partitions. If a single partition becomes very large it will cause data skew, which will be problematic for any query engine if no special handling is done.

Common symptoms of data skew are:

- Frozen stages and tasks.
- Low utilization of CPU.
- Out of memory errors.

### 97) Spark Internals:

[Overview](https://github.com/VarunCK25/SparkInternals/blob/master/EnglishVersion/1-Overview.md)

[JobLogicalPlan.](https://github.com/VarunCK25/SparkInternals/blob/master/EnglishVersion/2-JobLogicalPlan.md)

[JobPhysicalPlan](https://github.com/VarunCK25/SparkInternals/blob/master/EnglishVersion/3-JobPhysicalPlan.md)

[shuffleDetails](https://github.com/VarunCK25/SparkInternals/blob/master/markdown/english/4-shuffleDetails.md)

[Architecture](https://github.com/VarunCK25/SparkInternals/blob/master/markdown/english/5-Architecture.md)

[CacheAndCheckpoint](https://github.com/VarunCK25/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md)

[Broadcast](https://github.com/VarunCK25/SparkInternals/blob/master/markdown/english/7-Broadcast.md)

<img src="../Resources/spark_job.png" style="height:500px">

### 98) Scenario 1:

<code>
Specifications:

cores - 8/per machine
no of machines in cluster - 10
Ram memory - 64 GB / per machine
</code>


**When i am running job in this cluster, job is getting completed in 1 min.**

#### a) How much time it will take to complete the same process if the cluster has 20 machines ? Justify the answer.

#### b) How much time it will take to complete the same process if the cluster has 100 machines ? Justify the answer.

#### c) How much time it will take to complete the same process if the cluster has 1000 machines ? Justify the answer.

___

### 99) Sceanrio 2:

#### a) How your data is getting splitted across all the worker nodes in a cluster ?

#### b) What is the default partition size of the cluster ?

___

### 100) Scenario 3:

#### a) What is the relation between process execution time and re-partition ?

#### b) What happens to the process execution time , if i put the repartition(100) whether it will increase or decrease ? Justify the answer.

___

## Thank You !