In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load in 

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the "../input/" directory.
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# Any results you write to the current directory are saved as output.

/kaggle/input/movielens-20m-dataset/rating.csv
/kaggle/input/movielens-20m-dataset/movie.csv
/kaggle/input/movielens-20m-dataset/link.csv
/kaggle/input/movielens-20m-dataset/genome_tags.csv
/kaggle/input/movielens-20m-dataset/tag.csv
/kaggle/input/movielens-20m-dataset/genome_scores.csv


In [2]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 39kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 40.5MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - done
[?25h  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216131250 sha256=43f796cf3d9612b2d593c4621a4a58d44c30b1e31d7bb

In [3]:
import pyspark

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.sql("select 'spark' as hello")

df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [5]:
import pandas as pd

In [7]:
movie_ratings = spark.read.csv("../input/movielens/ratings.csv", inferSchema=True, header=True)
movie_ratings = movie_ratings.select('userId', 'movieId', 'rating')

In [8]:
type(movie_ratings)

pyspark.sql.dataframe.DataFrame

In [12]:
movie_ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      2|   3.5|
|     1|     29|   3.5|
|     1|     32|   3.5|
|     1|     47|   3.5|
|     1|     50|   3.5|
|     1|    112|   3.5|
|     1|    151|   4.0|
|     1|    223|   4.0|
|     1|    253|   4.0|
|     1|    260|   4.0|
|     1|    293|   4.0|
|     1|    296|   4.0|
|     1|    318|   4.0|
|     1|    337|   3.5|
|     1|    367|   3.5|
|     1|    541|   4.0|
|     1|    589|   3.5|
|     1|    593|   3.5|
|     1|    653|   3.0|
|     1|    919|   3.5|
+------+-------+------+
only showing top 20 rows



In [13]:
movie_ratings.count()

100000

In [14]:
type(movie_ratings)

pyspark.sql.dataframe.DataFrame

In [15]:
# RegressionEvaluator pour évaluer la performance du modèle ALS
from pyspark.ml.evaluation import RegressionEvaluator
# Alternating Least Squares (Alternance des moindres carrés)
from pyspark.ml.recommendation import ALS
# CrossValidator pour diviser la dataset en training and testing
# ParamGridBuilder pour affiner les paramètres de notre modèle
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [16]:
# Création de training set et test set
(training, test) = movie_ratings.randomSplit([0.8, 0.2])

In [17]:
# Création du modèle ALS (Alternating Least Saqures)
als = ALS(userCol='userId', itemCol='movieId', ratingCol='rating', coldStartStrategy='drop', nonnegative=True)

# nonnegative=True : car on veut pas qu'il nous retourne des valeurs négatives

In [18]:
# Régler le modèle en utilisant ParamGridBuilder
param_grid = ParamGridBuilder().addGrid(als.rank, [12, 13, 14]).addGrid(als.maxIter, [18, 19, 20]).addGrid(als.regParam, [.17, .18, .19]).build()

# On le donne :
# les paramètres des matrices U et P
# max iterations qui disent à Spark combien de fois alterner entre U et P pour minimiser l'erreur
# le paramètre de régularisation pour empêcher ALS de sur-adapter aux données (overfitting)

In [19]:
# Définir l'Évaluateur de régression, qui attend la prédiction des colonnes d'entrée
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

# predictionCol='prediction' : le nom de la colonne des prédictions

In [20]:
# Construction de cross validation
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)

# estimator=als : pour utiliser le modèle de ALS

In [21]:
# Entrainer le modèle avec les données d'entraînement
model = cv.fit(training)

In [22]:
# Extraire le meilleur modèle de l'exercice de tournage à l'aide de ParamGridBuilder
best_model = model.bestModel

In [23]:
# Générer des prédictions et évaluer à l'aide de RMSE
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)

# rmse : Écart quadratique moyen (Root-mean-square deviation)

In [24]:
# Afficher les métriques d'évaluation et les paramètres du modèle
print("RMSE = " + str(rmse))
print("**Best model**")
print("Rank : "), best_model.rank
print("MaxIter : "), best_model._java_obj.parent().getMaxIter()
print("RegParam : "), best_model._java_obj.parent().getRegParam()

RMSE = 0.8517008116870236
**Best model**
Rank : 
MaxIter : 
RegParam : 


(None, 0.17)

In [25]:
# Comparer les prédictions des évaluations des utilisateurs (ratings) avec les évaluations réels
display(predictions.sort('userId', 'rating'))

DataFrame[userId: bigint, movieId: bigint, rating: double, prediction: float]

In [26]:
predictions.sort('userId', 'rating').show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|     1|   3932|   3.0|  3.321066|
|     1|    653|   3.0| 3.2634063|
|     1|   1525|   3.0| 2.6832097|
|     1|   2194|   3.5|   3.76578|
|     1|   3476|   3.5| 3.3637607|
|     1|   6807|   3.5| 3.6391315|
|     1|   6242|   3.5|  3.511706|
|     1|   4941|   3.5|  2.985636|
|     1|   7247|   3.5| 3.2979224|
|     1|   5679|   3.5| 3.3537078|
|     1|   2253|   3.5|   2.69537|
|     1|   3438|   3.5| 2.8893774|
|     1|   3997|   3.5| 1.9998069|
|     1|   1193|   3.5| 3.9263577|
|     1|   8482|   3.5| 3.7055633|
|     1|   7449|   3.5| 2.3705983|
|     1|   4720|   3.5| 3.6240416|
|     1|   4105|   3.5|  3.617806|
|     1|   7164|   3.5|  3.373713|
|     1|   6774|   4.0|  3.357091|
+------+-------+------+----------+
only showing top 20 rows



In [27]:
# Génerer les prédiction des évaluations de tous les utilisateurs
users_recommendations = best_model.recommendForAllUsers(10)

In [28]:
users_recommendations.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|  1580|[[59295, 3.488422...|
|  4900|[[6600, 6.262145]...|
|  5300|[[6600, 6.682314]...|
|  6620|[[6600, 6.130567]...|
|   471|[[6600, 5.480845]...|
|  1591|[[6600, 6.2523136...|
|  4101|[[6600, 5.723655]...|
|  1342|[[6600, 6.363829]...|
|  2122|[[63194, 4.738602...|
|  2142|[[6600, 5.79335],...|
|   463|[[6600, 6.1415944...|
|   833|[[6600, 6.0323668...|
|  5803|[[6600, 7.128424]...|
|  3794|[[6600, 6.0814853...|
|  6654|[[6600, 6.888523]...|
|  1645|[[6600, 5.8292694...|
|  3175|[[6600, 6.988644]...|
|  4935|[[6600, 5.3088], ...|
|   496|[[6600, 6.589501]...|
|  2366|[[6600, 6.3854437...|
+------+--------------------+
only showing top 20 rows



In [29]:
# SQLContext : Le point d'entrée pour travailler avec des données structurées (lignes et colonnes) dans Spark
from pyspark.sql import SQLContext

sqlContext = SQLContext(spark)

In [30]:
# Pour faciliter l'affichage de users_recommendations

def get_recs_for_user(recs):
    recs = recs.select("userId", "recommendations.movieId", "recommendations.rating")
    movies = recs.select("movieId").toPandas().iloc[:, 0].values
    ratings = recs.select("rating").toPandas().iloc[:, 0].values
    userIds = recs.select("userId").toPandas()
    ratings_matrix = pd.DataFrame(movies, columns=['movieId'])
    #ratings_matrix['userId'] = userIds
    ratings_matrix.insert(0, 'userId', userIds)
    ratings_matrix['ratings'] = ratings
    ratings_matrix_ps = sqlContext.createDataFrame(ratings_matrix)
    return ratings_matrix_ps

In [31]:
users_recs = get_recs_for_user(users_recommendations)

In [32]:
type(users_recs)

pyspark.sql.dataframe.DataFrame

In [33]:
users_recs.show()

+------+--------------------+--------------------+
|userId|             movieId|             ratings|
+------+--------------------+--------------------+
|  1580|[59295, 69685, 94...|[3.48842239379882...|
|  4900|[6600, 95776, 829...|[6.26214504241943...|
|  5300|[6600, 82931, 957...|[6.68231391906738...|
|  6620|[6600, 727, 82931...|[6.13056707382202...|
|   471|[6600, 95776, 829...|[5.48084497451782...|
|  1591|[6600, 727, 82931...|[6.25231361389160...|
|  4101|[6600, 82931, 957...|[5.72365522384643...|
|  1342|[6600, 82931, 727...|[6.36382913589477...|
|  2122|[63194, 6160, 109...|[4.73860263824462...|
|  2142|[6600, 26325, 470...|[5.79335021972656...|
|   463|[6600, 727, 82931...|[6.14159440994262...|
|   833|[6600, 95776, 514...|[6.03236675262451...|
|  5803|[6600, 95776, 829...|[7.12842416763305...|
|  3794|[6600, 95776, 829...|[6.08148527145385...|
|  6654|[6600, 95776, 223...|[6.88852310180664...|
|  1645|[6600, 727, 82931...|[5.82926940917968...|
|  3175|[6600, 95776, 829...|[6

In [34]:
movieDF = spark.read.csv("../input/movielens/movies.csv", inferSchema=True, header=True)

In [35]:
movieDF.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [36]:
#user10 = pd.DataFrame({"userCol": 10})
#user10_recommendations = best_model.recommendForUserSubset(user10, 5)

In [37]:
# Extraire de users_recommendations les recommandations pour un utilisateur spécifique

user_recs = users_recs.filter("userId=10")

In [38]:
user_recs.show()

+------+--------------------+--------------------+
|userId|             movieId|             ratings|
+------+--------------------+--------------------+
|    10|[6600, 82931, 957...|[5.69969797134399...|
+------+--------------------+--------------------+



In [39]:
# Pour changer l'affichage de la liste des ids des films

z = []

for k,row in user_recs.toPandas().iterrows():
    for j in list(np.array(row.movieId).flat):
        z.append({'userId':row.userId, 'movieId':j})

user_recs = spark.createDataFrame(pd.DataFrame(z))

In [40]:
# Joindre la dataframe des films recommandés pour l'utilisateur avec leurs titres et genres

user_recs = user_recs.join(movieDF, on='movieId')

In [41]:
# Pour échanger les indexs de 'userId' et 'movieId'

user_recs = user_recs['userId', 'movieId', 'title', 'genres']

In [42]:
user_recs.show()

+------+-------+--------------------+--------------------+
|userId|movieId|               title|              genres|
+------+-------+--------------------+--------------------+
|    10|   6600|...And God Spoke ...|              Comedy|
|    10|  82931|Last Circus, The ...|    Comedy|Drama|War|
|    10|  95776|     Bob Funk (2009)|      Comedy|Romance|
|    10|    727|  War Stories (1995)|         Documentary|
|    10|   4261|       Lilies (1996)|Drama|Fantasy|Rom...|
|    10|  88570|      Welfare (1975)|         Documentary|
|    10| 100902|911 in Plane Site...|         Documentary|
|    10| 100106|Pervert's Guide t...|         Documentary|
|    10|   7113|Cabeza de Vaca (1...|    Action|Adventure|
|    10|  47028|Sione's Wedding (...|      Comedy|Romance|
+------+-------+--------------------+--------------------+

