# 301 Spark basics

The goal of this lab is to get familiar with Spark programming.

- [Spark programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
- [RDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html)
- [PairRDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/PairRDDFunctions.html)

## 301-2 Running a sample Spark job

Goal: calculate the average temperature for every month; dataset is ```weather-sample1```.

In [1]:
val bucketname = "unibo-bd2223-emarcantognini"

val rddWeather = sc.textFile("s3a://"+bucketname+"/datasets/weather-sample1.txt")

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1665559624186_0001,spark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

bucketname: String = unibo-bd2223-emarcantognini
rddWeather: org.apache.spark.rdd.RDD[String] = s3a://unibo-bd2223-emarcantognini/datasets/weather-sample1.txt MapPartitionsRDD[1] at textFile at <console>:25


In [2]:
def parseWeatherLine(line:String):(String,Double) = {
  val year = line.substring(15,19)
  val month = line.substring(19,21)
  val day = line.substring(21,23)
  var temp = line.substring(87,92).toInt
  (month, temp/10)
}

// Parse records
val rddWeatherKv = rddWeather.map(x => parseWeatherLine(x))
// Aggregate by key (i.e., month) to compute the sum and the count of temperature values
val rddTempDataPerMonth = rddWeatherKv.aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1), (a1,a2)=>(a1._1+a2._1,a1._2+a2._2))
// Calculate the average temperature in each record
val rddAvgTempPerMonth = rddTempDataPerMonth.map({case(k,v) => (k, v._1/v._2)})
// Sort, coalesce and cache the result (because it is used twice)
val rddCached = rddAvgTempPerMonth.sortByKey().coalesce(1).cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

parseWeatherLine: (line: String)(String, Double)
rddWeatherKv: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[2] at map at <console>:26
rddTempDataPerMonth: org.apache.spark.rdd.RDD[(String, (Double, Double))] = ShuffledRDD[3] at aggregateByKey at <console>:24
rddAvgTempPerMonth: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[4] at map at <console>:24
rddCached: org.apache.spark.rdd.RDD[(String, Double)] = CoalescedRDD[8] at coalesce at <console>:24


In [3]:
// Show all the records
rddCached.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res8: Array[(String, Double)] = Array((01,29.764781644286497), (02,52.831468961278425), (03,49.43499927074724), (04,61.3592872169286), (05,55.82656), (06,55.45816479125297), (07,86.90952392350223), (08,79.250958082407), (09,80.51662117371808), (10,106.26454490168254), (11,113.49704495968224), (12,63.9184413544602))


In [5]:
rddCached.saveAsTextFile("s3a://"+bucketname+"/spark/301-2_1")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 301-3 Spark warm-up

Load the ```capra``` and ```divinacommedia``` datasets and try the following actions:
- Show their content (```collect```)
- Count their rows (```count```)
- Split phrases into words (```map``` or ```flatMap```; what’s the difference?)
- Check the results (remember: evaluation is lazy)
- Try the ```toDebugString``` function to check the execution plan

In [6]:
val rddCapra = sc.textFile("s3a://"+bucketname+"/datasets/capra.txt")
val rddDC = sc.textFile("s3a://"+bucketname+"/datasets/divinacommedia.txt")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddCapra: org.apache.spark.rdd.RDD[String] = s3a://unibo-bd2223-emarcantognini/datasets/capra.txt MapPartitionsRDD[12] at textFile at <console>:24
rddDC: org.apache.spark.rdd.RDD[String] = s3a://unibo-bd2223-emarcantognini/datasets/divinacommedia.txt MapPartitionsRDD[14] at textFile at <console>:24


In [7]:
rddCapra.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res11: Array[String] = Array(sopra la panca la capra campa, sotto la panca la capra crepa)


In [8]:
rddCapra.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res12: Long = 2


In [10]:
rddCapra.flatMap(x => x.split(" ")).collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res14: Array[String] = Array(sopra, la, panca, la, capra, campa, sotto, la, panca, la, capra, crepa)


In [14]:
rddDC.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res18: Array[String] = Array(LA DIVINA COMMEDIA, di Dante Alighieri, INFERNO, "", "", "", Inferno: Canto I, "", "  Nel mezzo del cammin di nostra vita", mi ritrovai per una selva oscura, ch? la diritta via era smarrita., "  Ahi quanto a dir qual era ? cosa dura", esta selva selvaggia e aspra e forte, che nel pensier rinova la paura!, "  Tant'? amara che poco ? pi? morte;", ma per trattar del ben ch'i' vi trovai,, dir? de l'altre cose ch'i' v'ho scorte., "  Io non so ben ridir com'i' v'intrai,", tant'era pien di sonno a quel punto, che la verace via abbandonai., "  Ma poi ch'i' fui al pi? d'un colle giunto,", l? dove terminava quella valle, che m'avea di paura il cor compunto,, "  guardai in alto, e vidi le sue spalle", vestite gi? de' raggi del pianeta, che mena dritto altrui per ogne c...


In [11]:
rddDC.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res15: Long = 14753


In [16]:
rddDC.flatMap(x => x.split(" ")).collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res20: Array[String] = Array(LA, DIVINA, COMMEDIA, di, Dante, Alighieri, INFERNO, "", "", "", Inferno:, Canto, I, "", "", "", Nel, mezzo, del, cammin, di, nostra, vita, mi, ritrovai, per, una, selva, oscura, ch?, la, diritta, via, era, smarrita., "", "", Ahi, quanto, a, dir, qual, era, ?, cosa, dura, esta, selva, selvaggia, e, aspra, e, forte, che, nel, pensier, rinova, la, paura!, "", "", Tant'?, amara, che, poco, ?, pi?, morte;, ma, per, trattar, del, ben, ch'i', vi, trovai,, dir?, de, l'altre, cose, ch'i', v'ho, scorte., "", "", Io, non, so, ben, ridir, com'i', v'intrai,, tant'era, pien, di, sonno, a, quel, punto, che, la, verace, via, abbandonai., "", "", Ma, poi, ch'i', fui, al, pi?, d'un, colle, giunto,, l?, dove, terminava, quella, valle, che, m'avea, di, paura, il, cor, compunto...


## 301-4 From MapReduce to Spark

Reproduce on Spark the exercises seen on Hadoop MapReduce on the capra and divinacommedia datasets.

- Jobs:
  - Count the number of occurrences of each word
    - Result: (sopra, 1), (la, 4), …
  - Count the number of occurrences of words of given lengths
    - Result: (2, 4), (5, 8)
  - Count the average length of words given their first letter (hint: check the example in 301-1)
    - Result: (s, 5), (l, 2), …
  - Return the inverted index of words
    - Result: (sopra, (0)), (la, (0, 1)), ...
- How does Spark compare with respect to MapReduce? (performance, ease of use)
- How is the output sorted? How can you sort by value?

In [66]:
val rddCapraWords = rddCapra.flatMap(_.split(" ")).cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddCapraWords: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[118] at flatMap at <console>:23


In [67]:
val rddDCWords = rddDC.flatMap(_.split(" ")).cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddDCWords: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[119] at flatMap at <console>:23


In [69]:
// 1
rddCapraWords.map(x => (x, 1)).reduceByKey((x, y) => x + y).collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res92: Array[(String, Int)] = Array((campa,1), (la,4), (panca,2), (sotto,1), (crepa,1), (sopra,1), (capra,2))


In [70]:
// 1
rddDCWords.map(x => (x, 1)).reduceByKey((x, y) => x + y).collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res94: Array[(String, Int)] = Array((grand'avello,,1), (diseta,1), (vane.,1), (tonda,3), (blandimenti;,1), (sapore,1), (dando,3), (Verrucchio,,1), (Mantua,1), (m'apparvero,1), (disiderate,1), (dole,1), (moventi,1), (rincalzi,1), (freni,,1), (Voglia,1), (focina,1), (tormento,5), (s?:,2), (marino,,1), (scalz?,1), (pensassi,1), (esser,,2), (rade,2), (prava".,1), (Forese,,1), (forti,4), (rossi:,1), (richiuso".,1), ("Segnor,1), (rota.,1), ("ver',1), (pronti,1), (tr'ambo,2), (ch'ode,1), (chiari,,1), (lontana?".,1), (rinovelle.,1), (perdonasse,1), (Pluto,,1), (falsai,2), (nova,,3), (sparito,,1), (stampa,,1), (doglia,7), (regina,3), (pianto;,2), (Alto,2), (giovinezza.,1), (quell'albor,1), (asciutto,,1), (ch'ello,1), (padre,,14), (coperchio,3), (vite,4), (Anassagora,1), (dormia,,1), (consum?,1),...


In [62]:
// 2
rddCapraWords.map(x => (x, 1)).map({case (k,v) => (k.size, v)}).reduceByKey((x,y) => x+y).collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res84: Array[(Int, Int)] = Array((2,4), (5,8))


In [63]:
// 2
rddDCWords.filter(x => !(x == "" || x == "!" || x == ".")).map(x => (x, 1)).map({case (k,v) => (k.size, v)}).reduceByKey((x,y) => x+y).collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res86: Array[(Int, Int)] = Array((4,9111), (16,3), (14,50), (6,11775), (8,5363), (12,370), (10,1741), (2,19258), (13,154), (15,18), (11,933), (1,6988), (17,1), (3,16887), (7,7379), (9,3231), (5,13504))


In [59]:
// 3
rddCapraWords.map(x => (x, 1)).map({case (k,v) => (k(0), v)}).reduceByKey((x,y) => x+y).collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res79: Array[(Char, Int)] = Array((p,2), (l,4), (s,2), (c,4))


In [58]:
// 3
rddDCWords.filter(x => !(x == "" || x == "!" || x == ".")).map(x => (x, 1)).map({case (k,v) => (k(0), v)}).reduceByKey((x,y) => x+y).collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res77: Array[(Char, Int)] = Array((T,234), (d,7909), (z,18), (",1118), (L,351), (p,7331), (R,106), (B,210), (P,475), (t,4099), (b,1130), (h,187), (n,3518), (f,3797), (j,2), (v,3984), ((,3), (Z,7), (F,140), (V,193), (:,5), (,,20), (X,73), (N,216), (r,1952), (l,8268), (D,356), (',1441), (s,9368), (e,4746), (Q,267), (G,185), (M,354), (a,5176), (O,212), (;,1), (A,268), (u,769), (I,333), (o,1363), (i,3203), (!,9), (q,2669), (-,18), (S,347), (?,546), (C,580), (E,544), (?,1), (U,35), (g,2054), (m,5186), (c,11389))


In [64]:
// 4
rddCapraWords.zipWithIndex()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res88: Array[(String, Long)] = Array((sopra,0), (la,1), (panca,2), (la,3), (capra,4), (campa,5), (sotto,6), (la,7), (panca,8), (la,9), (capra,10), (crepa,11))
