# 101 Spark basics

The goal of this lab is to get familiar with Spark programming.

- Scala
    - [Spark programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
    - [RDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html)
    - [PairRDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/PairRDDFunctions.html)
- Python
    - [Spark programming guide](https://spark.apache.org/docs/3.5.0/rdd-programming-guide.html)
    - [All RDD APIs](https://spark.apache.org/docs/3.5.0/api/python/reference/api/pyspark.RDD.html)

Use `Tab` for autocompletion, `Shift+Tab` for documentation.

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Local Spark") \
    .config('spark.ui.port', '4040') \
    .getOrCreate()
sc = spark.sparkContext

sc

## 101-1 Spark warm-up

Load the ```capra``` and ```divinacommedia``` datasets and try the following actions:
- Show their content (```collect```)
- Count their rows (```count```)
- Split phrases into words (```map``` or ```flatMap```; what’s the difference?)
- Check the results (remember: evaluation is lazy)
- Try the ```toDebugString``` function to check the execution plan
    - In PySpark, use ```toDebugString().decode("unicode_escape")```

In [28]:
rddCapra = sc.textFile("../../../../datasets/capra.txt")
rddDC = sc.textFile("../../../../datasets/divinacommedia.txt")

rddCapra.collect()
#rddDC.collect()

rddCapra.count()
#rddDC.count()

rddCapra.map(lambda x: x.split(" ")).collect()     #map suddivide per righe
#rddCapra.flatMap(lambda x: x.split(" ")).collect() #flatMap suddivide per parole

#print(rddCapra.toDebugString().decode("unicode_escape"))

[['sopra', 'la', 'panca', 'la', 'capra', 'campa'],
 ['sotto', 'la', 'panca', 'la', 'capra', 'crepa']]

## 101-2 Basic Spark jobs

Implement on Spark the following jobs and test them on both capra and divinacommedia datasets.

- **Word count**: count the number of occurrences of each word
  - Result: (sopra, 1), (la, 4), …
- **Word length count**: count the number of occurrences of words of given lengths
  - Result: (2, 4), (5, 8)
- Count the average length of words given their first letter (i.e., words that begin with "s" have an average length of 5)
  - Result: (s, 5), (l, 2), …
- Return the inverted index of words (i.e., for each word, list the numbers of lines in which they appear)
  - Result: (sopra, (0)), (la, (0, 1)), ...

Also, check how sorting works and try to sort key-value RDDs by descending values.

In [54]:
rddCapra.flatMap(lambda x: x.split(" ")).map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y).sortByKey(True).collect()

[('campa', 1),
 ('capra', 2),
 ('crepa', 1),
 ('la', 4),
 ('panca', 2),
 ('sopra', 1),
 ('sotto', 1)]

In [63]:
rddCapra.flatMap(lambda x: x.split(" ")).map(lambda x: (len(x),1)).reduceByKey(lambda x,y: x+y).collect()

[(5, 8), (2, 4)]

In [79]:
rddCapra.flatMap(lambda x: x.split(" ")).map(lambda x: (x[0],(len(x),1))).reduceByKey(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc1[1])).mapValues(lambda x: (x[0]/x[1])).collect()

[('s', 5.0), ('l', 1.0), ('p', 5.0), ('c', 2.5)]

## 101-3 Extra Spark jobs

Implement the following job.

- Co-occurrence count: count the number of co-occurrences in the text. A co-occurrence is defined as "two distinct words appearing in the same line".
  - In the first line of the *capra* dataset, co-occurrences are:
     - (sopra, la), (sopra, panca), (sopra, capra), (sopra, campa)
     - (la, sopra), (la, panca), (la, capra), (la, campa) 
     - (panca, sopra), (panca, la), (panca, capra), (panca, campa)
     - (capra, sopra), (capra, la), (capra, panca), (capra, campa)
     - (campa, sopra), (campa, la), (campa, panca), (campa, capra)