# 101 Spark basics

The goal of this lab is to get familiar with Spark programming.

- Scala
    - [Spark programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
    - [RDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html)
    - [PairRDD APIs](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/PairRDDFunctions.html)
- Python
    - [Spark programming guide](https://spark.apache.org/docs/3.5.0/rdd-programming-guide.html)
    - [All RDD APIs](https://spark.apache.org/docs/3.5.0/api/python/reference/api/pyspark.RDD.html)

Use `Tab` for autocompletion, `Shift+Tab` for documentation.

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Local Spark") \
    .config('spark.ui.port', '4040') \
    .getOrCreate()
sc = spark.sparkContext

sc

## 101-1 Spark warm-up

Load the ```capra``` and ```divinacommedia``` datasets and try the following actions:
- Show their content (```collect```)
- Count their rows (```count```)
- Split phrases into words (```map``` or ```flatMap```; what’s the difference?)
- Check the results (remember: evaluation is lazy)
- Try the ```toDebugString``` function to check the execution plan
    - In PySpark, use ```toDebugString().decode("unicode_escape")```

In [None]:
rddCapra = sc.textFile("../../../../datasets/capra.txt")
rddDC = sc.textFile("../../../../datasets/divinacommedia.txt")

In [None]:
rddCapraWords1 = rddCapra.map(lambda x : x.split(" ") )
rddCapraWords1.collect()

In [None]:
rddCapraWords1.count()

In [None]:
rddCapraWords2 = rddCapra.flatMap(lambda x : x.split(" ") )
rddCapraWords2.collect()

In [None]:
rddCapraWords2.count()

In [None]:
rddL = rddCapra. \
   flatMap(lambda x : x.split(" ") ). \
   map(lambda x : (x,1)). \
   reduceByKey(lambda x,y : x+y)
print(rddL.toDebugString().decode("unicode_escape"))

## 101-2 Basic Spark jobs

Implement on Spark the following jobs and test them on both capra and divinacommedia datasets.

- **Word count**: count the number of occurrences of each word
  - Result: (sopra, 1), (la, 4), …
- **Word length count**: count the number of occurrences of words of given lengths
  - Result: (2, 4), (5, 8)
- Count the average length of words given their first letter (i.e., words that begin with "s" have an average length of 5)
  - Result: (s, 5), (l, 2), …
- Return the inverted index of words (i.e., for each word, list the numbers of lines in which they appear)
  - Result: (sopra, (0)), (la, (0, 1)), ...

Also, check how sorting works and try to sort key-value RDDs by descending values.

In [None]:
# Word count
rddCapra. \
  flatMap(lambda x : x.split(" ") ). \
  map(lambda x : (x,1)). \
  reduceByKey(lambda x,y : x + y). \
  map(lambda kv: (kv[1],kv[0])). \
  sortByKey(False). \
  collect()

In [None]:
# Word length count
rddCapra. \
  flatMap(lambda x : x.split(" ") ). \
  map(lambda x : (len(x),1)). \
  reduceByKey(lambda x,y : x + y). \
  collect()

In [None]:
# Average word length by initial
rddCapra. \
  flatMap(lambda x : x.split(" ") ). \
  filter(lambda x : len(x)>0 ). \
  map(lambda x : (x[0:1].lower(), (1,len(x)))). \
  reduceByKey(lambda x, y : (x[0] + y[0], x[1] + y[1])). \
  mapValues(lambda v : v[1]/v[0]). \
  collect()

In [None]:
# Average word length by initial (alternative on the final map)
rddCapra. \
  flatMap(lambda x : x.split(" ") ). \
  filter(lambda x : len(x)>0 ). \
  map(lambda x : (x[0:1].lower(), (1,len(x)))). \
  reduceByKey(lambda x, y : (x[0] + y[0], x[1] + y[1])). \
  map(lambda kv : (kv[0], kv[1][1]/kv[1][0])). \
  collect()

In [None]:
# Inverted index (word-based offset)
rddCapra. \
  flatMap(lambda x : x.split(" ") ). \
  zipWithIndex()
# ... continue from here

In [None]:
# Inverted index (sentence-based offset)
rddCapra. \
  zipWithIndex()
# ... continue from here
# Hint: associating sentence-based offsets to words can be done either via Python's list-comprehension or via the same Spark's transformations you have already used