# 103 Spark optimizations

The goal of this lab is to understand some of the optimization mechanisms of Spark.

- [Spark programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
- [RDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html)
- [PairRDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/PairRDDFunctions.html)

In [1]:
import org.apache.spark

Intitializing Scala interpreter ...

Spark Web UI available at http://LAPTOP-RRADKAOL:4040
SparkContext available as 'sc' (version = 3.5.1, master = local[*], app id = local-1730907170520)
SparkSession available as 'spark'


import org.apache.spark


In [None]:
// DO NOT EXECUTE - this is needed just to avoid showing errors in the following cells
val sc = spark.SparkContext.getOrCreate()

In [2]:
// WEATHER structure: (usaf,wban,year,month,day,airTemperature,airTemperatureQuality)
def parseWeather(row:String) = {
    val usaf = row.substring(4,10)
    val wban = row.substring(10,15)
    val year = row.substring(15,19)
    val month = row.substring(19,21)
    val day = row.substring(21,23)
    val airTemperature = row.substring(87,92)
    val airTemperatureQuality = row.charAt(92)

    (usaf,wban,year,month,day,airTemperature.toInt/10,airTemperatureQuality == '1')
}

// STATION structure: (usaf,wban,city,country,state,latitude,longitude,elevation,date_begin,date_end) 
def parseStation(row:String) = {
    def getDouble(str:String) : Double = {
        if (str.isEmpty)
            return 0
        else
            return str.toDouble
    }
    val columns = row.split(",").map(_.replaceAll("\"",""))
    val latitude = getDouble(columns(6))
    val longitude = getDouble(columns(7))
    val elevation = getDouble(columns(8))
    (columns(0),columns(1),columns(2),columns(3),columns(4),latitude,longitude,elevation,columns(9),columns(10))  
}

parseWeather: (row: String)(String, String, String, String, String, Int, Boolean)
parseStation: (row: String)(String, String, String, String, String, Double, Double, Double, String, String)


In [3]:
val rddWeather = sc.
  textFile("../../../../datasets/big/weather-sample10.txt").
  map(x => parseWeather(x))
val rddStation = sc.
  textFile("../../../../datasets/weather-stations.csv").
  map(x => parseStation(x))

rddWeather: org.apache.spark.rdd.RDD[(String, String, String, String, String, Int, Boolean)] = MapPartitionsRDD[2] at map at <console>:29
rddStation: org.apache.spark.rdd.RDD[(String, String, String, String, String, Double, Double, Double, String, String)] = MapPartitionsRDD[5] at map at <console>:32


## 103-1 Simple job optimization

Optimize the two jobs (avg temperature and max temperature) by avoiding the repetition of the same computations and by enforcing a partitioning criteria.
- There are multiple methods to repartition an RDD: check the ```coalesce```, ```partitionBy```, and ```repartition``` methods on the documentation and choose the best one.
  - To create a partitioning function, you must ```import org.apache.spark.HashPartitioner``` and then define ```val p = new HashPartitioner(n)``` where ```n``` is the number of partitions to create
- Verify your persisted data in the web UI
- Verify the execution plan of your RDDs with ```rdd.toDebugString``` (shell only) or on the web UI

In [4]:
// Average temperature for every month
rddWeather.
  filter(_._6<999).
  map(x => (x._4, x._6)).
  aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1), (a1,a2)=>(a1._1+a2._1,a1._2+a2._2)).
  map({case(k,v)=>(k,Math.round(v._1*100/v._2)/100.0)}).
  collect()

res0: Array[(String, Double)] = Array((10,13.32), (11,8.15), (12,4.08), (01,3.06), (02,5.5), (03,8.31), (04,11.75), (05,15.83), (06,18.53), (07,19.96), (08,20.31), (09,17.24))


In [None]:
// Maximum temperature for every month
rddWeather.
  filter(_._6<999).
  map(x => (x._4, x._6)).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

In [5]:
// 10 partitions due to the 10 keys number, to avoid data skew but to maintain a good structure

import org.apache.spark.HashPartitioner

val p = new HashPartitioner(10)

// a cashed rdd is used, since there's no need to do the same filter/mapping twice. 
// partition with hashing let the reduce operations being done on their own partition, so there's no need to shuffle. It can increase performance.
// there's no need to sort, so it's good performance wise

val cashedRdd = rddWeather.filter(_._6<999).map(x => (x._4, x._6)).partitionBy(p).cache()

val avgTemps = cashedRdd
                .aggregateByKey((0.0, 0.0))((a, v) => (a._1 + v, a._2 + 1), (a1, a2) => (a1._1 + a2._1, a1._2 + a2._2))
                .map({case(k,v)=>(k,Math.round(v._1*100/v._2)/100.0)})
                .collect()

val minMax = cashedRdd
                .reduceByKey((x, y) => if (x < y) y else x)
                .collect()

minMax

import org.apache.spark.HashPartitioner
p: org.apache.spark.HashPartitioner = org.apache.spark.HashPartitioner@a
cashedRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[12] at partitionBy at <console>:35
avgTemps: Array[(String, Double)] = Array((04,11.75), (05,15.83), (06,18.53), (07,19.96), (08,20.31), (09,17.24), (01,3.06), (10,13.32), (02,5.5), (11,8.15), (03,8.31), (12,4.08))
minMax: Array[(String, Int)] = Array((04,48), (05,49), (06,56), (07,56), (08,56), (09,55), (01,55), (10,55), (02,47), (11,43), (03,44), (12,47))
res1: Array[(String, Int)] = Array((04,48), (05,49), (06,56), (07,56), (08,56), (09,55), (01,55), (10,55), (02,47), (11,43), (03,44), (12,47))


## 103-2 RDD preparation

Check the five possibilities to prepare the Station RDD for subsequent processing and identify the best one.

In [None]:
import org.apache.spark.HashPartitioner
val p2 = new HashPartitioner(8)

// _1 and _2 are the fields composing the key; _4 and _8 are country and elevation, respectively
val rddS1 = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p2).
  cache().
  map({case (k,v) => (k,(v._4,v._8))})
val rddS2 = rddStation.
  keyBy(x => x._1 + x._2).
  map({case (k,v) => (k,(v._4,v._8))}).
  cache().
  partitionBy(p2)
val rddS3 = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p2).
  map({case (k,v) => (k,(v._4,v._8))}).
  cache()
val rddS4 = rddStation.
  keyBy(x => x._1 + x._2).
  map({case (k,v) => (k,(v._4,v._8))}).
  partitionBy(p2).
  cache()
val rddS5 = rddStation.
  map(x => (x._1 + x._2, (x._4,x._8))).
  partitionBy(p2).
  cache()

In [None]:
// la soluzione corretta è rddS4, rddS5 con una preferenza sulla quarta perché è meglio leggibile
// i primi due sono ovviamente sbagliati, perché l'ultima operazione dovrebbe essere cache
// rdds3 è sbagliato perché fa il partitionBy prima del map. Con il map si potrebbero ipoteticamente modificare le chiavi, per questo va sempre fatto prima.

## 103-3 Joining RDDs

Define the join between rddWeather and rddStation and compute:
- The maximum temperature for every city
- The maximum temperature for every city in the UK: 
  - ```StationData.country == "UK"```
- Sort the results by descending temperature
  - ```map({case(k,v)=>(v,k)})``` to invert key with value and vice versa

Hints & considerations:
- Keep only temperature values <999
- Join syntax: ```rdd1.join(rdd2)```
  - Both RDDs should be structured as key-value RDDs with the same key: usaf + wban
- Consider partitioning and caching to optimize the join
  - Careful: it is not enough for the two RDDs to have the same number of partitions; they must have the same partitioner!
- Verify the execution plan of the join in the web UI

In [14]:
import org.apache.spark.HashPartitioner
val p = new HashPartitioner(8)

val rdd1 = rddWeather.filter(_._6<999)
    .keyBy(x => x._1 + x._2)
    .map({case (k, v) => (k, v._6)})
    .partitionBy(p)

val rdd2 = rddStation
    .keyBy(x => x._1 + x._2)
    .map({case (k,v) => (k, (v._3, v._4))})
    .partitionBy(p)

// (key, (temperature, (city, country)))
val joinedRdd = rdd1.join(rdd2)

val flatJoinedRdd = joinedRdd.map { case (key, (temperature, (city, country))) =>
  (key,temperature, city, country)
}.cache()

val maxTempPerCity = flatJoinedRdd.map(x => (x._3, x._2)).reduceByKey((x, y) => if (x > y) x else y)
val maxTempUK = flatJoinedRdd.filter(x => x._4 == "UK").map(x => (x._3, x._2)).reduceByKey((x, y) => if (x > y) x else y)
maxTempUK.collect()

import org.apache.spark.HashPartitioner
p: org.apache.spark.HashPartitioner = org.apache.spark.HashPartitioner@8
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[81] at partitionBy at <console>:49
rdd2: org.apache.spark.rdd.RDD[(String, (String, String))] = ShuffledRDD[84] at partitionBy at <console>:54
joinedRdd: org.apache.spark.rdd.RDD[(String, (Int, (String, String)))] = MapPartitionsRDD[87] at join at <console>:57
flatJoinedRdd: org.apache.spark.rdd.RDD[(String, Int, String, String)] = MapPartitionsRDD[88] at map at <console>:59
maxTempPerCity: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[90] at reduceByKey at <console>:63
maxTempUK: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[93] at reduceByKey at <console>:64
res7: Array[(String, Int)] = Arra...


## 103-4 Memory occupation

Use Spark's web UI to verify the space occupied by the provided RDDs.

In [None]:
import org.apache.spark.storage.StorageLevel._

sc.getPersistentRDDs.foreach(_._2.unpersist())

val memRdd = rddWeather.cache()
val memSerRdd = memRdd.map(x=>x).persist(MEMORY_ONLY_SER)
val diskRdd = memRdd.map(x=>x).persist(DISK_ONLY)

## 103-5 Evaluating different join methods

Consider the following scenario:
- We have a disposable RDD of Weather data (i.e., it is used only once): ```rddW```
- And we have an RDD of Station data that is used many times: ```rddS```
- Both RDDs are cached (```collect()```is called to enforce caching)

We want to join the two RDDS. Which option is best?
- Simply join the two RDDs
- Enforce on ```rddW1``` the same partitioner of ```rddS``` (and then join)
- Exploit broadcast variables

In [None]:
import org.apache.spark.HashPartitioner
val p = new HashPartitioner(8)
sc.getPersistentRDDs.foreach(_._2.unpersist())

val rddW = rddWeather.
  filter(_._6<999).
  keyBy(x => x._1 + x._2).
  persist()
val rddS = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p).
  cache()

// Collect to enforce caching
rddW.collect()
rddS.collect()

In [None]:
// Is it better to simply join the two RDDs..
// no, different partitions force shuffling, which ruins performance
rddW.
  join(rddS).
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect

In [None]:
// ..to enforce on rddW1 the same partitioner of rddS..
// meglio rispetto a prima, riduco lo shuffling perché il partizionamento mette ordine, però...
// nel caso di data skew, non viene eliminato lo shuffling, quindi la performance non è ancora ottimale
rddW.
  partitionBy(p).
  join(rddS).
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

In [None]:
// ..or to exploit broadcast variables?
// se il dataset ha dimensioni ridotte è molto efficiente questo approccio
// consente di trasformare il primo db in una variabile broadcast, che viene inviata all'altro rdd

val bRddS = sc.broadcast(rddS.map(x => (x._1, x._2._3)).collectAsMap())
val rddJ = rddW.
  map({case (k,v) => (bRddS.value.get(k),v._6)}).
  filter(_._1!=None)
rddJ.
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

## 103-6 Optimizing Exercise 3

Start from the result of the last job of Exercise 3; is there a more efficient way to compute the same result?
- Try it on weather-sample10
- Hint: consider that each station is located in only one country

In [None]:
import org.apache.spark.HashPartitioner
import org.apache.spark.storage.StorageLevel._
val p = new HashPartitioner(8)
sc.getPersistentRDDs.foreach(_._2.unpersist())

val rddS = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p).
  cache()
val rddW = rddWeather.
  filter(_._6<999).
  keyBy(x => x._1 + x._2).
  partitionBy(p).
  persist(MEMORY_AND_DISK_SER)

// Collect to enforce caching
rddW.collect()
rddS.collect()

In [None]:
// First version
rddW.
  join(rddS).
  filter(_._2._2._4=="UK").
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  map({case(k,v)=>(v,k)}).
  sortByKey(false).
  collect()