**NB : Ce code doit etre importé et exécuté sur Kaggle.com, il génére des erreurs sur les autres plateformes (comme Jupyter, Spider ...)**

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load in 

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the "../input/" directory.
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# Any results you write to the current directory are saved as output.

In [None]:
!pip install pyspark

In [None]:
import pyspark

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.sql("select 'spark' as hello")

df.show()

In [None]:
import pandas as pd

In [None]:
#genome_scores = pd.read_csv("../input/movielens-20m-dataset/genome_scores.csv")
#genome_tags = pd.read_csv("../input/movielens-20m-dataset/genome_tags.csv")
#link = pd.read_csv("../input/movielens-20m-dataset/link.csv")
#movie = pd.read_csv("../input/movielens-20m-dataset/movie.csv")
#rating = pd.read_csv("../input/movielens-20m-dataset/rating.csv")
#tag = pd.read_csv("../input/movielens-20m-dataset/tag.csv")

In [None]:
# La table des notes des films
movie_ratings = spark.read.csv("../input/movielens-20m-dataset/rating.csv", inferSchema=True, header=True)
movie_ratings = movie_ratings.select('userId', 'movieId', 'rating')

In [None]:
type(movie_ratings)

In [None]:
# Cette contient environs 20 milions de lignes, on prend juste les 1 milions premiers
movie_ratings = movie_ratings.head(1000000)

In [None]:
type(movie_ratings)

In [None]:
# l'étape precedante nous retourne une liste, on crée une liste des noms des colonnes avec lesquels on vas structurer la liste movie_ratings
columns = list(['userId', 'movieId', 'rating'])

In [None]:
# Création d'une dataframe à partir de la liste movie_ratings avec les noms de colonnes columns
movie_ratings = spark.createDataFrame(movie_ratings, columns)

In [None]:
movie_ratings.show()

In [None]:
movie_ratings.count()

In [None]:
# Alternating Least Squares (Alternance des moindres carrés)
from pyspark.ml.recommendation import ALS
# RegressionEvaluator pour évaluer la performance du modèle ALS
from pyspark.ml.evaluation import RegressionEvaluator
# CrossValidator pour diviser la dataset en training and testing
# ParamGridBuilder pour affiner les paramètres de notre modèle
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
# Création de training set et test set
(training, test) = movie_ratings.randomSplit([0.8, 0.2])

In [None]:
# Création du modèle ALS (Alternating Least Saqures)
als = ALS(userCol='userId', itemCol='movieId', ratingCol='rating', coldStartStrategy='drop', nonnegative=True)

# nonnegative=True : car on veut pas qu'il nous retourne des valeurs négatives

In [None]:
# Régler le modèle en utilisant ParamGridBuilder
param_grid = ParamGridBuilder().addGrid(als.rank, [12, 13, 14]).addGrid(als.maxIter, [18, 19, 20]).addGrid(als.regParam, [.17, .18, .19]).build()

# On le donne :
# les paramètres des matrices U et P
# max iterations qui disent à Spark combien de fois alterner entre U et P pour minimiser l'erreur
# le paramètre de régularisation pour empêcher ALS de sur-adapter aux données (overfitting)

In [None]:
# Définir l'Évaluateur de régression, qui attend la prédiction des colonnes d'entrée
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

# predictionCol='prediction' : le nom de la colonne des prédictions

In [None]:
# Construction de cross validation
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)

# estimator=als : pour utiliser le modèle de ALS

In [None]:
# Entrainer le modèle avec les données d'entraînement
model = cv.fit(training)

In [None]:
# Extraire le meilleur modèle de l'exercice de tournage à l'aide de ParamGridBuilder
best_model = model.bestModel

In [None]:
# Générer des prédictions et évaluer à l'aide de RMSE
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)

# rmse : Écart quadratique moyen (Root-mean-square deviation)

In [None]:
# Afficher les métriques d'évaluation du modèle
print("RMSE = " + str(rmse))

In [None]:
# Comparer les prédictions des évaluations des utilisateurs (ratings) avec les évaluations réels
predictions.sort('userId', 'rating').show()

In [None]:
# Génerer les prédictions des évaluations de tous les utilisateurs (Recommander 10 films pour chaque utilisateur)
users_recommendations = best_model.recommendForAllUsers(10)

In [None]:
users_recommendations.show()

In [None]:
# SQLContext : Le point d'entrée pour travailler avec des données structurées (lignes et colonnes) dans Spark
from pyspark.sql import SQLContext

sqlContext = SQLContext(spark)

In [None]:
# Ajout de la table des films pour joindre les noms des films avec la table de recommendations en résultat

In [None]:
movieDF = spark.read.csv("../input/movielens-20m-dataset/movie.csv", inferSchema=True, header=True)

In [None]:
movieDF.show()

In [None]:
# Pour faciliter l'affichage de users_recommendations

def get_recs_for_user(recs):
    recs = recs.select("userId", "recommendations.movieId", "recommendations.rating")
    movies = recs.select("movieId").toPandas().iloc[:, 0].values
    ratings = recs.select("rating").toPandas().iloc[:, 0].values
    userIds = recs.select("userId").toPandas()
    ratings_matrix = pd.DataFrame(movies, columns=['movieId'])
    #ratings_matrix['userId'] = userIds
    ratings_matrix.insert(0, 'userId', userIds)
    ratings_matrix['ratings'] = ratings
    ratings_matrix_ps = sqlContext.createDataFrame(ratings_matrix)
    return ratings_matrix_ps

In [None]:
users_recs = get_recs_for_user(users_recommendations)

In [None]:
type(users_recs)

In [None]:
users_recs.show()

In [None]:
# Extraire de users_recommendations les recommandations pour un utilisateur spécifique
user_id = input("Donner l'id de l'utilisateur : ")
user_recs = users_recs.filter("userId="+user_id)

In [None]:
user_recs.show()

In [None]:
# Pour changer l'affichage de la liste des ids des films

z = []

for k,row in user_recs.toPandas().iterrows():
    for j in list(np.array(row.movieId).flat):
        z.append({'userId':row.userId, 'movieId':j})

user_recs = spark.createDataFrame(pd.DataFrame(z))

In [None]:
# Joindre la dataframe des films recommandés pour l'utilisateur avec leurs titres et genres

user_recs = user_recs.join(movieDF, on='movieId')

In [None]:
# Pour échanger les indexs de 'userId' et 'movieId'

user_recs = user_recs['userId', 'movieId', 'title', 'genres']

In [None]:
# Affichage de la table de recommendation pour l'utilisateur demandé
user_recs.show()