In [2]:
import os
from pyspark.mllib.recommendation import ALS

In [27]:
import logging
logging.basicConfig(level=logging.INFO)
logger=logging.getLogger(__name__)

def get_counts_and_averages(movie_tuple):
    nratings=len(movie_tuple[1])
    return (movie_tuple[0],nratings)

In [42]:
class RecommendationEngine:
    """ movie recommendation engine"""
    def __count_and_average_ratings(self):
        logger.info("counting movie ratings...")
        movie_ID_with_ratings_RDD=self.ratings_RDD.map(lambda x: (int(x[1]),float(x[2]))).groupByKey()
        movie_ID_with_average_ratings_RDD=movie_ID_with_ratings_RDD.map(get_counts_and_averages)
        self.movies_ratings_counts_RDD=movie_ID_with_average_ratings_RDD.map(lambda x: (x[0],x[1][0]))
        
    def __train_model(self):
        """train ALS model with current dataset
        """
        logger.info("training the ALS model...")
        self.model=ALS.train(self.ratings_RDD,self.rank,seed=self.seed,iterations=self.iterations,lambda_=self.regularization)
        logger.info("Model built!")
        
    def __init__(self, sc, dataset_path):
        """Init the recommendation engine given dataset path and spark context
        """
        logger.info("Starting recommender engine...")
        self.sc=sc
        
        #load ratings data for future use
        logger.info("loading ratings data...")
        ratings_file_path=os.path.join(dataset_path,'ratings.csv')
        ratings_raw_RDD=sc.textFile(ratings_file_path)
        ratings_raw_data_header=ratings_raw_RDD.take(1)[0]
        self.ratings_RDD=ratings_raw_RDD.filter(lambda line: line!=ratings_raw_data_header).map(lambda line: line.split(',')).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
        
        
        #load movies data for later use
        logger.info("Loading movies data...")
        movies_file_path=os.path.join(dataset_path,'movies.csv')
        movies_raw_RDD=sc.textFile(movies_file_path)
        movies_header=movies_raw_RDD.take(1)[0]
        self.movies_RDD=movies_raw_RDD.filter(lambda line: line!=movies_header).map(lambda line: line.split(',')).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()
        self.movies_titles_RDD=self.movies_RDD.map(lambda tokens: (int(tokens[0]),tokens[1])).cache()
        
        
        #pre-calculate movies ratings counts
        self.__count_and_average_ratings()
        
        
        #train the model
        self.rank=8
        self.seed=5
        self.regularization=0.1
        self.iterations=5
        self.__train_model()
        
        
        
                                                                        

In [43]:
def add_ratings(self, ratings):
    """add additional movie ratings in format (user_id, movie_id, rating)
    """
    
    #convert new ratings to RDD
    new_ratings_RDD=self.sc.parallelize(ratings)
    
    # add to existing ratings
    self.ratings_RDD=self.ratings_RDD.union(new_ratings_RDD)
    
    #recompute movie ratings count
    self.__count_and_average_ratings()
    
    #re-train ALS model with new ratings
    self.__train_model()
    return ratings

#attach this function to a class method
RecommendationEngine.add_ratings=add_ratings

In [44]:
def __predict_ratings(self, user_and_movie_RDD):
    """Gets predictions for (userID, movieID)
    """
    predicted_RDD=self.model.predictAll(user_and_movie_RDD)
    predicted_rating_RDD=predicted_RDD.map(lambda x:(x.product, x.rating))
    predicted_rating_title_and_count_RDD=predicted_rating_RDD.join(self.movies_titles_RDD).join(self.movies_rating_counts_RDD)
    predicted_rating_title_and_count_RDD=predicted_rating_title_and_count_RDD.map(lambda r: (r[1][0][1],r[1][0][0], r[1][1]))
    return predicted_rating_title_and_count_RDD

def get_top_ratings(self, user_id, movies_count):
    """Recommends top movies_count predicted movies unrated by user_id
    """
    #get pairs of (userID, moviesID) for user_id unrated movies
    user_unrated_movies_RDD=self.ratings_RDD.filter(lambda rating: not rating[0]==user_id ).map(lambda x: (user_id, x[1])).distinct()
    ratings=self.__predict_ratings(user_unrated_movies_RDD).filter(lambda r: r[2]>=25).takeOrdered(movies_count, key=lambda x: -x[1])
    return ratings

RecommendationEngine.__predict_ratings=__predict_ratings
RecommendationEngine.get_top_ratings=get_top_ratings
    
    
    
    # get unpredicted ratings
    

In [45]:
def get_ratings_for_movie_ids(self,user_id,movie_ids):
    """predict ratings of movie ids for user id
    """
    requested_movies_RDD=self.sc.parallelize(movie_ids).map(lambda x: (user_id, x))
    
    #get predicted ratings
    ratings=self.__predict_ratings(requested_movies_RDD).collect()
    return ratings

#attach function to class method
RecommendationEngine.get_ratings_for_movie_ids=get_ratings_for_movie_ids
    

In [46]:
#flask webapp

from flask import Blueprint
main=Blueprint('main',__name__)

import json
#import RecommendationEngine

In [47]:
import logging
logging.basicConfig(level=logging.INFO)
logger=logging.getLogger(__name__)




In [48]:
from flask import Flask, Request

In [49]:
@main.route("/<int:user_id>/ratings/top/<int:count>",methods=["GET"])
def top_ratings(user_id, count):
    logger.debug("User %s requested %s ratings",(user_id,count))
    top_ratings=recommendation_engine.get_top_ratings(user_id, count)
    return json.dumps(top_ratings)

@main.route("/<int:user_id>/ratings/<int:movie_id>", methods=["GET"])
def movie_ratings(user_id, movie_id):
    logger.debug("%s requested rating for %s",(user_id, movie_id))
    ratings=recommendation_engine.get_ratings_for_movie_ids(user_id, [movie_id])
    return json.dumps(ratings)

@main.route("/<int:user_id>/ratings",methods=["POST"])
def add_ratings(user_id):
    #get ratings from FLASK POST object
    ratings_list=request.form.keys()[0].strip().split("\n")
    ratings_list=map(lambda x: x.split(","), ratings_list)
    #create list of (user_id, movie_id, rating)
    ratings=map(lambda x: (user_id, int(x[0]), float(x[1])), ratings_list)
    recommendation_engine.add_ratings(ratings)
    return json.dumps(ratings)

def create_app(spark_context, dataset_path):
    global recommendation_engine
    recommendation_engine=RecommendationEngine(spark_context, dataset_path)
    
    app=Flask(__name__)
    app.register_blueprint(main)
    return app
    

In [50]:
#The final cherrypy server for running the app
import sys, time, cherrypy, os
                        

In [37]:
from paste.translogger import TransLogger

ImportError: No module named paste.translogger

In [38]:
from app import create_app

ImportError: No module named app

In [51]:
def init_spark_context():
    return sc

In [52]:
def run_server(app):
    app_logged=TransLogger(app)
    cherrypy.tree.graft(app_logged,"/")
    cherrypy.config.update({
            'engine.autoreload.on':True,
            'log.screen':True,
            'server.socket_port':5432,
            'server.socket_host':'0.0.0.0'
        })
    cherrypy.engine.start()
    cherrypy.engine.block()

In [53]:
if __name__=="__main__":
    dataset_path='/user/hduser/ml-latest'
    app=create_app(sc, dataset_path)
    #start webserver
    run_server(app)

INFO:__main__:Starting recommender engine...
INFO:__main__:loading ratings data...
INFO:__main__:Loading movies data...
INFO:__main__:counting movie ratings...
INFO:__main__:training the ALS model...
INFO:__main__:Model built!


ValueError: malformed url rule: '/<int: user_id>/ratings/<int: movie_id>'