In [None]:
from pyspark.sql import SparkSession
try:
    spark.stop()
    print("Ancienne session Spark arrêtée avec succès.")
except:
    print("Aucune session Spark active à arrêter. C'est parfait.")

# --- Initialisation de la SparkSession ---
print("--- Démarrage de la SparkSession pour les recommandations KNN ---")
spark = SparkSession.builder \
    .appName("KNNRecommendations") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "4g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.hadoop.io.native.lib.available", "false") \
    .getOrCreate()

spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark.conf.set("spark.sql.repl.eagerEval.maxResults", 10)

Aucune session Spark active à arrêter. C'est parfait.
--- Démarrage de la SparkSession pour les recommandations KNN ---


In [3]:
# Assurez-vous que votre SparkSession est créée comme dans le test précédent qui fonctionnait

# Chemin vers votre DOSSIER de modèle ALS sauvegardé
# Utilisez la syntaxe file:/// et les slashes
path_to_als_model = rf"file:///C:\Users\arthu\Desktop\spank movies\ml-32m\models\best_als_global_model"

try:
    print("--- Tentative de chargement MANUEL des facteurs ---")

    # On lit directement les sous-dossiers userFactors et itemFactors
    user_factors = spark.read.parquet(f"{path_to_als_model}/userFactors")
    item_factors = spark.read.parquet(f"{path_to_als_model}/itemFactors")

    print("✅ SUCCÈS ! Les userFactors et itemFactors ont été chargés manuellement.")

    print("Aperçu des userFactors :")
    user_factors.show(5)

    print("Aperçu des itemFactors :")
    item_factors.show(5)

except Exception as e:
    print("❌ ERREUR lors du chargement manuel des facteurs.")
    print(e)

--- Tentative de chargement MANUEL des facteurs ---
✅ SUCCÈS ! Les userFactors et itemFactors ont été chargés manuellement.
Aperçu des userFactors :
+---+--------------------+
| id|            features|
+---+--------------------+
|  3|[-0.36637914, -0....|
| 13|[-0.47874132, -0....|
| 33|[-0.554954, 0.378...|
| 43|[-0.39298496, 0.4...|
| 53|[-0.62480736, 0.5...|
+---+--------------------+
only showing top 5 rows

Aperçu des itemFactors :
+---+--------------------+
| id|            features|
+---+--------------------+
|  5|[-0.3952559, 0.13...|
| 15|[-0.4015493, 0.11...|
| 25|[-0.3550805, 0.26...|
| 35|[-0.30379027, 0.0...|
| 45|[-0.36853588, 0.3...|
+---+--------------------+
only showing top 5 rows



In [None]:
# --- Chargement des DataFrames préparés et du modèle ALS ---
print("Chargement des DataFrames préparés (test_data, df_movies_cleaned) et du modèle ALS global...")
output_dir = rf"file:///C:\Users\arthu\Desktop\spank movies\ml-32m\data"   # Dossier des DataFrames Parquet
models_dir = rf"file:///C:\Users\arthu\Desktop\spank movies\ml-32m\models" # Dossier du modèle ALS sauvegardé

try:
    test_data = spark.read.parquet(os.path.join(output_dir, "test_data.parquet"))
    df_movies_cleaned = spark.read.parquet(os.path.join(output_dir, "df_movies_cleaned.parquet"))
    best_model_global = ALSModel.load(os.path.join(models_dir, "best_als_global_model"))
    print("DataFrames et modèle ALS chargés avec succès.")

    # Vérification rapide des schémas
    print("\nSchéma de test_data:")
    test_data.printSchema()
    print("\nSchéma de df_movies_cleaned:")
    df_movies_cleaned.printSchema()

except Exception as e:
    print(f"Erreur lors du chargement des fichiers ou du modèle : {e}")
    print("Vérifiez que les fichiers Parquet et le modèle ALS ont bien été sauvegardés.")
    spark.stop()
    exit()

Chargement des DataFrames préparés (test_data, df_movies_cleaned) et du modèle ALS global...
Erreur lors du chargement des fichiers ou du modèle : name 'os' is not defined
Vérifiez que les fichiers Parquet et le modèle ALS ont bien été sauvegardés.


: 

In [None]:
# --- Préparation des facteurs latents des utilisateurs pour KNN ---
# userFactors contient userId et un vecteur de facteurs latents (features)
user_factors_df = best_model_global.userFactors

# CORRECTION CLÉ : Renommer la colonne 'id' en 'userId' dans user_factors_df
# Ceci est crucial car userFactors utilise 'id' par défaut, mais le reste de notre code utilise 'userId'.
if "id" in user_factors_df.columns and "userId" not in user_factors_df.columns:
    user_factors_df = user_factors_df.withColumnRenamed("id", "userId")

print("\nSchéma de user_factors_df (après renommage si nécessaire):")
user_factors_df.printSchema()
user_factors_df.show(5, truncate=False)

NameError: name 'best_model_global' is not defined

: 

In [None]:
# --- Définition de la fonction de similarité Cosinus (UDF) ---
@udf(returnType=DoubleType())
def cosine_similarity(features1, features2):
    if features1 is None or features2 is None:
        return None
    vec1 = np.array(features1)
    vec2 = np.array(features2)
    dot_product = float(np.dot(vec1, vec2))
    norm_a = float(np.linalg.norm(vec1))
    norm_b = float(np.linalg.norm(vec2))
    if norm_a == 0 or norm_b == 0:
        return 0.0 # Éviter la division par zéro
    return dot_product / (norm_a * norm_b)

In [None]:
# --- Fonction pour trouver les K utilisateurs les plus similaires ---
def find_k_nearest_neighbors(target_user_id, k, user_factors_df_param):
    """
    Trouve les K utilisateurs les plus similaires à un utilisateur cible
    basé sur la similarité cosinus de leurs facteurs latents.
    """
    target_user_features_row = user_factors_df_param.filter(col("userId") == target_user_id).select("features").first()
    
    if not target_user_features_row:
        print(f"Facteurs latents non trouvés pour l'utilisateur {target_user_id}. Vérifiez si l'utilisateur existe dans le modèle.")
        return spark.createDataFrame([], "userId INT, similarity DOUBLE")

    target_features = target_user_features_row['features']

    # Calculer la similarité avec tous les autres utilisateurs
    similarities_df = user_factors_df_param \
        .filter(col("userId") != target_user_id) \
        .withColumn("similarity", cosine_similarity(lit(target_features), col("features"))) \
        .filter(col("similarity").isNotNull()) \
        .orderBy(col("similarity").desc()) \
        .limit(k) \
        .select("userId", "similarity")

    return similarities_df

In [None]:
# --- Fonction pour recommander des films basés sur les K voisins ---
def recommend_movies_knn(target_user_id, k_neighbors, df_ratings_param, df_movies_cleaned_param, user_factors_df_param, k_recs=10):
    """
    Recommande des films à un utilisateur en se basant sur les notes de ses K voisins les plus proches.
    """
    print(f"Recherche des {k_neighbors} voisins les plus proches pour l'utilisateur {target_user_id}...")
    neighbors_df = find_k_nearest_neighbors(target_user_id, k_neighbors, user_factors_df_param)
    
    if neighbors_df.count() == 0:
        print(f"Aucun voisin trouvé ou pas assez de voisins pour l'utilisateur {target_user_id}.")
        return spark.createDataFrame([], "title STRING, genres STRING, predicted_score_knn DOUBLE")

    # Récupérer les notes des films par les voisins
    neighbor_ratings = df_ratings_param.join(neighbors_df, on="userId", how="inner")

    # Pour éviter de recommander des films déjà vus par l'utilisateur cible (dans training_data, qui contient df_ratings complet)
    # df_ratings_param est votre df_ratings original complet, donc on filtre les films que l'utilisateur 
    # a pu noter historiquement.
    movies_seen_by_target = df_ratings_param.filter(col("userId") == target_user_id).select("movieId").distinct()

    # Films notés par les voisins et non vus par l'utilisateur cible
    candidate_movies = neighbor_ratings.join(movies_seen_by_target, on="movieId", how="left_anti")

    if candidate_movies.count() == 0:
        print(f"Pas de films candidats uniques à recommander pour l'utilisateur {target_user_id} après filtrage des films déjà vus.")
        return spark.createDataFrame([], "title STRING, genres STRING, predicted_score_knn DOUBLE")

    # Calcul du score pondéré pour chaque film candidat
    weighted_scores = candidate_movies.groupBy("movieId") \
        .agg((sum(col("rating") * col("similarity")) / sum(col("similarity"))).alias("predicted_score_knn"))

    # Joindre avec les titres de films et trier
    recommendations = weighted_scores.join(df_movies_cleaned_param, on="movieId", how="inner") \
        .orderBy(col("predicted_score_knn").desc()) \
        .limit(k_recs) \
        .select("title", "genres", "predicted_score_knn")

    return recommendations

In [None]:
# --- Génération et affichage des recommandations KNN pour utilisateurs fictifs ---
print("\n--- Génération des recommandations KNN pour des utilisateurs fictifs ---")
users_to_recommend = [50, 150, 250, 350, 450] # Les mêmes utilisateurs que précédemment
K_NEIGHBORS = 50  # Nombre de voisins à considérer pour KNN
K_RECOMMENDATIONS = 10 # Nombre de films à recommander

# Pour la fonction recommend_movies_knn, nous devons passer df_ratings original (complet)
# pour que le filtrage des films déjà vus fonctionne sur toutes les notes de l'utilisateur.
# Attention, df_ratings n'est pas passé comme paramètre dans le main de ce notebook,
# il faut le charger ou s'assurer qu'il est dans le scope. Pour éviter des problèmes,
# on va le recharger ici explicitement si le scope n'est pas clair.
# Ou mieux, passer training_data + test_data combiné comme df_ratings_full.
# La solution la plus simple est de recharger df_ratings si nécessaire ici.

# Charger df_ratings à nouveau pour l'usage dans recommend_movies_knn pour filtrer les films vus
# (car recommend_movies_knn attend df_ratings_param, qui représente l'historique complet des notes)
try:
    df_ratings_full_for_knn = spark.read.csv("ratings.csv", header=True, inferSchema=True)
    # Appliquer le même nettoyage que dans 01_data_analysis_and_preparation.ipynb
    df_ratings_full_for_knn = df_ratings_full_for_knn.na.drop().dropDuplicates(['userId', 'movieId', 'timestamp'])
    print("df_ratings complet rechargé et nettoyé pour la fonction KNN.")
except Exception as e:
    print(f"Erreur lors du rechargement de df_ratings complet pour KNN : {e}")
    df_ratings_full_for_knn = None # Gérer si le chargement échoue

if df_ratings_full_for_knn is not None:
    for user_id in users_to_recommend:
        print(f"\n================== RECOMMANDATIONS KNN POUR L'UTILISATEUR {user_id} ==================")
        # Passez df_ratings_full_for_knn comme paramètre 'df_ratings_param'
        knn_recs = recommend_movies_knn(
            user_id, K_NEIGHBORS, df_ratings_full_for_knn, df_movies_cleaned, user_factors_df, K_RECOMMENDATIONS
        )
        if knn_recs.count() > 0:
            knn_recs.show(truncate=False)
        else:
            print(f"Pas de recommandations trouvées pour l'utilisateur {user_id} après filtrage.")
else:
    print("Impossible de générer les recommandations KNN car df_ratings complet n'a pas pu être chargé.")


In [None]:
# --- Calcul des métriques de Précision@K et Rappel@K pour KNN ---
print("\n--- Calcul des métriques de Précision@K et Rappel@K pour KNN ---")

# Déjà défini dans le notebook 02, mais la fonction est répétée ici pour l'évaluation
# du KNN spécifiquement.
def calculate_precision_recall(recs_df, actual_df, k_val):
    joined_df = recs_df.join(actual_df, on="userId", how="inner")

    hits_df = joined_df.withColumn(
        "hits",
        size(array_intersect(col("recommended_movies"), col("actual_movies")))
    )

    precision_at_k_df = hits_df.withColumn(
        "precision_at_k",
        col("hits") / lit(k_val)
    )

    recall_at_k_df = precision_at_k_df.withColumn(
        "recall_at_k",
        when(size(col("actual_movies")) > 0, col("hits") / size(col("actual_movies"))).otherwise(0.0)
    )
    
    # Filter out users with no actual movies in test set before averaging
    recall_at_k_df = recall_at_k_df.filter(size(col("actual_movies")) > 0)
    
    if recall_at_k_df.count() == 0:
        return 0.0, 0.0 # Retourne 0 si aucun utilisateur valide pour l'évaluation

    avg_precision = recall_at_k_df.agg(sum("precision_at_k")).first()[0] / recall_at_k_df.count()
    avg_recall = recall_at_k_df.agg(sum("recall_at_k")).first()[0] / recall_at_k_df.count()

    return avg_precision, avg_recall

In [None]:
# Préparer les films réels notés par chaque utilisateur dans le jeu de test (vérité terrain)
# C'est 'test_data' qui contient les films réels à comparer.
actual_ratings_per_user_test = test_data.groupBy("userId").agg(
    collect_list("movieId").alias("actual_movies")
)

# Nous devons générer les recommandations KNN pour TOUS les utilisateurs du test_data
# qui sont également présents dans user_factors_df.
# C'est l'étape la plus coûteuse pour KNN en mode "évaluation globale".
# Pour des millions d'utilisateurs, une UDTF ou une approche batch plus complexe serait nécessaire.
# Pour l'instant, nous allons nous limiter à un échantillon significatif du test_data si c'est trop lent.

# Filtrer les utilisateurs du test_data qui ont des facteurs latents (connus du modèle ALS)
users_for_knn_eval = actual_ratings_per_user_test.join(user_factors_df.select("userId"), on="userId", how="inner")

# Pour des raisons de performance, nous allons prendre un échantillon d'utilisateurs pour l'évaluation KNN
# Si vous avez une machine puissante, vous pouvez commenter la ligne .limit(1000)
users_for_knn_eval_sample = users_for_knn_eval.limit(1000) # Évalue KNN sur 1000 utilisateurs du test set

knn_recs_for_eval = []
print(f"\nPréparation des recommandations KNN pour {users_for_knn_eval_sample.count()} utilisateurs pour l'évaluation...")
for row in users_for_knn_eval_sample.collect():
    user_id = row.userId
    # Notez que j'appelle recommend_movies_knn et que je lui passe les DataFrames nécessaires
    recs_df = recommend_movies_knn(user_id, K_NEIGHBORS, df_ratings_full_for_knn, df_movies_cleaned, user_factors_df, K_EVAL)
    if recs_df.count() > 0:
        # Récupérer seulement les movieId pour la comparaison
        recommended_movie_ids = recs_df.join(df_movies_cleaned.select("movieId", "title"), on="title", how="inner").select("movieId").rdd.flatMap(lambda x: x).collect()
        knn_recs_for_eval.append((user_id, recommended_movie_ids))
    else:
        knn_recs_for_eval.append((user_id, [])) # Pas de recommandations

knn_recs_eval_df = spark.createDataFrame(knn_recs_for_eval, ["userId", "recommended_movies"])

# Calcul des métriques KNN
knn_precision, knn_recall = calculate_precision_recall(
    knn_recs_eval_df, actual_ratings_per_user_test, K_EVAL
)
print(f"KNN : Precision@{K_EVAL} = {knn_precision:.4f}, Recall@{K_EVAL} = {knn_recall:.4f}")




In [1]:
# --- Arrêt de la SparkSession ---
print("\n--- Arrêt de la SparkSession ---")
spark.stop()

print("\n--- Script 03_knn_recommendations.ipynb Terminé ---")


--- Arrêt de la SparkSession ---


NameError: name 'spark' is not defined