In [None]:
"""
@author: ben
date: 2019-11-11
"""

### define urls for the dataset
small_data_url = "http://files.grouplens.org/datasets/movielens/ml-latest-small.zip"
full_data_url = "http://files.grouplens.org/datasets/movielens/ml-latest.zip"

In [1]:
## define download locations:
import os
data_path = os.path.join(".", "data")
small_data_path = os.path.join(data_path, "ml-latest-small.zip")
full_data_path = os.path.join(data_path, "ml-latest.zip")

In [None]:
## download the zip files:
import urllib.request
small_data = urllib.request.urlretrieve(small_data_url, small_data_path)
full_data = urllib.request.urlretrieve(full_data_url, full_data_path)

### unzip the downloaded files
import zipfile
with zipfile.ZipFile(small_data_path, "r") as t:
    t.extractall(data_path)
with zipfile.ZipFile(full_data_path, "r") as t:
    t.extractall(data_path)

In [2]:
### now we can parse and load the data files
##from pyspark.sql import SparkSession
##sc = SparkSession.builder.appName("ALS").getOrCreate()

from pyspark.context import SparkContext
SparkContext.setSystemProperty('spark.executor.memory', '5g')
sc =  SparkContext('local[*]')

In [3]:
small_ratings = os.path.join(data_path, "ml-latest-small", "ratings.csv")
small_ratings_rdd = sc.textFile(small_ratings)
small_header = small_ratings_rdd.take(1)[0]
print(small_header)

userId,movieId,rating,timestamp


In [4]:
small_ratings_rdd = (small_ratings_rdd.filter(lambda l: l != small_header)
                     .map(lambda l: l.split(","))
                     .map(lambda l: (l[0], l[1], l[2]) ).cache())

In [5]:
print(small_ratings_rdd.take(3))

[('1', '1', '4.0'), ('1', '3', '4.0'), ('1', '6', '4.0')]


In [6]:
!ls ./data/ml-latest-small/

README.txt  links.csv   movies.csv  ratings.csv tags.csv


In [7]:
small_movies = os.path.join(data_path, "ml-latest-small", "movies.csv")
small_movies_rdd = sc.textFile(small_movies)
small_header = small_movies_rdd.take(1)[0]
print(small_header)

movieId,title,genres


In [8]:
small_movies_rdd = (small_movies_rdd.filter(lambda l: l!= small_header)
                   .map(lambda l: l.split(","))
                   .map(lambda l: (l[0], l[1])))

In [9]:
print(small_movies_rdd.take(3))

[('1', 'Toy Story (1995)'), ('2', 'Jumanji (1995)'), ('3', 'Grumpier Old Men (1995)')]


In [10]:
train_rdd, valid_rdd, hold_rdd = small_ratings_rdd.randomSplit([6,2,2], seed=16807)
valid_for_pred = valid_rdd.map(lambda r: (int(r[0]), int(r[1])))
hold_for_pred = hold_rdd.map(lambda r: (int(r[0]), int(r[1])))

In [11]:
### hyperparameters for ALS model
## rank: number of latent factors in the model.
## lambda: regularization parameter
from pyspark.mllib.recommendation import ALS
import math

ranks = [4, 6, 8, 10, 12]
itr = 10
lamdas = [0.02, 0.05, 0.1, 0.2]
tolerance = 0.02
min_MSE = float("inf")

best_rank = best_lambda = 0

for lamda in lamdas:
    for rank in ranks:
        model = ALS.train(train_rdd, rank, itr, seed = 123, lambda_ = lamda)
        preds = model.predictAll(valid_for_pred).map(lambda r: ((int(r[0]), int(r[1])), float(r[2])))
        rates_and_preds = valid_rdd.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(preds)
        error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0]-r[1][1])**2).mean())
        print(f"model with rank = {rank} & lambda = {lamda} has MSE: ", error)
        if error < min_MSE:
            min_MSE = error
            best_rank, best_lambda = rank, lamda

print(f"Therefore, the best model has rank {best_rank}, lambda {best_lambda}, and MSE: {min_MSE}")



model with rank = 4 & lambda = 0.02 has MSE:  1.0229567024546342
model with rank = 6 & lambda = 0.02 has MSE:  1.0608889180004513
model with rank = 8 & lambda = 0.02 has MSE:  1.0985095995771081
model with rank = 10 & lambda = 0.02 has MSE:  1.1167148740192265
model with rank = 12 & lambda = 0.02 has MSE:  1.144699318657436
model with rank = 4 & lambda = 0.05 has MSE:  0.956813493758704
model with rank = 6 & lambda = 0.05 has MSE:  0.9761115969435162
model with rank = 8 & lambda = 0.05 has MSE:  0.9837170754784507
model with rank = 10 & lambda = 0.05 has MSE:  0.995090709315019
model with rank = 12 & lambda = 0.05 has MSE:  1.0038294682861604
model with rank = 4 & lambda = 0.1 has MSE:  0.9095306483375436
model with rank = 6 & lambda = 0.1 has MSE:  0.9144034013919901
model with rank = 8 & lambda = 0.1 has MSE:  0.9155277182121414
model with rank = 10 & lambda = 0.1 has MSE:  0.9172446979073086
model with rank = 12 & lambda = 0.1 has MSE:  0.9176534860971569
model with rank = 4 & lambd

In [12]:
best_rank, best_lambda = 12, 0.2

In [13]:
model = ALS.train(train_rdd, best_rank, itr, seed=16807, lambda_ = best_lambda)
preds = model.predictAll(hold_for_pred).map(lambda r: ((int(r[0]), int(r[1])), float(r[2])))
rates_and_preds = hold_rdd.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(preds)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print(f"MSE of the best model on the holdout set: {error}")

MSE of the best model on the holdout set: 0.9029795839647352


In [None]:
## now use the full dataset

In [14]:
full_ratings = os.path.join(data_path, "ml-latest", "ratings.csv")
full_ratings_rdd = sc.textFile(full_ratings)
full_header = full_ratings_rdd.take(1)[0]
full_ratings_rdd = (full_ratings_rdd.filter(lambda r: r!= full_header)
                   .map(lambda r: r.split(","))
                   .map(lambda r: (int(r[0]), int(r[1]), float(r[2]))).cache())

print(full_ratings_rdd.take(3))

[(1, 307, 3.5), (1, 481, 3.5), (1, 1091, 1.5)]


In [14]:
print("How many records in the full dataset? ", full_ratings_rdd.count())

How many records in the full dataset?  27753444


In [15]:
train_rdd, test_rdd = full_ratings_rdd.randomSplit([8, 2], seed = 17)
test_for_preds = test_rdd.map(lambda r: (int(r[0]), int(r[1])))


full_model = ALS.train(train_rdd, best_rank, itr, seed=123, lambda_ = best_lambda)
preds = full_model.predictAll(test_for_preds).map(lambda r: ((int(r[0]), int(r[1])), float(r[2])))
rates_and_preds = test_rdd.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(preds)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print("For testing on the full dataset, the MSE is: ", error)

For testing on the full dataset, the MSE is:  0.8644451910268783


In [16]:
del train_rdd, test_rdd, preds, rates_and_preds

In [None]:
## load full movie file

In [17]:
full_movies = os.path.join(data_path, "ml-latest", "movies.csv")
full_movies_rdd = sc.textFile(full_movies)
full_header = full_movies_rdd.take(1)[0]
full_movies_rdd = (full_movies_rdd.filter(lambda r: r != full_header)
                  .map(lambda r: r.split(","))
                  .map(lambda r: (int(r[0]), r[1])).cache())

print(full_movies_rdd.take(3))

[(1, 'Toy Story (1995)'), (2, 'Jumanji (1995)'), (3, 'Grumpier Old Men (1995)')]


In [17]:
print("How many movies are there in the full movie.csv file? ", full_movies_rdd.count())

How many movies are there in the full movie.csv file?  58098


In [18]:
## get the number of ratings and average ratins for each movie
def count_rating_and_get_ave(ID_ratings):
    count = len(ID_ratings[1])
    return ID_ratings[0], (count, float(sum(ID_ratings[1])/count))

movieID_and_ratings = full_ratings_rdd.map(lambda r: (r[1], r[2])).groupByKey().map(count_rating_and_get_ave)
movieID_and_counts = movieID_and_ratings.map(lambda r: (r[0], r[1][0]))


In [19]:
print("Movie ID, num. of ratings, ave. ratings: ", movieID_and_ratings.take(2))
print("Movie ID, num. of ratings: ", movieID_and_counts.take(2))

Movie ID, num. of ratings, ave. ratings:  [(1449, (6867, 3.918377748652978)), (828, (1736, 3.1474654377880182))]
Movie ID, num. of ratings:  [(1449, 6867), (828, 1736)]


In [None]:
## add new user ratings to the dataset

In [20]:
new_user = 0
new_user_ratings = [(new_user, 260, 4.0), (new_user,   1, 3.0), (new_user,  16, 3.0), (new_user, 25,  4.0),
                    (new_user,  32, 4.0), (new_user, 335, 1.0), (new_user, 379, 1.0), (new_user, 296, 3.0),
                    (new_user, 858, 5.0), (new_user,  50, 4.0)]
new_user_ratings_rdd = sc.parallelize(new_user_ratings)

## union new_user_ratings with existing ratings
full_ratings_rdd = full_ratings_rdd.union(new_user_ratings_rdd)

## print to check
print(new_user_ratings_rdd.take(2))

[(0, 260, 4.0), (0, 1, 3.0)]


In [21]:
from time import time
start = time()
print(f"Best rank and lambda for the model: {best_rank}, {best_lambda}")
new_ratings_model = ALS.train(full_ratings_rdd, best_rank, itr, seed=123, lambda_ = best_lambda)
print(f"Re-training takes {float(time()-start)} seconds")

Best rank and lambda for the model: 12, 0.2
Re-training takes 156.72719621658325 seconds


In [None]:
## now we can get recommendations

In [22]:
new_user_rated = set(l[1] for l in new_user_ratings)
new_user_unrated = full_movies_rdd.filter(lambda l: l[0] not in new_user_rated).map(lambda l: (new_user, l[0]))
new_user_recommendations = new_ratings_model.predictAll(new_user_unrated)

In [23]:
print(new_user_recommendations.take(3))

[Rating(user=0, product=116688, rating=0.9103302632655832), Rating(user=0, product=57044, rating=2.6552582665331936), Rating(user=0, product=69199, rating=2.034827451262054)]


In [None]:
## We have our recommendations ready.
## Now we can print out the 25 movies with the highest predicted ratings for this new user

In [24]:
new_movieID_ratings = (new_user_recommendations.map(lambda l: (l[1], (l[0], l[2])))
                      .join(full_movies_rdd).join(movieID_and_counts))

In [25]:
print(new_movieID_ratings.take(2))

[(125970, (((0, 2.2831169342749904), 'Halloweentown (1998)'), 148)), (7410, (((0, 2.5355211999056335), '"Osterman Weekend'), 177))]


In [None]:
## So we need to flat this down a bit in order to have (userID, Title, Rating, Ratings Count).

In [26]:
user_movieID_title_rating_count = new_movieID_ratings.map(lambda l: (l[1][0][0][0], l[0], l[1][0][1],
                                                                     l[1][0][0][1], l[1][1]))
print(user_movieID_title_rating_count.take(2))

[(0, 125970, 'Halloweentown (1998)', 2.2831169342749904, 148), (0, 7410, '"Osterman Weekend', 2.5355211999056335, 177)]


In [27]:
top_movies = user_movieID_title_rating_count.filter(lambda l: l[4]>100).takeOrdered(20, key=lambda l: -l[3])
print("Top recommended movies:\n# UserID, MovieID, MovieTitle, PredictedRating, rating count")
print("\n".join(map(str, top_movies)))

Top recommended movies:
# UserID, MovieID, MovieTitle, PredictedRating, rating count
(0, 171495, 'Cosmos', 3.7945052255766054, 157)
(0, 26082, 'Harakiri (Seppuku) (1962)', 3.7348204017080247, 679)
(0, 171011, 'Planet Earth II (2016)', 3.7302233323374954, 853)
(0, 159817, 'Planet Earth (2006)', 3.723779938958276, 1384)
(0, 105250, '"Century of the Self', 3.721210227899121, 213)
(0, 101850, 'Death on the Staircase (Soupçons) (2004)', 3.6879269831915624, 130)
(0, 6669, 'Ikiru (1952)', 3.6780773173121633, 1551)
(0, 26587, '"Decalogue', 3.6570485630191625, 547)
(0, 172591, 'The Godfather Trilogy: 1972-1990 (1992)', 3.6528542110405535, 421)
(0, 170705, 'Band of Brothers (2001)', 3.650982132699011, 984)
(0, 7926, 'High and Low (Tengoku to jigoku) (1963)', 3.648158267529952, 812)
(0, 6818, 'Come and See (Idi i smotri) (1985)', 3.6403026168460118, 703)
(0, 2019, 'Seven Samurai (Shichinin no samurai) (1954)', 3.633078233250661, 14578)
(0, 1178, 'Paths of Glory (1957)', 3.625288643496362, 4508)
(

In [None]:
## another usefull usecase is to get the predicted rating for for a specific movie for a given user

In [28]:
new_data = sc.parallelize([(0, 500)])
individual_rating_rdd = new_ratings_model.predictAll(new_data).map(lambda r: (r[0], r[1], r[2]))
print(individual_rating_rdd.take(1))

[(0, 500, 2.458496060948492)]


In [None]:
### persisting the model

In [29]:
from pyspark.mllib.recommendation import MatrixFactorizationModel
model_path = os.path.join("./", "models", "./movie_lens_ALS_model")
full_model.save(sc, model_path)
loaded_model = MatrixFactorizationModel.load(sc, model_path)

In [30]:
del full_ratings_rdd

In [None]:
### Part 2: building a web service with spark and Flask

In [None]:
### building the recommendataion engine (engine.py)

In [31]:
import os
from pyspark.mllib.recommendation import ALS

In [32]:
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [33]:
class RecommendationEngine(object):
    """Movie recommendation engine"""
    
    def __init__(self, sc, data_path):
        """Initialize the engine given SparkContext and path to the data"""
        logger.info("Setting up the Movie Recommendation Engine ... (I call it 'Mre')")
        self.sc = sc
        """Load rating data"""
        logger.info("Loading rating data ... (make sure it is called 'raings.csv')")
        file_path = os.path.join(data_path, "ratings.csv")
        data_raw = self.sc.textFile(file_path)
        header = data_raw.take(1)[0]
        self.ratings_rdd = (data_raw.filter(lambda l: l != header).map(lambda l: l.split(","))
                                    .map(lambda l: (int(l[0]), int(l[1]), float(l[2]))).cache())
        
        """Load movie data"""
        logger.info("Loading movie data ... (make sure it is called 'movies.csv')")
        file_path = os.path.join(data_path, "movies.csv")
        data_raw = self.sc.textFile(file_path)
        header = data_raw.take(1)[0]
        self.movie_title_genre_rdd = (data_raw.filter(lambda l: l != header).map(lambda l: l.split(","))
                                               .map(lambda l: (int(l[0]), l[1], l[2])).cache())
        self.movie_title_rdd = self.movie_title_genre_rdd.map(lambda l: (l[0], l[1])).cache()
        
        """Compute movie ave ratings and rating counts"""
        self.__count_ave_ratings()
        
        """Train the model"""
        self.rank = 8
        self.seed = 16807
        self.iters = 10
        self.reg_param = 0.1
        self.__train_model()
    
    def __count_ave_ratings(self):
        """update movie average ratins and rating counts using self.ratings_rdd"""
        logger.info("For every movieID, computing average ratings and number of ratings ...")
        movieID_ratings = self.ratings_rdd.map(lambda l: (l[1], l[2])).groupByKey()
        self.movie_ratings_count_rdd = movieID_ratings.map(count_rating_and_get_ave)
    
    def __train_model(self):
        """train the ALS model with rdd"""
        logger.info("Training the model ...")
        self.model = ALS.train(self.ratings_rdd, self.rank, self.iters, seed=self.seed, lambda_ = self.reg_param)
        logger.info("ALS model is built !")


In [34]:
def add_ratings(self, new_ratings):
    """We should be able to add new ratings to the dataset"""
    new_ratings_rdd = self.sc.parallelize(new_ratings)
    ## add the new ratings to the existing ratings_rdd
    self.ratings_rdd = self.ratings_rdd.union(new_ratings_rdd)
    ## re-compute movie rating count and average ratings
    self.__count_ave_ratings()
    ## re-train the model with the new ratings_rdd
    self.__train_model()
    return

RecommendationEngine.add_ratings = add_ratings

In [None]:
## making recommendations

In [47]:
def __predict_ratings(self, user_and_movie):
    """ Predict ratings for given rdd with format (userID, movieID)
    Returns: rdd with format (userID, movieID, movieTitle, rating, rating_count)
    """
    predicted_rdd = self.model.predictAll(user_and_movie).map(lambda l: (l[0], l[1], l[2]))
    movie_user_rating_rdd = (predicted_rdd.map(lambda l: (l[1], (l[0], l[2]))).join(self.movie_title_rdd)
                         .join(self.movie_ratings_count_rdd))
    movie_user_rating_rdd = movie_user_rating_rdd.map(lambda l: (l[1][0][0][0], l[0], l[1][0][1], l[1][1]))
    return movie_user_rating_rdd

def get_top_ratings(self, user_id, movie_num, num_rating_thresh=25):
    """get movie_num top rating movies for this user
    on the condition that among the returned result each movie has more than num_rating_thresh ratings
    """
    user_unrated_rdd = (self.ratings_rdd.filter(lambda l: l[0] != user_id)
                            .map(lambda l: (user_id, l[1])))
    ratings = (self.__predict_ratings(user_unrated_rdd).filter(lambda l: l[4] > num_rating_thresh)
                   .takeOrdered(movie_num, key=lambda l: -l[3]))
    return ratings
    
RecommendationEngine.__predict_ratings = __predict_ratings
RecommendationEngine.get_user_rated_set = get_user_rated_set
RecommendationEngine.get_top_ratings = get_top_ratings


In [None]:
## We will also want to get ratings to particular movies

In [51]:
def get_ratings_for_movies(self, user_id, movie_ids):
    requested_rdd = self.sc.parallelize(movie_ids).map(lambda l: (user_id, l))
    ratings = self.__predict_ratings(requested_rdd)
    return ratings

RecommendationEngine.get_ratings_for_movies = get_ratings_for_movies

In [41]:
#import sys
#!{sys.executable} -m pip install flask

In [None]:
## Building a Web API around our Engine using Flask (app.py)

In [49]:
from flask import Blueprint
main = Blueprint('main', __name__)

import json
from engine import RecommendationEngine

import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [52]:
from flask import Flask, request

@main.route("/<int:user_id>/ratings/top/<int:count>", methods=["GET"])
def top_ratings(user_id, count):
    logger.debug(f"User {user_id}'s TOP {count} movies requested ... ")
    top_ratings = recommendation_engine.get_top_ratings(user_id, count)
    return json.dumps(top_ratings)

@main.route("/<int:user_id>/ratings/<int:movie_id>", method=["GET"])
def movie_ratings(user_id, movie_id):
    logger.debug(f"User {user_id} rating for movie {movie_id} requested ... ")
    user_movie_rating = recommendation_engine.get_ratings_for_movies(user_id, [movie_id])
    return json.dumps(user_movie_rating)

@main.route("/<int:user_id>/ratings/", method=["POST"])
def add_ratings(user_id):
    ## get ratings from the Flask POST request object
    ratings_list = request.form.keys()[0].strip().split("\n")
    ratings_list = map(lambda x: x.split(","), ratings_list)
    ## create a list with the format required by the engine: user_id, movie_id, ratings
    new_ratings = map(lambda x: (user_id, int(x[0]), float(x[1])), ratings_list)
    ## add ratings to the engine
    recommendation_engine.add_ratings(new_ratings)
    return json.dumps(new_ratings)

In [None]:
## creat app

In [53]:
def create_app(spark_context, data_path):
    global recommendation_engine
    recommendation_engine = RecommendationEngine(sc, data_path)
    app = Flask(__name__)
    app.register_blueprint(main)
    return app
    

In [54]:
new_movieID_ratings = [(260,9), (1,8), (16,7), (25,8), (32,9), (335,4), (379,3), (296,7), (858,10), (50,8)]

In [None]:
## Deploying a WSGI Server using CherryPy (server.py)

In [55]:
#import sys
!{sys.executable} -m pip install CherryPy

Collecting CherryPy
[?25l  Downloading https://files.pythonhosted.org/packages/0f/2c/73e16c77b20c01c277c42a1b4ee29ebadae2b18104570b920cadc3e51413/CherryPy-18.5.0-py2.py3-none-any.whl (418kB)
[K     |████████████████████████████████| 419kB 5.5MB/s eta 0:00:01
[?25hCollecting zc.lockfile
  Downloading https://files.pythonhosted.org/packages/6c/2a/268389776288f0f26c7272c70c36c96dcc0bdb88ab6216ea18e19df1fadd/zc.lockfile-2.0-py2.py3-none-any.whl
Collecting cheroot>=8.2.1
[?25l  Downloading https://files.pythonhosted.org/packages/bf/be/51b1517c6dbf3851d44b36ff08a6e1012464149f89f74c46b29d2f76545e/cheroot-8.2.1-py2.py3-none-any.whl (79kB)
[K     |████████████████████████████████| 81kB 10.3MB/s eta 0:00:01
[?25hCollecting jaraco.collections
  Downloading https://files.pythonhosted.org/packages/fe/06/a89652069f0a13a33701714c0c8e0cc8656bf6d21b7c6b85fde86cf06ff6/jaraco.collections-3.0.0-py2.py3-none-any.whl
Collecting portend>=2.1.1
  Downloading https://files.pythonhosted.org/packages/d7/79

In [56]:
import time, sys, cherrypy, os
from paste.translogger import TransLogger
from app import create_app
from pyspark import SparkContext, SparkConf

In [57]:
def init_spark_context():
    conf = SparkConf().setAppName("MovieLens-Recommendation-Server")
    sc = SparkContext(conf = conf, pyFiles = ["engine.py", "app.py"])
    return sc

In [59]:
def run_server(app):
    ## enable WSGI access logging via Paste
    app_logged = TransLogger(app)
    
    ## Mount the WSGI callable object (app) on the root directory
    cherrypy.tree.graft(app_logged, "/")
    
    ## set the configuration for the web server
    cherrypy.config.update( { "engine.autoreload.com": True,
                              "log.screen": True,
                              "server.socket_port": 7788,
                              "server.socket_host": "0.0.0.0" } )
    ## start the CherryPy WSGI web server
    cherrypy.engine.start()
    cherrypy.engine.block()

if __name__ == "main":
    ## initialize spark context
    sc = init_spark_context()
    data_path = path.join("data", "ml-latest")
    app = create_app(sc, data_path)
    
    ## start the web server
    run_server(app)