# ATTEMPT TO DO KNN for recommendation.
It is actually different from cluster

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler, Normalizer

In [2]:
spark = SparkSession.builder.appName("KNN_Recommender") \
    .config("spark.driver.memory", "100g") \
    .config("spark.executor.memory", "100g") \
    .getOrCreate()
spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/18 09:25:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/07/18 09:25:06 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
# 1. Lire les ratings
ratings = spark.read.csv("../data/raw/ratings.csv", header=True, inferSchema=True)
ratings = ratings.select("userId", "movieId", "rating")

                                                                                

In [4]:
from pyspark.sql.functions import count
top_movies = (
    ratings.groupBy("movieId")
    .agg(count("userId").alias("num_ratings"))
    .orderBy(col("num_ratings").desc())
    .limit(50000)
    .select("movieId")
    .rdd.flatMap(lambda x: x)
    .collect()
)

                                                                                

In [6]:
spark.conf.set("spark.sql.pivotMaxValues", 85000)

In [7]:
ratings_small = ratings.filter(col("movieId").isin(top_movies))
user_item = ratings_small.groupBy("userId").pivot("movieId").avg("rating").fillna(0)

25/07/18 09:31:36 WARN DAGScheduler: Broadcasting large task binary with size 1442.5 KiB
25/07/18 09:49:37 WARN DAGScheduler: Broadcasting large task binary with size 1449.9 KiB
25/07/18 09:49:37 WARN DAGScheduler: Broadcasting large task binary with size 1446.7 KiB
                                                                                

In [8]:
feature_cols = user_item.columns[1:]  # sauf userId
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
user_vectors = assembler.transform(user_item).select("userId", "features")

In [9]:
# Normalisation (pour cosinus)
normalizer = Normalizer(inputCol="features", outputCol="norm_features", p=2.0)
user_vectors = normalizer.transform(user_vectors)

## Calculer la similarité cosinus entre tous les utilisateurs

In [10]:
from pyspark.sql import functions as F

# On va faire un produit cartésien limité (sur un petit échantillon, exemple 100 users)
user_sample = user_vectors.limit(100)  # attention à la taille !
user_self = user_sample.alias("a")
user_other = user_sample.alias("b")

In [11]:
# Produit cartésien, on ne garde pas les paires (i,i)
user_pairs = user_self.join(user_other, F.col("a.userId") != F.col("b.userId"))

In [12]:
# Calcul du produit scalaire (cosinus)
from pyspark.ml.linalg import Vectors, DenseVector, VectorUDT

In [13]:
def cos_sim(u, v):
    return float(u.dot(v))  # les vecteurs sont normalisés

In [14]:
from pyspark.sql.types import FloatType

In [15]:
cos_sim_udf = F.udf(cos_sim, FloatType())

user_pairs = user_pairs.withColumn(
    "similarity",
    cos_sim_udf(F.col("a.norm_features"), F.col("b.norm_features"))
)

In [16]:
# Pour chaque user, garder les k plus proches voisins
from pyspark.sql.window import Window

In [19]:
spark.conf.set("spark.sql.debug.maxToStringFields", 1000)

In [None]:
k = 50
window = Window.partitionBy("a.userId").orderBy(F.desc("similarity"))
knn = user_pairs.withColumn("rank", F.row_number().over(window)).filter(F.col("rank") <= k)

# Affiche les voisins les plus proches de chaque user
knn.select(
    F.col("a.userId").alias("userId"),
    F.col("b.userId").alias("neighborId"),
    "similarity"
).show()

25/07/18 10:53:40 WARN DAGScheduler: Broadcasting large task binary with size 1452.0 KiB
25/07/18 10:54:10 WARN DAGScheduler: Broadcasting large task binary with size 1452.0 KiB
25/07/18 11:15:24 WARN DAGScheduler: Broadcasting large task binary with size 13.1 MiB
[Stage 17:>                                                       (0 + 16) / 17]

In [None]:
user_id = 169

# Id des k voisins les plus proches
neighbors = knn.filter(F.col("userId") == user_id).select("neighborId").rdd.flatMap(lambda x: x).collect()

# Films déjà vus
movies_seen = ratings.filter(col("userId") == user_id).select("movieId").rdd.flatMap(lambda x: x).collect()

# Films bien notés par les voisins, mais pas vus
neighbor_ratings = ratings.filter(col("userId").isin(neighbors))
recos = (
    neighbor_ratings
    .filter(~col("movieId").isin(movies_seen))
    .groupBy("movieId")
    .agg(F.avg("rating").alias("mean_rating"))
    .orderBy(F.desc("mean_rating"))
    .limit(10)
)

# Affichage des titres des films
movies = spark.read.csv("movies.csv", header=True, inferSchema=True)
recos = recos.join(movies, on="movieId", how="left")
recos.select("movieId", "title", "mean_rating").show(truncate=False)
