# 301 Spark basics

The goal of this lab is to get familiar with Spark programming.

- [Spark programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
- [RDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html)
- [PairRDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/PairRDDFunctions.html)

## 301-2 Running a sample Spark job

Goal: calculate the average temperature for every month; dataset is ```weather-sample1```.

In [1]:
val bucketname = "unibo-bd2324-dcohen"

val rddWeather = sc.textFile("s3a://"+bucketname+"/datasets/weather-sample1.txt")

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1696445664495_0001,spark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

bucketname: String = unibo-bd2324-dcohen
rddWeather: org.apache.spark.rdd.RDD[String] = s3a://unibo-bd2324-dcohen/datasets/weather-sample1.txt MapPartitionsRDD[1] at textFile at <console>:25


In [2]:
// 
def parseWeatherLine(line:String):(String,Double) = {
  val year = line.substring(15,19)
  val month = line.substring(19,21)
  val day = line.substring(21,23)
  var temp = line.substring(87,92).toInt
  (month, temp/10)
}

// Parse records (for each item apply the function key-value)
// In order to enable the operation to allow aggregration you need that rdd is structure in kv
val rddWeatherKv = rddWeather.map(x => parseWeatherLine(x))
// Aggregate by key (i.e., month) to compute the sum and the count of temperature values
// specify two combinitation before shuffling and after
// The behaviour is the same in this case but is specified differently
// Its important because spark always try to do some combining even with reducebykey always try to adopt this to the map size.
val rddTempDataPerMonth = rddWeatherKv.aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1), (a1,a2)=>(a1._1+a2._1,a1._2+a2._2))
// Calculate the average temperature in each record
// Map operation wherete the is the touple divide by the sum and count
val rddAvgTempPerMonth = rddTempDataPerMonth.map({case(k,v) => (k, v._1/v._2)})
// Sort, coalesce and cache the result (because it is used twice)
val rddCached = rddAvgTempPerMonth.sortByKey().coalesce(1).cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

parseWeatherLine: (line: String)(String, Double)
rddWeatherKv: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[2] at map at <console>:27
rddTempDataPerMonth: org.apache.spark.rdd.RDD[(String, (Double, Double))] = ShuffledRDD[3] at aggregateByKey at <console>:27
rddAvgTempPerMonth: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[4] at map at <console>:25
rddCached: org.apache.spark.rdd.RDD[(String, Double)] = CoalescedRDD[8] at coalesce at <console>:24


In [3]:
// Show all the records
rddCached.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res14: Array[(String, Double)] = Array((01,29.764781644286497), (02,52.831468961278425), (03,49.43499927074724), (04,61.3592872169286), (05,55.82656), (06,55.45816479125297), (07,86.90952392350223), (08,79.250958082407), (09,80.51662117371808), (10,106.26454490168254), (11,113.49704495968224), (12,63.9184413544602))


In [4]:
rddCached.saveAsTextFile("s3a://"+bucketname+"/spark/301-2")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory s3a://unibo-bd2324-dcohen/spark/301-2 already exists
  at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
  at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:299)
  at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
  at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
  at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
  at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRD

## 301-3 Spark warm-up

Load the ```capra``` and ```divinacommedia``` datasets and try the following actions:
- Show their content (```collect```)
- Count their rows (```count```)
- Split phrases into words (```map``` or ```flatMap```; what’s the difference?)
- Check the results (remember: evaluation is lazy)
- Try the ```toDebugString``` function to check the execution plan

In [5]:
val rddCapra = sc.textFile("s3a://"+bucketname+"/datasets/capra.txt")
val rddDC = sc.textFile("s3a://"+bucketname+"/datasets/divinacommedia.txt")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddCapra: org.apache.spark.rdd.RDD[String] = s3a://unibo-bd2324-dcohen/datasets/capra.txt MapPartitionsRDD[11] at textFile at <console>:24
rddDC: org.apache.spark.rdd.RDD[String] = s3a://unibo-bd2324-dcohen/datasets/divinacommedia.txt MapPartitionsRDD[13] at textFile at <console>:24


In [6]:
rddCapra.collect()
rddDC.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res16: Array[String] = Array(sopra la panca la capra campa, sotto la panca la capra crepa)
res17: Array[String] = Array(LA DIVINA COMMEDIA, di Dante Alighieri, INFERNO, "", "", "", Inferno: Canto I, "", "  Nel mezzo del cammin di nostra vita", mi ritrovai per una selva oscura, ch? la diritta via era smarrita., "  Ahi quanto a dir qual era ? cosa dura", esta selva selvaggia e aspra e forte, che nel pensier rinova la paura!, "  Tant'? amara che poco ? pi? morte;", ma per trattar del ben ch'i' vi trovai,, dir? de l'altre cose ch'i' v'ho scorte., "  Io non so ben ridir com'i' v'intrai,", tant'era pien di sonno a quel punto, che la verace via abbandonai., "  Ma poi ch'i' fui al pi? d'un colle giunto,", l? dove terminava quella valle, che m'avea di paura il cor compunto,, "  guardai in alto, e vidi le sue spalle", vestite gi? de' raggi del pianeta, che mena dritto altrui per ogne c...


In [7]:
rddCapra.count()
rddDC.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res18: Long = 2
res19: Long = 14753


In [10]:
val rddCapraSplitted = rddCapra.flatMap(_.split(" "))
// rddCapraSplitted.toDebugString
rddCapraSplitted.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddCapraSplitted: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at flatMap at <console>:23
res29: Array[String] = Array(sopra, la, panca, la, capra, campa, sotto, la, panca, la, capra, crepa)


In [11]:
val rddDCSplitted = rddDC.flatMap(elem => elem.split(" "))
// rddDCSplitted.toDebugString
rddDCSplitted.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddDCSplitted: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[18] at flatMap at <console>:23
res31: Array[String] = Array(LA, DIVINA, COMMEDIA, di, Dante, Alighieri, INFERNO, "", "", "", Inferno:, Canto, I, "", "", "", Nel, mezzo, del, cammin, di, nostra, vita, mi, ritrovai, per, una, selva, oscura, ch?, la, diritta, via, era, smarrita., "", "", Ahi, quanto, a, dir, qual, era, ?, cosa, dura, esta, selva, selvaggia, e, aspra, e, forte, che, nel, pensier, rinova, la, paura!, "", "", Tant'?, amara, che, poco, ?, pi?, morte;, ma, per, trattar, del, ben, ch'i', vi, trovai,, dir?, de, l'altre, cose, ch'i', v'ho, scorte., "", "", Io, non, so, ben, ridir, com'i', v'intrai,, tant'era, pien, di, sonno, a, quel, punto, che, la, verace, via, abbandonai., "", "", Ma, poi, ch'i', fui, al, pi?, d'un, colle, giunto,, l?, dove, terminava, quella, valle, che, m'avea, di, paura, il, cor, compunto...


## 301-4 From MapReduce to Spark

Reproduce on Spark the exercises seen on Hadoop MapReduce on the capra and divinacommedia datasets.

- Jobs:
  - Count the number of occurrences of each word
    - Result: (sopra, 1), (la, 4), …
  - Count the number of occurrences of words of given lengths
    - Result: (2, 4), (5, 8)
  - Count the average length of words given their first letter (hint: check the example in 301-1)
    - Result: (s, 5), (l, 2), …
  - Return the list of offsets of each words
    - Result: (sopra, (0)), (la, (0, 1)), ...
- How does Spark compare with respect to MapReduce? (performance, ease of use)
- How is the output sorted? How can you sort by value?

In [19]:
// Occurrences Count

val rddCapraCount = rddCapra.
   flatMap( _.split(" ") ).
   map((_,1)).
   reduceByKey((x,y)=>x+y)

rddCapraCount.collect()
// rddCapraCount.toDebugString

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddCapraCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[37] at reduceByKey at <console>:28
res47: String =
(2) ShuffledRDD[37] at reduceByKey at <console>:28 []
 +-(2) MapPartitionsRDD[36] at map at <console>:27 []
    |  MapPartitionsRDD[35] at flatMap at <console>:26 []
    |  s3a://unibo-bd2324-dcohen/datasets/capra.txt MapPartitionsRDD[11] at textFile at <console>:24 []
    |  s3a://unibo-bd2324-dcohen/datasets/capra.txt HadoopRDD[10] at textFile at <console>:24 []


In [26]:
// Word length count

val rddCapraOccurrences = rddCapra.
    flatMap( _.split(" ") ).
    map(word => (word.length, 1)).
    reduceByKey((x,y) => x + y)

rddCapraOccurrences.collect()
// rddCapraOccurrences.toDebugString

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddCapraOccurrences: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[51] at reduceByKey at <console>:28
res70: Array[(Int, Int)] = Array((2,4), (5,8))


In [29]:
// Average word Length by initial

val rddCapraAvgLenMap = rddCapra.flatMap( _.split(" ") ).
    filter ( _.length > 0).
    map (w => (w.take(1),w.length))

rddCapraAvgLenMap.collect()
// rddCapraAvgLength.toDebugString

val rddCapraAvgLenReduce = rddCapraAvgLenMap.
    aggregateByKey((0,0))((a,v)=>(a._1+v,a._2+1), (a1,a2)=>(a1._1+a2._1,a1._2+a2._2))  //reduce

rddCapraAvgLenReduce.collect()
// rddCapraAvgLength.toDebugString

val rddCapraAvgLenFinal = rddCapraAvgLenReduce.
    mapValues(v => v._1/v._2)

rddCapraAvgLenFinal.collect()
// rddCapraAvgLength.toDebugString

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddCapraAvgLenMap: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[63] at map at <console>:27
res92: Array[(String, Int)] = Array((s,5), (l,2), (p,5), (l,2), (c,5), (c,5), (s,5), (l,2), (p,5), (l,2), (c,5), (c,5))
rddCapraAvgLenReduce: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[64] at aggregateByKey at <console>:26
res96: Array[(String, (Int, Int))] = Array((p,(10,2)), (l,(8,4)), (s,(10,2)), (c,(20,4)))
rddCapraAvgLenFinal: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[65] at mapValues at <console>:26
res100: Array[(String, Int)] = Array((p,5), (l,2), (s,5), (c,5))


In [31]:
// Inverted index (word-based offset)

val rddCapraMap = rddCapra.
    flatMap( _.split(" ") ).
    zipWithIndex()

rddMap.collect()
// rddMap.toDebugString

// CompactBuffer isan alternative to ArrayBuffer that 
// results in better performance because it allocates less memory.

val rddGroup = rddMap.groupByKey()
rddGroup.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddCapraMap: org.apache.spark.rdd.RDD[(String, Long)] = ZippedWithIndexRDD[70] at zipWithIndex at <console>:26
rddGroup: org.apache.spark.rdd.RDD[(String, Iterable[Long])] = ShuffledRDD[71] at groupByKey at <console>:27
res114: Array[(String, Iterable[Long])] = Array((campa,CompactBuffer(5)), (la,CompactBuffer(1, 3, 7, 9)), (panca,CompactBuffer(2, 8)), (sotto,CompactBuffer(6)), (crepa,CompactBuffer(11)), (sopra,CompactBuffer(0)), (capra,CompactBuffer(4, 10)))


In [32]:
// Inverted index (sentence-based offset) alternative
val rddMap = rddCapra.zipWithIndex().
    map({case (k,v)=>(v,k)}).
    flatMapValues( x => x.split(" ") ).
    map({case (k,v)=>(v,k)}).
    distinct()
rddMap.collect()

val rddGroup = rddMap.groupByKey()
rddGroup.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddMap: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[78] at distinct at <console>:27
res116: Array[(String, Long)] = Array((la,0), (capra,0), (la,1), (sotto,1), (sopra,0), (capra,1), (panca,1), (crepa,1), (campa,0), (panca,0))
rddGroup: org.apache.spark.rdd.RDD[(String, Iterable[Long])] = ShuffledRDD[79] at groupByKey at <console>:24
res118: Array[(String, Iterable[Long])] = Array((campa,CompactBuffer(0)), (la,CompactBuffer(0, 1)), (panca,CompactBuffer(1, 0)), (sotto,CompactBuffer(1)), (crepa,CompactBuffer(1)), (sopra,CompactBuffer(0)), (capra,CompactBuffer(0, 1)))


In [35]:
// Sort an RDD by key

// rddCapra.sortByKey()

// Sort an RDD by value

// rddCapra.map({case(k,v) => (v,k)}).sortByKey()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
<console>:30: error: constructor cannot be instantiated to expected type;
 found   : (T1, T2)
 required: String
       rddCapra.map({case(k,v) => (v,k)}).sortByKey()
                         ^

