Author: Mohamed-Amine Baazizi
Affiliation: LIP6 - Faculté des Sciences - Sorbonne Université
Email: mohamed-amine.baazizi@lip6.fr

# Prise en main de Scala (30 mn)

Cet exercice illustre les différentes structures de contrôle de Scala présentées en cours. 
Il permet de comprendre le paradigme fonctionnel : seules les fonctions `map, reduce, flatten, filter, flatMap` sont autorisées.

Temps consillé : 30 mn

## Question 1

Définir trois fonctions qui prennent en entrée une liste d'entiers `liste` et réalisent les opérations suivantes:
* `maxEntiers` retourne le plus grand des entiers de `liste` 
* `scEntiers` retourne la somme des carrés des entiers de `liste`
* `moyEntiers` retourne la moyenne des entiers de `liste`

Tester votre réponses en invoquant ces fonctions sur `listeEntiers` définie comme suit

In [4]:
val listeEntiers = List.range(1,11)

listeEntiers: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


In [5]:
def maxEntiers(liste: List[Int] ): Int = liste.reduce((x,y)=>if (x>y) x else y)
maxEntiers(listeEntiers)

maxEntiers: (liste: List[Int])Int
res2: Int = 10


In [6]:
def scEntiers(liste: List[Int] ): Int = liste.map(x => x*x).reduce((x,y)=>x+y)
scEntiers(listeEntiers)

scEntiers: (liste: List[Int])Int
res3: Int = 385


In [7]:
def moyEntiers(liste: List[Int] ): Float = (liste.reduce((a,b)=>a+b).toFloat / liste.length)
moyEntiers(listeEntiers)

moyEntiers: (liste: List[Int])Float
res4: Float = 5.5


## Question 2

Soit une liste chaine de caractères construite à l'aide de l'instruction suivante.

In [1]:
val listeTemp = List("7,2010,04,27,75", "12,2009,01,31,78", "41,2009,03,25,95", "2,2008,04,28,76", "7,2010,02,32,91")


Intitializing Scala interpreter ...

Spark Web UI available at http://10.64.16.105:4040
SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1570785244399)
SparkSession available as 'spark'


listeTemp: List[String] = List(7,2010,04,27,75, 12,2009,01,31,78, 41,2009,03,25,95, 2,2008,04,28,76, 7,2010,02,32,91)


Chaque élément représente un enregistrement fictif de températures avec le format (station, année, mois, température, code_département). On voudrais calculer pour, l'année 2009, le maximum et la moyenne de ses températures.
Compléter l'instruction suivante qui permet les transformations et les conversions de type nécessaires pour l'évaluation de ces deux calculs. 


In [8]:
val temp2009 = listeTemp.map(x=>x.split(",")).filter(x=>x(1)=="2009").map(x=>x(3).toInt)

println("max des temps " + maxEntiers(temp2009))

println("moy des temps " + moyEntiers(temp2009))

max des temps 31
moy des temps 28.0


temp2009: List[Int] = List(31, 25)


## Question 3

Soit une liste chaine de caractères construite à l'aide de l'instruction suivante.

In [9]:
val melange = List("1233,100,3,20171010", "1224,22,4,20171009", "100,lala,comedie", "22,loup,documentaire")

melange: List[String] = List(1233,100,3,20171010, 1224,22,4,20171009, 100,lala,comedie, 22,loup,documentaire)


Deux types d'éléments existent : ceux de la forme (userID, movieID, rating, timestamp) et ceux de la forme (movieID, title, genre). Le domaine des userID est [1000, 2000] et celui des movieID est [0, 100].

Il est demandé de construire à partir de `melange` deux listes distinctes :

* `notes` contenant les éléments de la forme (userID, movieID, rating, timestamp) et dont le type est (Int, String, Int, Int) 
* `films` contenant les éléments de la forme (movieID, title, genre) et dont le type est (Int, String, String).

In [12]:
val notes = melange.map(x=>x.split(",")).filter(x=>x.length==4).map(x=>List(x(0).toInt, x(1), x(2).toInt, x(3).toInt))
val films = melange.map(x=>x.split(",")).filter(x=>x.length==3).map(x=>List(x(0).toInt, x(1), x(2)))

notes: List[List[Any]] = List(List(1233, 100, 3, 20171010), List(1224, 22, 4, 20171009))
films: List[List[Any]] = List(List(100, lala, comedie), List(22, loup, documentaire))


## Question 4

Soit une liste personnes contenant des tuples ayant trois attributs décrivant des personnes :

* le premier attribut est le nom de la personne;
* le second attribut est le type de la personne : etudiant (etu), enseignant (ens) ou inconnu (nan);
* la troisième attribut est l'année d'inscription (s'il s'agit d'un étudiant) ou les années d'ancienneté pour les enseignants.

In [32]:
val personnes = List(("Joe", "etu", 3), ("Lee", "etu", 4), ("Sara", "ens", 10), ("John", "ens", 5), ("Bill", "nan",20))

personnes: List[(String, String, Int)] = List((Joe,etu,3), (Lee,etu,4), (Sara,ens,10), (John,ens,5), (Bill,nan,20))


Définir une classe `Etu(nom:String, annee:Int)` et une classe `Ens(nom:String, annee:Int)`

Transformer la liste `personnes` en une liste d'objets de la classe `Etu` ou `Ens` encapsulant les information des tuples en entrée. Par exemple, le tuple `("Joe", "etu", 3)` devra être transformé en un objet `Etu("Joe", 3)`.

Attention Les personnes de type inconnu ne doivent être dans le résultat!

Utiliser le pattern matching pour répondre à cette question.



In [33]:
case class Etu(nom:String, annee:Int)
case class Ens(nom:String, annee:Int)

defined class Etu
defined class Ens


In [34]:
def transform(infos: (String, String, Int)) = infos._2 match {
    case "etu" => Etu(infos._1, infos._3)
    case "ens" => Ens(infos._1, infos._3)
    case _ => "inconnu"
}

transform: (infos: (String, String, Int))java.io.Serializable


In [36]:
val test = personnes.map(x=>transform(x)).filter(x => x!= "inconnu")

test: List[java.io.Serializable] = List(Etu(Joe,3), Etu(Lee,4), Ens(Sara,10), Ens(John,5))


# Prise en main de Spark RDD

Prendre le temps de lire la documentation Spark sur l'utilisation des RDD
https://spark.apache.org/docs/latest/rdd-programming-guide.html

et sur l'API RDD
https://spark.apache.org/docs/2.1.1/api/scala/index.html#org.apache.spark.rdd.RDD

## Préparation datasets

Créer un répertoire dans votre dossier `tmp` en tapant

`mkdir -p /tmp/BDLE/dataset`

* Pour le dataset `wordcount` copier en tapant

 `cp /Infos/bd/spark/bdle/2015/data/wordcount.txt.bz2 /tmp/BDLE/dataset` 
 
 `cd /tmp/BDLE/dataset`
 
 `bunzip2 wordcount.txt.bz2`
  
  Vérifier que vous avez bien le fichier `wordcount.txt`


* Pour le dataset `books` copier en tapant

    `cp -r /Infos/bd/spark/dataset/Books /tmp/BDLE/dataset`

    `cd /tmp/BDLE/dataset/Books`
    
     Vérifier que vous avez bien les fichiers 
     
     `books.csv`  `ratings.csv`	et `users.csv`


## Exercice 1 (30 mn) 

Cet exercice utilise le dataset `wordcount`

Exécuter la commande suivante pour le charger dans la valeur `data` et tester que le chargement marche bien en lisant les 10 premieres lignes. 

In [1]:
val data = sc.textFile("wordcount.txt")

data.take(10)

Intitializing Scala interpreter ...

Spark Web UI available at http://10.64.16.105:4040
SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1570787444881)
SparkSession available as 'spark'


data: org.apache.spark.rdd.RDD[String] = wordcount.txt MapPartitionsRDD[1] at textFile at <console>:25
res0: Array[String] = Array(de.b %C3%96ffentliches_Recht/_Assex/_Hessen/_Handakte/_Widerspruchsbescheid_1 1 8533, de.b 2:WorldTraveller101 1 5744, de.b A._Einstein,_Ist_die_Tr%C3%A4gheit_eines_K%C3%B6rpers_von_seinem_Energieinhalt_abh%C3%A4ngig%3F_-_Kommentiert_und_erl%C3%A4utert. 1 9290, de.b A_Poem_a_Day/_2._September:_Verg%C3%A4nglichkeit_der_Sch%C3%B6nheit_(Christian_Hoffmann_von_Hoffmannswaldau) 1 23801, de.b Adventskalender_2009 1 13741, de.b Adventskalender_2011 1 13509, de.b Algorithmensammlung:_Graphentheorie:_Dijkstra-Algorithmus 1 8015, de.b Allgemeine_und_Anorganische_Chemie/_Atombau 1 15108, de.b Analysis 1 9078, de.b Anorganische_Chemie_f%C3%BCr_Sch%C3%BCler 1 12620)


*Remarque* pour partitionner une chaîne de caractères en utilisant le point (.) comme délimiteur à l'aide de la méthode `split()`, il faut protéger le point avec deux backslahs comme suit `split("\\.")`

#### Q1. Structurer le contenu de data de sorte à obtenir un tableau de tableaux de chaines de caractères. Ce dernier devra être stocké dans une nouvelle variable nommée list.

In [2]:
val list = data.map(x=>x.split(" "))

list.take(10)

list: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:26
res1: Array[Array[String]] = Array(Array(de.b, %C3%96ffentliches_Recht/_Assex/_Hessen/_Handakte/_Widerspruchsbescheid_1, 1, 8533), Array(de.b, 2:WorldTraveller101, 1, 5744), Array(de.b, A._Einstein,_Ist_die_Tr%C3%A4gheit_eines_K%C3%B6rpers_von_seinem_Energieinhalt_abh%C3%A4ngig%3F_-_Kommentiert_und_erl%C3%A4utert., 1, 9290), Array(de.b, A_Poem_a_Day/_2._September:_Verg%C3%A4nglichkeit_der_Sch%C3%B6nheit_(Christian_Hoffmann_von_Hoffmannswaldau), 1, 23801), Array(de.b, Adventskalender_2009, 1, 13741), Array(de.b, Adventskalender_2011, 1, 13509), Array(de.b, Algorithmensammlung:_Graphentheorie:_Dijkstra-Algorithmus, 1, 8015), Array(de.b, Allgemeine_und_Anorganische_Chemie/_Atombau, 1, 15108), Array(d...

#### Q2. Afficher les 100 premiers éléments de la 3e colonne de `list`.

In [3]:
val q2 = list.map(x=>x(2))

q2.take(100)

q2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at map at <console>:26
res2: Array[String] = Array(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 4, 1, 5, 1, 3, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 4, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 4, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)


#### Q3. Transformer le contenu de `list` en une liste de paires `(mot, nb)` où `mot` correspond à la première colonne de `list` et `nb` sa troisième colonne.

In [10]:
val q3 = list.map(x=>(x(0), x(2).toInt))

q3.take(10)

q3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:28
res6: Array[(String, Int)] = Array((de.b,1), (de.b,1), (de.b,1), (de.b,1), (de.b,1), (de.b,1), (de.b,1), (de.b,1), (de.b,1), (de.b,1))


#### Q4. Grouper les paires par l'attribut `mot` et additionner leur nombres respectifs.

In [13]:
val q4 = q3.reduceByKey((x, y) => (x+y))
q4.take(10)

q4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:28
res9: Array[(String, Int)] = Array((frr,493), (fr.n,959), (en.q,3202), (en.s,84086), (frp.mw,61), (fr.voy,1646), (de.q,721), (frr.mw,130), (frp,538), (fr.v,893), (fr,496594), (fr.d,25668), (fr.b,1472), (fr.mw,488420), (de.s,2065), (en.d,38830), (en,1812752), (de.d,7943), (fr.q,730), (de.v,345), (de,460970), (en.b,82231), (fr.s,9215), (de.voy,894), (de.b,1381), (de.n,687), (en.v,723), (en.n,1683), (en.mw,7404104), (en.voy,1175), (de.mw,509600))


#### Q5. Reprendre les questions Q3 et Q4 en calculant `mot` différemment : 

désormais, `mot` doit correspondre au préfixe du premier sous-élément de chaque élément de `list`, 

Exemple
* pour `en.d`, mot doit être `en`
* pour `fr.d`, mot doit être `fr`, etc. 

Comparer les résultats avec ceux obtenus précédemment.

In [17]:
val q5 = list.map(x=>(x(0).split("\\.")(0), x(2).toInt)).reduceByKey((x, y) => (x+y))
q5.take(10)

q5: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at reduceByKey at <console>:28
res11: Array[(String, Int)] = Array((frr,623), (frp,599), (fr,1025597), (en,9428786), (de,984606))


## Exercice 2 (60 mn) 


Pour cet exercice, on utilise le jeux de données Books qui renseigne sur des livres (books.csv), des utilsateurs (users.csv) et des notes réalisées par les utilsateurs (ratings.csv).

Les schémas des tables sont 

* `Users (userid: Number, country: Text, age: Number)`
* `Books (bookid: Number, titlewords: Number, authorwords: Number, year: Number, publisher: Number)`
* `Ratings (userid: Number, bookid: Number, rating: Number)`

In [1]:
val path = "Books/" 

val books_data = sc.textFile(path + "books.csv")
val users_data = sc.textFile(path + "users.csv")
val ratings_data = sc.textFile(path + "ratings.csv")



//users_data.take(10)
ratings_data.take(10)
//books_data.take(10)

Intitializing Scala interpreter ...

Spark Web UI available at http://172.20.10.6:4040
SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1570818344770)
SparkSession available as 'spark'


path: String = Books/
books_data: org.apache.spark.rdd.RDD[String] = Books/books.csv MapPartitionsRDD[1] at textFile at <console>:27
users_data: org.apache.spark.rdd.RDD[String] = Books/users.csv MapPartitionsRDD[3] at textFile at <console>:28
ratings_data: org.apache.spark.rdd.RDD[String] = Books/ratings.csv MapPartitionsRDD[5] at textFile at <console>:29
res0: Array[String] = Array(userid,bookid,rating, 276747,4780,4, 276747,1837,4, 276747,6277,3, 276762,7819,1, 276762,4885,3, 276772,27222,2, 276772,33829,5, 276772,83629,5, 276786,246867,3)


### Préparation

Afin de formater les données, créer une classe pour chaque table puis charger les données de chaque fichier dans une RDD content des objets de la classe.  

In [2]:
users_data.map(x=>x.split(",")).take(10)

res1: Array[Array[String]] = Array(Array(userid, country, age), Array(100004, usa, 0), Array(100009, canada, 49), Array(10001, usa, 47), Array(100029, germany, 0), Array(10003, usa, 20), Array(100035, canada, 0), Array(100043, usa, 36), Array(100046, usa, 14), Array(100053, usa, 31))


In [3]:
case class Users(userid: Int, country: String, age: Int)
case class Books (bookid: Int, titlewords: Int, authorwords: Int, year: Int, publisher: Int) 
case class Rating(userid: Int, bookid: Int, rating: Int)

defined class Users
defined class Books
defined class Rating


Pour chaque table, créer une RDD contenant des objets de la classe lui correspondant. Compléter les instructions ci-dessous.

In [4]:
val users = users_data.map(x=>x.split(",")).filter(x=>x(0) != "userid").map(x=>Users(x(0).toInt,x(1),x(2).toInt))

users: org.apache.spark.rdd.RDD[Users] = MapPartitionsRDD[9] at map at <console>:28


In [5]:
val books = books_data.map(x=>x.split(",")).filter(x=>x(0) != "bookid").map(x=>Books(x(0).toInt,x(1).toInt,x(2).toInt,x(3).toInt,x(4).toInt))
val ratings = ratings_data.map(x=>x.split(",")).filter(x=>x(0) != "userid").map(x=>Rating(x(0).toInt,x(1).toInt,x(2).toInt))

books: org.apache.spark.rdd.RDD[Books] = MapPartitionsRDD[12] at map at <console>:31
ratings: org.apache.spark.rdd.RDD[Rating] = MapPartitionsRDD[15] at map at <console>:32


Exprimer les requêtes ci-dessous en opérateurs RDD

### Requêtes sur une seule table

#### Identifiants d'utilisateurs du pays 'france'

In [6]:
val s0 = users.filter(x=>x.country=="france")
s0.foreach(println)

Users(224895,france,36)
Users(224897,france,34)
Users(100681,france,54)
Users(225340,france,24)
Users(225414,france,27)
Users(226184,france,20)
Users(22648,france,35)
Users(101742,france,26)
Users(227544,france,31)
Users(229427,france,37)
Users(22981,france,23)
Users(231143,france,19)
Users(105187,france,53)
Users(106040,france,29)
Users(107767,france,17)
Users(108153,france,53)
Users(108551,france,28)
Users(109040,france,26)
Users(110367,france,21)
Users(110572,france,18)
Users(233441,france,33)
Users(233714,france,23)
Users(113275,france,38)
Users(114043,france,58)
Users(115198,france,31)
Users(115259,france,32)
Users(235303,france,26)
Users(117941,france,33)
Users(11881,france,32)
Users(118846,france,29)
Users(119859,france,53)
Users(120332,france,41)
Users(121070,france,27)
Users(1211,france,22)
Users(121918,france,46)
Users(238421,france,33)
Users(23876,france,0)
Users(239630,france,24)
Users(239736,france,20)
Users(241991,france,25)
Users(242409,france,23)
Users(243213,france,0)


s0: org.apache.spark.rdd.RDD[Users] = MapPartitionsRDD[16] at filter at <console>:26


#### Identifiants des livres dont la date est 2000

In [62]:
val s1 = books.filter(x=>x.year==2000)
s1.foreach(println)

Books(57346,5,2,2000,35)
Books(57348,3,2,2000,74)
Books(57357,3,2,2000,132)
Books(57477,6,2,2000,247)
Books(57503,4,2,2000,882)
Books(57504,6,2,2000,10)
Books(57514,4,3,2000,175)
Books(57628,9,2,2000,5182)
Books(57671,3,2,2000,1068)
Books(57688,7,2,2000,103)
Books(57748,4,3,2000,40)
Books(57760,1,2,2000,495)
Books(57803,3,2,2000,394)
Books(57850,2,2,2000,141)
Books(58036,18,2,2000,35)
Books(29,1,2,2000,28)
Books(37,2,2,2000,32)
Books(45,1,3,2000,39)
Books(52,12,2,2000,42)
Books(81,2,2,2000,61)
Books(84,2,2,2000,64)
Books(102,19,3,2000,74)
Books(114,2,2,2000,74)
Books(120,2,2,2000,80)
Books(127,6,2,2000,40)
Books(162,3,2,2000,74)
Books(170,3,3,2000,26)
Books(202,5,2,2000,126)
Books(212,3,2,2000,75)
Books(219,2,2,2000,129)
Books(231,2,2,2000,57)
Books(248,4,2,2000,140)
Books(283,3,2,2000,83)
Books(319,5,2,2000,178)
Books(328,6,2,2000,40)
Books(329,1,3,2000,41)
Books(340,4,2,2000,187)
Books(345,6,2,2000,35)
Books(356,5,2,2000,196)
Books(369,6,2,2000,134)
Books(373,2,2,2000,40)
Books(379,1

s1: org.apache.spark.rdd.RDD[Books] = MapPartitionsRDD[72] at filter at <console>:28


#### Identifiants des livres notés plus que 3

In [7]:
val s2 = ratings.filter(x=>x.rating>=3)
//s2.foreach(println)

s2: org.apache.spark.rdd.RDD[Rating] = MapPartitionsRDD[17] at filter at <console>:26


### Requêtes d'agrégation

#### Nombres d'utilisateurs par pays, triés par ordre décroissant de ce nombre

In [8]:
val q1 = users.map(x=>(x.country, 1)).reduceByKey((x,y)=>x+y).sortByKey()
q1.foreach(println)

(albania,1)
(alderney,1)
(america,1)
(andorra,1)
(antarctica,1)
(argentina,12)
(aruba,3)
(australia,581)
(austria,97)
(bahamas,2)
(bahrain,1)
(barbados,2)
(belgium,28)
(belize,1)
(bermuda,1)
(brazil,39)
(brunei,1)
(bulgaria,5)
(burma,1)
(canada,2505)
(cape verde,1)
(catalonia,1)
(catalunya,1)
(catalunya spain,1)
(cayman islands,2)
(channel islands,1)
(chile,4)
(china,15)
(colombia,1)
(costa rica,7)
(cuba,1)
(cyprus,2)
(czech republic,6)
(denmark,15)
(dominican republic,5)
(ecuador,1)
(egypt,5)
(england,5)
(espa�a,1)
(euskal herria,2)
(everywhere and anywhere,1)
(far away...,1)
(finland,38)
(fort bend,1)
(framingham,1)
(france,309)
(germany,1254)
(ghana,1)
(greece,9)
(grenada,1)
(guatemala,1)
(guernsey,1)
(guinea,1)
(honduras,1)
(hong kong,12)
(hungary,4)
(iceland,6)
(india,9)
(indonesia,1)
(iran,9)
(ireland,36)
(israel,14)
(italia,2)
(italy,211)
(jamaica,1)
(japan,28)
(jersey,1)
(k1c7b1,1)
(kazakhstan,1)
(kenya,1)
(kuwait,4)
(l`italia,1)
(la france,1)
(laos,1)
(lithuania,1)
(luxembourg

q1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[22] at sortByKey at <console>:26


#### Pays qui a le plus grand nombre d'utilisateurs. Il n y a pas d'ex aequo

In [24]:
val q2 = users.map(x=>(x.country, 1)).reduceByKey((x,y)=>x+y).reduce((x, y) => if(x._2 > y._2) x else y)

q2: (String, Int) = (usa,18935)
res12: (String, Int) = (usa,18935)


#### Année avec le plus grand nombre de livres édités. Il n y a pas d'ex aequo

In [25]:
val q3 = books.map(x=>(x.year, 1)).reduceByKey((x,y)=>x+y).reduce((x, y) => if(x._2 > y._2) x else y)

q3: (Int, Int) = (2002,4529)


### Requêtes avec jointure

#### Les éditeurs de livres ayant été notés par des utilisateurs habitant en France

In [52]:
val j1 = books.map(x=>(x.bookid, x.publisher)) 
val j2 = users.filter(x=>x.country=="france").map(x=>(x.userid, x.country)) 
val j3 = ratings.map(x=>(x.userid, x.bookid)) 
val j4 = j2.join(j3).map(x=>(x._2._2, x._2._1))
val j5 = j1.join(j4).map(x=>x._2._1).distinct
//j5.foreach(println)

j1: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[241] at map at <console>:34
j2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[243] at map at <console>:35
j3: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[244] at map at <console>:36
j4: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[248] at map at <console>:37
j5: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[255] at distinct at <console>:38


#### Les éditeurs de livres n'ayant pas été notés par des utilisateurs habitant en France

In [51]:
val j6 = books.map(x=>(x.bookid, x.publisher)) 
val j7 = users.filter(x=>x.country!="france").map(x=>(x.userid, x.country)) 
val j8 = ratings.map(x=>(x.userid, x.bookid)) 
val j9 = j7.join(j8).map(x=>(x._2._2, x._2._1))
val j10 = j6.join(j9).map(x=>x._2._1).distinct

j6: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[226] at map at <console>:34
j7: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[228] at map at <console>:35
j8: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[229] at map at <console>:36
j9: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[233] at map at <console>:37
j10: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[240] at distinct at <console>:38


#### Pour chaque livre, la moyenne d'age des utilisateurs qui l'ont noté

In [None]:
val j12 = users.map(x=>(x.userid, x.age)) 
val j13 = ratings.map(x=>(x.userid, x.bookid)) 
val j14 = j12.join(j13).map(x=>(x._2._2, x._2._1)).reduceByKey((x, y)=>x+y)
j14.foreach(println)