# Modèles de Recommandations.

## 1. Librairies

In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
plt.style.use('ggplot')

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark import StorageLevel

In [4]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark



In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

## 2. Création d'une session Spark

In [6]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("MyALS") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.sql.shuffle.partitions", "8") \
    .master("local[*]") \
    .getOrCreate()

## 3. Chargement des données

In [8]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [9]:
df = spark.read.parquet("/content/drive/MyDrive/ml-32m/df.parquet")

In [10]:
df.show(5)

+-------+--------------------+--------------------+------+------+----------+--------------------+----+--------------------+
|movieId|               title|              genres|userId|rating| timestamp|         genres_list|year|    title_sans_annee|
+-------+--------------------+--------------------+------+------+----------+--------------------+----+--------------------+
|   8970|Finding Neverland...|               Drama| 62769|   5.0|1426961515|             [Drama]|2004|   Finding Neverland|
|  27815|Chorus, The (Chor...|               Drama| 62769|   3.5|1426959562|             [Drama]|2004|Chorus, The (Chor...|
|  30707|Million Dollar Ba...|               Drama| 62769|   4.0|1426959541|             [Drama]|2004| Million Dollar Baby|
|  33166|        Crash (2004)|         Crime|Drama| 62769|   5.0|1426961486|      [Crime, Drama]|2004|               Crash|
|  40819|Walk the Line (2005)|Drama|Musical|Rom...| 62769|   3.0|1426961490|[Drama, Musical, ...|2005|       Walk the Line|
+-------

# 4. Modélisation avec Spark MLlib : ALS

### 4.1. Préparation des données

In [11]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import DataFrame
from pyspark.sql.functions import rand

# Sous-échantillonage

df_sample = df.sample(withReplacement=False, fraction=0.7, seed=333)

Split par proportion stratifiée

In [12]:
from pyspark.sql import Window
import pyspark.sql.functions as F

# Proportion du train
train_ratio = 0.8

# Création d'un rang temporel par utilisateur
window = Window.partitionBy("userId").orderBy(F.col("timestamp"))
df_strat = df_sample.withColumn("rank", F.row_number().over(window))

# Nombre d'interactions par utilisateur
user_counts = df_strat.groupBy("userId").agg(F.max("rank").alias("count_per_user_join"))

# Jointure avec df_strat pour chaque ligne
df_strat = df_strat.join(user_counts, on="userId", how="left")

# Seuil train/test par utilisateur
df_strat = df_strat.withColumn(
    "train_cutoff",
    (F.col("count_per_user_join") * train_ratio).cast("int")
)

# Split stratifié : les ranks <= cutoff vont au train, le reste au test
train = df_strat.filter(F.col("rank") <= F.col("train_cutoff"))
test  = df_strat.filter(F.col("rank") >  F.col("train_cutoff"))

liste des utilisateurs dans le test

In [13]:
users_test = test.select("userId").distinct()

liste des utilisateurs dans le train

In [14]:
users_train = train.select("userId").distinct()

Recherche d'utilisateurs du test absents du train

In [15]:
users_missing = users_test.join(users_train, on="userId", how="left_anti")
users_missing.show()  # affichge des userId à problème

+------+
|userId|
+------+
+------+



### 4.2. Entraînement de l'ALS

In [16]:
# Extraction des colonnes nécessaires

als_train = train.select("userId", "movieId", "rating")
als_test  = test.select("userId", "movieId", "rating")


In [17]:
# ALS (grid search sur les hyper paramètres)

from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Initialisation du modèle ALS
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True
)

# Recherche sur rank, regParam, maxIter
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [5, 10, 20]) \
    .addGrid(als.regParam, [0.01, 0.05, 0.1]) \
    .addGrid(als.maxIter, [5, 10]) \
    .build()

# RMSE comme critère de sélection
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

# Cross-validation
cv = CrossValidator(
    estimator=als,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=2
)

# Entraînement pour obtenir les meilleurs hyperparamètres
cv_model = cv.fit(als_train)

# Meilleur modèle
best_als_model = cv_model.bestModel

In [18]:
# Sauvegarde du meilleur modèle dans un fichier

best_als_model.write().overwrite().save("/content/drive/MyDrive/ml-32m/best_als_model")

In [19]:
# Récupération de la valeur de chaque hyperparamètre
best_rank     = best_als_model._java_obj.parent().getRank()
best_regparam = best_als_model._java_obj.parent().getRegParam()
best_maxiter  = best_als_model._java_obj.parent().getMaxIter()

print(f"Best rank: {best_rank}")
print(f"Best regParam: {best_regparam}")
print(f"Best maxIter: {best_maxiter}")

Best rank: 20
Best regParam: 0.1
Best maxIter: 10


### 4.3. Prédictions sur le test

In [20]:
# Prédiction sur le test set
pred = best_als_model.transform(test)

# Calcul du RMSE
rmse_best = evaluator.evaluate(pred)
print(f"RMSE du meilleur modèle sur le test: {rmse_best:.4f}")

RMSE du meilleur modèle sur le test: 0.8326


In [23]:
# RMSE moyens de chaque configuration de la grille (sur les folds CV)

print("RMSE moyens sur la grille (validation croisée):", cv_model.avgMetrics)

RMSE moyens sur la grille (validation croisée): [np.float64(0.858377372374762), np.float64(0.8372734562648013), np.float64(0.8466002653566026), np.float64(0.8293461872894616), np.float64(0.8377920228323973), np.float64(0.8287932066590538), np.float64(0.861789933089792), np.float64(0.8512572991037564), np.float64(0.8503000643540893), np.float64(0.8238328946562261), np.float64(0.841947786061614), np.float64(0.8220224913436294), np.float64(0.8741842901728114), np.float64(0.8752812114049078), np.float64(0.8441269158303443), np.float64(0.820181050237654), np.float64(0.8450839491218489), np.float64(0.8195422341803131)]


### 4.4. Recommandations

In [21]:
# Top 5 des utilisateurs susceptibles d'apprécier chaque film
recommandations_items = best_als_model.recommendForAllItems(5)
recommandations_items.show(truncate=False)

+-------+------------------------------------------------------------------------------------------------------+
|movieId|recommendations                                                                                       |
+-------+------------------------------------------------------------------------------------------------------+
|1      |[{48515, 5.4672713}, {36189, 5.438407}, {22571, 5.4115696}, {15920, 5.3778567}, {197988, 5.334977}]   |
|3      |[{60356, 5.005183}, {70092, 4.9980774}, {14, 4.9406357}, {15088, 4.857096}, {155152, 4.8103595}]      |
|5      |[{49191, 4.8853474}, {60356, 4.8091955}, {14, 4.791869}, {135288, 4.7744694}, {99897, 4.7668896}]     |
|6      |[{79924, 5.482949}, {66795, 5.359822}, {139881, 5.3339453}, {160024, 5.2148037}, {3698, 5.13992}]     |
|7      |[{13504, 4.940066}, {36189, 4.8890123}, {146472, 4.8796716}, {88509, 4.8264446}, {106096, 4.820671}]  |
|9      |[{128408, 4.933466}, {30923, 4.746749}, {177923, 4.740363}, {60356, 4.7155566}, {155152

In [22]:
# Top 5 des recommandations par utilisateur
recommandations = best_als_model.recommendForAllUsers(5)
recommandations.show(truncate=False)

+------+---------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                          |
+------+---------------------------------------------------------------------------------------------------------+
|5     |[{222368, 5.1271605}, {205277, 5.1271605}, {154280, 4.8625593}, {214720, 4.7654576}, {216753, 4.6300936}]|
|6     |[{154280, 7.283293}, {137363, 7.249842}, {222368, 7.189124}, {205277, 7.189124}, {86288, 6.993524}]      |
|9     |[{86288, 5.773513}, {222368, 5.663228}, {205277, 5.663228}, {196167, 5.6463585}, {265364, 5.618403}]     |
|10    |[{151989, 4.4932694}, {86288, 4.4077573}, {265364, 4.3203387}, {196167, 4.2794147}, {192261, 4.17376}]   |
|12    |[{216663, 4.2657824}, {138960, 4.1286583}, {217747, 4.122506}, {185291, 4.0856066}, {154280, 4.0807295}] |
|13    |[{86288, 5.451408}, {274047, 5.2210417}, {265364, 4.920437}, {222368, 4.

### 4.5. Evaluations des recommandations

Les recommandations fournies par l'ALS incluent probablement des films déjà vus par l'utilisateur, parce que l'algorithme ALS de Spark ne filtre pas par défaut les anciens items pour chaque utilisateur lors du calcul des recommandations.

On pourrait filtrer les résultats de l'ALS pour ne garder que les films non précédemment vus par chaque utilisateur.


In [23]:
# Génération de k = 5 recommandations natives de l'ALS

k = 5
user_recs = best_als_model.recommendForAllUsers(k)

In [24]:
ratings = spark.read.csv("/content/drive/MyDrive/ml-32m/ratings.csv", header=True, inferSchema=True)

In [25]:
from pyspark.sql.functions import arrays_zip, explode, col

# Accéder aux movieId et rating depuis les recommandations
user_recs_exploded = (
    user_recs
    .select('userId', explode('recommendations').alias('rec'))
    .select('userId', col('rec.movieId').alias('movieId'), col('rec.rating').alias('rating'))
)
already_watched = ratings.select('userId', 'movieId')
final_recs = user_recs_exploded.join(already_watched, ['userId', 'movieId'], 'left_anti')

In [26]:
# Filtrage des films déjà vus

vue = ratings.withColumn("vu", col("rating"))
recs_nouveaux = user_recs_exploded.join(vue.select("userId", "movieId"), ["userId", "movieId"], how="left_anti")

Reconstitution d'un top-k sans doublon

In [27]:
# Remplacement des predictions par les ratings
w = Window.partitionBy("userId").orderBy(F.desc("rating"))
final_recs = recs_nouveaux.withColumn("rank", F.row_number().over(w)).filter(F.col("rank") <= k)
final_recs.show(5)

+------+-------+---------+----+
|userId|movieId|   rating|rank|
+------+-------+---------+----+
|     1| 196167|5.5673566|   1|
|     1| 216753| 5.358209|   2|
|     1| 155641|5.0933185|   3|
|     1| 164937| 5.064542|   4|
|     1| 120821|5.0633893|   5|
+------+-------+---------+----+
only showing top 5 rows



On obtiens ainsi le top-k recommandations neuves (hors films déjà vus) pour chaque utilisateur.

Métriques d'évaluation

a) Hit Ratio (HR) et Coverage

Hit Ratio :

    Pour chaque utilisateur, HR@N = 1 si au moins un item recommandé est réellement dans le test, sinon 0.

    Moyenne sur tous les utilisateurs.

Coverage :

    Ratio d’items différents recommandés sur le total de tous les items du catalogue.

b) MRR, MAP, NDCG

Pour chaque utilisateur :

    MRR (Mean Reciprocal Rank) : l’inverse du rang du premier item pertinent recommandé.

    MAP (Mean Average Precision) : la moyenne des précisions à chaque position d’un item pertinent dans le top-N recommandé.

    NDCG : gain cumulatif discounté, normalisé par le score idéal ; valorise les bonnes recommandations en haut de liste.

c) Dice

Indice de Dice (pour la similarité entre deux ensembles de recommandations/testing) :
Dice=2∣A∩B∣∣A∣+∣B∣
Dice=∣A∣+∣B∣2∣A∩B∣

où AA est le set recommandé, BB est le set réel (vrais items pertinents dans le test set).

In [32]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window

# 1. Recommandations ALS top-N
N = 10
user_recs = best_als_model.recommendForAllUsers(N).selectExpr("userId", "explode(recommendations) as rec")
user_recs = user_recs.select("userId", "rec.movieId", F.row_number().over(Window.partitionBy("userId").orderBy(F.lit(1))).alias("rec_rank")) # Renamed rank to rec_rank

# 2. Truth : items réellement aimés/testés par user
# Suppose que test_df contient (userId, movieId, rating)
seuil_de_positivité = 4.0  # Define the threshold for positive ratings
test_positive = test.filter(test.rating >= seuil_de_positivité)  # seuil typique = 4

# 3. Jointure pour trouver quels recs sont correctes (hit)
eval_df = user_recs.join(test_positive, on=["userId", "movieId"], how="left").withColumn("hit", (F.col("rating").isNotNull()).cast("int"))

# HR : groupby user, puis moyenne
user_hr = eval_df.groupBy("userId").agg(F.max("hit").alias("hit_any"))
HR = user_hr.select(F.mean("hit_any")).first()[0]

# Coverage : nb d’items différents recommandés / nb total d’items
coverage = eval_df.select("movieId").distinct().count() / train.select("movieId").distinct().count()

In [33]:
from pyspark.sql.functions import min as min_agg, when

# Rang du premier hit par utilisateur
first_hit = eval_df.filter(F.col("hit") == 1) \
    .groupBy("userId").agg(min_agg("rank").alias("first_rel_rank"))

# MRR : 1/rank du premier hit, sinon 0 si aucun hit
user_mrr = eval_df.select("userId").distinct().join(first_hit, on="userId", how="left") \
    .withColumn("mrr", when(F.col("first_rel_rank").isNotNull(), 1 / F.col("first_rel_rank")).otherwise(0))
MRR = user_mrr.select(F.mean("mrr")).first()[0]

In [34]:
from pyspark.sql.window import Window

w = Window.partitionBy("userId").orderBy("rank")
eval_df = eval_df.withColumn("cum_hits", F.sum("hit").over(w))
eval_df = eval_df.withColumn("precision_at_k", when(F.col("hit") == 1, F.col("cum_hits") / F.col("rank")).otherwise(0))

# AP par utilisateur
user_ap = eval_df.groupBy("userId") \
    .agg((F.sum("precision_at_k") / F.sum("hit")).alias("AP"))
user_ap = user_ap.fillna(0, subset=["AP"])  # Gère les users sans hits
MAP = user_ap.select(F.mean("AP")).first()[0]

In [35]:
from pyspark.sql.functions import log2

eval_df = eval_df.withColumn("dcg_term", when(F.col("hit") == 1, 1 / log2(F.col("rank")+1)).otherwise(0))
dcg_user = eval_df.groupBy("userId").agg(F.sum("dcg_term").alias("dcg"))

# Nombre de hits par utilisateur et calcul de l'IDCG
n_rel = eval_df.groupBy("userId").agg(F.sum("hit").alias("n_true"))
N = 10  # top-N
n_rel = n_rel.withColumn("ideal_dcg",
    F.expr(f"aggregate(sequence(1, least(n_true, {N})), 0D, (acc, k) -> acc + 1 / log2(k + 1))")
)
# Assemblage final
ndcg_user = dcg_user.join(n_rel, on="userId")
ndcg_user = ndcg_user.withColumn("ndcg", when(F.col("ideal_dcg") > 0, F.col("dcg") / F.col("ideal_dcg")).otherwise(0))
NDCG = ndcg_user.select(F.mean("ndcg")).first()[0]

In [36]:
# Récupère les recommandations et les vrais positifs en sets
als_rec_per_user = user_recs.groupBy("userId").agg(F.collect_set("movieId").alias("als_rec"))
test_pos_per_user = test_positive.groupBy("userId").agg(F.collect_set("movieId").alias("true_movies"))


user_dice = als_rec_per_user.join(test_pos_per_user, "userId") \
    .withColumn("intersection", F.size(F.array_intersect("als_rec", "true_movies"))) \
    .withColumn("dice", 2 * F.col("intersection") / (F.size("als_rec") + F.size("true_movies")))
DICE = user_dice.agg(F.mean("dice")).first()[0]

In [37]:
metrics_list = [(float(HR), float(MRR), float(MAP), float(NDCG), float(coverage), float(DICE))]
metrics_columns = ["HR", "MRR", "MAP", "NDCG", "Coverage", "Dice"]

spark.createDataFrame(metrics_list, metrics_columns).show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                  HR|                 MRR|                 MAP|                NDCG|            Coverage|                Dice|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|1.990564723211975...|3.995454020778496...|3.995454020778496...|2.154974564797158E-6|0.027402118768215394|9.521595602125913E-7|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+



Metrics: top k, recall@k, precision, F1

In [39]:
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql.functions import collect_list, collect_set


# userRecs: DataFrame userId, recommendations (liste de movieIds prédits)
# userTruth: DataFrame userId, ground_truth (liste de movieIds test réel)

# Define user_truth using test_positive
user_truth = test_positive.groupBy("userId").agg(collect_set("movieId").alias("ground_truth"))
print(f"Number of users in user_truth: {user_truth.count()}")


# Reconstruct user_recs to have a list of recommended movieIds per user
user_recs_list = user_recs.groupBy("userId").agg(collect_list("movieId").alias("recommendations_list"))
print(f"Number of users in user_recs_list: {user_recs_list.count()}")


# Set top_k for evaluation
top_k = N # N is defined as 10 in a previous cell

# Joins for aligning by userId
joined = user_recs_list.join(user_truth, "userId")
print(f"Number of users in joined DataFrame: {joined.count()}")


# Transform into RDD (predicted, true) for RankingMetrics
score_and_labels = joined.rdd.map(
    lambda row: (row["recommendations_list"], row["ground_truth"])
)

# Check the number of elements in the RDD
print(f"Number of elements in score_and_labels RDD: {score_and_labels.count()}")


metrics = RankingMetrics(score_and_labels)

precision_at_k = metrics.precisionAt(top_k)
recall_at_k = metrics.recallAt(top_k)
# Calculate F1 score, handling potential division by zero
f1_at_k = 0.0
if (precision_at_k + recall_at_k) > 0:
    f1_at_k = 2 * (precision_at_k * recall_at_k) / (precision_at_k + recall_at_k)


print(f"Precision@{top_k}: {precision_at_k:.4f}")
print(f"Recall@{top_k}: {recall_at_k:.4f}")
print(f"F1@{top_k}: {f1_at_k:.4f}")

Number of users in user_truth: 194924
Number of users in user_recs_list: 200948


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

## 5. Recommandation de films basée sur le contenu (genres, TF-IDF, similarité cosinus)

In [40]:
# Extraction et vectorisation des genres (prétraitement)

from pyspark.sql.functions import split
train = train.withColumn("genres_list", split(col("genres"), "\\|"))
test  = test.withColumn("genres_list", split(col("genres"), "\\|"))

In [42]:
# Vectorisation TF-IDF
# Applique le même modèle TF-IDF sur tout le catalogue de films à recommander et sur le train.

from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml import Pipeline

cv = CountVectorizer(inputCol="genres_list", outputCol="raw_features", minDF=2)  # minDF réduit le bruit
idf = IDF(inputCol="raw_features", outputCol="features")
pipeline = Pipeline(stages=[cv, idf])

tfidf_model = pipeline.fit(train)
train_tfidf = tfidf_model.transform(train)
test_tfidf = tfidf_model.transform(test)

In [43]:
#Construction des Profils Utilisateur

    #Filtrage des films appréciés dans le train :

from pyspark.sql.functions import col

seuil_like = 4.0
liked_train = train_tfidf.filter(col("rating") >= seuil_like)

In [44]:
# Agrégation (profil moyen) :
# Spark n’offre pas de moyenne native sur un vector. On les convertit en Array pour faire une moyenne proprement.


from pyspark.sql.functions import collect_list, udf
import numpy as np
from pyspark.sql.types import ArrayType, DoubleType

def mean_vectors(arrs):
    if not arrs: return []
    arrs = [np.array(a.toArray()) if hasattr(a, 'toArray') else np.array(a) for a in arrs]
    return (np.mean(arrs, axis=0)).tolist()

mean_vectors_udf = udf(mean_vectors, ArrayType(DoubleType()))

user_profiles = liked_train.groupBy("userId").agg(
    collect_list("features").alias("arrs")
).withColumn("user_profile", mean_vectors_udf("arrs"))

In [45]:
# Catalogue de recommandation (films candidats), en utilisant tous les films du catalogue, pas seulement le train

df_catalog = df_sample.select("movieId", "genres").distinct()
df_catalog = df_catalog.withColumn("genres_list", split(col("genres"), "\\|"))
catalog_tfidf = tfidf_model.transform(df_catalog).select("movieId", "features")

In [46]:
# Calcul de la similarité cosinus

    # Implémentation UDF

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import numpy as np

def cosine_sim(v1, v2):
    arr1, arr2 = np.array(v1), np.array(v2)
    num = np.dot(arr1, arr2)
    denom = np.linalg.norm(arr1) * np.linalg.norm(arr2)
    return float(num / denom) if denom else 0.0

cosine_udf = udf(cosine_sim, DoubleType())

In [47]:
# Croisement profils utilisateurs et catalogue :


user_films = user_profiles.crossJoin(catalog_tfidf)
user_films = user_films.withColumn(
    "cosine_sim",
    cosine_udf("user_profile", "features")
)

In [48]:
#Filtrage des films déjà vus

    # Ejection des films déjà notés par chaque utilisateur :

df_seen = train.select("userId", "movieId").distinct()
user_films = user_films.join(df_seen, on=["userId", "movieId"], how="left_anti")

In [49]:
# Top-N recommandations par similarité

    # Classement et coupe pour Top-N (exemple : N=10) :


from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

N = 10
w = Window.partitionBy("userId").orderBy(col("cosine_sim").desc())
topn_recs = user_films.withColumn("rank", row_number().over(w)) \
    .filter(col("rank") <= N) \
    .select("userId", "movieId", "cosine_sim", "rank")

In [50]:
# Vérité de test (films appréciés dans le test)

from pyspark.sql import functions as F
test_positive = test.filter(col("rating") >= seuil_like).groupBy("userId") \
    .agg(F.collect_set("movieId").alias("test_movies"))

In [51]:
# Jointure recommandations et vérité

eval_df = topn_recs.join(test_positive, "userId", "left") \
    .withColumn("hit", F.expr("array_contains(test_movies, movieId)").cast("int"))

In [55]:
# Recommandations de k films similaires à un film donné
'''Pour un film cible, il suffit de :

    Extraire le vecteur TF-IDF (basé sur les genres) correspondant à ce film dans le catalogue.

    Calculer la similarité cosinus entre ce vecteur et celui de tous les autres films du catalogue (hors lui-même).

    Trier les scores de similarité décroissants.

    Afficher les k premiers résultats.'''

from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
import numpy as np

movie_id_cible = 30707  # ID du film pour lequel on veut des recommandations
k = 5

# Récupération du profil TF-IDF du film cible
film_query_vec = catalog_tfidf.filter(col("movieId") == movie_id_cible).select("features").collect()[0][0]

# Definition de lan fonction cosine_sim UDF qui capture le film_query_vec
def cosine_sim_udf_factory(query_vec):
    def cosine_sim(v1):
        arr1, arr2 = np.array(v1.toArray()), np.array(query_vec.toArray())
        num = np.dot(arr1, arr2)
        denom = np.linalg.norm(arr1) * np.linalg.norm(arr2)
        return float(num / denom) if denom else 0.0
    return udf(cosine_sim, DoubleType())

# Création de l'instance UDF avec le vecteur de requête spécifique
cosine_udf_instance = cosine_sim_udf_factory(film_query_vec)

# Calcul de la similarité cosinus en utilisant l'instance UDF
recs_film = catalog_tfidf.withColumn(
    "cosine_sim",
    cosine_udf_instance(col("features")) # Ne passer que la colonne desfeatures
)


# Suppression du film d'origine et sélection des k plus proches
recs_film = recs_film.filter(col("movieId") != movie_id_cible) \
    .orderBy(col("cosine_sim").desc()) \
    .limit(k)

recs_film.select("movieId", "cosine_sim").show()

+-------+----------+
|movieId|cosine_sim|
+-------+----------+
|  64839|       1.0|
|  56607|       1.0|
|  55069|       1.0|
|   1358|       1.0|
|   6565|       1.0|
+-------+----------+



In [60]:
movies = spark.read.csv("/content/drive/MyDrive/ml-32m/movies.csv", header=True, inferSchema=True)

In [61]:
# Affichage des films proches de movieId = 30707 (Million Dollar Baby)

recs_film_with_title = recs_film.join(movies.select("movieId", "title"), on="movieId", how="left")
recs_film_with_title.select("movieId", "title", "cosine_sim").show()

+-------+--------------------+----------+
|movieId|               title|cosine_sim|
+-------+--------------------+----------+
|  64839|Wrestler, The (2008)|       1.0|
|  55069|4 Months, 3 Weeks...|       1.0|
|   6565|   Seabiscuit (2003)|       1.0|
|   1246|Dead Poets Societ...|       1.0|
|   1276|Cool Hand Luke (1...|       1.0|
+-------+--------------------+----------+



In [57]:
'''Recommander k films pour un genre donné

Pour un genre, la méthode consiste à :

    Construire un vecteur TF-IDF qui encode ce genre (ex : {"Action"}).

    Calculer la similarité cosinus entre ce vecteur et tous les films du catalogue (ou simplement filtrer les films qui contiennent ce genre, puis classer selon un critère, ex : popularité ou diversité).

Approche par similarité :'''

# Suppose que 'Action' est le genre voulu
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
import numpy as np


genre_name = 'Action'
k = 5

# Construire le vecteur TF-IDF du genre 'Action'
# Ensure the vocabulary from the fitted TF-IDF model is available
vocabulary = tfidf_model.stages[0].vocabulary
genre_index = vocabulary.index(genre_name)
# Create a sparse vector for the genre
genre_vec = Vectors.sparse(len(vocabulary), [(genre_index, 1.0)])


# Use the cosine_sim_udf_factory defined in cell v55nkUmv3aVK
# Define the cosine_sim UDF that captures the query_vec
def cosine_sim_udf_factory(query_vec):
    def cosine_sim(v1):
        arr1, arr2 = np.array(v1.toArray()), np.array(query_vec.toArray())
        num = np.dot(arr1, arr2)
        denom = np.linalg.norm(arr1) * np.linalg.norm(arr2)
        return float(num / denom) if denom else 0.0
    return udf(cosine_sim, DoubleType())

# Create the UDF instance with the specific genre vector
cosine_udf_instance = cosine_sim_udf_factory(genre_vec)


# Calculate cosine similarity with all films in the catalog
catalog_genre_query = catalog_tfidf.withColumn(
    "cosine_sim",
    cosine_udf_instance(col("features")) # Pass only the features column
)


# Top-k films les plus similaires à ce genre
recs_genre = catalog_genre_query.orderBy(col("cosine_sim").desc()).limit(k)
recs_genre.select("movieId", "cosine_sim").show()

+-------+----------+
|movieId|cosine_sim|
+-------+----------+
| 156877|       1.0|
|  65400|       1.0|
| 176935|       1.0|
| 175243|       1.0|
| 170973|       1.0|
+-------+----------+



In [62]:
# Affichage des films les plus susceptibles d'être du genre Action

recs_genre_with_title = recs_genre.join(movies.select("movieId", "title"), on="movieId", how="left")
recs_genre_with_title.select("movieId", "title", "cosine_sim").show()

+-------+--------------------+----------+
|movieId|               title|cosine_sim|
+-------+--------------------+----------+
| 156877|The Young Vagabon...|       1.0|
| 176935|     Geostorm (2017)|       1.0|
| 170973|Boyka: Undisputed...|       1.0|
|   7192|Only the Strong (...|       1.0|
| 176979|King Arthur and t...|       1.0|
+-------+--------------------+----------+



In [59]:
'''Approche plus simple :
Les k films du genre 'Action', tri direct par popularité ou note moyenne :'''

from pyspark.sql.functions import col, array_contains # Import array_contains
import pyspark.sql.functions as F


action_movies = catalog_tfidf.join(df_sample, "movieId") \
    .filter(array_contains(col("genres_list"), "Action")) \
    .groupBy("movieId").agg(F.avg("rating").alias("mean_rating")) \
    .orderBy(col("mean_rating").desc()) \
    .limit(k)

action_movies.select("movieId", "mean_rating").show()

+-------+-----------+
|movieId|mean_rating|
+-------+-----------+
| 214910|        5.0|
| 202175|        5.0|
| 137743|        5.0|
| 193701|        5.0|
| 201901|        5.0|
+-------+-----------+



Evaluations

In [63]:
from pyspark.sql import functions as F

K = 10  # nombre de recommandations par utilisateur

# Jointure des recommandations et des films "aimés" en test
eval_df = topn_recs.join(test_positive, "userId", "left") \
    .withColumn("hit", F.expr("array_contains(test_movies, movieId)").cast("int"))

# Regrouper les hits par utilisateur: liste des hits
per_user_hits = eval_df.groupBy("userId") \
    .agg(
        F.collect_set(
            F.when(F.col("hit") == 1, F.col("movieId"))
        ).alias("hits"),
        F.first("test_movies").alias("test_movies")
    )

# Calcul des métriques par utilisateur
def precision_recall_f1(hits, test_movies):
    hits = set(hits or [])
    test_movies = set(test_movies or [])
    n_hits = len(hits & test_movies)
    n_pred = len(hits)
    n_true = len(test_movies)
    precision = n_hits / K if K > 0 else 0
    recall = n_hits / n_true if n_true > 0 else 0
    f1 = (2*precision*recall/(precision+recall)) if (precision + recall) > 0 else 0
    hr = 1 if n_hits > 0 else 0
    return float(precision), float(recall), float(f1), float(hr)

from pyspark.sql.types import StructType, StructField, FloatType, IntegerType
from pyspark.sql.functions import udf

schema = StructType([
    StructField("precision", FloatType(), False),
    StructField("recall", FloatType(), False),
    StructField("f1", FloatType(), False),
    StructField("hit_rate", FloatType(), False)
])

prf_udf = udf(precision_recall_f1, schema)

per_user_metrics = per_user_hits.withColumn(
    "metrics",
    prf_udf("hits", "test_movies")
).select(
    "userId",
    "metrics.precision",
    "metrics.recall",
    "metrics.f1",
    "metrics.hit_rate"
)

# Moyenne globale des métriques :
results = per_user_metrics.agg(
    F.mean("precision").alias(f'Precision@{K}'),
    F.mean("recall").alias(f'Recall@{K}'),
    F.mean("f1").alias(f'F1@{K}'),
    F.mean("hit_rate").alias(f'HitRate@{K}')
)

results.show()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
from pyspark.mllib.evaluation import RankingMetrics

# Préparation pour RankingMetrics : (prédictions, vérité)
# On construit la liste des movideId recommandés (par user)
recs_per_user = topn_recs.groupBy("userId") \
    .agg(F.collect_list("movieId").alias("predicted"))

eval_ranking = recs_per_user.join(test_positive, "userId") \
    .select("predicted", "test_movies")

# Conversion en RDD [([recom], [ground_truth])]
score_and_labels = eval_ranking.rdd.map(lambda r: (r["predicted"], r["test_movies"]))

metrics = RankingMetrics(score_and_labels)

print(f"MAP@{K}   :", metrics.meanAveragePrecision)
print(f"NDCG@{K}  :", metrics.ndcgAt(K))
# Pour précision@K, RankingMetrics propose aussi:
print(f"Precision@{K} :", metrics.precisionAt(K))

## 6. Recommandation Basée sur les Proximités Utilisateurs (User-KNN)

In [None]:
# Préparation des données

from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer
import pandas as pd
import numpy as np

# Extraire du DataFrame PySpark (pour manipuler numpy)
pivot_df = train.groupBy("userId").pivot("movieId").agg(F.first("rating"))
pivot_pd = pivot_df.toPandas().set_index("userId").fillna(0)
user_item_matrix = pivot_pd.values  # Matrice numpy users x items
user_ids = pivot_pd.index.values

In [None]:
# Mesure de similarité entre utilisateurs (similarité cosinus)

from sklearn.metrics.pairwise import cosine_similarity

similarity_matrix = cosine_similarity(user_item_matrix)
# Diagonale = 1 (identité utilisateur)

In [None]:
# Recherche des k voisins les plus proches

k = 10  # nombre de voisins à considérer
top_k_indices = np.argsort(-similarity_matrix, axis=1)[:, 1:k+1]  # évite l'utilisateur lui-même (colonne 0)

In [None]:
'''Génération des recommandations

Pour un utilisateur donné :

    On aboutit à :

        Ses k voisins les plus similaires.

        Les notes qu'ils ont donné à chaque film.

        On moyenne/pondère les notes pour chaque film non vu par l'utilisateur cible.

        On trie les scores et propose les meilleurs films.
'''

def recommend_for_user(user_idx, user_item_matrix, top_k_indices, user_ids, N=10):
    neighbors = top_k_indices[user_idx]
    # Moyenne (ou pondérée par similarité) des notes des voisins, sur les films non vus
    neighbor_ratings = user_item_matrix[neighbors]
    user_ratings = user_item_matrix[user_idx]
    already_seen = set(np.where(user_ratings > 0)[0])

    # Calcul du score moyen
    mean_scores = neighbor_ratings.mean(axis=0)
    scores = [(i, score) for i, score in enumerate(mean_scores) if i not in already_seen]
    # Top-N recommandations (par score décroissant)
    top_n = sorted(scores, key=lambda x: -x[1])[:N]
    return top_n  # indices des films à recommander

# Pour tous les utilisateurs
recs_by_user = {user_ids[i]: recommend_for_user(i, user_item_matrix, top_k_indices, user_ids) for i in range(len(user_ids))}

Recommandation Hybride : KNN Utilisateur + Genres

In [1]:
# Préparer les données utilisateur-film et d'intégration des genres

from pyspark.sql.functions import split, col, collect_set

# train : userId, movieId, rating, genres (séparés par |)
train = train.withColumn("genres_list", split(col("genres"), "\\|"))

NameError: name 'train' is not defined

In [None]:
# Établir le profil « genre préféré » de chaque utilisateur

user_genres = train.filter(col("rating") >= 4) \
    .select("userId", "genres_list") \
    .withColumn("genre", F.explode("genres_list")) \
    .groupBy("userId") \
    .agg(collect_set("genre").alias("preferred_genres"))

In [None]:
# Construire la matrice user-item via pivot (ratings explicites uniquement)

user_item_df = train.groupBy("userId").pivot("movieId").agg(F.first("rating"))
user_item_pd = user_item_df.toPandas().set_index("userId").fillna(0)
user_ids = user_item_pd.index.values

In [None]:
# Calculer la similarité entre utilisateurs (KNN collaboratif seulement)

from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

user_item_matrix = user_item_pd.values
sim_matrix = cosine_similarity(user_item_matrix)
# On force la diagonale à 0 (évite d'être son propre voisin)
np.fill_diagonal(sim_matrix, 0)
k = 10  # voisins

# Indices des k plus proches voisins pour chaque utilisateur
topk_neighbor_indices = np.argsort(-sim_matrix, axis=1)[:, :k]

In [None]:
# Gérer le cold start et la diversité dans la recommandation

# - Si un utilisateur n'a PAS de voisins (cold start), CF: => recommandation popularité et/ou par genre préféré
# - Pour la diversité, on filtre ou pondère la liste finale selon qu'un film apporte un genre « différent » des habitudes

def get_user_genres(user_id):
    # mapping userId -> genres préférés (set)
    row = user_genres.filter(col("userId") == user_id).collect()
    return set(row[0]["preferred_genres"]) if row else set()

movies_genres = train.select("movieId", "genres_list").distinct()
movies_genres_dict = dict(movies_genres.collect())

In [None]:
# Générer les recommandations hybrides pour un utilisateur


def recommend_knn_genres(user_idx, N=10):
    neighbors = topk_neighbor_indices[user_idx]
    user_seen = set(np.where(user_item_matrix[user_idx] > 0)[0])
    scores = np.zeros(user_item_matrix.shape[1])

    for n_idx in neighbors:
        neighbor_ratings = user_item_matrix[n_idx]
        scores += neighbor_ratings

    # On ne recommande pas les films déjà vus
    for idx in user_seen:
        scores[idx] = -np.inf

    # On priorise : genre jamais vu pour l'utilisateur (diversité)
    user_id = user_ids[user_idx]
    user_fav_genres = get_user_genres(user_id)

    movie_indices = np.argsort(-scores)
    recs = []
    for i in movie_indices:
        if len(recs) >= N:
            break
        movie_id = user_item_pd.columns[i]
        film_genres = set(movies_genres_dict.get(movie_id, []))
        # Option diversity : push si au moins un genre nouveau pour user
        if len(film_genres - user_fav_genres) > 0 or not user_fav_genres:
            recs.append((movie_id, scores[i]))
    # Cold start si pas de recs : on propose les k films populaires alignés avec genres préférés
    if len(recs) == 0:
        for i in movie_indices:
            movie_id = user_item_pd.columns[i]
            film_genres = set(movies_genres_dict.get(movie_id, []))
            if len(film_genres & user_fav_genres) > 0:
                recs.append((movie_id, scores[i]))
                if len(recs) >= N:
                    break
    return recs[:N]

In [None]:
# Appliquer à tous les utilisateurs

recommendations = {user_ids[i]: recommend_knn_genres(i, N=10) for i in range(len(user_ids))}