# ***Recommendation Sysytem with Pyspark***

### Author: Salma OUARDI
 Dataset : [MovieLens 100k](https://www.kaggle.com/datasets/prajitdatta/movielens-100k-dataset)



In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 39 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 58.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805911 sha256=5a91e26c70f4ddb906f168b74248c459e854804ce5d36efdff0557603d8bd357
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [None]:
from pyspark import SparkContext
sc=SparkContext()

## Train model with explict data

In [None]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

# extract rating data
data = sc.textFile(HDFS+'u.data')
data_fileds = data.map(lambda line: line.split())
ratings = data_fileds.map(lambda fields: Rating(fields[0], fields[1], fields[2]))

# parameters
rank = 200
iterations = 50
lambda_ = 0.01

# train model and validate with MSE
model = ALS.train(ratings, rank, iterations, lambda_)
test_data = ratings.map(lambda r: (r[0], r[1]))
predictions = model.predictAll(test_data).map(lambda r: ((r[0], r[1]), r[2])) # predictAll return a list of Rating
true_ratings = ratings.map(lambda r: ((int(r[0]), int(r[1])), float(r[2])))
# join is called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs 
ratings_and_predictions = predictions.join(true_ratings) 
mes = ratings_and_predictions.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print 'MSE:{0}'.format(mes)

MSE:0.00806581210266


## Use the model

## top k recommendation for a user

In [None]:
K = 10
user_id = 789
top_k = model.recommendProducts(user_id, K)
print '=====top {1} recommendataion for user {0}======'.format(user_id, K)
for item in top_k:
    print item

Rating(user=789, product=127, rating=4.975318665475989)
Rating(user=789, product=100, rating=4.973055095155739)
Rating(user=789, product=475, rating=4.9693345410273295)
Rating(user=789, product=276, rating=4.9691825418727475)
Rating(user=789, product=129, rating=4.961758935939646)
Rating(user=789, product=150, rating=4.960734391882507)
Rating(user=789, product=9, rating=4.955455781631823)
Rating(user=789, product=50, rating=4.955384189847999)
Rating(user=789, product=741, rating=4.929019832825062)
Rating(user=789, product=663, rating=4.88606841224948)


In [None]:
# vertify the top k recommendation from their content
movies = sc.textFile(HDFS + 'u.item')
print movies.first()
# mapping of movie id and its title
titles = movies.map(lambda line: line.split('|')[:2]).map(lambda t: (int(t[0]), t[1])).collectAsMap()
print titles[134]

# find the most favorite movies that are already rated by user_id

# rated_movies = ratings.keyBy(lambda r: int(r.user)).lookup(user_id) # return a list rather than a rdd
rated_movies = ratings.filter(lambda r: int(r.user) == user_id).sortBy(lambda r: r.rating, ascending = False).take(10)
print '========={0} most favorite rated movies for user {1}==========='.format(len(rated_movies), user_id)
for r in rated_movies:
    print titles[r.product], r.rating

print '=====top {0} recommendataion for user {1}======'.format(K, user_id)
for r in top_k:
    print titles[r.product], r.rating

1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
Citizen Kane (1941)
Godfather, The (1972) 5.0
Trainspotting (1996) 5.0
Dead Man Walking (1995) 5.0
Star Wars (1977) 5.0
Swingers (1996) 5.0
Leaving Las Vegas (1995) 5.0
Bound (1996) 5.0
Fargo (1996) 5.0
Last Supper, The (1995) 5.0
Private Parts (1997) 4.0
Godfather, The (1972) 4.97531866548
Fargo (1996) 4.97305509516
Trainspotting (1996) 4.96933454103
Leaving Las Vegas (1995) 4.96918254187
Bound (1996) 4.96175893594
Swingers (1996) 4.96073439188
Dead Man Walking (1995) 4.95545578163
Star Wars (1977) 4.95538418985
Last Supper, The (1995) 4.92901983283
Being There (1979) 4.88606841225


## similar item recommendataion

In [None]:
import numpy as np
def cosineSimilarity(vec1, vec2):
    """vec1 and vec2 are numpy array"""
    return (vec1.dot(vec2.T)/(np.linalg.norm(vec1) * np.linalg.norm(vec2)))[0]
item_id = 567
# has the item_id as key
item_factor = model.productFeatures().filter(lambda i: i[0] == item_id)
#print item_factor.collect()
# don't have item_id as key
item_factor = model.productFeatures().lookup(item_id)
item_factor =  np.array(item_factor)

sims = model.productFeatures().map(lambda p: (int(p[0]), cosineSimilarity(item_factor, np.array(p[1]))))
most_sims = sims.sortBy(lambda p: p[1], ascending = False).take(10)
print '============{0} most similar movies to {1}=========='.format(len(most_sims), titles[item_id])
for p in most_sims:
    # print p
    print titles[p[0]], p[1]

Wes Craven's New Nightmare (1994) 1.0
Body Snatchers (1993) 0.67708296453
Tales from the Crypt Presents: Bordello of Blood (1996) 0.66785730695
Stephen King's The Langoliers (1995) 0.659703025353
Braindead (1992) 0.656651008989
Blink (1994) 0.635719989761
Albino Alligator (1996) 0.633008686004
Nightmare on Elm Street, A (1984) 0.628060514577
Paradise Lost: The Child Murders at Robin Hood Hills (1996) 0.613623292345
Edge, The (1997) 0.611766252378


# Evaluate performance of the model

## Mean Sqared Error(MSE)

In [None]:
# same as above
test_data = ratings.map(lambda r: (r[0], r[1]))
predictions = model.predictAll(test_data).map(lambda r: ((r[0], r[1]), r[2])) # predictAll return a list of Rating
true_ratings = ratings.map(lambda r: ((int(r[0]), int(r[1])), float(r[2])))
# join is called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs 
predictions_and_actual = predictions.join(true_ratings)
mes = predictions_and_actual.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print 'MSE:{0}'.format(mes)

MSE:0.00806581210266


## Mean average precision at K(MAPK)

In [None]:
def avgPrecisionK(actual_ratings, predicted_ratings, k):
    """compute APK for a user
    actual_ratings: list of Ratings
    predicted_ratings: list of Ratings
    """
    actual = [r.product for r in actual_ratings]
    predicted = [r.product for r in predicted_ratings]
    predicted_k = predicted[:k]
    score, hits = 0, 0
    for i in xrange(len(predicted_k)):
        if predicted_k[i] in actual:
            hits += 1
            score += 1.0*hits/(i+1)
    if len(actual) == 0 or len(predicted_k) == 0:
        return 0
    else:
        return score/min(len(actual), len(predicted_k))

# test for one user
print 'APK for user {0} : {1}'.format(user_id, avgPrecisionK(rated_movies, top_k, 10))



# compute APK for each user and average them to get MAPK
K = 10
# get all users
total_apk = 0
user_ids = sc.textFile(HDFS+'u.user').map(lambda line : int(line.split('|')[0])).collect()
for user_id in user_ids:
    total_apk += avgPrecisionK(ratings.filter(lambda r: int(r.user) == user_id).sortBy(lambda r: r.rating, ascending = False).take(10),\
                        model.recommendProducts(user_id, K), K)
MAPK = 1.0 * total_apk/len(user_ids)
print 'MAPK for all users : {0}'.format(MAPK)

APK for user 212 : 0.9
MAPK for all users : 0.295360551432


## MLlib's built-in evaluation functions

### RMSE and MSE

In [None]:
from  pyspark.mllib.evaluation import RegressionMetrics
predict_and_true = predictions_and_actual.map(lambda pr:pr[1])
metrics = RegressionMetrics(predict_and_true)
print 'RMSE:{0}, MSE:{1}'.format(metrics.rootMeanSquaredError, metrics.meanSquaredError)

RMSE:0.0898098663993, MSE:0.00806581210266


### MAP

In [None]:
from pyspark.mllib.evaluation import RankingMetrics
import time
K = 10

start_time = time.time()
recommend, true = [], []
user_ids = sc.textFile(HDFS+'u.user').map(lambda line : int(line.split('|')[0])).collect()
for user_id in user_ids:
    r_ratings = model.recommendProducts(user_id, K)
    t_ratings = ratings.filter(lambda r: int(r.user) == user_id).sortBy(lambda r: r.rating, ascending = False).take(K)
    recommend.append([r.product for r in r_ratings])
    true.append([r.product for r in t_ratings])
metrics = RankingMetrics(sc.parallelize(zip(recommend, true)))
print 'consuming time: {0}s, MAPK: {1}'.format(time.time() - start_time, metrics.meanAveragePrecision)

 consuming time: 1196.98456383s, MAPK: 0.294608771398
