In [13]:
import os
import sys
os.environ['SPARK_HOME'] = "/Users/Florian/spark"
spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, os.path.join(spark_home, '/Users/Florian/spark/python'))
sys.path.insert(0, os.path.join(spark_home, '/Users/Florian/spark/python/lib/py4j-0.9-src.zip'))
execfile(os.path.join(spark_home, '/Users/Florian/spark/python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Python version 2.7.13 (default, Dec 20 2016 23:05:08)
SparkContext available as sc, HiveContext available as sqlContext.


# Moteur de recommandation de films

On va générer un moteur de recommendation dans Spark. Pour cela on va utiliser le dataset mis à disposition par movieLens.

In [14]:
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname

from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS

#use a parsing function
def parseRating(line):
    fields = line.strip().split("::")
    return long(fields[3]) % 10, (int(fields[0]), int(fields[1]), int(fields[2]))


In [17]:
# load personal ratings
f = open("/Users/Florian/Documents/Formation_Python/Recommandation/personalRatings.txt")
#get only rated movies (i.e. fields[2] > 0)
ratings = filter(lambda r: r[2] > 0, [parseRating(line)[1] for line in f])

f.close()
ratings


[(0, 1, 4),
 (0, 780, 2),
 (0, 590, 3),
 (0, 1210, 5),
 (0, 648, 3),
 (0, 344, 1),
 (0, 165, 2),
 (0, 153, 4),
 (0, 597, 2),
 (0, 1580, 3),
 (0, 231, 4)]

In [18]:
#create an RDD (Resilient Distributed Datasets)
                             # les données (ici ma liste de notes), numSlices    
myRatingsRDD = sc.parallelize(ratings, 1)

In [19]:
# .collect() pour recuperer les données distribuées sur les worker nodes 
myRatingsRDD.collect()

[(0, 1, 4),
 (0, 780, 2),
 (0, 590, 3),
 (0, 1210, 5),
 (0, 648, 3),
 (0, 344, 1),
 (0, 165, 2),
 (0, 153, 4),
 (0, 597, 2),
 (0, 1580, 3),
 (0, 231, 4)]

Chargement du dataset movieLens:

In [20]:
movieLensHomeDir = "/Users/Florian/Documents/Formation_Python/Recommandation/ratings.dat"
# ratings is an RDD of (last digit of timestamp, (userID, movieID, rating))
# on envoie la fonction "parseRating" sur les worker nodes 
#map = transformation (données distances, lazy mode)
ratings = sc.textFile(movieLensHomeDir).map(parseRating)
#count = action, ramène un résultat 
numRatings = ratings.count()
numRatings


1000209

In [27]:
# definition d'une fonction pour parser une ligne du fichier movies.dat
def parseMovie(line):
    fields=line.strip().split("::")
    return int(fields[0]), fields[1]

# sc.textFile =  fichier -> RDD
# .map(parseMovie) -> pou parser le RDD
# .collect()  -> retourne une liste de tuple (id, nom)
# dict -> transforme la liste de tuple en un dictionnaire

movies = dict(sc.textFile("/Users/Florian/Documents/Formation_Python/Recommandation/movies.dat").map(parseMovie).collect())

In [28]:
movies

{1: u'Toy Story (1995)',
 2: u'Jumanji (1995)',
 3: u'Grumpier Old Men (1995)',
 4: u'Waiting to Exhale (1995)',
 5: u'Father of the Bride Part II (1995)',
 6: u'Heat (1995)',
 7: u'Sabrina (1995)',
 8: u'Tom and Huck (1995)',
 9: u'Sudden Death (1995)',
 10: u'GoldenEye (1995)',
 11: u'American President, The (1995)',
 12: u'Dracula: Dead and Loving It (1995)',
 13: u'Balto (1995)',
 14: u'Nixon (1995)',
 15: u'Cutthroat Island (1995)',
 16: u'Casino (1995)',
 17: u'Sense and Sensibility (1995)',
 18: u'Four Rooms (1995)',
 19: u'Ace Ventura: When Nature Calls (1995)',
 20: u'Money Train (1995)',
 21: u'Get Shorty (1995)',
 22: u'Copycat (1995)',
 23: u'Assassins (1995)',
 24: u'Powder (1995)',
 25: u'Leaving Las Vegas (1995)',
 26: u'Othello (1995)',
 27: u'Now and Then (1995)',
 28: u'Persuasion (1995)',
 29: u'City of Lost Children, The (1995)',
 30: u'Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)',
 31: u'Dangerous Minds (1995)',
 32: u'Twelve Monkeys (1995)',
 33: u'Wings 

## 1 - Exercice

* Combien d'utilisateurs ?
* Combien de films ?

In [29]:
# D'abord les utilisateurs

ratings.take(10)


[(0L, (1, 1193, 5)),
 (9L, (1, 661, 3)),
 (8L, (1, 914, 3)),
 (5L, (1, 3408, 4)),
 (1L, (1, 2355, 5)),
 (8L, (1, 1197, 3)),
 (9L, (1, 1287, 5)),
 (9L, (1, 2804, 5)),
 (8L, (1, 594, 4)),
 (8L, (1, 919, 4))]

In [30]:
# Le champ tuple[1][0] designe le user_id
# fonction lambda
ratings.map(lambda monTuple : monTuple[1][0]).distinct().count()


6040

In [31]:
# Meme chose pour les films
ratings.map(lambda monTuple : monTuple[1][1]).distinct().count()

3706

## 2 - Spliter les données
Séparer les données en un jeu de trainning (60%), validation (20%) et testing (20%) en utilisant le dernier chiffre du timestamp.
Etape 2 : répartir chaque jeu de données sur chacun des RDD

In [37]:
# ratings : format defini par la fonction parseRating
#  timestamp %10, (user_id, film_id, note)

# 60% du jeu de données, auquel on rajoute les donnees perso MyRatingsRDD
# .repartition(2) : le nouveau RDD est divisé en 2 partitions
# .cache : persist le nouveau RDD pour eviter que cette transformation de filter soit opérée pour chaque action
training = ratings.filter(lambda monTuple : monTuple[0]<6).values().union(myRatingsRDD).repartition(2).cache()

numTraining = training.count()


In [85]:
validation = ratings.filter(lambda monTuple : monTuple[0]>=6 and monTuple[0]<8).values().repartition(2).cache()
numValidation = validation.count()


In [86]:
test = ratings.filter(lambda monTuple : monTuple[0]>=8).values().repartition(2).cache()
numTest = test.count()


# La mesure de l'erreur de prédiction

In [87]:
# fonction de calcul du root-mean-square-error
def computeRmse(model, data, n):
    #get the prediction from an user - movie pair
    predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
    
    #compare  the predictions to the actual ratings
    #perform an inner join of the predictions with the data and their ratings (x[2])
        
    predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
        .join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
        .values()  #keep the two ratings
    
    # Pour chaque jeu de prédiction - vraie notation, transformation en (x[0] - x[1])**2) (transformation map)
    # puis action reduce 
    return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1])**2).reduce(add) / float(n))

In [88]:
#Ranges of variables for the models
ranks = [8, 12]
lambdas = [0.1, 10.0]
numIters = [10, 20]

#Storing the best parameters
bestModel = None
bestTestRmse = 1.0e30000
bestRank = 0
bestLambda = -0.1
bestNumIter = -1

In [89]:
#Collaborative filtering is commonly used for recommender systems.
#These techniques aim to fill in the missing entries of a user-item association matrix.
#spark.mllib currently supports model-based collaborative filtering,
#in which users and products are described by a small set of latent factors that can be used to predict missing entries.
#spark.mllib uses the alternating least squares (ALS) algorithm to learn these latent factors.

#Training the ALS models on the training RDDs
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
    #cartesian product, equivalent to neasted for loops
    
    #On entraine le modele avec le jeu d'entrainement
    model = ALS.train(training, rank, numIter, lmbda)
    
    # On estime l'erreur grace au jeu de test
    #estimate error
    testRmse = computeRmse(model, test, numTest)
    
    print "RMSE (test) = %f for the model trained with " % testRmse + \
          "rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter)
        
    #store the best settings
    if(testRmse < bestTestRmse):
        bestModel = model
        bestTestRmse = testRmse
        bestRank = rank
        bestLambda = lmbda
        bestNumIter = numIter
        
    # evaluate the best model on the validation set
    validationRmse = computeRmse(bestModel, validation, numValidation)
    
    print "The best model was trained with rank = %d and lambda = %.1f, " % (bestRank, bestLambda) \
    + "and numIter = %d, and its RMSE on the validation set is %f." % (bestNumIter, bestTestRmse)
    

RMSE (test) = 0.877001 for the model trained with rank = 8, lambda = 0.1, and numIter = 10.
The best model was trained with rank = 8 and lambda = 0.1, and numIter = 10, and its RMSE on the validation set is 0.877001.
RMSE (test) = 0.870945 for the model trained with rank = 8, lambda = 0.1, and numIter = 20.
The best model was trained with rank = 8 and lambda = 0.1, and numIter = 20, and its RMSE on the validation set is 0.870945.
RMSE (test) = 3.752548 for the model trained with rank = 8, lambda = 10.0, and numIter = 10.
The best model was trained with rank = 8 and lambda = 0.1, and numIter = 20, and its RMSE on the validation set is 0.870945.
RMSE (test) = 3.752548 for the model trained with rank = 8, lambda = 10.0, and numIter = 20.
The best model was trained with rank = 8 and lambda = 0.1, and numIter = 20, and its RMSE on the validation set is 0.870945.
RMSE (test) = 0.872612 for the model trained with rank = 12, lambda = 0.1, and numIter = 10.
The best model was trained with rank 

## Utiliser le meilleur model pour faire les prédictions

In [90]:
# On ajoute les resultats du jeu de test a ceux du jeu d'entrainement
#compare the best model with a naive baseline always returning the mean rating
meanRating = training.union(test).map(lambda x: x[2]).mean()
baselineRmse = sqrt(validation.map(lambda x: (meanRating - x[2]) ** 2).reduce(add) / numValidation)
improvement = (baselineRmse - validationRmse) / baselineRmse * 100
print "The best model improves the baseline by %.2f" % (improvement) + "%."

The best model improves the baseline by 21.96%.


In [67]:
# load personal ratings
f = open("/Users/Florian/Documents/Formation_Python/Recommandation/personalRatings.txt")

#get only rated movies (i.e. fields[2] > 0)
myRatings = filter(lambda r: r[2] > 0, [parseRating(line)[1] for line in f])

f.close()
myRatings

[(0, 1, 4),
 (0, 780, 2),
 (0, 590, 3),
 (0, 1210, 5),
 (0, 648, 3),
 (0, 344, 1),
 (0, 165, 2),
 (0, 153, 4),
 (0, 597, 2),
 (0, 1580, 3),
 (0, 231, 4)]

In [71]:


#make personalized recommendations
myRatedMoviesIds = set([x[1] for x in myRatings])
candidates = sc.parallelize([m for m in movies if m not in myRatedMoviesIds])
predictions = bestModel.predictAll(candidates.map(lambda x: (0,x))).collect()
recommendations = sorted(predictions, key=lambda x: x[2], reverse=True)[:10]

print "Movies recommended for you:"
for i in xrange(len(recommendations)):
    print("%2d: %s" % (i+1, movies[recommendations[i][1]])).encode('ascii','ignore')
    

Movies recommended for you:
 1: American Dream (1990)
 2: Nobody Loves Me (Keiner liebt mich) (1994)
 3: Anne Frank Remembered (1995)
 4: Hard Core Logo (1996)
 5: Some Mother's Son (1996)
 6: Star Wars: Episode V - The Empire Strikes Back (1980)
 7: Maya Lin: A Strong Clear Vision (1994)
 8: Trial, The (Le Procs) (1963)
 9: Conformist, The (Il Conformista) (1970)
10: Careful (1992)


In [38]:
sc.stop()