### Notebook TP API DataFrame de Spark en Python - MLIA


Le but du TP est d'utilisert l'API Dataframe de Spark en Python.
Pour la documentation à consulter, suivre ce li:
* https://spark.apache.org/docs/latest/sql-programming-guide.html

* Consulter également les fichiers contenus dans le répertoire **aide-python-spark** sur google drive

## Rappel de quelques fonctions

|Expression |Action|
|:-------------:|:-------------:|
|val ds = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/path/file.csv") |loads the content of file.csv into a dataset ds by indicting that it contains a header  and by requesting  Spark to infer the schema |
|ds.printSchema | show the schema of ds |
|ds.show(truncate=false)|shows the first 20 rows without truncating the values |
|ds.describe().show()|collects and shows descriptive statistics (mean, max, count, ..) of numeric values|
|ds.select("c1", "c2", ..., "cn")|projects ds on the columns c1, …, cn|
|ds.withColumnRenamed("c1","c2")|renames the column c1 with c2|
|ds.where(cond)|selects the rows respecting cond|
|ds.groupBy("c1").agg(collect_list($"c2") as "values")|groups the rows by column c1 and creates an new column of values associated to those of c1|
|ds.groupBy("c1").agg(avg("c2"))|computes the sum of c2 for each c1 |
|ds.withColumn("new", Exp)|creates a new column whose values are computed by Exp|
|ds1.crossJoin(ds2)|computes the cross product of ds1 and ds2|
|ds1.join(ds2, "c") |joins ds1 and ds2 on the column c|
|ds1.join(ds2, Seq("c1",...,"cn")) |generalizes the previous one to a sequence of columns c1,…, cn|

## Préparation

*   ***Vérifier que des ressources*** de calcul sont allouées à votre notebook est
connecté (cf RAM de disque indiqués en haut à droite) . Sinon cliquer sur le bouton connecter pour obtenir des ressources.

*   ***Créer le répertoire*** pour stocker les fichiers nécessaires sur votre google
drive (donnez l'autorisation au notebook d'accéder à votre drive lorsque c'est demandé). *Ajuster le nom de votre dossier* : **MyDrive/mlia/TP**

***Installer pyspark et findspark :***

In [1]:
!pip install -q pyspark
!pip install -q findspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


***Démarrer la session spark:***

In [2]:
import os
!find /usr/local -name "pyspark"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.10/dist-packages/pyspark"
os.environ["JAVA_HOME"] = "/usr"

/usr/local/lib/python3.10/dist-packages/pyspark
/usr/local/lib/python3.10/dist-packages/pyspark/python/pyspark
/usr/local/lib/python3.10/dist-packages/pyspark/bin/pyspark
/usr/local/bin/pyspark


In [3]:
# Principaux import
import findspark
from pyspark.sql import SparkSession
from pyspark import SparkConf

# pour les dataframe et udf
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import *

# pour le chronomètre
import time

# initialise les variables d'environnement pour spark
findspark.init()

# Démarrage session spark
# --------------------------
def demarrer_spark():
  local = "local[*]"
  appName = "TP"
  configLocale = SparkConf().setAppName(appName).setMaster(local).\
  set("spark.executor.memory", "6G").\
  set("spark.driver.memory","6G").\
  set("spark.sql.catalogImplementation","in-memory")

  spark = SparkSession.builder.config(conf = configLocale).getOrCreate()
  sc = spark.sparkContext
  sc.setLogLevel("ERROR")

  spark.conf.set("spark.sql.autoBroadcastJoinThreshold","-1")

  # On ajuste l'environnement d'exécution des requêtes à la taille du cluster (4 coeurs)
  spark.conf.set("spark.sql.shuffle.partitions","4")
  print("session démarrée, son id est ", sc.applicationId)
  return spark
spark = demarrer_spark()

session démarrée, son id est  local-1687255176424


## Lire un fichier et le transformer en DataFrame
  - lire le fichier films.json
  - afficher le schéma
  - afficher les colonnes (attributs)
  - afficher le contenu (3 films)
  - afficher le nombre de films
  - décrire la colonne nF (fonction describe())
  - afficher des statistiques sur la table films (fonction summary)

In [4]:
# URL du dossier contenant des fichiers de données (data.csv et meta.csv) utiles pour le TP
# ---------------------------------------------------------------------------
# en cas de problème avec le téléchargement des datasets, aller directement sur l'URL ci-dessous
PUBLIC_DATASET_URL = "https://nuage.lip6.fr/s/H3bpyRGgnCq2NR4"
PUBLIC_DATASET=PUBLIC_DATASET_URL + "/download?path="

print("URL pour les données: ", PUBLIC_DATASET_URL)

URL pour les données:  https://nuage.lip6.fr/s/H3bpyRGgnCq2NR4


In [5]:
import os
from urllib import request

import os
from urllib import request

drive_dir = "/content/sample_data/"

def load_file(file,dir):
  if(os.path.isfile(drive_dir+file)):
    print(file, "is already stored")
  else:
    url = PUBLIC_DATASET + "/"+ dir + "/" + file
    print("downloading from URL: ", url, "save in : " + drive_dir   + file)
    request.urlretrieve(url , drive_dir + file)

load_file("films.json", "movielens")
load_file("notesAMJ.csv", "movielens")
load_file("ratings.csv", "movielens/ml-latest-small")

# Liste des fichiers téléchargés
print("Fichiers téléchargés:")
os.listdir(drive_dir)

downloading from URL:  https://nuage.lip6.fr/s/H3bpyRGgnCq2NR4/download?path=/movielens/films.json save in : /content/sample_data/films.json
downloading from URL:  https://nuage.lip6.fr/s/H3bpyRGgnCq2NR4/download?path=/movielens/notesAMJ.csv save in : /content/sample_data/notesAMJ.csv
downloading from URL:  https://nuage.lip6.fr/s/H3bpyRGgnCq2NR4/download?path=/movielens/ml-latest-small/ratings.csv save in : /content/sample_data/ratings.csv
Fichiers téléchargés:


['anscombe.json',
 'README.md',
 'notesAMJ.csv',
 'ratings.csv',
 'films.json',
 'mnist_train_small.csv',
 'mnist_test.csv',
 'california_housing_test.csv',
 'california_housing_train.csv']

In [6]:
#Le dossier contenant les fichiers csv importés:
DATASET_DIR="/content/sample_data/"

## Lire le fichier films.json et créer le DataFrame films

In [7]:
#Lire le fichier films.json et création d'un Dataframe films (fonction spark.read.json)
#==============
# Données
#==============
print("Lecture du fichier: ", DATASET_DIR+"/films.json")
films = spark.read.json(DATASET_DIR+"/films.json")

Lecture du fichier:  /content/sample_data//films.json


In [8]:
#Afficher le schéma obtenu
films.printSchema()
#résultat:
#root
# |-- g: array (nullable = true)
# |    |-- element: string (containsNull = true)
# |-- nF: long (nullable = true)
# |-- titre: string (nullable = true)

root
 |-- g: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- nF: long (nullable = true)
 |-- titre: string (nullable = true)



In [9]:
#Afficher les noms des colonnes
films.columns
#résultat:
#['g', 'nF', 'titre']

['g', 'nF', 'titre']

In [10]:
#Afficher 3 lignes de la structure films (fonction show)
films.show(3)

+--------------------+------+--------------------+
|                   g|    nF|               titre|
+--------------------+------+--------------------+
|             [Drama]|  8754|Prime of Miss Jea...|
|          [Thriller]|111486|Lesson of the Evi...|
|[Animation, Child...|  1033|Fox and the Hound...|
+--------------------+------+--------------------+
only showing top 3 rows



Résultat:

```
+--------------------+------+--------------------+
|                   g|    nF|               titre|
+--------------------+------+--------------------+
|             [Drama]|  8754|Prime of Miss Jea...|
|          [Thriller]|111486|Lesson of the Evi...|
|[Animation, Child...|  1033|Fox and the Hound...|
+--------------------+------+--------------------+
```

In [11]:
#Afficher le nombre de films (fonction count)
films.count()
#résultat: 9125

9125

In [12]:
#Décrire (donner les statistiques) de la colonne nF de films (fonction describe)
films.describe('nF').show()

+-------+------------------+
|summary|                nF|
+-------+------------------+
|  count|              9125|
|   mean|31123.291835616437|
| stddev|40782.633603974195|
|    min|                 1|
|    max|            164979|
+-------+------------------+



Résultat:


```
+-------+------------------+
|summary|                nf|
+-------+------------------+
|  count|              9125|
|   mean|31123.291835616437|
| stddev|40782.633603974195|
|    min|                 1|
|    max|            164979|
+-------+------------------+
```



In [13]:
# statistiques par attributs
films.describe().show()

+-------+------------------+--------------------+
|summary|                nF|               titre|
+-------+------------------+--------------------+
|  count|              9125|                9125|
|   mean|31123.291835616437|                null|
| stddev|40782.633603974195|                null|
|    min|                 1|"""Great Performa...|
|    max|            164979| İtirazım Var (2014)|
+-------+------------------+--------------------+



Résultat:

```
+-------+------------------+--------------------+
|summary|                nF|               titre|
+-------+------------------+--------------------+
|  count|              9125|                9125|
|   mean|31123.291835616437|                null|
| stddev|40782.633603974195|                null|
|    min|                 1|"""Great Performa...|
|    25%|              2849|                null|
|    50%|              6287|                null|
|    75%|             56251|                null|
|    max|            164979| İtirazım Var (2014)|
+-------+------------------+--------------------+
```

## Requêtes: Interrogation  des films
   - afficher 10 titres de films
   - afficher les titres de films, les numéros de films incrémentés de 1 et les genres
   - afficher les films dont le titre commence par 'Police', les ordonner par nF (fonction startswith)
   - créer une nouvelle DataFrame films2 avec un seul genre par film (pour un film avec n genres, il y a n lignes); fonction explode
   - afficher deux lignes de films2
   - afficher le nombre de genres distinct
   - afficher le nombre de films par genre (groupBy)

In [14]:
# Afficher 10 titres de films
films.select('titre').show(10)

+--------------------+
|               titre|
+--------------------+
|Prime of Miss Jea...|
|Lesson of the Evi...|
|Fox and the Hound...|
|Sinbad: Legend of...|
|       Gloria (1980)|
|    Lady Jane (1986)|
|4 Months, 3 Weeks...|
|Ella Enchanted (2...|
|In a World... (2013)|
|The Disappearance...|
+--------------------+
only showing top 10 rows




```
# Résultat:
+--------------------+
|               titre|
+--------------------+
|Prime of Miss Jea...|
|Lesson of the Evi...|
|Fox and the Hound...|
|Sinbad: Legend of...|
|       Gloria (1980)|
|    Lady Jane (1986)|
|4 Months, 3 Weeks...|
|Ella Enchanted (2...|
|In a World... (2013)|
|The Disappearance...|
+--------------------+
only showing top 10 rows
```



In [15]:
#Afficher 3 titres de films, les numéros de films incrémentés de 1 et les genres
from pyspark.sql.functions import col
films = films.withColumn('nF + 1', col('nF') + 1)

films.show(3)

+--------------------+------+--------------------+------+
|                   g|    nF|               titre|nF + 1|
+--------------------+------+--------------------+------+
|             [Drama]|  8754|Prime of Miss Jea...|  8755|
|          [Thriller]|111486|Lesson of the Evi...|111487|
|[Animation, Child...|  1033|Fox and the Hound...|  1034|
+--------------------+------+--------------------+------+
only showing top 3 rows



In [16]:
films.select(['titre', 'nF + 1', 'g']).show(3)

+--------------------+------+--------------------+
|               titre|nF + 1|                   g|
+--------------------+------+--------------------+
|Prime of Miss Jea...|  8755|             [Drama]|
|Lesson of the Evi...|111487|          [Thriller]|
|Fox and the Hound...|  1034|[Animation, Child...|
+--------------------+------+--------------------+
only showing top 3 rows



Résultat:


```
# +--------------------+--------+--------------------+
|               titre|(nF + 1)|                   g|
+--------------------+--------+--------------------+
|Prime of Miss Jea...|    8755|             [Drama]|
|Lesson of the Evi...|  111487|          [Thriller]|
|Fox and the Hound...|    1034|[Animation, Child...|
+--------------------+--------+--------------------+
```



In [17]:
#Afficher les films dont le titre commence par 'Police', les ordonner par nF (filter avec  startswith, orderBy, show)
# Filter the DataFrame based on the condition and order by 'nF'
result = films.filter(films['titre'].startswith("Police")).orderBy("nF")

# Display the result
result.show()

+--------------------+-----+--------------------+------+
|                   g|   nF|               titre|nF + 1|
+--------------------+-----+--------------------+------+
|     [Comedy, Crime]| 2378|Police Academy (1...|  2379|
|     [Comedy, Crime]| 2379|Police Academy 2:...|  2380|
|     [Comedy, Crime]| 2380|Police Academy 3:...|  2381|
|     [Comedy, Crime]| 2381|Police Academy 4:...|  2382|
|     [Comedy, Crime]| 2382|Police Academy 5:...|  2383|
|     [Comedy, Crime]| 2383|Police Academy 6:...|  2384|
|     [Comedy, Crime]| 8387|Police Academy: M...|  8388|
|[Action, Comedy, ...|26547|Police Story (Gin...| 26548|
+--------------------+-----+--------------------+------+



Résultat:
```
# +---------------+----+--------------------+
|              g|  nF|               titre|
+---------------+----+--------------------+
|[Comedy, Crime]|2378|Police Academy (1...|
|[Comedy, Crime]|2379|Police Academy 2:...|
|[Comedy, Crime]|2380|Police Academy 3:...|
+---------------+----+--------------------+
```



In [18]:
# Pour les films sans genre il y films.g est un array de taille 1 avec '(no genres listed)')
# Créer un Dataframe tmp qui contient que les films sans les films sans genre (fonction array_contains) et afficher le nombre de films dans ce Dataframe
#Résultat: 9107
# Filter the DataFrame to exclude movies with no genre listed
tmp = films.filter(~array_contains(col("g"), "(no genres listed)"))

# Display the number of films in the new DataFrame
film_count = tmp.count()
print("Number of films in the DataFrame without movies with no genre:", film_count)

Number of films in the DataFrame without movies with no genre: 9107


In [19]:
#Créer un nouveau DataFrame films_g à partir de tmp avec un seul genre par film (pour un film avec n genres, il y a n lignes); utiliser la fonction explode
from pyspark.sql.functions import explode

# Use the 'explode' function to create a new DataFrame with a single genre per film
films_g = tmp.select(col("nF"), col("titre"), explode(col("g")).alias("genre"))

# Display the new DataFrame
films_g.show()

+------+--------------------+---------+
|    nF|               titre|    genre|
+------+--------------------+---------+
|  8754|Prime of Miss Jea...|    Drama|
|111486|Lesson of the Evi...| Thriller|
|  1033|Fox and the Hound...|Animation|
|  1033|Fox and the Hound...| Children|
|  1033|Fox and the Hound...|    Drama|
|  6536|Sinbad: Legend of...|Adventure|
|  6536|Sinbad: Legend of...|Animation|
|  6536|Sinbad: Legend of...| Children|
|  6536|Sinbad: Legend of...|  Fantasy|
|  5179|       Gloria (1980)|    Drama|
|  5179|       Gloria (1980)| Thriller|
|  6201|    Lady Jane (1986)|    Drama|
|  6201|    Lady Jane (1986)|  Romance|
| 55069|4 Months, 3 Weeks...|    Drama|
|  7380|Ella Enchanted (2...|   Comedy|
|  7380|Ella Enchanted (2...|  Fantasy|
|  7380|Ella Enchanted (2...|  Romance|
|104339|In a World... (2013)|   Comedy|
|130580|The Disappearance...|    Drama|
|  7832|Thin Man Goes Hom...|   Comedy|
+------+--------------------+---------+
only showing top 20 rows



Résultat:
```
# +--------------------+---------+------+
|               titre|    genre|    nF|
+--------------------+---------+------+
|Prime of Miss Jea...|    Drama|  8754|
|Lesson of the Evi...| Thriller|111486|
|Fox and the Hound...|Animation|  1033|
+--------------------+---------+------+
```


In [20]:
# Calculer le nombre de genres distinct (distinct et count)
films_g.select('genre').distinct().count()
#Résultat: 19

19

In [21]:
# Afficher le nombre de films par genre (groupBy et count)

films_g.groupBy(films_g.genre).count().show()

+-----------+-----+
|      genre|count|
+-----------+-----+
|   Children|  583|
|    Fantasy|  654|
|      Crime| 1100|
|     Horror|  877|
|      Drama| 4365|
|  Adventure| 1117|
|     Sci-Fi|  792|
|    Musical|  394|
|    Western|  168|
|       IMAX|  153|
|   Thriller| 1729|
|  Animation|  447|
|    Romance| 1545|
|     Comedy| 3315|
|    Mystery|  543|
|        War|  367|
|Documentary|  495|
|     Action| 1545|
|  Film-Noir|  133|
+-----------+-----+



Résultat:
```
# +-----------+-----+
|      genre|count|
+-----------+-----+
|      Crime| 1100|
|     Horror|  877|
|  Adventure| 1117|
|     Sci-Fi|  792|
|    Musical|  394|
|    Western|  168|
|    Romance| 1545|
|     Comedy| 3315|
|    Mystery|  543|
|        War|  367|
|Documentary|  495|
|   Children|  583|
|    Fantasy|  654|
|      Drama| 4365|
|       IMAX|  153|
|   Thriller| 1729|
|  Animation|  447|
|     Action| 1545|
|  Film-Noir|  133|
+-----------+-----+
```



## Lire le fichier notesAMJ.csv et créer le DataFrame notes

In [22]:
schema = """
          nU INT,
          nF LONG,
          note FLOAT,
          annee INT,
          mois INT,
          jour INT
        """
print("Lecture du fichier: ", DATASET_DIR+"/notesAMJ.csv")
notes = spark.read.csv(DATASET_DIR+"/notesAMJ.csv", header='true', schema=schema)
notes.printSchema()
notes=notes.persist()
notes.count()
#résultat: 100004

Lecture du fichier:  /content/sample_data//notesAMJ.csv
root
 |-- nU: integer (nullable = true)
 |-- nF: long (nullable = true)
 |-- note: float (nullable = true)
 |-- annee: integer (nullable = true)
 |-- mois: integer (nullable = true)
 |-- jour: integer (nullable = true)



100004

## Requêtes: Interrogation des notes
  - lire le fichier notesAMJ.csv (le fichier contient pour chaque utilisateur les films qu'il a notés
      (avec la date (année, mois, jour) de la note))
  - afficher le schéma obtenu
  - afficher le contenu (3 lignes)
  - afficher le nombre d'années distinctes
  - afficher le nombre de dates (comprenant année, mois, jour) distinctes
  - afficher la note maximale, moyenne et minimale
  - grouper les notes par numéro de film
  - afficher la note moyenne par film
  - pour chaque utilisateur
     - afficher son nombre total de notes differentes, la note maximum, minimum et moyenne
     - trier le resultat de la requête précédente par le nombre de notes décroissant et le numéro d'utilisateur

In [23]:
# Afficher le schéma de notes ainsi que 3 lignes de son contenu
notes.printSchema()

root
 |-- nU: integer (nullable = true)
 |-- nF: long (nullable = true)
 |-- note: float (nullable = true)
 |-- annee: integer (nullable = true)
 |-- mois: integer (nullable = true)
 |-- jour: integer (nullable = true)



In [24]:
notes.show(3)

+---+-----+----+-----+----+----+
| nU|   nF|note|annee|mois|jour|
+---+-----+----+-----+----+----+
|175|   48| 3.0| 2003|   5|  14|
|461| 2001| 3.0| 2004|   8|   8|
|547|89881| 4.0| 2011|  10|   8|
+---+-----+----+-----+----+----+
only showing top 3 rows



Résultat:
```
# root
 |-- nU: integer (nullable = true)
 |-- nF: long (nullable = true)
 |-- note: float (nullable = true)
 |-- annee: integer (nullable = true)
 |-- mois: integer (nullable = true)
 |-- jour: integer (nullable = true)

+---+-----+----+-----+----+----+
|nU |nF   |note|annee|mois|jour|
+---+-----+----+-----+----+----+
|175|48   |3.0 |2003 |5   |14  |
|461|2001 |3.0 |2004 |8   |8   |
|547|89881|4.0 |2011 |10  |8   |
+---+-----+----+-----+----+----+
```



In [25]:
#Afficher le nombre d'années distinctes (countDistinct)
from pyspark.sql.functions import countDistinct

# Count the number of distinct years in the 'annee' column
distinct_years_count = notes.agg(countDistinct("annee")).show(1)

# Display the result
print("Number of distinct years:", distinct_years_count)

+------------+
|count(annee)|
+------------+
|          22|
+------------+

Number of distinct years: None


Résultat:
```
# +---------------------+
|count(DISTINCT annee)|
+---------------------+
|                   22|
+---------------------+
```



In [26]:
#Afficher le nombre de dates distinctes (annee, mois, jour)
from pyspark.sql.functions import countDistinct

# Count the number of distinct dates considering 'annee', 'mois', and 'jour' columns
distinct_dates_count = notes.agg(countDistinct("annee", "mois", "jour")).show(1)

# Display the result
print("Number of distinct dates:", distinct_dates_count)

+------------------------+
|count(annee, mois, jour)|
+------------------------+
|                    3840|
+------------------------+

Number of distinct dates: None


Résultat:
```
# +---------------------------------+
|count(DISTINCT annee, mois, jour)|
+---------------------------------+
|                             3840|
+---------------------------------+
```



In [27]:
#Afficher la note maximale, moyenne et minimale (min, max, avg)
from pyspark.sql.functions import min, max, avg, stddev

# Calculate the minimum, maximum, and average values of the 'note' column
result = notes.agg(
    min("note").alias("Min Note"),
    max("note").alias("Max Note"),
    avg("note").alias("Avg Note"),
    stddev('note').alias("std")
)

# Display the result
result.show()


+--------+--------+-----------------+------------------+
|Min Note|Max Note|         Avg Note|               std|
+--------+--------+-----------------+------------------+
|     0.5|     5.0|3.543608255669773|1.0580641091070393|
+--------+--------+-----------------+------------------+



Résultat:
```
# +---------+---------+-----------------+
|min(note)|max(note)|        avg(note)|
+---------+---------+-----------------+
|      0.5|      5.0|3.543608255669773|
+---------+---------+-----------------+
```



In [28]:
#Grouper les notes par numéro de film (groupBy) et stocker le résultat dans un Dataframe notes_groupee
notes_groupee = notes.groupBy('nF')
#pas de résultat à afficher

In [29]:
#Afficher la note moyenne par film en utilisant le Dataframe notes_groupees (avg)
notes_groupee.avg('note').show(3)

+-----+------------------+
|   nF|         avg(note)|
+-----+------------------+
|   48|2.9262295081967213|
|89881|               4.0|
|  208| 2.752212389380531|
+-----+------------------+
only showing top 3 rows



In [30]:
films_w_notes = films.join(notes, on='nF', how='inner').select('titre', 'nU', 'nF', 'note')
films_w_notes = films_w_notes.groupBy('titre').avg('note').orderBy(desc('avg(note)'))
films_w_notes.show()

+--------------------+---------+
|               titre|avg(note)|
+--------------------+---------+
|Armour of God (Lo...|      5.0|
|    Blackrock (1997)|      5.0|
|On Any Sunday (1971)|      5.0|
|    Endurance (1999)|      5.0|
|Funhouse, The (1981)|      5.0|
|Red Firecracker, ...|      5.0|
|Face in the Crowd...|      5.0|
|        Faces (1968)|      5.0|
|Good Morning, Bab...|      5.0|
|      Caveman (1981)|      5.0|
|Circle, The (Daye...|      5.0|
|Dream With the Fi...|      5.0|
|King Is Alive, Th...|      5.0|
|Taste of Cherry (...|      5.0|
|Last Dragon, The ...|      5.0|
| Bandit Queen (1994)|      5.0|
|Above the Law (1988)|      5.0|
|Paris, France (1993)|      5.0|
|   Masquerade (1988)|      5.0|
|Defying Gravity (...|      5.0|
+--------------------+---------+
only showing top 20 rows



Résultat:
```
# +-----+------------------+
|   nF|         avg(note)|
+-----+------------------+
|   48|2.9262295081967213|
|89881|               4.0|
|  208| 2.752212389380531|
+-----+------------------+
```



In [31]:
#Afficher les notes moyennes par film triées par ordre décroissant de la note (orderBy avec desc)

notes_groupee.avg('note').orderBy(desc('avg(note)'), 'nF').show(3)

+---+---------+
| nF|avg(note)|
+---+---------+
| 53|      5.0|
|183|      5.0|
|301|      5.0|
+---+---------+
only showing top 3 rows



Résultat:
```
# +---+---------+
| nF|avg(note)|
+---+---------+
| 53|      5.0|
|183|      5.0|
|301|      5.0|
+---+---------+
```



In [32]:
#Creer un dataframe notes_util qui groupe les notes par utilisateur
#pas de rsultat à afficher
notes_util = notes.groupBy('nU')

In [33]:
# Créer un Dataframe tmp qui contient pour chaque utilisateur le nombre total de notes differentes, la la note maximum, minimum et moyenne
tmp = notes_util.agg(
    countDistinct('note').alias('total'),
    max('note').alias('max'),
    min('note').alias('min'),
    avg('note').alias('moyenne')
)

# pas de résultat à afficher

In [34]:
# Trier le Dataframe tmp par le nombre de notes décroissant et le numéro d'utilisateur et afficher le résultat
tmp.orderBy(['total', 'nU']).show(3)

+---+-----+---+---+------------------+
| nU|total|max|min|           moyenne|
+---+-----+---+---+------------------+
| 24|    3|5.0|3.0|3.6666666666666665|
| 46|    3|5.0|4.0| 4.948717948717949|
| 47|    3|5.0|3.0|3.8684210526315788|
+---+-----+---+---+------------------+
only showing top 3 rows



Résultat:
```
# +---+-----+---+---+------------------+
| nU|total|max|min|           moyenne|
+---+-----+---+---+------------------+
| 15|   10|5.0|0.5|2.6217647058823528|
| 17|   10|5.0|0.5| 3.743801652892562|
| 20|   10|5.0|0.5|3.2908163265306123|
+---+-----+---+---+------------------+
```



### **Jointures films et notes**
  - créer un DataFrame films_notes qui contient les films avec leur notes (une ligne par note)
  - afficher le nombre de notes du film dont le titre contient la chaîne'Pocahontas'
  - afficher pour chaque film, son titre, le nombre de notes, sa note moyenne, sa note maximale, sa note minimale
  - les titres des films qui ne sont pas notés
  - pour chaque genre les utilisateurs qui n'ont noté aucun film de ce genre
  - exporter le DataFrame films_notes dans un fichier JSON et afficher le contenu du fichier obtenu
  - essayer d'exporter films_notes dans un fichier CSV

In [35]:
films.printSchema()
notes.printSchema()

root
 |-- g: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- nF: long (nullable = true)
 |-- titre: string (nullable = true)
 |-- nF + 1: long (nullable = true)

root
 |-- nU: integer (nullable = true)
 |-- nF: long (nullable = true)
 |-- note: float (nullable = true)
 |-- annee: integer (nullable = true)
 |-- mois: integer (nullable = true)
 |-- jour: integer (nullable = true)



In [36]:
# Créer un DataFrame films_notes qui contient les films avec leur notes (une ligne par note) (join)
films_notes = films.join(notes, on='nF', how='inner')

films_notes.show(20)

+---+--------------------+----------------+------+---+----+-----+----+----+
| nF|                   g|           titre|nF + 1| nU|note|annee|mois|jour|
+---+--------------------+----------------+------+---+----+-----+----+----+
|  1|[Adventure, Anima...|Toy Story (1995)|     2|126| 5.0| 1996|   5|  28|
|  1|[Adventure, Anima...|Toy Story (1995)|     2|396| 5.0| 1996|   6|  17|
|  1|[Adventure, Anima...|Toy Story (1995)|     2|165| 2.5| 2005|   3|  22|
|  1|[Adventure, Anima...|Toy Story (1995)|     2|463| 3.0| 2003|   4|   9|
|  1|[Adventure, Anima...|Toy Story (1995)|     2|294| 4.0| 2003|   3|   7|
|  1|[Adventure, Anima...|Toy Story (1995)|     2|382| 3.5| 2013|   6|  20|
|  1|[Adventure, Anima...|Toy Story (1995)|     2|283| 3.0| 2005|   5|   2|
|  1|[Adventure, Anima...|Toy Story (1995)|     2|357| 5.0| 1996|   5|  22|
|  1|[Adventure, Anima...|Toy Story (1995)|     2| 20| 3.5| 2009|   4|   3|
|  1|[Adventure, Anima...|Toy Story (1995)|     2|179| 5.0| 2015|   7|  12|
|  1|[Advent

Résultat:
```
# +----+--------------------+--------------------+---+----+-----+----+----+
|  nF|                   g|               titre| nU|note|annee|mois|jour|
+----+--------------------+--------------------+---+----+-----+----+----+
|  48|[Animation, Child...|   Pocahontas (1995)|175| 3.0| 2003|   5|  14|
|2001|[Action, Comedy, ...|Lethal Weapon 2 (...|461| 3.0| 2004|   8|   8|
+----+--------------------+--------------------+---+----+-----+----+----+
```



In [37]:
#Afficher le nombre de notes du film dont le titre contient la chaîne'Pocahontas' (contains)
# Filter the DataFrame to keep rows where the 'titre' column contains the string 'Pocahontas'
pocahontas_films = films_notes.filter(col("titre").contains("Pocahontas"))

# Count the number of notes for the filtered film
number_of_notes = pocahontas_films.count()

# Display the result
print("Number of notes for films with 'Pocahontas' in the title:", number_of_notes)
#résultat: 61

Number of notes for films with 'Pocahontas' in the title: 61


In [38]:
# Afficher pour chaque film, son titre, le nombre de notes, sa note moyenne, sa note maximale, sa note minimale (groupBy + agg)
# Group the DataFrame by 'nF' (film) and 'titre', and aggregate the number of notes, max, min, and avg notes
film_stats = films_notes.groupBy("nF", "titre").agg(
    count("note").alias("Number of Notes"),
    min("note").alias("Min Note"),
    max("note").alias("Max Note"),
    avg("note").alias("Avg Note")
)

# Display the resulting DataFrame
film_stats.show()


+---+--------------------+---------------+--------+--------+------------------+
| nF|               titre|Number of Notes|Min Note|Max Note|          Avg Note|
+---+--------------------+---------------+--------+--------+------------------+
|  1|    Toy Story (1995)|            247|     1.0|     5.0|3.8724696356275303|
|  2|      Jumanji (1995)|            107|     1.5|     5.0|3.4018691588785046|
|  3|Grumpier Old Men ...|             59|     0.5|     5.0|3.1610169491525424|
|  4|Waiting to Exhale...|             13|     1.0|     3.5|2.3846153846153846|
|  5|Father of the Bri...|             56|     1.0|     5.0| 3.267857142857143|
|  6|         Heat (1995)|            104|     1.0|     5.0|3.8846153846153846|
|  7|      Sabrina (1995)|             53|     0.5|     5.0|3.2830188679245285|
|  8| Tom and Huck (1995)|              5|     1.0|     5.0|               3.8|
|  9| Sudden Death (1995)|             20|     1.0|     5.0|              3.15|
| 10|    GoldenEye (1995)|            12

Résultat:
```
# +--------------------+-------+---+---+------------------+
|               titre|nbNotes|max|min|           moyenne|
+--------------------+-------+---+---+------------------+
|    Toy Story (1995)|      9|5.0|1.0|3.8724696356275303|
|      Jumanji (1995)|      8|5.0|1.5|3.4018691588785046|
|Grumpier Old Men ...|     10|5.0|0.5|3.1610169491525424|
+--------------------+-------+---+---+------------------+
```



### *Jointures externe et produit Cartesien*

In [39]:
# Créer un Dataframe f_films qui renomme l'attribut nF de films en nF1 (withColumnRenamed)
# Create a new DataFrame with the 'nF' column renamed to 'nF1'
f_films = films.withColumnRenamed("nF", "nF1")

# Display the new DataFrame
f_films.show()

#pas de résultat à afficher

+--------------------+------+--------------------+------+
|                   g|   nF1|               titre|nF + 1|
+--------------------+------+--------------------+------+
|             [Drama]|  8754|Prime of Miss Jea...|  8755|
|          [Thriller]|111486|Lesson of the Evi...|111487|
|[Animation, Child...|  1033|Fox and the Hound...|  1034|
|[Adventure, Anima...|  6536|Sinbad: Legend of...|  6537|
|   [Drama, Thriller]|  5179|       Gloria (1980)|  5180|
|    [Drama, Romance]|  6201|    Lady Jane (1986)|  6202|
|             [Drama]| 55069|4 Months, 3 Weeks...| 55070|
|[Comedy, Fantasy,...|  7380|Ella Enchanted (2...|  7381|
|            [Comedy]|104339|In a World... (2013)|104340|
|             [Drama]|130580|The Disappearance...|130581|
|[Comedy, Crime, M...|  7832|Thin Man Goes Hom...|  7833|
|[Comedy, Drama, R...| 56367|         Juno (2007)| 56368|
|[Comedy, Horror, ...|149830|Pride and Prejudi...|149831|
|[Adventure, Drama...|138208|     The Walk (2015)|138209|
|     [Comedy,

In [40]:
# Joignez la table f_film avec la table notes par une jointure externe gauche (left outer join) qui garde aussi les films sans notes; stocker le résultat dans un Dataframe ff_notes
# Perform a left outer join between the f_films DataFrame and the notes DataFrame
ff_notes = f_films.join(notes, on=(f_films.nF1 == notes.nF), how="left")

# Display the new DataFrame
ff_notes.show()

#pas de résultat à afficher

+--------------------+----+--------------------+------+---+----+----+-----+----+----+
|                   g| nF1|               titre|nF + 1| nU|  nF|note|annee|mois|jour|
+--------------------+----+--------------------+------+---+----+----+-----+----+----+
|[Animation, Child...|1033|Fox and the Hound...|  1034|605|1033| 3.0| 2001|   1|  22|
|[Animation, Child...|1033|Fox and the Hound...|  1034|185|1033| 3.0| 2001|  10|  19|
|[Animation, Child...|1033|Fox and the Hound...|  1034|564|1033| 4.0| 2000|  11|  21|
|[Animation, Child...|1033|Fox and the Hound...|  1034|602|1033| 4.0| 1996|   9|  10|
|[Animation, Child...|1033|Fox and the Hound...|  1034|514|1033| 4.0| 1997|   1|  22|
|[Animation, Child...|1033|Fox and the Hound...|  1034|592|1033| 2.0| 2001|   7|  18|
|[Animation, Child...|1033|Fox and the Hound...|  1034|530|1033| 4.0| 1997|  12|  17|
|[Animation, Child...|1033|Fox and the Hound...|  1034|289|1033| 4.5| 2012|   2|  11|
|[Animation, Child...|1033|Fox and the Hound...|  1034

In [41]:
# Afficher les films sans notes (isNull)
ff_notes.filter(~col("note").isNull()).show()

+--------------------+---+----------------+------+---+---+----+-----+----+----+
|                   g|nF1|           titre|nF + 1| nU| nF|note|annee|mois|jour|
+--------------------+---+----------------+------+---+---+----+-----+----+----+
|[Adventure, Anima...|  1|Toy Story (1995)|     2|126|  1| 5.0| 1996|   5|  28|
|[Adventure, Anima...|  1|Toy Story (1995)|     2|396|  1| 5.0| 1996|   6|  17|
|[Adventure, Anima...|  1|Toy Story (1995)|     2|165|  1| 2.5| 2005|   3|  22|
|[Adventure, Anima...|  1|Toy Story (1995)|     2|463|  1| 3.0| 2003|   4|   9|
|[Adventure, Anima...|  1|Toy Story (1995)|     2|294|  1| 4.0| 2003|   3|   7|
|[Adventure, Anima...|  1|Toy Story (1995)|     2|382|  1| 3.5| 2013|   6|  20|
|[Adventure, Anima...|  1|Toy Story (1995)|     2|283|  1| 3.0| 2005|   5|   2|
|[Adventure, Anima...|  1|Toy Story (1995)|     2|357|  1| 5.0| 1996|   5|  22|
|[Adventure, Anima...|  1|Toy Story (1995)|     2| 20|  1| 3.5| 2009|   4|   3|
|[Adventure, Anima...|  1|Toy Story (199

Résultat:


```
# +--------------------+
|               titre|
+--------------------+
|Wild Child, The (...|
|Iron Ladies, The ...|
|Scarlet Street (1...|
+--------------------+
```



In [42]:
# Pour chaque genre les utilisateurs qui n'ont noté aucun film de ce genre

# - créer un premier Dataframe g_u qui contient des couples (genre, nU) où nU a vu des films du genre "genre"
g_u = films_g.join(notes, on='nF').select('nU', 'genre')

# - créer un deuxième Dataframe gu-tous qui contient tous les couples (genre, nU) possibles (crossjoin)
distinct_genres = films_g.select("genre").distinct()
distinct_users = notes.select("nU").distinct()

# Cross join to create all possible pairs
gu_tous = distinct_genres.crossJoin(distinct_users)

# - utiliser les deux Dataframes pour calculer les couples (genre, nU) où nU n'a pas vu de films du genre "genre" (subtract)
gu_no_ratings = gu_tous.subtract(g_u)
gu_no_ratings.show()

+------+---+
| genre| nU|
+------+---+
|Action|104|
|Action|109|
|Action|110|
|Action|119|
|Action|125|
|Action|133|
|Action| 14|
|Action|141|
|Action|143|
|Action|144|
|Action|147|
|Action| 15|
|Action|157|
|Action|160|
|Action|179|
|Action|186|
|Action|189|
|Action|196|
|Action|210|
|Action|223|
+------+---+
only showing top 20 rows



Résultat:


```
# +---------+---+
|    genre| nU|
+---------+---+
|Animation| 11|
|Animation| 71|
|Animation|230|
+---------+---+
```



# **Recommander des films aux utilisateurs**

Appliquer le filtrage collaboratif (l'aproche centrée utilisateur) pour recommander à chaque utilisateur des films pas encore visionnés (on suppose qu'un film non noté par un utilisateur n'a pas été visionné par celui-ci). Voir une description de l'approche ici (https://en.wikipedia.org/wiki/Collaborative_filtering, section Memory-based).

Nous allons tout d'abord préparer les données à partir des fichiers ratings.csv et movies.csv contenant les films et les notes des utilisateurs pour ces films et construire les strutures DataFrame correpondantes.

## Préparation des données

In [43]:
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType
from pyspark.sql.functions import col

# charger le fichier ratings.csv dans un Datframe notes_i
# Define the schema
schema = StructType([
    StructField("nU", IntegerType(), True),
    StructField("nF", IntegerType(), True),
    StructField("note", FloatType(), True),
    StructField("date", IntegerType(), True)
])

print("Lecture du fichier: ", DATASET_DIR+"/ratings.csv")

# Load the data and skip the header
notes_i = spark.read.csv(DATASET_DIR+"/ratings.csv", header=True, schema=schema)

# Cast the columns to the appropriate data types
#notes_i = notes_i.select(
#    col("userId").cast(IntegerType()).alias("nU"),
#    col("movieId").cast(IntegerType()).alias("nF"),
#    col("rating").cast(FloatType()).alias("note"),
#    col("timestamp").cast(IntegerType()).alias("date")
#)

# Persist the DataFrame in memory
notes_i = notes_i.persist()

# Display the schema and the top 3 rows
notes_i.printSchema()
notes_i.show(3)
notes_i.count()

Lecture du fichier:  /content/sample_data//ratings.csv
root
 |-- nU: integer (nullable = true)
 |-- nF: integer (nullable = true)
 |-- note: float (nullable = true)
 |-- date: integer (nullable = true)

+---+----+----+----------+
| nU|  nF|note|      date|
+---+----+----+----------+
|  1|  31| 2.5|1260759144|
|  1|1029| 3.0|1260759179|
|  1|1061| 3.0|1260759182|
+---+----+----+----------+
only showing top 3 rows



100004

Résultat:
```
# root
 |-- nU: integer (nullable = true)
 |-- nF: integer (nullable = true)
 |-- note: float (nullable = true)
 |-- date: integer (nullable = true)

+---+----+----+----------+
| nU|  nF|note|      date|
+---+----+----+----------+
|  1|  31| 2.5|1260759144|
|  1|1029| 3.0|1260759179|
|  1|1061| 3.0|1260759182|
+---+----+----+----------+
only showing top 3 rows

100004
```



### *Extraction du jour, du mois et de l'année à partir de la date :*
Dans le fichier ratings.csv la date à laquelle un utilisateur a noté un film est au format epoch Unix (timestamp). Nous allons extraire de cette date les informations concernant l'année, le mois et le jour. Cette conversion sera réalisée en deux étapes:
- créer 3 fonctions utilisateur, chacune prenant comme paramètre un entier représentant la date à convertir (annotées @udf('integer')) et renvoyant respectivement le jour, le mois (compris entre 1 et   12) et l'année
- invoquer ces fonctions à l'aide de la méthode withColumn(). Vérifier que les valeurs obtenues correspondent à celles  continues dans le fichier notesAMJ.csv

In [44]:
from datetime import *
from pyspark.sql.functions import udf

In [45]:
#définir la fonction qui extrait le jour (compris entre 1 et 31) de la date
@udf('integer')
def getJour(v):
    return datetime.utcfromtimestamp(v).day
#définir la fonction qui extrait le mois (compris entre 1 et 12) de la date
@udf('integer')
def getMois(v):
    return datetime.utcfromtimestamp(v).month
#définir la fonction qui extrait l'année
@udf('integer')
def getAnnee(v):
    return datetime.utcfromtimestamp(v).year

In [46]:
# appliquer les 3 fonctions précédentes à la colonne date de notes_i pour construire une nouvelle DataFrame notes
# ayant comme colonnes nU, nF, note, jour, mois, annee
# Replace 'notes_i' with the name of your DataFrame containing the Unix epoch timestamp
notes = notes_i.withColumn("jour", getJour("date")) \
               .withColumn("mois", getMois("date")) \
               .withColumn("annee", getAnnee("date"))
notes = notes.drop('date')
notes.persist() #garder les notes en mémoire
notes.count()
notes.show(3)
notes.printSchema()

+---+----+----+----+----+-----+
| nU|  nF|note|jour|mois|annee|
+---+----+----+----+----+-----+
|  1|  31| 2.5|  14|  12| 2009|
|  1|1029| 3.0|  14|  12| 2009|
|  1|1061| 3.0|  14|  12| 2009|
+---+----+----+----+----+-----+
only showing top 3 rows

root
 |-- nU: integer (nullable = true)
 |-- nF: integer (nullable = true)
 |-- note: float (nullable = true)
 |-- jour: integer (nullable = true)
 |-- mois: integer (nullable = true)
 |-- annee: integer (nullable = true)



Résultat:
```
# +---+----+----+----+----+-----+
| nU|  nF|note|jour|mois|annee|
+---+----+----+----+----+-----+
|  1|  31| 2.5|  14|  12| 2009|
|  1|1029| 3.0|  14|  12| 2009|
|  1|1061| 3.0|  14|  12| 2009|
+---+----+----+----+----+-----+
```



In [47]:
# Creer un DataFrame films_i pour stocker les films qui se trouvent dans le fichier movies.csv
#son schéma est le suivant: nF INT, titre STRING, g STRING
films_i = spark.read.json('/content/sample_data/films.json')
new_column_order = ['nF', 'titre', 'g']
films_i = films_i.select(new_column_order).orderBy(asc('nF'))
films_i.persist()
films_i.printSchema()
films_i.show(3)

root
 |-- nF: long (nullable = true)
 |-- titre: string (nullable = true)
 |-- g: array (nullable = true)
 |    |-- element: string (containsNull = true)

+---+--------------------+--------------------+
| nF|               titre|                   g|
+---+--------------------+--------------------+
|  1|    Toy Story (1995)|[Adventure, Anima...|
|  2|      Jumanji (1995)|[Adventure, Child...|
|  3|Grumpier Old Men ...|   [Comedy, Romance]|
+---+--------------------+--------------------+
only showing top 3 rows



Résultat:


```
# root
 |-- nF: integer (nullable = true)
 |-- titre: string (nullable = true)
 |-- g: string (nullable = true)

+---+--------------------+--------------------+
| nF|               titre|                   g|
+---+--------------------+--------------------+
|  1|    Toy Story (1995)|Adventure|Animati...|
|  2|      Jumanji (1995)|Adventure|Childre...|
|  3|Grumpier Old Men ...|      Comedy|Romance|
+---+--------------------+--------------------+
```



#### **Transformation de la colonne des genres**

Les genres de chaque film étant actuellement stockés dans une seule chaîne de caractères, nous allons remplacer
cette chaîne par un tableau de chaînes (par exemple, pour un film avec une colonne g contenant 'Comedy, Romance'
nous allons obtenir une colonne genres ['Comedy', 'Romance']).
- utiliser la fonction split

In [48]:
#Construire le DataFrame films
#résultat: 9125

Le nouveau DataFrame films aura les colonnes nF, titre et genres et sera gardé en mémoire.
- afficher le schéma
- afficher 3 lignes

In [49]:
films = films_i.persist()
films.printSchema()
films.show(3)
films.count()

root
 |-- nF: long (nullable = true)
 |-- titre: string (nullable = true)
 |-- g: array (nullable = true)
 |    |-- element: string (containsNull = true)

+---+--------------------+--------------------+
| nF|               titre|                   g|
+---+--------------------+--------------------+
|  1|    Toy Story (1995)|[Adventure, Anima...|
|  2|      Jumanji (1995)|[Adventure, Child...|
|  3|Grumpier Old Men ...|   [Comedy, Romance]|
+---+--------------------+--------------------+
only showing top 3 rows



9125

Résultat:
```
# root
 |-- nF: integer (nullable = true)
 |-- titre: string (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = true)

+---+--------------------+--------------------+
| nF|               titre|              genres|
+---+--------------------+--------------------+
|  1|    Toy Story (1995)|[Adventure, Anima...|
|  2|      Jumanji (1995)|[Adventure, Child...|
|  3|Grumpier Old Men ...|   [Comedy, Romance]|
+---+--------------------+--------------------+
```



## **Calcul de recommandation**

### 1.  Calcul de la similarité entre les utilisateurs (similarité Jaccard)

Nous allons d'abord calculer pour chaque couple d'utilisateurs une valeur de similarité basée sur les films
qu'ils ont notés en commun. Pour un utilisateur u nous avons besoin de connaître l'ensemble v des numéros de
films qu'il a notés. La similarité entre les utilisateurs u1 et u2 sera calculée à partir des ensembles de films v1 et v2 correspondants.

Similarité Jaccard (voir la description ici: https://en.wikipedia.org/wiki/Jaccard_index):
 - la similarité entre u1 et u2 est égale au nombre de films notés en commun par u1 et u2 divisé par le nombre total
   de films notés par u1 ou u2. Par exemple, si u1 a noté les films f1, f3 et f4 (v1=[f1, f3, f4]) et u2 a noté les      films f3, f4, f5 et f6 (v2=[f3, f4, f5, f6]) leur similarité sera 2/5=0,4 ce qui correspond à la cardinalité de      l'intersection entre v1 et v2 divisée par la cardinalité de leur union).

Le calcul de similarité sera effectué en plusieurs étapes:

- Étape 1: construire pour chaque utilisateur la liste des films qu'il a notés et les stocker dans le DataFrame liste_films qui aura 2 colonnes: nU et l_films qui contiendra un tableau de numéro de films

In [50]:
from pyspark.sql import functions as F

liste_films = notes.drop('jour', 'mois', 'annee', 'note')

liste_films = liste_films.groupby("nU").agg(F.collect_list("nF").alias("l_films"))

liste_films.printSchema()
liste_films.persist()
liste_films.show(2)

root
 |-- nU: integer (nullable = true)
 |-- l_films: array (nullable = false)
 |    |-- element: integer (containsNull = false)

+---+--------------------+
| nU|             l_films|
+---+--------------------+
| 12|[253, 529, 538, 6...|
| 13|[1, 47, 110, 277,...|
+---+--------------------+
only showing top 2 rows



Résultat:
```
# root
 |-- nU: integer (nullable = true)
 |-- l_films: array (nullable = false)
 |    |-- element: integer (containsNull = false)

+---+--------------------+
| nU|             l_films|
+---+--------------------+
|  1|[31, 1029, 1061, ...|
|  2|[10, 17, 39, 47, ...|
+---+--------------------+
```



- Étape 2: Construire tous les couples possibles d'utilisateurs avec leur listes de films respectives et les stocker dans le DataFrame couples_u qui aura comme colonnes nU1, nU2, l_films1, l_films2

In [51]:
# Construire un DataFrame intermédiaire t1(nU1, l_films1) à partir de liste films en renommant nU->nU1 et
# Create t1 and t2 DataFrames
t1 = liste_films.select(liste_films.nU.alias('nU1'), liste_films.l_films.alias('l_films1'))
t2 = liste_films.select(liste_films.nU.alias('nU2'), liste_films.l_films.alias('l_films2'))

# Show the first few rows
t1.show(3)
t2.show(3)

#Construire couples_u(nU1, nU2, l_films1, l_films2) à partir de t1 et t2 (éliminer les couples où nU1=nU2)
couples_u = t1.crossJoin(t2).where(t1.nU1 != t2.nU2)
couples_u.persist()

couples_u.show(3)

+---+--------------------+
|nU1|            l_films1|
+---+--------------------+
| 12|[253, 529, 538, 6...|
| 13|[1, 47, 110, 277,...|
| 14|[594, 1196, 1721,...|
+---+--------------------+
only showing top 3 rows

+---+--------------------+
|nU2|            l_films2|
+---+--------------------+
| 12|[253, 529, 538, 6...|
| 13|[1, 47, 110, 277,...|
| 14|[594, 1196, 1721,...|
+---+--------------------+
only showing top 3 rows

+---+--------------------+---+--------------------+
|nU1|            l_films1|nU2|            l_films2|
+---+--------------------+---+--------------------+
| 12|[253, 529, 538, 6...| 13|[1, 47, 110, 277,...|
| 12|[253, 529, 538, 6...| 14|[594, 1196, 1721,...|
| 12|[253, 529, 538, 6...| 18|[5, 6, 7, 9, 14, ...|
+---+--------------------+---+--------------------+
only showing top 3 rows



Résultat:
```
# +---+--------------------+
|nU1|            l_films1|
+---+--------------------+
|  1|[31, 1029, 1061, ...|
|  2|[10, 17, 39, 47, ...|
|  3|[60, 110, 247, 26...|
+---+--------------------+
only showing top 3 rows

+---+--------------------+
|nU2|            l_films2|
+---+--------------------+
|  1|[31, 1029, 1061, ...|
|  2|[10, 17, 39, 47, ...|
|  3|[60, 110, 247, 26...|
+---+--------------------+
only showing top 3 rows

+---+--------------------+---+--------------------+
|nU1|            l_films1|nU2|            l_films2|
+---+--------------------+---+--------------------+
|  1|[31, 1029, 1061, ...|  2|[10, 17, 39, 47, ...|
|  1|[31, 1029, 1061, ...|  3|[60, 110, 247, 26...|
|  1|[31, 1029, 1061, ...|  4|[10, 34, 112, 141...|
+---+--------------------+---+--------------------+
```



- Étape 3: Définition d'une fonction utilisateur sim_jaccard qui calcule une valeur de similarité Jaccard à partir de deux listes spécifiées comme paramètres

In [52]:
@udf('float')
def sim_jaccard(l1, l2):
    set1=set(l1)
    set2=set(l2)
    l = len(set1.union(set2))
    if (l == 0): return 0
    return float(len(set1.intersection(set2)))/len(set1.union(set2))

- Étape 4: Calcul de la similarité entre chaque couple d'utilisateurs construit à l'étape 2 en appliquant la fonction de similarité définie à l'étape 3 à leur listes de films respectives. La similarité sera stockée dans le DataFrame sim_j(nU1, nU2, sim)

In [53]:
# Construire un DataFrame sim_j(nU1, nU2, sim) en appliquant la méthode withColumn au DataFrame couples_u
# Garder uniquement les entrées où sim != 0
sim_j = couples_u.withColumn('sim', sim_jaccard(couples_u.l_films1, couples_u.l_films2)).drop('l_films1', 'l_films2')
sim_j.persist()
sim_j.show(3)

+---+---+------------+
|nU1|nU2|         sim|
+---+---+------------+
| 12| 13|0.0088495575|
| 12| 14|      0.0125|
| 12| 18| 0.018181818|
+---+---+------------+
only showing top 3 rows



Résultat:
```
# +---+---+------------+
|nU1|nU2|         sim|
+---+---+------------+
| 12| 13|0.0088495575|
| 12| 14|      0.0125|
| 12| 18| 0.018181818|
```



### 2. **Calcul de scores de recommandation pour les films non notés**

- Préparation du calcul: éliminer les infomations concernant la date

In [54]:
u_vu_notes = notes.drop('jour', 'mois', 'annee')
#u_vu_notes.persist()
u_vu_notes.show(3)

+---+----+----+
| nU|  nF|note|
+---+----+----+
|  1|  31| 2.5|
|  1|1029| 3.0|
|  1|1061| 3.0|
+---+----+----+
only showing top 3 rows



Résultat:
```
# +---+----+----+
| nU|  nF|note|
+---+----+----+
|  1|  31| 2.5|
|  1|1029| 3.0|
|  1|1061| 3.0|
+---+----+----+
```



- Étape 1: Construire tous les couples possibles (nU, nF) et enlever les couples qui se trouvent dans u_vu_notes. Stocker le résultat dans le DataFrame uf_pas_vu(nU, nF) qui sera gardé en mémoire

In [55]:
# Get distinct values of nU and nF
distinct_nU = notes.select("nU").distinct()
distinct_nF = films.select("nF").distinct()

# Generate all possible (nU, nF) pairs
all_pairs = distinct_nU.crossJoin(distinct_nF)

# Remove pairs that exist in u_vu_notes
uf_pas_vu = all_pairs.subtract(u_vu_notes.drop('note'))

uf_pas_vu.persist()
uf_pas_vu.show(3)

+---+---+
| nU| nF|
+---+---+
|  1| 24|
|  1| 41|
|  1| 52|
+---+---+
only showing top 3 rows



Résultat:


```
# +---+---+
| nU| nF|
+---+---+
|  1|240|
|  1|314|
|  1|420|
+---+---+
```



- Etape 2: calculer un DF u_sim_notes qui contient des quintuples (nU1,nU2,nF,note,sim)

In [56]:
# Join on 'nU' and 'nU1'
u_sim_notes = sim_j.join(notes, sim_j.nU1 == notes.nU)

# Select and rename columns as needed
u_sim_notes = u_sim_notes.selectExpr("nU1", "nU2", "nF", "note", "sim")

u_sim_notes.show(2)
u_sim_notes.count()

+---+---+----+----+------------+
|nU1|nU2|  nF|note|         sim|
+---+---+----+----+------------+
| 12| 13| 253| 3.0|0.0088495575|
| 12| 13| 529| 1.0|0.0088495575|
| 12| 13| 538| 3.0|0.0088495575|
| 12| 13| 608| 2.0|0.0088495575|
| 12| 13| 673| 1.0|0.0088495575|
| 12| 13| 736| 4.0|0.0088495575|
| 12| 13| 737| 3.0|0.0088495575|
| 12| 13|1028| 1.0|0.0088495575|
| 12| 13|1032| 2.0|0.0088495575|
| 12| 13|1077| 3.0|0.0088495575|
| 12| 13|1197| 1.0|0.0088495575|
| 12| 13|1215| 5.0|0.0088495575|
| 12| 13|1220| 5.0|0.0088495575|
| 12| 13|1230| 2.0|0.0088495575|
| 12| 13|1235| 5.0|0.0088495575|
| 12| 13|1295| 1.0|0.0088495575|
| 12| 13|1374| 1.0|0.0088495575|
| 12| 13|1387| 4.0|0.0088495575|
| 12| 13|1639| 2.0|0.0088495575|
| 12| 13|1732| 3.0|0.0088495575|
+---+---+----+----+------------+
only showing top 20 rows



Résultat:


```
# +---+---+---+----+------------+
|nU1|nU2| nF|note|         sim|
+---+---+---+----+------------+
| 13| 12|253| 3.0|0.0088495575|
| 14| 12|253| 3.0|      0.0125|
+---+---+---+----+------------+
64389904
```



- Etape 3 : créer un DF u_recom qui étend u_sim_notes avec une colonne recom qui contient le produit sim*note

In [57]:
u_recom = u_sim_notes.withColumn('recom', u_sim_notes['sim']*u_sim_notes['note'])

u_recom.show(2)

+---+---+---+----+------------+------------+
|nU1|nU2| nF|note|         sim|       recom|
+---+---+---+----+------------+------------+
| 12| 13|253| 3.0|0.0088495575| 0.026548672|
| 12| 13|529| 1.0|0.0088495575|0.0088495575|
+---+---+---+----+------------+------------+
only showing top 2 rows



Résultat:
```
# +---+---+---+----+------------+-----------+
|nU1|nU2| nF|note|         sim|      recom|
+---+---+---+----+------------+-----------+
| 13| 12|253| 3.0|0.0088495575|0.026548672|
| 14| 12|253| 3.0|      0.0125|     0.0375|
+---+---+---+----+------------+-----------+
```



- Etape 4: créer un DF u_recom2 qui contient les colonnes nU1, nF et avg_rec de avg_rec est la moyennes des scores de recommandation pour nU1 et nF. Afficher le résultat.

In [None]:
# Assuming df is your DataFrame
avg_recom = u_recom.groupBy('nU1', 'nF').agg(F.avg('recom').alias('avg_rec'))
u_recom2 = u_recom.select('nU1', 'nF').join(avg_recom)

u_recom2.persist()
u_recom2.show(2)
u_recom2.count()

Résultat:

```
# +---+---+-----------+
|nU1| nF|      recom|
+---+---+-----------+
| 13|253|0.026548672|
| 14|253|     0.0375|
+---+---+-----------+
6019322
```



- Etape 5: créer un DF u_pas_vu_rec qui contient que les recommandations pour des films pas vus

In [None]:
u_pas_vu = u_recom2.orderBy(desc('avg_rec'))

u_pas_vu.persist()
u_pas_vu.show(5)

Résultat:
```
# +---+----+------------------+
| nU|  nF|           avg_rec|
+---+----+------------------+
|514| 966| 1.696969747543335|
|562|5828|1.6906474828720093|
|461|8504|1.6904267072677612|
| 86| 764|1.6736401319503784|
|355|5828|  1.66304349899292|
+---+----+------------------+
only showing top 5 rows
```



In [None]:
u_pas_vu.orderBy(asc('nU')).show()