# 103 Spark optimizations

The goal of this lab is to understand some of the optimization mechanisms of Spark.

- [Spark programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
- [RDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html)
- [PairRDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/PairRDDFunctions.html)

In [20]:
import org.apache.spark

import org.apache.spark


In [None]:
// DO NOT EXECUTE - this is needed just to avoid showing errors in the following cells
val sc = spark.SparkContext.getOrCreate()

In [3]:
// WEATHER structure: (usaf,wban,year,month,day,airTemperature,airTemperatureQuality)
def parseWeather(row:String) = {
    val usaf = row.substring(4,10)
    val wban = row.substring(10,15)
    val year = row.substring(15,19)
    val month = row.substring(19,21)
    val day = row.substring(21,23)
    val airTemperature = row.substring(87,92)
    val airTemperatureQuality = row.charAt(92)

    (usaf,wban,year,month,day,airTemperature.toInt/10,airTemperatureQuality == '1')
}

// STATION structure: (usaf,wban,city,country,state,latitude,longitude,elevation,date_begin,date_end) 
def parseStation(row:String) = {
    def getDouble(str:String) : Double = {
        if (str.isEmpty)
            return 0
        else
            return str.toDouble
    }
    val columns = row.split(",").map(_.replaceAll("\"",""))
    val latitude = getDouble(columns(6))
    val longitude = getDouble(columns(7))
    val elevation = getDouble(columns(8))
    (columns(0),columns(1),columns(2),columns(3),columns(4),latitude,longitude,elevation,columns(9),columns(10))  
}

parseWeather: (row: String)(String, String, String, String, String, Int, Boolean)
parseStation: (row: String)(String, String, String, String, String, Double, Double, Double, String, String)


In [46]:
val rddWeather = sc.
  textFile("../../../../datasets/big/weather-sample1.txt").
  map(x => parseWeather(x))
val rddStation = sc.
  textFile("../../../../datasets/weather-stations.csv").
  map(x => parseStation(x))

rddWeather: org.apache.spark.rdd.RDD[(String, String, String, String, String, Int, Boolean)] = MapPartitionsRDD[141] at map at <console>:41
rddStation: org.apache.spark.rdd.RDD[(String, String, String, String, String, Double, Double, Double, String, String)] = MapPartitionsRDD[144] at map at <console>:44


## 103-1 Simple job optimization

Optimize the two jobs (avg temperature and max temperature) by avoiding the repetition of the same computations and by enforcing a partitioning criteria.
- There are multiple methods to repartition an RDD: check the ```coalesce```, ```partitionBy```, and ```repartition``` methods on the documentation and choose the best one.
  - To create a partitioning function, you must ```import org.apache.spark.HashPartitioner``` and then define ```val p = new HashPartitioner(n)``` where ```n``` is the number of partitions to create
- Verify your persisted data in the web UI
- Verify the execution plan of your RDDs with ```rdd.toDebugString``` (shell only) or on the web UI

In [6]:
import org.apache.spark.HashPartitioner
val n = 10
val p = new HashPartitioner(n)

import org.apache.spark.HashPartitioner
n: Int = 10
p: org.apache.spark.HashPartitioner = org.apache.spark.HashPartitioner@a


In [34]:
// Average temperature for every month
def AverageTemperatureByMonth(rdd:org.apache.spark.rdd.RDD[(String,String,String,String,String,Int,Boolean)]) = {
    rdd.
        filter(_._6<999).
        map(x => (x._4, x._6)).
        aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1), (a1,a2)=>(a1._1+a2._1,a1._2+a2._2)).
        map({case(k,v)=>(k,Math.round(v._1*100/v._2)/100.0)}).
        collect()
}

AverageTemperatureByMonth(rddWeather)

AverageTemperatureByMonth: (rdd: org.apache.spark.rdd.RDD[(String, String, String, String, String, Int, Boolean)])Array[(String, Double)]
res9: Array[(String, Double)] = Array((10,13.32), (11,8.15), (12,4.08), (01,3.06), (02,5.5), (03,8.31), (04,11.75), (05,15.83), (06,18.53), (07,19.96), (08,20.31), (09,17.24))


In [8]:
// optimized average temperature
val partitionedRDD = rddWeather
  .filter(_._6 < 999)
  .map(x => (x._4, x._6))
  .repartition(10)
  .cache()

val avgTempByMonth = partitionedRDD
  .aggregateByKey((0.0, 0.0))(
    (a: (Double, Double), v: Double) => (a._1 + v, a._2 + 1),
    (a1: (Double, Double), a2: (Double, Double)) => (a1._1 + a2._1, a1._2 + a2._2)
  )
  .map { case (k, v) => (k, Math.round(v._1 * 100 / v._2) / 100.0) }
  .collect()

<console>: 34: error: type mismatch;

In [35]:
// Maximum temperature for every month
def MaxTemperatureByMonth(rdd:org.apache.spark.rdd.RDD[(String,String,String,String,String,Int,Boolean)]) = {
    rdd.
        filter(_._6<999).
        map(x => (x._4, x._6)).
        reduceByKey((x,y)=>{if(x<y) y else x}).
        collect()
}

MaxTemperatureByMonth(rddWeather)

MaxTemperatureByMonth: (rdd: org.apache.spark.rdd.RDD[(String, String, String, String, String, Int, Boolean)])Array[(String, Int)]
res10: Array[(String, Int)] = Array((10,55), (11,43), (12,47), (01,55), (02,47), (03,44), (04,48), (05,49), (06,56), (07,56), (08,56), (09,55))


In [42]:
// Optimized maximum temperature for every month
def OptimizedMaxTemperatureByMonth(rdd:org.apache.spark.rdd.RDD[(String,String,String,String,String,Int,Boolean)]) = {
    rdd
      .filter(_._6<999)
      .map(x => (x._4, x._6))
      .reduceByKey((x,y)=>{if(x<y) y else x})
      .repartition(10)
      .cache()
      .collect()
}

OptimizedMaxTemperatureByMonth: (rdd: org.apache.spark.rdd.RDD[(String, String, String, String, String, Int, Boolean)])Array[(String, Int)]


In [None]:
OptimizedMaxTemperatureByMonth(rddWeather)

## 103-2 RDD preparation

Check the five possibilities to prepare the Station RDD for subsequent processing and identify the best one.

In [12]:
import org.apache.spark.HashPartitioner
val p2 = new HashPartitioner(8)

// _1 and _2 are the fields composing the key; _4 and _8 are country and elevation, respectively
val rddS1 = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p2).
  cache().
  map({case (k,v) => (k,(v._4,v._8))}) // mapping after the partition breaks the partitioning criteria
val rddS2 = rddStation.
  keyBy(x => x._1 + x._2).
  map({case (k,v) => (k,(v._4,v._8))}).
  cache().
  partitionBy(p2) // partitioning after the caching breaks the partitioning criteria
val rddS3 = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p2).
  map({case (k,v) => (k,(v._4,v._8))}). // mapping before the caching and partitioning criteria
  cache()
val rddS4 = rddStation.
  keyBy(x => x._1 + x._2).
  map({case (k,v) => (k,(v._4,v._8))}).
  partitionBy(p2).
  cache()
val rddS5 = rddStation.
  map(x => (x._1 + x._2, (x._4,x._8))).
  partitionBy(p2). // partitioning before the caching and after the mapping criteria
  cache()

import org.apache.spark.HashPartitioner
p2: org.apache.spark.HashPartitioner = org.apache.spark.HashPartitioner@8
rddS1: org.apache.spark.rdd.RDD[(String, (String, Double))] = MapPartitionsRDD[29] at map at <console>:37
rddS2: org.apache.spark.rdd.RDD[(String, (String, Double))] = ShuffledRDD[32] at partitionBy at <console>:42
rddS3: org.apache.spark.rdd.RDD[(String, (String, Double))] = MapPartitionsRDD[35] at map at <console>:46
rddS4: org.apache.spark.rdd.RDD[(String, (String, Double))] = ShuffledRDD[38] at partitionBy at <console>:51
rddS5: org.apache.spark.rdd.RDD[(String, (String, Double))] = ShuffledRDD[40] at partitionBy at <console>:55


In [17]:
val t1 = System.nanoTime()
rddS1.collect()
val t2 = System.nanoTime()
val t3 = System.nanoTime()
rddS2.collect()
val t4 = System.nanoTime()
val t5 = System.nanoTime()
rddS3.collect()
val t6 = System.nanoTime()
val t7 = System.nanoTime()
rddS4.collect()
val t8 = System.nanoTime()
val t9 = System.nanoTime()
rddS5.collect()

val times = List(t2-t1,t4-t3,t6-t5,t8-t7,t9-t8)
times.zipWithIndex.min

t1: Long = 1284976439000
t2: Long = 1285138099500
t3: Long = 1285138106400
t4: Long = 1285526429200
t5: Long = 1285526442700
t6: Long = 1286194044800
t7: Long = 1286194056200
t8: Long = 1286547294200
t9: Long = 1286547305900
times: List[Long] = List(161660500, 388322800, 667602100, 353238000, 11700)
res4: (Long, Int) = (11700,4)


## 103-3 Joining RDDs

Define the join between rddWeather and rddStation and compute:
- The maximum temperature for every city
- The maximum temperature for every city in the UK: 
  - ```StationData.country == "UK"```
- Sort the results by descending temperature
  - ```map({case(k,v)=>(v,k)})``` to invert key with value and vice versa

Hints & considerations:
- Keep only temperature values <999
- Join syntax: ```rdd1.join(rdd2)```
  - Both RDDs should be structured as key-value RDDs with the same key: usaf + wban
- Consider partitioning and caching to optimize the join
  - Careful: it is not enough for the two RDDs to have the same number of partitions; they must have the same partitioner!
- Verify the execution plan of the join in the web UI

In [85]:
sc.getPersistentRDDs.foreach(_._2.unpersist()) // this command cleans the cache

In [104]:
val rddWeatherPairs = rddWeather
  .filter(r => r._6 < 999) //don't cache datasets when iterating over them only once, cache them if you need to do multiple operations on them
  .map(r => (r._1 + r._2, r._6)) // key is usaf + wban, value is airTemperature
val rddStationPairs = rddStation
  .map(r => (r._1 + r._2, (r._3, r._4))) // key is usaf + wban, value is (city, country)

val rddCached = rddWeatherPairs
  .join(rddStationPairs)
  .map(r => ((r._2._2._1, r._2._2._2), r._2._1)) // key is city + country, value is airTemperature 
  // (city, country) -> airTemperature is more space efficient than (city) -> airTemperature, country
  .reduceByKey((x,y) => {if(x < y) y else x}) // maximum temperature for every city
  .cache() // cache the result of the join

rddWeatherPairs: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[328] at map at <console>:45
rddStationPairs: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[329] at map at <console>:47
rddCached: org.apache.spark.rdd.RDD[((String, String), Int)] = ShuffledRDD[334] at reduceByKey at <console>:53


In [105]:
rddCached.collect()

res53: Array[((String, String), Int)] = Array(((MUSTASAARI VALASSAARET,FI),20), ((LOCH GLASCARNOCH,UK),22), ((KAJAANI,FI),30), ((FOULA,UK),15), ((TULLOCH BRIDGE,UK),26), ((RACKWICK,UK),19), ((INVERGORDON HARBOUR,UK),20), ((BUTT OF LEWIS (LH),UK),8), ((KIRKWALL,UK),19), ((VARKAUS,FI),29), ((SELLA NESS,UK),19), ((LOCHBOISDALE,UK),21), ((HAILUOTO ISLAND,FI),28), ((LEMLAND NYHAMN,FI),19), ((INKOO BAGASKAR,FI),22), ((BARRA ISLAND,UK),21), ((FOULA NO2,UK),15), ((BENBECULA,UK),20), ((KAUHAVA,FI),29), ((GLENLIVET,UK),23), ((AHTARI MYLLYMAKI,FI),28), ((JOMALA,FI),22), ((KRUUNUPYY,FI),28), ((WATERSTEIN,UK),21), ((KINLOSS,UK),24), ((JYVASKYLA,FI),29), ((HANKO RUSSARO,FI),22), ((SUMBURGH,UK),30), ((HALLI,FI),29), ((CAIRNGORM SUMMIT,UK),14), ((NORTH RONA ISLAND,UK),21), ((LAPPEENRANTA HIEKKAPAKKA,FI...


In [110]:
rddCached
  .filter(r => r._1._2 == "UK")
  .map({case (k,v) => (v,k)}) // invert key with value so that SortByKey can sort by temperature
  .sortByKey(false) // sort by descending temperature
  .collect()

res56: Array[(Int, (String, String))] = Array((34,(SOUTH UIST RANGE,UK)), (30,(SUMBURGH,UK)), (26,(TULLOCH BRIDGE,UK)), (24,(KINLOSS,UK)), (24,(ALTNAHARRA NO2,UK)), (23,(GLENLIVET,UK)), (23,(LOSSIEMOUTH,UK)), (23,(AULTBEA NO2,UK)), (23,(FOYERS,UK)), (23,(AVIEMORE,UK)), (23,(INVERNESS,UK)), (23,(SKYE/LUSA,UK)), (22,(LOCH GLASCARNOCH,UK)), (22,(TAIN RANGE (SAWS),UK)), (21,(LOCHBOISDALE,UK)), (21,(BARRA ISLAND,UK)), (21,(WATERSTEIN,UK)), (21,(NORTH RONA ISLAND,UK)), (21,(KILMORY,UK)), (20,(INVERGORDON HARBOUR,UK)), (20,(BENBECULA,UK)), (20,(STORNOWAY,UK)), (20,(SCATSTA,UK)), (19,(RACKWICK,UK)), (19,(KIRKWALL,UK)), (19,(SELLA NESS,UK)), (18,(BALTASOUND NO.2,UK)), (17,(SULE SKERRY,UK)), (17,(AONACH MOR,UK)), (17,(LERWICK,UK)), (16,(MUCKLE HOLM,UK)), (15,(FOULA,UK)), (15,(FOULA NO2,UK)), (15,...


## 103-4 Memory occupation

Use Spark's web UI to verify the space occupied by the provided RDDs.

In [117]:
// simply running this does not show anything on "Storage", because cache is lazy
import org.apache.spark.storage.StorageLevel._

sc.getPersistentRDDs.foreach(_._2.unpersist())

val memRdd = rddWeather.cache()
val memSerRdd = memRdd.map(x=>x).persist(MEMORY_ONLY_SER)
val diskRdd = memRdd.map(x=>x).persist(DISK_ONLY)

<console>: 60: error: value persist is not a member of Array[(String, String, String, String, String, Int, Boolean)]

In [114]:
memRdd.first()
memSerRdd.first()
diskRdd.first()

res60: (String, String, String, String, String, Int, Boolean) = (028690,99999,2000,04,01,999,false)


## 103-5 Evaluating different join methods

Consider the following scenario:
- We have a disposable RDD of Weather data (i.e., it is used only once): ```rddW```
- And we have an RDD of Station data that is used many times: ```rddS```
- Both RDDs are cached (```collect()```is called to enforce caching)

We want to join the two RDDS. Which option is best?
- Simply join the two RDDs
- Enforce on ```rddW1``` the same partitioner of ```rddS``` (and then join)
- Exploit broadcast variables

In [None]:
import org.apache.spark.HashPartitioner
val p = new HashPartitioner(8)
sc.getPersistentRDDs.foreach(_._2.unpersist())

val rddW = rddWeather.
  filter(_._6<999).
  keyBy(x => x._1 + x._2).
  persist()
val rddS = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p).
  cache()

// Collect to enforce caching
rddW.collect()
rddS.collect()

In [None]:
// Is it better to simply join the two RDDs..
rddW.
  join(rddS).
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect

In [None]:
// ..to enforce on rddW1 the same partitioner of rddS..
rddW.
  partitionBy(p).
  join(rddS).
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

In [None]:
// ..or to exploit broadcast variables?
val bRddS = sc.broadcast(rddS.map(x => (x._1, x._2._3)).collectAsMap())
val rddJ = rddW.
  map({case (k,v) => (bRddS.value.get(k),v._6)}).
  filter(_._1!=None)
rddJ.
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

## 103-6 Optimizing Exercise 3

Start from the result of the last job of Exercise 3; is there a more efficient way to compute the same result?
- Try it on weather-sample10
- Hint: consider that each station is located in only one country

In [None]:
import org.apache.spark.HashPartitioner
import org.apache.spark.storage.StorageLevel._
val p = new HashPartitioner(8)
sc.getPersistentRDDs.foreach(_._2.unpersist())

val rddS = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p).
  cache()
val rddW = rddWeather.
  filter(_._6<999).
  keyBy(x => x._1 + x._2).
  partitionBy(p).
  persist(MEMORY_AND_DISK_SER)

// Collect to enforce caching
rddW.collect()
rddS.collect()

In [None]:
// First version
rddW.
  join(rddS).
  filter(_._2._2._4=="UK").
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  map({case(k,v)=>(v,k)}).
  sortByKey(false).
  collect()