In [1]:
%%init_spark
launcher.driver_memory = '10G'

# 103 Spark optimizations

The goal of this lab is to understand some of the optimization mechanisms of Spark.

- [Spark programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
- [RDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html)
- [PairRDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/PairRDDFunctions.html)

In [2]:
import org.apache.spark
import org.apache.spark.HashPartitioner

Intitializing Scala interpreter ...

Spark Web UI available at http://BigData:4040
SparkContext available as 'sc' (version = 3.5.1, master = local[*], app id = local-1730283176661)
SparkSession available as 'spark'


import org.apache.spark
import org.apache.spark.HashPartitioner


In [3]:
// WEATHER structure: (usaf,wban,year,month,day,airTemperature,airTemperatureQuality)
def parseWeather(row:String) = {
    val usaf = row.substring(4,10)
    val wban = row.substring(10,15)
    val year = row.substring(15,19)
    val month = row.substring(19,21)
    val day = row.substring(21,23)
    val airTemperature = row.substring(87,92)
    val airTemperatureQuality = row.charAt(92)

    (usaf,wban,year,month,day,airTemperature.toInt/10,airTemperatureQuality == '1')
}

// STATION structure: (usaf,wban,city,country,state,latitude,longitude,elevation,date_begin,date_end) 
def parseStation(row:String) = {
    def getDouble(str:String) : Double = {
        if (str.isEmpty)
            return 0
        else
            return str.toDouble
    }
    val columns = row.split(",").map(_.replaceAll("\"",""))
    val latitude = getDouble(columns(6))
    val longitude = getDouble(columns(7))
    val elevation = getDouble(columns(8))
    (columns(0),columns(1),columns(2),columns(3),columns(4),latitude,longitude,elevation,columns(9),columns(10))  
}

parseWeather: (row: String)(String, String, String, String, String, Int, Boolean)
parseStation: (row: String)(String, String, String, String, String, Double, Double, Double, String, String)


In [7]:
val rddWeather = sc
  .textFile("../../../../datasets/big/weather-sample10.txt")
  .map(x => parseWeather(x))
val rddStation = sc
  .textFile("../../../../datasets/weather-stations.csv")
  .map(x => parseStation(x))

rddWeather: org.apache.spark.rdd.RDD[(String, String, String, String, String, Int, Boolean)] = MapPartitionsRDD[8] at map at <console>:30
rddStation: org.apache.spark.rdd.RDD[(String, String, String, String, String, Double, Double, Double, String, String)] = MapPartitionsRDD[11] at map at <console>:33


## 103-1 Simple job optimization

Optimize the two jobs (avg temperature and max temperature) by avoiding the repetition of the same computations and by enforcing a partitioning criteria.
- There are multiple methods to repartition an RDD: check the ```coalesce```, ```partitionBy```, and ```repartition``` methods on the documentation and choose the best one.
  - To create a partitioning function, you must ```import org.apache.spark.HashPartitioner``` and then define ```val p = new HashPartitioner(n)``` where ```n``` is the number of partitions to create
- Verify your persisted data in the web UI
- Verify the execution plan of your RDDs with ```rdd.toDebugString``` (shell only) or on the web UI

In [8]:
// Average temperature for every month
rddWeather
  .filter(_._6<999)
  .map(x => (x._4, x._6))
  .aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1), (a1,a2)=>(a1._1+a2._1,a1._2+a2._2))
  .map({case(k,v)=>(k,Math.round(v._1*100/v._2)/100.0)})
  .collect()

res0: Array[(String, Double)] = Array((10,13.32), (11,8.15), (12,4.08), (01,3.06), (02,5.5), (03,8.31), (04,11.75), (05,15.83), (06,18.53), (07,19.96), (08,20.31), (09,17.24))


In [9]:
// Maximum temperature for every month
rddWeather
  .filter(_._6<999)
  .map(x => (x._4, x._6))
  .reduceByKey((x,y)=>{if(x<y) y else x})
  .collect()

res1: Array[(String, Int)] = Array((10,55), (11,43), (12,47), (01,55), (02,47), (03,44), (04,48), (05,49), (06,56), (07,56), (08,56), (09,55))


In [10]:
val partitionedRddWeather = rddWeather
    .filter(_._6<999)
    .map(x => (x._4, x._6))
    .partitionBy(new HashPartitioner(12))
    .cache()

partitionedRddWeather.count

partitionedRddWeather: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[21] at partitionBy at <console>:29
res2: Long = 4185148


In [11]:
partitionedRddWeather
    .aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1), (a1,a2)=>(a1._1+a2._1,a1._2+a2._2))
    .map({case(k,v)=>(k,Math.round(v._1*100/v._2)/100.0)})
    .collect()

partitionedRddWeather
    .reduceByKey((x,y)=>{if(x<y) y else x})
    .collect()

res3: Array[(String, Int)] = Array((01,55), (02,47), (03,44), (04,48), (05,49), (06,56), (07,56), (10,55), (11,43), (08,56), (09,55), (12,47))


In [12]:
partitionedRddWeather.unpersist(true)

res4: partitionedRddWeather.type = ShuffledRDD[21] at partitionBy at <console>:29


## 103-2 RDD preparation

Check the five possibilities to prepare the Station RDD for subsequent processing and identify the best one.

In [None]:
// _1 and _2 are the fields composing the key; _4 and _8 are country and elevation, respectively
/*
val rddS1 = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p2).
  cache().
  map({case (k,v) => (k,(v._4,v._8))})
val rddS2 = rddStation.
  keyBy(x => x._1 + x._2).
  map({case (k,v) => (k,(v._4,v._8))}).
  cache().
  partitionBy(p2)
val rddS3 = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p2).
  map({case (k,v) => (k,(v._4,v._8))}).
  cache()
val rddS4 = rddStation.
  keyBy(x => x._1 + x._2).
  map({case (k,v) => (k,(v._4,v._8))}).
  partitionBy(p2).
  cache()
val rddS5 = rddStation.
  map(x => (x._1 + x._2, (x._4,x._8))).
  partitionBy(p2).
  cache()
*/

## 103-3 Joining RDDs

Define the join between rddWeather and rddStation and compute:
- The maximum temperature for every city
- The maximum temperature for every city in the UK: 
  - ```StationData.country == "UK"```
- Sort the results by descending temperature
  - ```map({case(k,v)=>(v,k)})``` to invert key with value and vice versa

Hints & considerations:
- Keep only temperature values <999
- Join syntax: ```rdd1.join(rdd2)```
  - Both RDDs should be structured as key-value RDDs with the same key: usaf + wban
- Consider partitioning and caching to optimize the join
  - Careful: it is not enough for the two RDDs to have the same number of partitions; they must have the same partitioner!
- Verify the execution plan of the join in the web UI

In [13]:
val nPartitions = 12

// STATION structure: (usaf, wban,city,country,state,latitude,longitude,elevation,date_begin,date_end)
val rddS = rddStation
  .map(x => (x._1 + x._2, (x._3, x._4)))
  .partitionBy(new HashPartitioner(nPartitions))
  .setName("RDD_Stations")
  .cache()

// WEATHER structure: (usaf, wban,year,month,day,airTemperature,airTemperatureQuality)
val rddW = rddWeather
  .filter(_._6<999)
  .map(x => (x._1 + x._2, x._6))
  .partitionBy(new HashPartitioner(nPartitions))
  .setName("RDD_Weather")
  .cache()

// JOIN structure: (key, (temperature, city, country))
val rddJ = rddW
  .join(rddS)
  .map({case (key, (temperature, (city, country))) => (key, (temperature, city, country))})
  .partitionBy(new HashPartitioner(nPartitions)).setName("RDD_Join")
  .cache()

nPartitions: Int = 12
rddS: org.apache.spark.rdd.RDD[(String, (String, String))] = RDD_Stations ShuffledRDD[26] at partitionBy at <console>:32
rddW: org.apache.spark.rdd.RDD[(String, Int)] = RDD_Weather ShuffledRDD[29] at partitionBy at <console>:40
rddJ: org.apache.spark.rdd.RDD[(String, (Int, String, String))] = RDD_Join ShuffledRDD[34] at partitionBy at <console>:48


In [14]:
// max temperature per city
val maxTemperaturePerCity = rddJ
    .map({case (k, (temp, city, country)) => (city, temp)})
    .reduceByKey((x, y) => x.max(y))

maxTemperaturePerCity.collect

maxTemperaturePerCity: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[36] at reduceByKey at <console>:29
res5: Array[(String, Int)] = Array((RYBINSK,28), (DIKILI,31), (MISCOU ISLAND (AUT),23), (TURI,23), (MORARESTI,28), (ILIAMNA AIRPORT,20), (SINAIA-1500,26), (KALBARRI,38), (SHELDON MUNI,35), (ORLANDO SANFORD,36), (SPADEADAM,23), (GANDER INTL,29), (KRUSEVAC,40), (OBAN,22), (PLATFORM NO. 62125,21), (FEIRA DE SANTANA,21), (MARTIN STATE,34), (LUSAKA INTL,35), (FUKUOKA,35), (LJUBLJANA/BEZIGRAD,34), (BOLSOJ USKAN ISLAND,1), (BATUMI,27), (HAMILTON/RAVALLI CO,33), (ATHENS/BEN EPPS AIRPORT,37), (GLEBA CELESTE,36), (GARISSA,37), (COUNCIL BLUFFS MUNI,37), (NARRANDERA GOLF CLUB,36), (ARGYLE AERODROME,40), (MARTHAS VINEYARD,28), (NETAJI SUBHASH CHANDRA BOSE INTL,37), (ROSEBURG RGNL,35), (SAI...


In [15]:
// max temperature per city in UK
val maxTemperaturePerCityInUK = rddJ
    .filter({case (key, (temp, city, country)) => country == "UK"})
    .map({case (k, (temp, city, country)) => (city, temp)})
    .reduceByKey((x, y) => x.max(y))

maxTemperaturePerCityInUK.collect

maxTemperaturePerCityInUK: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[39] at reduceByKey at <console>:30
res6: Array[(String, Int)] = Array((CHURCH LAWFORD,29), (SPADEADAM,23), (LYNEHAM,28), (GATWICK,28), (WALTON-ON-THE-NAZE,23), (SHAP,23), (SEA LION ISLAND,12), (PENDENNIS POINT,19), (MANCHESTER,28), (FYLINGDALES,22), (OBAN,22), (PEMBRY SANDS,23), (GLOUCESTERSHIRE,29), (BIRMINGHAM,29), (KENLEY AIRFIELD,26), (BOLTSHOPE PARK,20), (TEST SAMOS 03761,28), (ASPATRIA,25), (CHURCH FENTON,25), (LAKENHEATH,33), (BALLYPATRICK FOREST,20), (SENNYBRIDGE NO2,26), (CAIRNWELL,17), (GREENOCK MRCC,24), (PLATFORM NO. 63101,15), (DATA BUOY 63117,17), (HIGH WYCOMBE HQAIR,25), (ALTNAHARRA NO2,24), (ALDERGROVE,23), (SHAWBURY,26), (SNOWDON SUMMIT,17), (LEUCHARS,24), (CARDIFF,24), (PORTRUSH,21), (RACK...


In [16]:
// results sorting
val sortedMaxTempCity = maxTemperaturePerCity
    .map({case (k, v) => (v, k)})
    .sortByKey(false)
    .collect

val sortedMaxTempCityUK = maxTemperaturePerCityInUK
    .map({case (k, v) => (v, k)})
    .sortByKey(false)
    .collect

sortedMaxTempCity: Array[(Int, String)] = Array((56,TIN CITY AFS), (56,TATALINA LRRS), (56,GALENA A.), (56,INDIAN MOUNTAIN AFS), (56,POINT LAY), (56,SPARREVOHN AFS), (56,CAPE LISBURNE AFS), (55,AKJOUJT), (55,PUERTO CARRENO), (55,CAPE NEWENHAM AFS), (55,BAHAWALPUR), (54,CAPE ROMANZOFF AFS), (53,ALI AL SALEM), (52,""), (50,SUFFOLK EXECUTIVE), (50,AHWAZ), (50,LZ BULL / EXERCISE), (50,AGHAJARI), (50,SHAHID ASYAEE), (49,SAFI-ABAD DEZFUL), (49,INDIANA CO), (49,KUWAIT INTL), (49,ABADAN), (48,TIMIMOUN), (48,SIBI), (48,K.F.I.A. (KING FAHAD INT. AIRPORT) DAMMA), (48,OWATONNA DEGNER RGNL), (48,ASWAN INTL), (48,NEWCASTLE), (48,IMPERIAL CO), (48,AL AHSA), (48,HASSAKAH), (48,TOUAT CHEIKH SIDI MOHAMED BELKEBIR), (48,DOHA INTL), (48,NEEDLES AIRPORT), (47,RAFHA), (47,ABU DHABI INTL), (47,DESERT RESORTS ...


## 103-4 Memory occupation

Use Spark's web UI to verify the space occupied by the provided RDDs.

In [None]:
import org.apache.spark.storage.StorageLevel._

sc.getPersistentRDDs.foreach(_._2.unpersist())

val memRdd = rddWeather.cache()
val memSerRdd = memRdd.map(x=>x).persist(MEMORY_ONLY_SER)
val diskRdd = memRdd.map(x=>x).persist(DISK_ONLY)

## 103-5 Evaluating different join methods

Consider the following scenario:
- We have a disposable RDD of Weather data (i.e., it is used only once): ```rddW```
- And we have an RDD of Station data that is used many times: ```rddS```
- Both RDDs are cached (```collect()```is called to enforce caching)

We want to join the two RDDS. Which option is best?
- Simply join the two RDDs
- Enforce on ```rddW1``` the same partitioner of ```rddS``` (and then join)
- Exploit broadcast variables

In [None]:
import org.apache.spark.HashPartitioner
val p = new HashPartitioner(8)
sc.getPersistentRDDs.foreach(_._2.unpersist())

val rddW = rddWeather.
  filter(_._6<999).
  keyBy(x => x._1 + x._2).
  persist()
val rddS = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p).
  cache()

// Collect to enforce caching
rddW.collect()
rddS.collect()

In [None]:
// Is it better to simply join the two RDDs..
rddW.
  join(rddS).
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect

In [None]:
// ..to enforce on rddW1 the same partitioner of rddS..
rddW.
  partitionBy(p).
  join(rddS).
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

In [None]:
// ..or to exploit broadcast variables?
val bRddS = sc.broadcast(rddS.map(x => (x._1, x._2._3)).collectAsMap())
val rddJ = rddW.
  map({case (k,v) => (bRddS.value.get(k),v._6)}).
  filter(_._1!=None)
rddJ.
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

## 103-6 Optimizing Exercise 3

Start from the result of the last job of Exercise 3; is there a more efficient way to compute the same result?
- Try it on weather-sample10
- Hint: consider that each station is located in only one country

In [None]:
import org.apache.spark.HashPartitioner
import org.apache.spark.storage.StorageLevel._
val p = new HashPartitioner(8)
sc.getPersistentRDDs.foreach(_._2.unpersist())

val rddS = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p).
  cache()
val rddW = rddWeather.
  filter(_._6<999).
  keyBy(x => x._1 + x._2).
  partitionBy(p).
  persist(MEMORY_AND_DISK_SER)

// Collect to enforce caching
rddW.collect()
rddS.collect()

In [None]:
// First version
rddW.
  join(rddS).
  filter(_._2._2._4=="UK").
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  map({case(k,v)=>(v,k)}).
  sortByKey(false).
  collect()