# Méthode d'accès aux données
## Labo: Spark

Auteurs: Jonathan Friedli et Valentin Kaelin

Printemps 2022

### Spark initialisation

Nothing to modify in this section.

Here we create and configure the SparkSession and SparkContext.

In [None]:
// The package xbean-asm7-shaded is required by NotebookSparkSession.
// From version 3.2, Spark depends on xbean-asm9-shaded.
import $ivy.`org.apache.xbean:xbean-asm7-shaded:4.16`
// Import the wanted version of Spark.
import $ivy.`org.apache.spark::spark-sql:3.2.0`

[32mimport [39m[36m$ivy.$                                        
// Import the wanted version of Spark.
[39m
[32mimport [39m[36m$ivy.$                                  [39m

In [None]:
// Configure Logger to limit output
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.WARN)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m

In [None]:
// Create the Spark session.
// To have better integration with the notebook, we use a wrapper class provided by almond-spark
import org.apache.spark.sql._
import org.apache.spark.rdd._

val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
    .getOrCreate()
}

Loading spark-stubs
Getting spark JARs
Creating SparkSession
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
22/06/07 14:00:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36morg.apache.spark.rdd._

[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@7c3bc851

In [None]:
// Retrieve the Spark context
def sc = spark.sparkContext

defined [32mfunction[39m [36msc[39m

### Dataset loading

Nothing to modify in this section.

In [None]:
case class Movie(id: Int, title: String, genres: Seq[String],
                 description: String, director: String, actors: Seq[String],
                 year: Int, rating: Float, votes: Int)

defined [32mclass[39m [36mMovie[39m

In [None]:
def parseRow(row: Row): Movie = {
    val id = row.getInt(0)
    val title = row.getString(1)
    val genres = row.getString(2).split(",").toList
    val description = row.getString(3)
    val director = row.getString(4)
    val actors = row.getString(5).split(",").toList
    val year = row.getInt(6)
    val rating = row.getDouble(8).toFloat
    val votes = row.getInt(9)

    Movie(id, title, genres, description, director, actors, year, rating, votes)
}

defined [32mfunction[39m [36mparseRow[39m

In [None]:
val filename = "/work/data/IMDB-Movie-Data.csv"
val moviesDF = spark.read.format("csv")
    .option("sep", ",")
    .option("inferSchema", "true")
    .option("header", "true")
    .load(filename)
val rddMovies = moviesDF.rdd.map(parseRow)

[36mfilename[39m: [32mString[39m = [32m"/work/data/IMDB-Movie-Data.csv"[39m
[36mmoviesDF[39m: [32mDataFrame[39m = [Rank: int, Title: string ... 10 more fields]
[36mrddMovies[39m: [32mRDD[39m[[32mMovie[39m] = MapPartitionsRDD[15] at map at cmd6.sc:7

In [None]:
// Print the title of the first 10 movies to see if they were correctly added.
rddMovies.take(10).map(m => m.title).foreach(println)

Guardians of the Galaxy
Prometheus
Split
Sing
Suicide Squad
The Great Wall
La La Land
Mindhorn
The Lost City of Z
Passengers


### Part 1 - Playing with the movies using RDD functions

The goal of this part is to play (i.e. query, filter and transform the data) with the movies.

#### Ex1 - Print the movies whose title contains "City" 

Goal: 

* use `map()` and `filter()` methods to get the title of the movies that contains "City" in their title
 
Output example:

```plain
City of Tiny Lights
The Mortal Instruments: City of Bones
```

Steps:

* Use `filter()` to only keep the movies that contains "City" in their title
* Use `map()` to retrieve the titles of these filtered movies
* Use `foreach()` to pretty print the results

In [None]:
rddMovies.filter(m => m.title.contains("City")).map(m => m.title).foreach(println)

The Lost City of Z
Sin City: A Dame to Kill For
City of Tiny Lights
The Mortal Instruments: City of Bones
Sex and the City
Sex and the City 2


#### Ex2 - Print the title of the movies rated between `rateMin` and `rateMax`. Take the 10 worst ratings.

Goal:
    
* Take the titles of the movies that were rated between `rateMin` and `rateMax` (excluding `rateMin` and including`rateMax`).
* This list is sorted by rating ASC
    
Output example:

```plain
...
3.5 - Wrecker
3.7 - The Last Face
...
```
    
Steps:

* Use `filter()` to only keep the movies released between `rateMin` and `rateMax`
* Sort the filtered movies by increasing rating
* Use `map()` to keep only the relevant attributes (i.e. rating and title)
* Use `foreach()` to pretty print the results

In [None]:
val rateMin = 3
val rateMax = 4

rddMovies.filter(m => (m.rating > rateMin && m.rating <= rateMax))
            .sortBy(m => m.rating, true)
            .map(m => (m.rating, m.title))
            .take(10)
            .foreach(m => System.out.println(m._1 + " - " + m._2))

3.2 - Tall Men
3.5 - The Intent
3.5 - Wrecker
3.7 - The Last Face
3.7 - Satanic
3.9 - The Disappointments Room
3.9 - The Black Room
3.9 - Birth of the Dragon
4.0 - 2307: Winter's Dream


[36mrateMin[39m: [32mInt[39m = [32m3[39m
[36mrateMax[39m: [32mInt[39m = [32m4[39m

#### Ex3 - Print the 10 top genres

Goals:

* Print the list of the genres that appears the most
* Use `flatMap()`

Output example:

```plain
Drama (513)
Action (303)
Comedy (279)
Adventure (259)
```

Theory:

When an operation is giving you a sequence of sequences like:

```scala
Array("hello", "world").map(word => word.split(""))
res91: Array[Array[String]] = Array(Array(h, e, l, l, o), Array(w, o, r, l, d))
```

You may want to flatten this to only have a single list like:
```scala
Array("hello", "world").map(_.split("")).flatten
res93: Array[String] = Array(h, e, l, l, o, w, o, r, l, d)
```

You can achieve the same result (i.e. `map` + `flatten`) using `flatMap`:
```scala
Array("hello", "world").flatMap(_.split(""))
res95: Array[String] = Array(h, e, l, l, o, w, o, r, l, d)
```

We are going to apply this same technique with the `genres` member.

Steps:

* Use `flatMap()` to get the list with all the genres
* Make sure to remove trailling whitespaces
* Count the genres
* Sort them by decreasing order
* Show the top 10 genres

In [None]:
rddMovies
    .flatMap(m => m.genres)
    .map(g => (g.trim(), 1))
    .reduceByKey(_ + _)
    .sortBy(g => g._2, false)
    .take(10)
    .foreach(g => println(g._1 + " (" + g._2 + ")"))

Drama (513)
Action (303)
Comedy (279)
Adventure (259)
Thriller (195)
Crime (150)
Romance (141)
Sci-Fi (120)
Horror (119)
Mystery (106)


#### Ex4 - Print the average number of votes per year, order by descreasing number of votes

Goal:

* Print the average votes per year
* This output is sorted by descreasing votes

Output example:

```plain
...
year: 2008 average votes: 275505.3846153846
year: 2009 average votes: 255780.64705882352
year: 2010 average votes: 252782.31666666668
...
```

Theory:

We are going to use `reduceByKey()` which has the following signature `reduceByKey(func: (V, V) => V): RDD[(K, V)]`. 

`reduceByKey()` works on a RDD like `RDD[(K,V)]` (i.e. sort of "list of key/values pairs"). 

`reduceByKey()` takes a function that, from two elements, returns one i.e. the `func: (V, V) => V` in the signature.
The difference with `reduce()` is that `reduceByKey()` uses two elements sharing the same key.

For example (pseudo code):

```plain
 year, count
(2010, 2)
(2011, 3)
(2011, 4)
(2010, 8)
// use reduceByKey((count1, count2) => count1+count2)
> (2010, 10)
> (2011, 7)
```

Note: here `count` is just an Int but it can be anything e.g. `Movie`

Steps:

* To compute the average we need the **total sum** of votes per year and the **count** of all the movies per year
* Use `map()` to create an RDD made of `(year, (votes, 1))`. Like a word count we use the `1` to be able to count the number of movies per year
* Use `reduceByKey()` to sum the votes and to count the number of movies per year. The output should look like: `(year, (totalVotes, moviePerYearCount))`
* Find a way to compute the average using the result from the last operation
* Sort by number of votes decreasing

In [None]:
val intermediate = rddMovies
    .map(m => (m.year, (m.votes, 1)))
    .reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))

val avgVotes = intermediate.mapValues {
    case (votes, numberOfFilms) => votes / numberOfFilms.toFloat
}

avgVotes
    .sortBy(y => y._2, false)
    .foreach(y => println("year: " + y._1 + " average votes: " + y._2))

year: 2012 average votes: 285226.1
year: 2008 average votes: 275505.38
year: 2006 average votes: 269289.97
year: 2009 average votes: 255780.64
year: 2010 average votes: 252782.31
year: 2007 average votes: 244331.03
year: 2011 average votes: 240790.3
year: 2013 average votes: 219049.64
year: 2014 average votes: 203930.22
year: 2015 average votes: 115726.22
year: 2016 average votes: 48591.754


[36mintermediate[39m: [32mRDD[39m[([32mInt[39m, ([32mInt[39m, [32mInt[39m))] = ShuffledRDD[30] at reduceByKey at cmd11.sc:3
[36mavgVotes[39m: [32mRDD[39m[([32mInt[39m, [32mFloat[39m)] = MapPartitionsRDD[31] at mapValues at cmd11.sc:5

### Part 2 - Create a basic Inverted Index

The goal of this part is to show you how to create an inverted index that indexes words from all the movies' description.

Goal:

Using `rddMovies` create an inverted that use the movies' description:

```plain
Movie(1,Guardians of the Galaxy,List(Action, Adventure, Sci-Fi),A group of intergalactic [...] of the universe.,James Gunn,List(Chris Pratt, Vin Diesel, Bradley Cooper, Zoe Saldana),2014,8.1,757074.0)
Movie(2,Prometheus,List(Adventure, Mystery, Sci-Fi),Following clues to the origin[...] not alone.,Ridley Scott,List(Noomi Rapace, Logan Marshall-Green, Michael Fassbender, Charlize Theron),2012,7.0,485820.0)
Movie(3,Split,List(Horror, Thriller),Three girls are kidnapped [...] a frightful new 24th.,M. Night Shyamalan,List(James McAvoy, Anya Taylor-Joy, Haley Lu Richardson, Jessica Sula),2016,7.3,157606.0)
...
```

and extract them to produce an inverted index like:

```plain
"reunion" -> (640, 697)
"runner" -> (338)
"vietnam" -> (797, 947, 983)
...
```

Steps

* Tokenize description i.e. produce an RDD like (movId, words)
* Normalize words e.g. toLowercase, trimming,..
* Remove stopwords (ignored here)
* Apply stemming (ignored here)
* Group by document id

In [None]:
/**
* Goal: create an inverted index that allows searching a word
* in the movies description.
* Features:
* - case insensitive
*
*/
// In this first function we are going to tokenize and order the descriptions of the movies, then return these data. We are not going to apply any search right now.
def createInvertedIndex(movies: RDD[Movie]): RDD[(String, Iterable[Int])] = {
    // Define helper functions directly inside this function. In scala you can declare inner functions
    // and use them only inside the function they were declared. Useful to encapsulate/restrict 
    // their use outside this function.
    
    // Split the given string into an array of words (without any formatting), then return it.
    def tokenizeDescription(description: String): Seq[String] = {
        description.split("[ ,.]+") // On split aussi sur les virgules et points potentiellement collés aux mots
    }
    
    // Remove the blank spaces (trim) in the given word, transform it in lowercase, then return it.
    def normalizeWord(word: String): String = {
        word.trim().toLowerCase()
    }
    
    // For the sake of simplicity let's ignore the implementation (in a real case we would return true if w is a stopword, otherwise false).
    def isStopWord(w: String): Boolean = {
        false
    }
    
    // For the sake of simplicity let's ignore the implementation (in a real case we would apply stemming to w and return the result, e.g. w=automation -> w=automat).
    def applyStemming(w: String): String = {
        w
    }
    
    // Here we are going to work on the movies RDD, by tokenizing and normalizing the description of every 
    // movie, then by building a key-value object that contains the tokens as keys, and the IDs of 
    // the movies as values
    // (see the example on 4).
    // The goal here is to do everything by chaining the possible transformations and actions of Spark.
    // Possible steps:
    //   1) What we first want to do here is applying the 4 previous methods on any movie's description. 
    //      Be aware of the fact that we also want to keep the IDs of the movies.
    //   2) For each tokenized word, create a tuple as (word, id), where id is the current movie id
    //        [
    //          ("toto", 120), ("mange", 120), ("des", 120), ("pommes", 120),
    //          ("toto", 121), ("lance", 121), ("des", 121), ("photocopies", 121)
    //        ]
    //      Hint: you can use a `map` function inside another `map` function.
    //   3) We finally need to find a way to remove duplicated keys and thus only having one entry per key,
    //      with all the linked IDs as values. For example:
    //        [
    //          ("toto", [120, 121]),
    //          ("mange", [120]),
    //          ...
    //        ]

    val invertedIndex = movies
        .flatMap(m => 
            tokenizeDescription(m.description).map(w => (w, m.id))
        )
        .map(m => (normalizeWord(m._1), m._2))
        .filter(m => !isStopWord(m._1))
        .map(m => (applyStemming(m._1), m._2))
        .distinct()  // enlever les doublons
        .groupByKey()

    // Return the new-built inverted index.
    invertedIndex
}

defined [32mfunction[39m [36mcreateInvertedIndex[39m

Now we would like to use our inverted index to display the top N most used words in the descriptions of movies.

In [None]:
// Here we are going to operate the analytic and display its result on a given inverted index (that will be obtained from the previous function).
def topN(invertedIndex: RDD[(String, Iterable[Int])], N: Int): Unit = {
  // We are going to work on the given invertedIndex array to do our analytic:
  //   1) Find a way to get the number of movie in which a word appears.
  //   2) Keep only the top N words and their occurence.
  val topWords = invertedIndex
    .map(m => (m._1, m._2.size))
    .sortBy(m => m._2, false)
    .take(N)
  
  // Print the words and the number of descriptions in which they appear.
  println("Top '" + N + "' most used words")
  topWords.foreach(println)
}

defined [32mfunction[39m [36mtopN[39m

In [None]:
// Code used to test your implementation.
// Create the inverted index of the movies.
val invertedIndex = createInvertedIndex(rddMovies)

// Show how the inverted index looks like.
invertedIndex.take(3).foreach(x => println(x._1 + ": " + x._2.mkString(", ")))

// Show the top 10 most used words.
topN(invertedIndex, 10)

divorcee: 63
reunion: 697, 640
confidants: 785


Top '10' most used words
(a,837)
(the,720)
(to,628)
(of,559)
(and,553)
(in,459)
(his,327)
(an,259)
(is,256)
(with,243)


[36minvertedIndex[39m: [32mRDD[39m[([32mString[39m, [32mIterable[39m[[32mInt[39m])] = ShuffledRDD[42] at groupByKey at cmd12.sc:55

### Part 3 - Dataframe and SparkSQL

For all of the following exercices, write your queries in two different ways:

* using the sql literal
* using DataFrame API (select, where, etc.)

#### Ex1 - Use the moviesDF DataFrame

* Use the dataframe `moviesDF` already created when loading the data 
* Print the schema of moviesDF
* Show the first 10 lines of the moviesDF as a table

In [None]:
// TODO students

#### Ex2 - Get the movies (id, title, votes, director) whose title contains "City" 

Apply two different ways: 

* use the sql literal 
* use DataFrame API (select, where, etc.)

In [None]:
// TODO student

#### Ex3 - Get the number of movies which have a number of votes between 500 and 2000 (inclusive range)

In [None]:
// TODO student

#### Ex4 - Get the minimum, maximum and average rating of films per director. Sort the results by minimum rating.  

In [None]:
// TODO student

#### Ex5 - Find the title of the movie(s) having the minimum rating for each year. Show the title, year and the rating information in the result, order by increasing rating.

**Example output**

```plain
+--------------------+----+------+
|               title|year|rating|
+--------------------+----+------+
|      Disaster Movie|2008|   1.9|
|Don't Fuck in the...|2016|   2.7|
|Dragonball Evolution|2009|   2.7|
...
```

In [None]:
// TODO student

#### Ex6 - Find the title of movies having the same director. 

**Example output**

```plain
+------------------+--------------------+--------------------+
|         director1|              title1|              title2|
+------------------+--------------------+--------------------+
|        James Gunn|Guardians of the ...|               Super|
|        James Gunn|Guardians of the ...|             Slither|
|      Ridley Scott|          Prometheus|        Body of Lies|
|      Ridley Scott|          Prometheus|         A Good Year|
...
```

Note that when using the dataframe API:
* You can use `===` to test equality of columns and `!==` to test non-equality of them
* You can use the `as` keyword to make alias of column names and table names (check the reference documentation)

In [None]:
// TODO student

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=e4added7-fb86-4475-95f5-9b3ee51af0de' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>