# 103 Spark optimizations

The goal of this lab is to understand some of the optimization mechanisms of Spark.

- [Spark programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
- [RDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html)
- [PairRDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/PairRDDFunctions.html)

In [1]:
import org.apache.spark

Intitializing Scala interpreter ...

Spark Web UI available at http://LAPTOP-PSTRJPQO:4040
SparkContext available as 'sc' (version = 3.5.1, master = local[*], app id = local-1732288272044)
SparkSession available as 'spark'


import org.apache.spark


In [2]:
// DO NOT EXECUTE - this is needed just to avoid showing errors in the following cells
val sc = spark.SparkContext.getOrCreate()

sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@6ed06efe


In [9]:
// WEATHER structure: (usaf,wban,year,month,day,airTemperature,airTemperatureQuality)
def parseWeather(row:String) = {
    val usaf = row.substring(4,10)
    val wban = row.substring(10,15)
    val year = row.substring(15,19)
    val month = row.substring(19,21)
    val day = row.substring(21,23)
    val airTemperature = row.substring(87,92)
    val airTemperatureQuality = row.charAt(92)

    (usaf,wban,year,month,day,airTemperature.toInt/10,airTemperatureQuality == '1')
}

// STATION structure: (usaf,wban,city,country,state,latitude,longitude,elevation,date_begin,date_end) 
def parseStation(row:String) = {
    def getDouble(str:String) : Double = {
        if (str.isEmpty)
            return 0
        else
            return str.toDouble
    }
    val columns = row.split(",").map(_.replaceAll("\"",""))
    val latitude = getDouble(columns(6))
    val longitude = getDouble(columns(7))
    val elevation = getDouble(columns(8))
    (columns(0),columns(1),columns(2),columns(3),columns(4),latitude,longitude,elevation,columns(9),columns(10))  
}

parseWeather: (row: String)(String, String, String, String, String, Int, Boolean)
parseStation: (row: String)(String, String, String, String, String, Double, Double, Double, String, String)


In [10]:
val rddWeather = sc.
  textFile("../../../../datasets/big/weather-sample10.txt").
  map(x => parseWeather(x))
val rddStation = sc.
  textFile("../../../../datasets/weather-stations.csv").
  map(x => parseStation(x))

rddWeather: org.apache.spark.rdd.RDD[(String, String, String, String, String, Int, Boolean)] = MapPartitionsRDD[26] at map at <console>:33
rddStation: org.apache.spark.rdd.RDD[(String, String, String, String, String, Double, Double, Double, String, String)] = MapPartitionsRDD[29] at map at <console>:36


## 103-1 Simple job optimization

Optimize the two jobs (avg temperature and max temperature) by avoiding the repetition of the same computations and by enforcing a partitioning criteria.
- There are multiple methods to repartition an RDD: check the ```coalesce```, ```partitionBy```, and ```repartition``` methods on the documentation and choose the best one.
  - To create a partitioning function, you must ```import org.apache.spark.HashPartitioner``` and then define ```val p = new HashPartitioner(n)``` where ```n``` is the number of partitions to create
- Verify your persisted data in the web UI
- Verify the execution plan of your RDDs with ```rdd.toDebugString``` (shell only) or on the web UI

In [11]:
// Average temperature for every month
rddWeather.
  filter(_._6<999).
  map(x => (x._4, x._6)).
  aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1), (a1,a2)=>(a1._1+a2._1,a1._2+a2._2)).
  map({case(k,v)=>(k,Math.round(v._1*100/v._2)/100.0)}).
  collect()

res4: Array[(String, Double)] = Array((10,13.32), (11,8.15), (12,4.08), (01,3.06), (02,5.5), (03,8.31), (04,11.75), (05,15.83), (06,18.53), (07,19.96), (08,20.31), (09,17.24))


In [12]:
// Maximum temperature for every month
rddWeather.
  filter(_._6<999).
  map(x => (x._4, x._6)).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

res5: Array[(String, Int)] = Array((10,55), (11,43), (12,47), (01,55), (02,47), (03,44), (04,48), (05,49), (06,56), (07,56), (08,56), (09,55))


Exercise 1 => cached repetition operations

In [7]:
import org.apache.spark.HashPartitioner
val partitioner = new HashPartitioner(8)
val cachedRddWeather = rddWeather.
  filter(_._6 < 999).
  map(x => (x._4, x._6)).
  partitionBy(partitioner).
  cache()

// Average temperature for every month
cachedRddWeather.aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1), (a1,a2)=>(a1._1+a2._1,a1._2+a2._2)).
  map({case(k,v)=>(k,Math.round(v._1*100/v._2)/100.0)}).
  collect()

// Maximum temperature for every month
cachedRddWeather.
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

import org.apache.spark.HashPartitioner
partitioner: org.apache.spark.HashPartitioner = org.apache.spark.HashPartitioner@8
cachedRddWeather: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[20] at partitionBy at <console>:33
res3: Array[(String, Int)] = Array((11,43), (08,56), (09,55), (12,47), (01,55), (02,47), (03,44), (04,48), (05,49), (06,56), (07,56), (10,55))


## 103-2 RDD preparation

Check the five possibilities to prepare the Station RDD for subsequent processing and identify the best one.

In [None]:
import org.apache.spark.HashPartitioner
val p2 = new HashPartitioner(8)

// _1 and _2 are the fields composing the key; _4 and _8 are country and elevation, respectively
val rddS1 = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p2).
  cache().
  map({case (k,v) => (k,(v._4,v._8))})
val rddS2 = rddStation.
  keyBy(x => x._1 + x._2).
  map({case (k,v) => (k,(v._4,v._8))}).
  cache().
  partitionBy(p2)
val rddS3 = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p2).
  map({case (k,v) => (k,(v._4,v._8))}).
  cache()
val rddS4 = rddStation.
  keyBy(x => x._1 + x._2).
  map({case (k,v) => (k,(v._4,v._8))}).
  partitionBy(p2).
  cache()
val rddS5 = rddStation.
  map(x => (x._1 + x._2, (x._4,x._8))).
  partitionBy(p2).
  cache()

Soluzione => keyBy() e map() rompono la partizione, quindi devono essere fatte prima di partitionBy(). Tutto ciò che accade dopo cache() non è salvato e deve essere ricalcolato ogni volta, quindi mettere cache() il più tardi possibile.

====>>>> rddS4 e rddS5 sono le opzioni migliori

## 103-3 Joining RDDs

Define the join between rddWeather and rddStation and compute:
- The maximum temperature for every city
- The maximum temperature for every city in the UK: 
  - ```StationData.country == "UK"```
- Sort the results by descending temperature
  - ```map({case(k,v)=>(v,k)})``` to invert key with value and vice versa

Hints & considerations:
- Keep only temperature values <999
- Join syntax: ```rdd1.join(rdd2)```
  - Both RDDs should be structured as key-value RDDs with the same key: usaf + wban
- Consider partitioning and caching to optimize the join
  - Careful: it is not enough for the two RDDs to have the same number of partitions; they must have the same partitioner!
- Verify the execution plan of the join in the web UI

In [14]:
// Clear the cache
sc.getPersistentRDDs.foreach(_._2.unpersist())

In [23]:
val p3 = new HashPartitioner(8)

val rddSt = rddStation.
  map( r => (r._1 + r._2, (r._3, r._4))).
  partitionBy(p3)
val rddWe = rddWeather.
  filter(_._6 < 999).
  map (r => (r._1 + r._2, r._6)).
  partitionBy(p3)

// JOIN
val rddJoined = rddSt.join(rddWe).cache()


p3: org.apache.spark.HashPartitioner = org.apache.spark.HashPartitioner@8
rddSt: org.apache.spark.rdd.RDD[(String, (String, String))] = ShuffledRDD[75] at partitionBy at <console>:38
rddWe: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[78] at partitionBy at <console>:42
rddJoined: org.apache.spark.rdd.RDD[(String, ((String, String), Int))] = MapPartitionsRDD[81] at join at <console>:45
res7: Array[(String, Int)] = Array((RYBINSK,28), (POWRANNA (TASMANIA FEEDLOT),31), (GUNNEDAH POOL,33), (HUICHON,34), (NJURBA,40), (RICHMOND OPERATION CENTRE,7), (TURI,23), (KARSHI,39), (ENVIRONM BUOY 62425,-10), (ILIAMNA AIRPORT,20), (SINAIA-1500,26), (UFA,36), (DIKILI,31), (HART ISLAND (AUT)  NS,22), (CAHOKIA/ST. LOUIS,36), (BOJNORD,36), (RONG-SHUI,25), (ONEGA,29), (ANJU,32), (GANDER INTL,29)...


In [26]:
//MAXIMUM TEMPERATURE FOR EVERY CITY
rddJoined.
  map(r => (r._2._1._1, r._2._2)).
  reduceByKey((x,y) => {if(x>y) x else y})
  .collect()

res10: Array[(String, Int)] = Array((RYBINSK,28), (POWRANNA (TASMANIA FEEDLOT),31), (GUNNEDAH POOL,33), (HUICHON,34), (NJURBA,40), (RICHMOND OPERATION CENTRE,7), (TURI,23), (KARSHI,39), (ENVIRONM BUOY 62425,-10), (ILIAMNA AIRPORT,20), (SINAIA-1500,26), (UFA,36), (DIKILI,31), (HART ISLAND (AUT)  NS,22), (CAHOKIA/ST. LOUIS,36), (BOJNORD,36), (RONG-SHUI,25), (ONEGA,29), (ANJU,32), (GANDER INTL,29), (KRUSEVAC,40), (LES ESCALDES,19), (UST-KARENGA,0), (HINOJOSA DEL DUQUE,26), (OBAN,22), (MAREEBA AIRPORT,32), (FEIRA DE SANTANA,21), (PLATFORM NO. 62125,21), (SCHOOLCRAFT CO,27), (EL BORMA,45), (LUSAKA INTL,35), (SURABAYA/PERAK,33), (PEMBERTON AIRPORT  BC,34), (ALLENDALE,35), (TED STEVENS ANCHORAGE INTL,21), (HAMILTON/RAVALLI CO,33), (ATHENS/BEN EPPS AIRPORT,37), (GLEBA CELESTE,36), (SURABAYA/GED...


In [25]:
// The maximum temperature for every city in the UK
rddJoined.
  filter(r => r._2._1._2 == "UK").
  map(r => (r._2._1._1, r._2._2)).
  reduceByKey((x,y) => {if(x>y) x else y})
  .collect()

res9: Array[(String, Int)] = Array((WATTISHAM,30), (CAMBRIDGE,30), (NORWICH,31), (PRESTWICK RNAS,21), (LYNEHAM,28), (WAINFLEET (AUT),24), (GREAT MALVERN,28), (TAIN RANGE (SAWS),21), (COVENTRY,28), (LOSSIEMOUTH,23), (LEEDS WEATHER CTR,30), (SEA LION ISLAND,12), (ISLE OF PORTLAND,20), (ENVIRONM BUOY 62128,17), (OBAN,22), (PEMBRY SANDS,23), (NEWCASTLE,23), (GLASGOW,26), (HONINGTON,26), (PERSHORE,26), (ST MAWGAN,23), (BARKSTON HEATH,13), (MADLEY,23), (BALTASOUND NO.2,17), (BOLTSHOPE PARK,20), (SELLA NESS,17), (ODIHAM,27), (LOCHRANZA NO3,21), (KIRKWALL,19), (ASPATRIA,25), (FOULA,13), (LAKENHEATH,33), (SULE SKERRY,16), (SENNYBRIDGE NO2,26), (CULDROSE,22), (GREENOCK MRCC,24), (NOTTINGHAM/WATNALL,29), (BUTT OF LEWIS (LH),8), (GRAVESEND-BROADNESS,27), (SAUGHALL,21), (LEEMING,28), (PLATFORM NO. 6...


In [27]:
//Sort the results by descending temperature
rddJoined.
  map({case(k,v)=>(v,k)}).
  sortByKey(false).
  map({case(k,v)=>(v,k)}).
  collect()


res11: Array[(String, ((String, String), Int))] = Array((69679099999,(("",""),-25)), (69665499999,(("",""),-22)), (69665499999,(("",""),-22)), (69679099999,(("",""),-19)), (69679099999,(("",""),-18)), (69679099999,(("",""),-18)), (69665499999,(("",""),-18)), (69708499999,(("",""),-17)), (69679099999,(("",""),-17)), (69679099999,(("",""),-17)), (69689499999,(("",""),-16)), (69689499999,(("",""),-16)), (69167499999,(("",""),-16)), (69167499999,(("",""),-16)), (69680499999,(("",""),-15)), (69689499999,(("",""),-15)), (69679099999,(("",""),-15)), (69689499999,(("",""),-14)), (69689499999,(("",""),-14)), (69689499999,(("",""),-14)), (69689499999,(("",""),-14)), (69091499999,(("",""),-14)), (69091499999,(("",""),-14)), (69111499999,(("",""),-14)), (69061499999,(("",""),-14)), (69689499999,(("...


## 103-4 Memory occupation

Use Spark's web UI to verify the space occupied by the provided RDDs.

In [None]:
import org.apache.spark.storage.StorageLevel._

sc.getPersistentRDDs.foreach(_._2.unpersist())

val memRdd = rddWeather.cache()
val memSerRdd = memRdd.map(x=>x).persist(MEMORY_ONLY_SER)
val diskRdd = memRdd.map(x=>x).persist(DISK_ONLY)

## 103-5 Evaluating different join methods

Consider the following scenario:
- We have a disposable RDD of Weather data (i.e., it is used only once): ```rddW```
- And we have an RDD of Station data that is used many times: ```rddS```
- Both RDDs are cached (```collect()```is called to enforce caching)

We want to join the two RDDS. Which option is best?
- Simply join the two RDDs
- Enforce on ```rddW1``` the same partitioner of ```rddS``` (and then join)
- Exploit broadcast variables

In [None]:
import org.apache.spark.HashPartitioner
val p = new HashPartitioner(8)
sc.getPersistentRDDs.foreach(_._2.unpersist())

val rddW = rddWeather.
  filter(_._6<999).
  keyBy(x => x._1 + x._2).
  persist()
val rddS = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p).
  cache()

// Collect to enforce caching
rddW.collect()
rddS.collect()

In [None]:
// Is it better to simply join the two RDDs..
rddW.
  join(rddS).
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect

In [None]:
// ..to enforce on rddW1 the same partitioner of rddS..
rddW.
  partitionBy(p).
  join(rddS).
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

In [None]:
// ..or to exploit broadcast variables?
val bRddS = sc.broadcast(rddS.map(x => (x._1, x._2._3)).collectAsMap())
val rddJ = rddW.
  map({case (k,v) => (bRddS.value.get(k),v._6)}).
  filter(_._1!=None)
rddJ.
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

## 103-6 Optimizing Exercise 3

Start from the result of the last job of Exercise 3; is there a more efficient way to compute the same result?
- Try it on weather-sample10
- Hint: consider that each station is located in only one country

In [None]:
import org.apache.spark.HashPartitioner
import org.apache.spark.storage.StorageLevel._
val p = new HashPartitioner(8)
sc.getPersistentRDDs.foreach(_._2.unpersist())

val rddS = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p).
  cache()
val rddW = rddWeather.
  filter(_._6<999).
  keyBy(x => x._1 + x._2).
  partitionBy(p).
  persist(MEMORY_AND_DISK_SER)

// Collect to enforce caching
rddW.collect()
rddS.collect()

In [None]:
// First version
rddW.
  join(rddS).
  filter(_._2._2._4=="UK").
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  map({case(k,v)=>(v,k)}).
  sortByKey(false).
  collect()