# 103 Spark optimizations

The goal of this lab is to understand some of the optimization mechanisms of Spark.

- [Spark programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
- [RDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html)
- [PairRDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/PairRDDFunctions.html)

In [1]:
import org.apache.spark

Intitializing Scala interpreter ...

Spark Web UI available at http://lab42-04-01.campusfc.dir.unibo.it:4040
SparkContext available as 'sc' (version = 3.5.2, master = local[*], app id = local-1730889143654)
SparkSession available as 'spark'


import org.apache.spark


In [2]:
// DO NOT EXECUTE - this is needed just to avoid showing errors in the following cells
val sc = spark.SparkContext.getOrCreate()

sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@fcfcd9e


In [2]:
// WEATHER structure: (usaf,wban,year,month,day,airTemperature,airTemperatureQuality)
def parseWeather(row:String) = {
    val usaf = row.substring(4,10)
    val wban = row.substring(10,15)
    val year = row.substring(15,19)
    val month = row.substring(19,21)
    val day = row.substring(21,23)
    val airTemperature = row.substring(87,92)
    val airTemperatureQuality = row.charAt(92)

    (usaf,wban,year,month,day,airTemperature.toInt/10,airTemperatureQuality == '1')
}

// STATION structure: (usaf,wban,city,country,state,latitude,longitude,elevation,date_begin,date_end) 
def parseStation(row:String) = {
    def getDouble(str:String) : Double = {
        if (str.isEmpty)
            return 0
        else
            return str.toDouble
    }
    val columns = row.split(",").map(_.replaceAll("\"",""))
    val latitude = getDouble(columns(6))
    val longitude = getDouble(columns(7))
    val elevation = getDouble(columns(8))
    (columns(0),columns(1),columns(2),columns(3),columns(4),latitude,longitude,elevation,columns(9),columns(10))  
}

parseWeather: (row: String)(String, String, String, String, String, Int, Boolean)
parseStation: (row: String)(String, String, String, String, String, Double, Double, Double, String, String)


In [3]:
val rddWeather = sc.
  textFile("../../../../datasets/big/weather-sample1.txt").
  map(x => parseWeather(x))
val rddStation = sc.
  textFile("../../../../datasets/weather-stations.csv").
  map(x => parseStation(x))

rddWeather: org.apache.spark.rdd.RDD[(String, String, String, String, String, Int, Boolean)] = MapPartitionsRDD[2] at map at <console>:29
rddStation: org.apache.spark.rdd.RDD[(String, String, String, String, String, Double, Double, Double, String, String)] = MapPartitionsRDD[5] at map at <console>:32


## 103-1 Simple job optimization

Optimize the two jobs (avg temperature and max temperature) by avoiding the repetition of the same computations and by enforcing a partitioning criteria.
- There are multiple methods to repartition an RDD: check the ```coalesce```, ```partitionBy```, and ```repartition``` methods on the documentation and choose the best one.
  - To create a partitioning function, you must ```import org.apache.spark.HashPartitioner``` and then define ```val p = new HashPartitioner(n)``` where ```n``` is the number of partitions to create
- Verify your persisted data in the web UI
- Verify the execution plan of your RDDs with ```rdd.toDebugString``` (shell only) or on the web UI

In [4]:
// Average temperature for every month
rddWeather.
  filter(_._6<999).
  map(x => (x._4, x._6)).
  aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1), (a1,a2)=>(a1._1+a2._1,a1._2+a2._2)).
  map({case(k,v)=>(k,Math.round(v._1*100/v._2)/100.0)}).
  collect()

res0: Array[(String, Double)] = Array((10,13.32), (11,8.15), (12,4.08), (01,3.06), (02,5.5), (03,8.31), (04,11.75), (05,15.83), (06,18.53), (07,19.96), (08,20.31), (09,17.24))


In [5]:
// Maximum temperature for every month
rddWeather.
  filter(_._6<999).
  map(x => (x._4, x._6)).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

res1: Array[(String, Int)] = Array((10,55), (11,43), (12,47), (01,55), (02,47), (03,44), (04,48), (05,49), (06,56), (07,56), (08,56), (09,55))


## 103-2 RDD preparation

Check the five possibilities to prepare the Station RDD for subsequent processing and identify the best one.

In [6]:
import org.apache.spark.HashPartitioner
val p2 = new HashPartitioner(8)

// _1 and _2 are the fields composing the key; _4 and _8 are country and elevation, respectively
val rddS1 = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p2).
  cache().
  map({case (k,v) => (k,(v._4,v._8))})
val rddS2 = rddStation.
  keyBy(x => x._1 + x._2).
  map({case (k,v) => (k,(v._4,v._8))}).
  cache().
  partitionBy(p2)
val rddS3 = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p2).
  map({case (k,v) => (k,(v._4,v._8))}).
  cache()
val rddS4 = rddStation.
  keyBy(x => x._1 + x._2).
  map({case (k,v) => (k,(v._4,v._8))}).
  partitionBy(p2).
  cache()
val rddS5 = rddStation.
  map(x => (x._1 + x._2, (x._4,x._8))).
  partitionBy(p2).
  cache()

import org.apache.spark.HashPartitioner
p2: org.apache.spark.HashPartitioner = org.apache.spark.HashPartitioner@8
rddS1: org.apache.spark.rdd.RDD[(String, (String, Double))] = MapPartitionsRDD[15] at map at <console>:33
rddS2: org.apache.spark.rdd.RDD[(String, (String, Double))] = ShuffledRDD[18] at partitionBy at <console>:38
rddS3: org.apache.spark.rdd.RDD[(String, (String, Double))] = MapPartitionsRDD[21] at map at <console>:42
rddS4: org.apache.spark.rdd.RDD[(String, (String, Double))] = ShuffledRDD[24] at partitionBy at <console>:47
rddS5: org.apache.spark.rdd.RDD[(String, (String, Double))] = ShuffledRDD[26] at partitionBy at <console>:51


## 103-3 Joining RDDs

Define the join between rddWeather and rddStation and compute:
- The maximum temperature for every city
- The maximum temperature for every city in the UK: 
  - ```StationData.country == "UK"```
- Sort the results by descending temperature
  - ```map({case(k,v)=>(v,k)})``` to invert key with value and vice versa

Hints & considerations:
- Keep only temperature values <999
- Join syntax: ```rdd1.join(rdd2)```
  - Both RDDs should be structured as key-value RDDs with the same key: usaf + wban
- Consider partitioning and caching to optimize the join
  - Careful: it is not enough for the two RDDs to have the same number of partitions; they must have the same partitioner!
- Verify the execution plan of the join in the web UI

In [4]:
val rddWeatherKV = rddWeather
  .map(x => (x._1 + x._2, x._6))
  .filter({case (k,v) => v < 999})
val rddStationKV = rddStation
  .map(x => (x._1 + x._2, (x._3, x._4)))

val maxTemperatureForCity = 
  rddStationKV.join(rddWeatherKV)
  .reduceByKey({ case ((c1, t1), (c2, t2)) => if (t1 >= t2) {(c1, t1)} else {(c2, t2)}})
  .cache()

maxTemperatureForCity
  .sortBy({case (_,((city, country))) => city})
  .collect()

rddWeatherKV: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[7] at filter at <console>:28
rddStationKV: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[8] at map at <console>:30
maxTemperatureForCity: org.apache.spark.rdd.RDD[(String, ((String, String), Int))] = MapPartitionsRDD[12] at reduceByKey at <console>:34
res0: Array[(String, ((String, String), Int))] = Array((69055499999,(("",""),25)), (69800499999,(("",""),41)), (69167499999,(("",""),37)), (69373499999,(("",""),19)), (69061499999,(("",""),38)), (69378499999,(("",""),32)), (69703499999,(("",""),33)), (69375499999,(("",""),21)), (69708499999,(("",""),-17)), (69379499999,(("",""),35)), (69372499999,(("",""),11)), (69170499999,(("",""),14)), (69680499999,(("",""),36)), (69689499999,(("",""),3...


In [5]:
val maxTemperatureForCityUK = maxTemperatureForCity
  .filter({case (_,((city, country), _)) => country == "UK"})
  .cache()

maxTemperatureForCityUK
  .collect()

maxTemperatureForCityUK: org.apache.spark.rdd.RDD[(String, ((String, String), Int))] = MapPartitionsRDD[18] at filter at <console>:26
res1: Array[(String, ((String, String), Int))] = Array((03815099999,((LIZARD LIGHTHOUSE,UK),19)), (99609099999,((ENVIRONM BUOY 62146,UK),14)), (03305099999,((CAPEL CURIG NO3,UK),28)), (03075099999,((WICK,UK),18)), (03482099999,((MARHAM,UK),31)), (03507099999,((SENNYBRIDGE NO2,UK),26)), (03174099999,((FIFE NESS,UK),19)), (99502099999,((PLATFORM NO. 62101,UK),21)), (03408099999,((CYNWYD,UK),19)), (88986099999,((SOUTH THULE IS.,UK),8)), (03766399999,((BIGGIN HILL,UK),27)), (03281099999,((FYLINGDALES,UK),22)), (03302099999,((VALLEY,UK),27)), (03072099999,((CAIRNWELL,UK),17)), (03717099999,((CARDIFF WEATHER CENTRE,UK),26)), (03384099999,((LECONFIELD (AUT),UK)...


In [6]:
val sortedMaxTemperatureForCityUK = maxTemperatureForCityUK
  .sortBy({ case (_,(_, temp)) => temp },ascending = false)
  .cache()

sortedMaxTemperatureForCityUK.collect()

sortedMaxTemperatureForCityUK: org.apache.spark.rdd.RDD[(String, ((String, String), Int))] = MapPartitionsRDD[23] at sortBy at <console>:26
res2: Array[(String, ((String, String), Int))] = Array((99522099999,((PLATFORM 62120,UK),40)), (03583099999,((LAKENHEATH,UK),33)), (03482099999,((MARHAM,UK),31)), (03492099999,((NORWICH,UK),31)), (03772099999,((HEATHROW,UK),31)), (03577099999,((MILDENHALL,UK),31)), (03590099999,((WATTISHAM,UK),30)), (03571599999,((CAMBRIDGE,UK),30)), (03263599999,((DURHAM TEES VALLEY AIRPORT,UK),30)), (03495099999,((COLTISHALL,UK),30)), (03373599999,((HUMBERSIDE,UK),30)), (03414599999,((COSFORD,UK),30)), (03418599999,((NOTTINGHAM EAST MIDLANDS,UK),30)), (03347099999,((LEEDS WEATHER CTR,UK),30)), (03658099999,((BENSON,UK),30)), (03347599999,((SHEFFIELD CITY,UK),30))...


In [7]:
sc.getPersistentRDDs.foreach(_._2.unpersist())

## 103-4 Memory occupation

Use Spark's web UI to verify the space occupied by the provided RDDs.

In [4]:
import org.apache.spark.storage.StorageLevel._

sc.getPersistentRDDs.foreach(_._2.unpersist())

val memRdd = rddWeather.cache()
val memSerRdd = memRdd.map(x=>x).persist(MEMORY_ONLY_SER)
val diskRdd = memRdd.map(x=>x).persist(DISK_ONLY)

import org.apache.spark.storage.StorageLevel._
memRdd: rddWeather.type = MapPartitionsRDD[2] at map at <console>:29
memSerRdd: org.apache.spark.rdd.RDD[(String, String, String, String, String, Int, Boolean)] = MapPartitionsRDD[6] at map at <console>:31
diskRdd: org.apache.spark.rdd.RDD[(String, String, String, String, String, Int, Boolean)] = MapPartitionsRDD[7] at map at <console>:32


In [5]:
memRdd.collect()
memSerRdd.collect()
diskRdd.collect()

res0: Array[(String, String, String, String, String, Int, Boolean)] = Array((028690,99999,2000,04,01,999,false), (028690,99999,2000,04,01,999,false), (028690,99999,2000,04,01,-8,true), (028690,99999,2000,04,01,999,false), (028690,99999,2000,04,01,999,false), (028690,99999,2000,04,02,-9,true), (028690,99999,2000,04,02,999,false), (028690,99999,2000,04,02,999,false), (028690,99999,2000,04,02,-10,true), (028690,99999,2000,04,02,999,false), (028690,99999,2000,04,02,-11,true), (028690,99999,2000,04,02,-11,true), (028690,99999,2000,04,02,-9,true), (028690,99999,2000,04,02,-9,true), (028690,99999,2000,04,02,-6,true), (028690,99999,2000,04,02,-6,true), (028690,99999,2000,04,02,-5,true), (028690,99999,2000,04,02,-4,true), (028690,99999,2000,04,02,-4,true), (028690,99999,2000,04,02,-4,true), (028...


## 103-5 Evaluating different join methods

Consider the following scenario:
- We have a disposable RDD of Weather data (i.e., it is used only once): ```rddW```
- And we have an RDD of Station data that is used many times: ```rddS```
- Both RDDs are cached (```collect()```is called to enforce caching)

We want to join the two RDDS. Which option is best?
- Simply join the two RDDs
- Enforce on ```rddW1``` the same partitioner of ```rddS``` (and then join)
- Exploit broadcast variables

In [4]:
import org.apache.spark.HashPartitioner
var p = new HashPartitioner(8)
sc.getPersistentRDDs.foreach(_._2.unpersist())

var rddW = rddWeather.
  filter(_._6<999).
  keyBy(x => x._1 + x._2).
  persist()
var rddS = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p).
  cache()

// Collect to enforce caching
rddW.collect()
rddS.collect()

import org.apache.spark.HashPartitioner
p: org.apache.spark.HashPartitioner = org.apache.spark.HashPartitioner@8
rddW: org.apache.spark.rdd.RDD[(String, (String, String, String, String, String, Int, Boolean))] = MapPartitionsRDD[7] at keyBy at <console>:33
rddS: org.apache.spark.rdd.RDD[(String, (String, String, String, String, String, Double, Double, Double, String, String))] = ShuffledRDD[9] at partitionBy at <console>:37
res0: Array[(String, (String, String, String, String, String, Double, Double, Double, String, String))] = Array((00701199999,(007011,99999,CWOS 07011,"","",0.0,0.0,0.0,20120101,20121129)), (00704499999,(007044,99999,CWOS 07044,"","",0.0,0.0,0.0,20120127,20120127)), (00840599999,(008405,99999,XM14,"","",0.0,0.0,0.0,20120101,20120827)), (00841699999,(008416,99999,X...


In [5]:
// Is it better to simply join the two RDDs..
rddW.
  join(rddS).
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect

res1: Array[(String, Int)] = Array((TAIN RANGE (SAWS),22), (LOSSIEMOUTH,23), (HALLI,29), (SUOMUSJARVI,26), (BALTASOUND NO.2,18), (SELLA NESS,19), (KANKAANPAA NIINISALO PUOLVOIM,29), (KIRKWALL,19), (FOULA,15), (SULE SKERRY,17), (BUTT OF LEWIS (LH),8), (KUOPIO,30), (KUMLINGE ISLAND,22), (RACKWICK,19), (STORNOWAY,20), (NORTH RONALDSAY ISL,6), (GLENLIVET,23), (WATERSTEIN,21), (KRUUNUPYY,28), (JOKIOINEN,27), (SKYE/LUSA,23), (NORTH RONA ISLAND,21), (FOULA NO2,15), (INVERGORDON HARBOUR,20), (HELSINKI VANTAA,28), (AHTARI MYLLYMAKI,28), (LERWICK,17), (HELSINKI MALMI,28), (BARRA ISLAND,21), (KUUSAMO,29), (KAUHAVA,29), (TURKU,27), (MUSTASAARI VALASSAARET,20), (CAIRNGORM SUMMIT,14), (KINLOSS,24), (VARKAUS,29), (LOCH GLASCARNOCH,22), (AULTBEA NO2,23), (LAPPEENRANTA,29), (NIVALA,28), (JYVASKYLA,29), ...


In [6]:
// ..to enforce on rddW1 the same partitioner of rddS..
rddW.
  partitionBy(p).
  join(rddS).
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

res2: Array[(String, Int)] = Array((TAIN RANGE (SAWS),22), (LOSSIEMOUTH,23), (HALLI,29), (SUOMUSJARVI,26), (BALTASOUND NO.2,18), (SELLA NESS,19), (KANKAANPAA NIINISALO PUOLVOIM,29), (KIRKWALL,19), (FOULA,15), (SULE SKERRY,17), (BUTT OF LEWIS (LH),8), (KUOPIO,30), (KUMLINGE ISLAND,22), (RACKWICK,19), (STORNOWAY,20), (NORTH RONALDSAY ISL,6), (GLENLIVET,23), (WATERSTEIN,21), (KRUUNUPYY,28), (JOKIOINEN,27), (SKYE/LUSA,23), (NORTH RONA ISLAND,21), (FOULA NO2,15), (INVERGORDON HARBOUR,20), (HELSINKI VANTAA,28), (AHTARI MYLLYMAKI,28), (LERWICK,17), (HELSINKI MALMI,28), (BARRA ISLAND,21), (KUUSAMO,29), (KAUHAVA,29), (TURKU,27), (MUSTASAARI VALASSAARET,20), (CAIRNGORM SUMMIT,14), (KINLOSS,24), (VARKAUS,29), (LOCH GLASCARNOCH,22), (AULTBEA NO2,23), (LAPPEENRANTA,29), (NIVALA,28), (JYVASKYLA,29), ...


In [7]:
// ..or to exploit broadcast variables?
val bRddS = sc.broadcast(rddS.map(x => (x._1, x._2._3)).collectAsMap())
val rddJ = rddW.
  map({case (k,v) => (bRddS.value.get(k),v._6)}).
  filter(_._1!=None)
rddJ.
  reduceByKey((x,y)=>{if(x<y) y else x}).
  collect()

bRddS: org.apache.spark.broadcast.Broadcast[scala.collection.Map[String,String]] = Broadcast(12)
rddJ: org.apache.spark.rdd.RDD[(Option[String], Int)] = MapPartitionsRDD[23] at filter at <console>:32
res3: Array[(Option[String], Int)] = Array((Some(FOYERS),23), (Some(KRUUNUPYY),28), (Some(KILMORY),21), (Some(KINLOSS),24), (Some(SUOMUSJARVI),26), (Some(TURKU),27), (Some(AULTBEA NO2),23), (Some(KUMLINGE ISLAND),22), (Some(CAIRNGORM SUMMIT),14), (Some(MARIEHAMN),23), (Some(JOMALA),22), (Some(BENBECULA),20), (Some(HANKO RUSSARO),22), (Some(SELLA NESS),19), (Some(OULU),31), (Some(AONACH MOR),17), (Some(PORI),29), (Some(JYVASKYLA),29), (Some(TULLOCH BRIDGE),26), (Some(ISOSAARI),21), (Some(STORNOWAY),20), (Some(HALLI),29), (Some(SKYE/LUSA),23), (Some(SUOMUSSALMI),29), (Some(GLENLIVET),23), (...


## 103-6 Optimizing Exercise 3

Start from the result of the last job of Exercise 3; is there a more efficient way to compute the same result?
- Try it on weather-sample10
- Hint: consider that each station is located in only one country

In [8]:
import org.apache.spark.HashPartitioner
import org.apache.spark.storage.StorageLevel._
p = new HashPartitioner(8)
sc.getPersistentRDDs.foreach(_._2.unpersist())

rddS = rddStation.
  keyBy(x => x._1 + x._2).
  partitionBy(p).
  cache()
rddW = rddWeather.
  filter(_._6<999).
  keyBy(x => x._1 + x._2).
  partitionBy(p).
  persist(MEMORY_AND_DISK_SER)

// Collect to enforce caching
rddW.collect()
rddS.collect()

import org.apache.spark.HashPartitioner
import org.apache.spark.storage.StorageLevel._
p: org.apache.spark.HashPartitioner = org.apache.spark.HashPartitioner@8
rddS: org.apache.spark.rdd.RDD[(String, (String, String, String, String, String, Double, Double, Double, String, String))] = ShuffledRDD[26] at partitionBy at <console>:38
rddW: org.apache.spark.rdd.RDD[(String, (String, String, String, String, String, Int, Boolean))] = ShuffledRDD[29] at partitionBy at <console>:43
res4: Array[(String, (String, String, String, String, String, Double, Double, Double, String, String))] = Array((00701199999,(007011,99999,CWOS 07011,"","",0.0,0.0,0.0,20120101,20121129)), (00704499999,(007044,99999,CWOS 07044,"","",0.0,0.0,0.0,20120127,20120127)), (00840599999,(008405,99999,XM14,"","",0.0,0.0,0....


In [9]:
// First version
rddW.
  join(rddS).
  filter(_._2._2._4=="UK").
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  map({case(k,v)=>(v,k)}).
  sortByKey(false).
  collect()

res5: Array[(Int, String)] = Array((34,SOUTH UIST RANGE), (30,SUMBURGH), (26,TULLOCH BRIDGE), (24,KINLOSS), (24,ALTNAHARRA NO2), (23,LOSSIEMOUTH), (23,GLENLIVET), (23,SKYE/LUSA), (23,AULTBEA NO2), (23,INVERNESS), (23,FOYERS), (23,AVIEMORE), (22,TAIN RANGE (SAWS)), (22,LOCH GLASCARNOCH), (21,WATERSTEIN), (21,NORTH RONA ISLAND), (21,BARRA ISLAND), (21,LOCHBOISDALE), (21,KILMORY), (20,STORNOWAY), (20,INVERGORDON HARBOUR), (20,SCATSTA), (20,BENBECULA), (19,SELLA NESS), (19,KIRKWALL), (19,RACKWICK), (18,BALTASOUND NO.2), (17,SULE SKERRY), (17,LERWICK), (17,AONACH MOR), (16,MUCKLE HOLM), (15,FOULA), (15,FOULA NO2), (15,FAIR ISLE), (14,CAIRNGORM SUMMIT), (8,BUTT OF LEWIS (LH)), (6,NORTH RONALDSAY ISL))


In [10]:
// Second version (filter moved before joining)
rddW.
  join(rddS.filter({case (k, s) => s._4 == "UK"})).
  map({case(k,v)=>(v._2._3,v._1._6)}).
  reduceByKey((x,y)=>{if(x<y) y else x}).
  map({case(k,v)=>(v,k)}).
  sortByKey(false).
  collect()

res6: Array[(Int, String)] = Array((34,SOUTH UIST RANGE), (30,SUMBURGH), (26,TULLOCH BRIDGE), (24,KINLOSS), (24,ALTNAHARRA NO2), (23,LOSSIEMOUTH), (23,GLENLIVET), (23,SKYE/LUSA), (23,AULTBEA NO2), (23,INVERNESS), (23,FOYERS), (23,AVIEMORE), (22,TAIN RANGE (SAWS)), (22,LOCH GLASCARNOCH), (21,WATERSTEIN), (21,NORTH RONA ISLAND), (21,BARRA ISLAND), (21,LOCHBOISDALE), (21,KILMORY), (20,STORNOWAY), (20,INVERGORDON HARBOUR), (20,SCATSTA), (20,BENBECULA), (19,SELLA NESS), (19,KIRKWALL), (19,RACKWICK), (18,BALTASOUND NO.2), (17,SULE SKERRY), (17,LERWICK), (17,AONACH MOR), (16,MUCKLE HOLM), (15,FOULA), (15,FOULA NO2), (15,FAIR ISLE), (14,CAIRNGORM SUMMIT), (8,BUTT OF LEWIS (LH)), (6,NORTH RONALDSAY ISL))


In [14]:
// Third version (filter moved before joining, and reducing amount of data brought by)
rddW.
  map({case(k, w) => (k, w._6)}).
  join(rddS.map({case (k, s) => (k, (s._3, s._4))}).filter({case (k, s) => s._2 == "UK"})).
  map({case(k,(w, s))=>(k, (w, s._1))}).
  reduceByKey((x,y)=>{if(x._1<y._1) y else x}).
  map({case(k,v)=>(v,k)}).
  sortByKey(false).
  collect()

res10: Array[((Int, String), String)] = Array(((34,SOUTH UIST RANGE),03023099999), ((30,SUMBURGH),03003099999), ((26,TULLOCH BRIDGE),03047099999), ((24,KINLOSS),03066099999), ((24,ALTNAHARRA NO2),03044099999), ((23,SKYE/LUSA),03037099999), ((23,LOSSIEMOUTH),03068099999), ((23,INVERNESS),03059099999), ((23,GLENLIVET),03070099999), ((23,FOYERS),03057099999), ((23,AVIEMORE),03063099999), ((23,AULTBEA NO2),03034099999), ((22,TAIN RANGE (SAWS)),03062099999), ((22,LOCH GLASCARNOCH),03031099999), ((21,WATERSTEIN),03027099999), ((21,NORTH RONA ISLAND),03011099999), ((21,LOCHBOISDALE),03021099999), ((21,KILMORY),03040099999), ((21,BARRA ISLAND),03035099999), ((20,STORNOWAY),03026099999), ((20,SCATSTA),03006499999), ((20,INVERGORDON HARBOUR),03058099999), ((20,BENBECULA),03022099999), ((19,SELLA ...
