# 102 Spark optimizations

The goal of this lab is to understand some of the optimization mechanisms of Spark.

- Scala
    - [Spark programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
    - [RDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html)
    - [PairRDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/PairRDDFunctions.html)
- Python
    - [Spark programming guide](https://spark.apache.org/docs/3.5.0/rdd-programming-guide.html)
    - [All RDD APIs](https://spark.apache.org/docs/3.5.0/api/python/reference/api/pyspark.RDD.html)

Use `Tab` for autocompletion, `Shift+Tab` for documentation.

In [1]:
from pyspark.sql import SparkSession

# .master("local[N]") <-- ask for N cores on the driver
spark = SparkSession.builder \
.master("local[4]") \
.appName("Local Spark") \
.config('spark.ui.port', '4040') \
  .getOrCreate()
sc = spark.sparkContext

sc

## The weather dataset

Download the following ZIP files and unzip them inside the "datasets/big" folder of this repo (which is not committed).
- [weather-sample1.txt](https://big.csr.unibo.it/downloads/bigdata/weather-datasets-s1.zip) <-- start from this!
- [weather-sample10.txt](https://big.csr.unibo.it/downloads/bigdata/weather-datasets-s10.zip)
- [weather-full.txt](https://big.csr.unibo.it/downloads/bigdata/weather-datasets-full.zip)
  
The weather datasets are textual files with weather data from all over the world in year 2000 (collected from the [National Climatic Data Center](ftp://ftp.ncdc.noaa.gov/pub/data/noaa/) of the USA. The full one weighs 13GB, the other are samples of 10% (1.3GB) and 1% (130MB) respectively.
  - Sample row: 005733213099999**19580101**03004+51317+028783FM-12+017199999V0203201N00721004501CN0100001N9 **-0021**1-01391102681
  - The date in YYYYMMDD format is located at 0-based position 15-23
  - The temperatue in x10 Celsius degrees is located at 0-based positions 87-92

In the dataset folder you also have *weather-stations.csv*; it is a structured file with the description of weather stations collecting the weather data.

In [2]:
# WEATHER structure: (usaf,wban,year,month,day,airTemperature,airTemperatureQuality)
def parseWeather(row):
    usaf = row[4:10]
    wban = row[10:15]
    year = row[15:19]
    month = row[19:21]
    day = row[21:23]
    airTemperature = row[87:92]
    airTemperatureQuality = row[92]

    return (usaf,wban,year,month,day,int(airTemperature)/10,airTemperatureQuality == '1')

# STATION structure: (usaf,wban,city,country,state,latitude,longitude,elevation,date_begin,date_end) 
def parseStation(row):
    def getDouble(str):
        return 0 if len(str)==0 else float(str)
    
    columns = [ x.replace("\"","") for x in row.split(",") ]
    latitude = getDouble(columns[6])
    longitude = getDouble(columns[7])
    elevation = getDouble(columns[8])
    return (columns[0],columns[1],columns[2],columns[3],columns[4],latitude,longitude,elevation,columns[9],columns[10])  

In [14]:
rddWeather = sc.\
  textFile("../../../../datasets/big/weather-sample1.txt").\
  map(lambda x: parseWeather(x))
rddStation = sc.\
  textFile("../../../../datasets/weather-stations.csv").\
  map(lambda x: parseStation(x))

## 102-1 Simple job optimization

Optimize the two jobs (avg temperature and max temperature) by avoiding the repetition of the same computations and by enforcing a partitioning criteria.
- There are multiple methods to repartition an RDD: check the ```coalesce```, ```partitionBy```, and ```repartition``` methods on the documentation and choose the best one.
- Verify your persisted data in the web UI
- Verify the execution plan of your RDDs with ```rdd.toDebugString``` (shell only) or on the web UI

In [4]:
# Average temperature for every month
rddCached=rddWeather.\
  filter(lambda x: x[5]<999).\
  map(lambda x: (x[3], (x[5],1))).\
  partitionBy(8).\
  persist()
#Use partitionBy 8 because 4x2
rddCached.reduceByKey(lambda v1, v2: (v1[0]+v2[0], v1[1]+v2[1])).\
  mapValues(lambda v: round(v[0]/v[1],2)).\
  collect()

[('10', 8.58),
 ('02', 0.41),
 ('06', 11.95),
 ('07', 14.1),
 ('08', 13.78),
 ('11', 4.38),
 ('09', 10.62),
 ('01', 0.31),
 ('05', 9.64),
 ('12', 1.74),
 ('04', 4.91),
 ('03', 1.89)]

In [5]:
# Maximum temperature for every month
rddWeather.\
  filter(lambda x: x[5]<999).\
  map(lambda x: (x[3], (x[5],1)))
rddCached.reduceByKey(lambda x, y: y if x<y else x).\
  collect()

[('10', (20.0, 1)),
 ('02', (13.2, 1)),
 ('06', (31.4, 1)),
 ('07', (29.2, 1)),
 ('08', (23.0, 1)),
 ('11', (14.0, 1)),
 ('09', (30.0, 1)),
 ('01', (12.0, 1)),
 ('05', (34.2, 1)),
 ('12', (14.0, 1)),
 ('04', (23.0, 1)),
 ('03', (15.2, 1))]

## 102-2 RDD preparation

Check the five possibilities to prepare the Station RDD for subsequent processing and identify the best one.

In [6]:
num_partitions = 8

# [0] and [1] are the fields composing the key; [3] and [7] are country and elevation, respectively
rddS1 = rddStation.\
  keyBy(lambda x: x[0] + x[1]).\
  partitionBy(num_partitions).\
  cache().\
  map(lambda kv: (kv[0],(kv[1][3],kv[1][7])))
rddS2 = rddStation.\
  keyBy(lambda x: x[0] + x[1]).\
  map(lambda kv: (kv[0],(kv[1][3],kv[1][7]))).\
  cache().\
  partitionBy(num_partitions)
rddS3 = rddStation.\
  keyBy(lambda x: x[0] + x[1]).\
  partitionBy(num_partitions).\
  map(lambda kv: (kv[0],(kv[1][3],kv[1][7]))).\
  cache()
rddS4 = rddStation.\
  keyBy(lambda x: x[0] + x[1]).\
  map(lambda kv: (kv[0],(kv[1][3],kv[1][7]))).\
  partitionBy(num_partitions).\
  cache()
rddS5 = rddStation.\
  map(lambda x: (x[0] + x[1], (x[3],x[7]))).\
  partitionBy(num_partitions).\
  cache()

## 102-3 Joining RDDs

Define the join between rddWeather and rddStation and compute:
- The maximum temperature for every city
- The maximum temperature for every city in the UK: 
  - ```StationData.country == "UK"```
- Sort the results by descending temperature
  - ```map(lambda kv: (kv[1],kv[0]))``` to invert key with value and vice versa

Hints & considerations:
- Keep only temperature values <999
- Join syntax: ```rdd1.join(rdd2)```
  - Both RDDs should be structured as key-value RDDs with the same key: usaf + wban
- Consider partitioning and caching to optimize the join
  - [Scala only] Careful: it is not enough for the two RDDs to have the same number of partitions; they must have the same partitioner! To create a partitioning function, you must ```import org.apache.spark.HashPartitioner``` and then define ```p = new HashPartitioner(n)``` where ```n``` is the number of partitions to create.
- Verify the execution plan of the join in the web UI

In [62]:
# STATION structure: (usaf,wban,city,country,state,latitude,longitude,elevation,date_begin,date_end) 
rddStationKey=rddStation.map(lambda x: (x[0] + x[1], (x[2],x[3])))

rddJoined=rddWeather.keyBy(lambda x: x[0]+x[1]).filter(lambda x: x[1][5]<999).join(rddStationKey).cache()
#Needs to be the same key
rddJoined.take(5)
#[('02869099999',(('028690', '99999', '2000', '04', '01', 9.9, True), ('KUUSAMO', 'FI')),
#key, ((k1,k2, year, month, day, temperature, air quality), (city, country))
rddJoined.map(lambda x:(x[1][1][0], x[1][0][5])).reduceByKey(lambda v1,v2: max(v1,v2)).collect() #max temperature for every city
rddUk=rddJoined.map(lambda x:(x[1][1][1], x[1][1][0], x[1][0][5])).filter(lambda x: x[0]=="UK").map(lambda x: (x[1], x[2])).reduceByKey(lambda v1,v2: max(v1,v2)) #max temperature for every city in uk
rddUk.map(lambda kv: (kv[1], kv[0])).sortBy(lambda x: x[0], False).collect() #Sorted by descending
#Oppure sortByKey

[(34.2, 'SOUTH UIST RANGE'),
 (30.0, 'SUMBURGH'),
 (26.0, 'TULLOCH BRIDGE'),
 (24.1, 'ALTNAHARRA NO2'),
 (24.0, 'KINLOSS'),
 (23.5, 'AVIEMORE'),
 (23.5, 'FOYERS'),
 (23.4, 'SKYE/LUSA'),
 (23.4, 'LOSSIEMOUTH'),
 (23.0, 'INVERNESS'),
 (23.0, 'AULTBEA NO2'),
 (23.0, 'GLENLIVET'),
 (22.9, 'LOCH GLASCARNOCH'),
 (22.1, 'TAIN RANGE (SAWS)'),
 (21.7, 'NORTH RONA ISLAND'),
 (21.2, 'KILMORY'),
 (21.0, 'WATERSTEIN'),
 (21.0, 'LOCHBOISDALE'),
 (21.0, 'BARRA ISLAND'),
 (20.8, 'INVERGORDON HARBOUR'),
 (20.0, 'STORNOWAY'),
 (20.0, 'BENBECULA'),
 (20.0, 'SCATSTA'),
 (19.6, 'RACKWICK'),
 (19.1, 'SELLA NESS'),
 (19.0, 'KIRKWALL'),
 (18.0, 'BALTASOUND NO.2'),
 (17.9, 'LERWICK'),
 (17.4, 'AONACH MOR'),
 (17.4, 'SULE SKERRY'),
 (16.0, 'MUCKLE HOLM'),
 (15.9, 'FOULA NO2'),
 (15.9, 'FOULA'),
 (15.7, 'FAIR ISLE'),
 (14.9, 'CAIRNGORM SUMMIT'),
 (8.8, 'BUTT OF LEWIS (LH)'),
 (6.9, 'NORTH RONALDSAY ISL')]

## 102-4 Memory occupation

Use Spark's web UI to verify the space occupied by the provided RDDs.

*Warning*: in PySpark, StoraleLevels use serialization by default (see [documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html)).

In [71]:
from pyspark import StorageLevel

# Clear the cache
for (id, rdd) in sc._jsc.getPersistentRDDs().items():         
    rdd.unpersist()

memSerRdd = rddWeather.cache()
memRdd = memSerRdd.map(lambda x: x).persist(StorageLevel.MEMORY_AND_DISK_DESER)
diskRdd = memSerRdd.map(lambda x: x).persist(StorageLevel.DISK_ONLY)

## 102-5 Evaluating different join methods

Consider the following scenario:
- We have a disposable RDD of Weather data (i.e., it is used only once): ```rddW```
- And we have an RDD of Station data that is used many times: ```rddS```
- Both RDDs are cached (```collect()```is called to enforce caching)

We want to join the two RDDS. Which option is best?
- Simply join the two RDDs
- Enforce on ```rddW1``` the same partitioner of ```rddS``` (and then join)
- Exploit broadcast variables

In [64]:
num_partitions = 8

rddW = rddWeather.\
  filter(lambda w: w[5]<999).\
  keyBy(lambda w: w[0]+w[1]).\
  cache()

rddS = rddStation.\
  keyBy(lambda s: s[0]+s[1]).\
  partitionBy(num_partitions).\
  cache()

# Collect to enforce caching
rddW.collect()
rddS.collect()

[('00702699999',
  ('007026',
   '99999',
   'WXPOD 7026',
   'AF',
   '',
   0.0,
   0.0,
   7026.0,
   '20120713',
   '20170715')),
 ('00705999999',
  ('007059', '99999', 'CWOS 07059', '', '', 0, 0, 0, '20120314', '20120828')),
 ('00706499999',
  ('007064', '99999', 'CWOS 07064', '', '', 0, 0, 0, '20121218', '20121219')),
 ('00841599999',
  ('008415', '99999', 'XM21', '', '', 0, 0, 0, '20131002', '20160217')),
 ('01001399999',
  ('010013', '99999', 'ROST', 'NO', '', 0, 0, 0, '19861120', '19880105')),
 ('01001699999',
  ('010016',
   '99999',
   'RORVIK/RYUM',
   'NO',
   '',
   64.85,
   11.233,
   14.0,
   '19870116',
   '19910806')),
 ('01010099999',
  ('010100',
   '99999',
   'ANDOYA',
   'NO',
   '',
   69.293,
   16.144,
   13.1,
   '19310103',
   '20170715')),
 ('01016099999',
  ('010160',
   '99999',
   'KONGSOYA',
   'NO',
   '',
   78.933,
   28.9,
   20.0,
   '19930501',
   '20170715')),
 ('01030099999',
  ('010300',
   '99999',
   'KISTEFJELL',
   'NO',
   '',
   69.283,


In [65]:
# Is it better to simply join the two RDDs..
rddX = rddW.\
  join(rddS).\
  map(lambda kv: (kv[1][1][2],kv[1][0][5])).\
  reduceByKey(lambda x,y: min(x,y),1)
print(rddX.toDebugString().decode("unicode_escape"))

(1) PythonRDD[739] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[738] at mapPartitions at PythonRDD.scala:160 []
 |  ShuffledRDD[737] at partitionBy at <unknown>:0 []
 +-(12) PairwiseRDD[736] at reduceByKey at /tmp/ipykernel_269/683189098.py:5 []
    |   PythonRDD[735] at reduceByKey at /tmp/ipykernel_269/683189098.py:5 []
    |   MapPartitionsRDD[734] at mapPartitions at PythonRDD.scala:160 []
    |   ShuffledRDD[733] at partitionBy at <unknown>:0 []
    +-(12) PairwiseRDD[732] at join at /tmp/ipykernel_269/683189098.py:3 []
       |   PythonRDD[731] at join at /tmp/ipykernel_269/683189098.py:3 []
       |   UnionRDD[730] at union at <unknown>:0 []
       |   PythonRDD[728] at RDD at PythonRDD.scala:53 []
       |   PythonRDD[723] at RDD at PythonRDD.scala:53 []
       |       CachedPartitions: 4; MemorySize: 2.5 MiB; DiskSize: 0.0 B
       |   PythonRDD[720] at RDD at PythonRDD.scala:53 []
       |       CachedPartitions: 4; MemorySize: 2.6 MiB; DiskSize: 0.0 B
       |   ../.

In [66]:
rddX.collect()

[('HALLI', -25.4),
 ('MIKKELI', -26.0),
 ('SAVONLINNA', -29.0),
 ('KOTKA RANKKI', -16.4),
 ('ISOSAARI', -14.7),
 ('NORTH RONA ISLAND', -3.5),
 ('SUOMUSJARVI', -22.8),
 ('HANKO RUSSARO', -9.4),
 ('BENBECULA', -4.0),
 ('SOUTH UIST RANGE', -3.1),
 ('BARRA ISLAND', -2.5),
 ('SKYE/LUSA', -3.4),
 ('KRUUNUPYY', -21.0),
 ('VIITASAARI', -22.0),
 ('LAPPEENRANTA HIEKKAPAKKA', -28.2),
 ('SULE SKERRY', 1.4),
 ('NIVALA', -22.8),
 ('AVIEMORE', -11.1),
 ('OULU', -23.6),
 ('MARIEHAMN', -19.0),
 ('SCATSTA', -7.0),
 ('FOULA NO2', -2.8),
 ('STORNOWAY', -5.0),
 ('WATERSTEIN', -4.9),
 ('AULTBEA NO2', -3.4),
 ('CAIRNGORM SUMMIT', -12.1),
 ('VAASA', -20.0),
 ('KAUHAVA', -22.0),
 ('KANKAANPAA NIINISALO PUOLVOIM', -22.2),
 ('TAMPERE PIRKKALA', -22.2),
 ('PORI', -20.9),
 ('HELSINKI VANTAA', -20.3),
 ('BALTASOUND NO.2', -9.4),
 ('FAIR ISLE', -2.0),
 ('FOYERS', -4.4),
 ('TAIN RANGE (SAWS)', -6.9),
 ('KUUSAMO', -33.0),
 ('VARKAUS', -28.0),
 ('INKOO BAGASKAR', -14.8),
 ('KAJAANI', -30.0),
 ('JOENSUU', -26.3),
 ('JOK

In [67]:
# ..to enforce on rddW1 the same partitioner of rddS..
rddX = rddW.\
  partitionBy(num_partitions).\
  join(rddS).\
  map(lambda kv: (kv[1][1][2],kv[1][0][5])).\
  reduceByKey(lambda x,y: min(x,y),1)
print(rddX.toDebugString().decode("unicode_escape"))

(1) PythonRDD[751] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[750] at mapPartitions at PythonRDD.scala:160 []
 |  ShuffledRDD[749] at partitionBy at <unknown>:0 []
 +-(8) PairwiseRDD[748] at reduceByKey at /tmp/ipykernel_269/958411475.py:6 []
    |  PythonRDD[747] at reduceByKey at /tmp/ipykernel_269/958411475.py:6 []
    |  PartitionerAwareUnionRDD[746] at union at <unknown>:0 []
    |  PythonRDD[744] at RDD at PythonRDD.scala:53 []
    |  MapPartitionsRDD[743] at mapPartitions at PythonRDD.scala:160 []
    |  ShuffledRDD[742] at partitionBy at <unknown>:0 []
    +-(4) PairwiseRDD[741] at partitionBy at /tmp/ipykernel_269/958411475.py:3 []
       |  PythonRDD[740] at partitionBy at /tmp/ipykernel_269/958411475.py:3 []
       |  PythonRDD[723] at RDD at PythonRDD.scala:53 []
       |      CachedPartitions: 4; MemorySize: 2.5 MiB; DiskSize: 0.0 B
       |  PythonRDD[720] at RDD at PythonRDD.scala:53 []
       |      CachedPartitions: 4; MemorySize: 2.6 MiB; DiskSize: 0.0 B
   

In [68]:
rddX.collect()

[('OULU', -23.6),
 ('MUSTASAARI VALASSAARET', -11.2),
 ('MIKKELI', -26.0),
 ('SAVONLINNA', -29.0),
 ('KOTKA RANKKI', -16.4),
 ('ISOSAARI', -14.7),
 ('SCATSTA', -7.0),
 ('MUCKLE HOLM', -2.8),
 ('NORTH RONA ISLAND', -3.5),
 ('STORNOWAY', -5.0),
 ('LOCH GLASCARNOCH', -7.0),
 ('AULTBEA NO2', -3.4),
 ('HAILUOTO ISLAND', -22.8),
 ('VAASA', -20.0),
 ('AHTARI MYLLYMAKI', -26.4),
 ('KANKAANPAA NIINISALO PUOLVOIM', -22.2),
 ('HELSINKI VANTAA', -20.3),
 ('LEMLAND NYHAMN', -5.4),
 ('KUMLINGE ISLAND', -10.7),
 ('BENBECULA', -4.0),
 ('SOUTH UIST RANGE', -3.1),
 ('BARRA ISLAND', -2.5),
 ('KINLOSS', -6.0),
 ('GLENLIVET', -5.3),
 ('KRUUNUPYY', -21.0),
 ('INKOO BAGASKAR', -14.8),
 ('NIVALA', -22.8),
 ('SEINAJOKI', -21.0),
 ('JOENSUU', -26.3),
 ('TURKU', -20.0),
 ('PARAINEN UTO', -6.5),
 ('SUMBURGH', -5.0),
 ('SELLA NESS', -5.3),
 ('INVERGORDON HARBOUR', -3.8),
 ('AVIEMORE', -11.1),
 ('HALLI', -25.4),
 ('MARIEHAMN', -19.0),
 ('LERWICK', -5.5),
 ('FOULA', 0.0),
 ('FOULA NO2', -2.8),
 ('WATERSTEIN', -4.9),

In [69]:
# ..or to exploit broadcast variables?
bRddS = sc.broadcast(rddS.map(lambda s: (s[0], s[1][2])).collectAsMap())
rddJ = rddW.\
  map(lambda kv: (bRddS.value.get(kv[0]), kv[1][5])).\
  filter(lambda x: x[0] is not None)

rddX = rddJ.\
  reduceByKey(lambda x,y: min(x,y),1)
print(rddX.toDebugString().decode("unicode_escape"))

(1) PythonRDD[757] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[756] at mapPartitions at PythonRDD.scala:160 []
 |  ShuffledRDD[755] at partitionBy at <unknown>:0 []
 +-(4) PairwiseRDD[754] at reduceByKey at /tmp/ipykernel_269/805349340.py:8 []
    |  PythonRDD[753] at reduceByKey at /tmp/ipykernel_269/805349340.py:8 []
    |  PythonRDD[723] at RDD at PythonRDD.scala:53 []
    |      CachedPartitions: 4; MemorySize: 2.5 MiB; DiskSize: 0.0 B
    |  PythonRDD[720] at RDD at PythonRDD.scala:53 []
    |      CachedPartitions: 4; MemorySize: 2.6 MiB; DiskSize: 0.0 B
    |  ../../../../datasets/big/weather-sample1.txt MapPartitionsRDD[89] at textFile at NativeMethodAccessorImpl.java:0 []
    |  ../../../../datasets/big/weather-sample1.txt HadoopRDD[88] at textFile at NativeMethodAccessorImpl.java:0 []


In [70]:
rddX.collect()

[('KUUSAMO', -33.0),
 ('HAILUOTO ISLAND', -22.8),
 ('OULU', -23.6),
 ('SUOMUSSALMI', -30.3),
 ('KAJAANI', -30.0),
 ('NIVALA', -22.8),
 ('MUSTASAARI VALASSAARET', -11.2),
 ('VAASA', -20.0),
 ('KAUHAVA', -22.0),
 ('KRUUNUPYY', -21.0),
 ('VIITASAARI', -22.0),
 ('KUOPIO', -26.0),
 ('LAPPEENRANTA HIEKKAPAKKA', -28.2),
 ('AHTARI MYLLYMAKI', -26.4),
 ('SEINAJOKI', -21.0),
 ('JOENSUU', -26.3),
 ('JYVASKYLA', -27.0),
 ('KANKAANPAA NIINISALO PUOLVOIM', -22.2),
 ('TAMPERE PIRKKALA', -22.2),
 ('HALLI', -25.4),
 ('MIKKELI', -26.0),
 ('VARKAUS', -28.0),
 ('SAVONLINNA', -29.0),
 ('PORI', -20.9),
 ('LAPPEENRANTA', -23.0),
 ('JOKIOINEN', -23.4),
 ('LAHTI LAUNE', -22.9),
 ('UTTI', -23.0),
 ('MARIEHAMN', -19.0),
 ('JOMALA', -16.9),
 ('TURKU', -20.0),
 ('SUOMUSJARVI', -22.8),
 ('HELSINKI VANTAA', -20.3),
 ('HELSINKI MALMI', -21.0),
 ('KOTKA RANKKI', -16.4),
 ('LEMLAND NYHAMN', -5.4),
 ('PARAINEN UTO', -6.5),
 ('HANKO RUSSARO', -9.4),
 ('INKOO BAGASKAR', -14.8),
 ('ISOSAARI', -14.7),
 ('KUMLINGE ISLAND', -