## Online movie recommending service using Spark and Flask

To make our movie recommender a online service, we can use the python-based framework [Flask](http://flask.pocoo.org/). This can be used to build web-services on top of any kind of Spark models. 

The complete web service contains three Python files:
- recommender.py defines the recommender engine, it contains all the Spark related computation.
- app.py is a Flask web application that defines a restful-like API around the engine.
- server.py initialize a CherryPy webserver after creating a Spark context and Flask web application.

## A recommendation engine

The core of our movie recommendation web service is the recommendation model we built. It is represented by the class RecommendationEngion.

### Starting the engine


When the engine is initialized, we need to generate the ALS model for the first time. Optionally we might want to load a previously built model, we might even want to load or precompute any RDDs that will be used later to make recommendations.

We'll do that in the \__init\__ method. In this case, we won't save any time. The same process is repeated everytime the engine is created.

In [10]:
import os
from pyspark.mllib.recommendation import ALS

import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def get_counts_and_avarages(id_rating_tuple):
    nrating = len(id_rating_tuple[1])
    return id_rating_tuple[0], (nrating, float(sum(x for x in id_rating_tuple[1])) / nrating)

class RecommendationEngine:
    
    def __count_and_average_ratings(self):
        loger.info("Counting movie ratings...")
        movieId_ratings_RDD = self.ratings_RDD.map(lambda x: (x[1], x[2])).groupByKey()
        movieId_avg_ratings_RDD = movieId_ratings_RDD.map(get_counts_and_avarages)
        self.movies_rating_counts_RDD = movieId_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))
        
    def __train_model(self):
        logger.info("Training the ALS model...")
        self.model = ALS.train(self.ratings_RDD, self.rank, seed = self.seed,
                              iterations = self.iterations, lambda_ = self.regularization_parameter)
        logger.info("ALS model built!")
    
    def __init__(self, sc, dataset_path):
        logger.info("Starting up the Recommendation Engine: ")
        
        self.sc = sc
        
        #Load ratings data
        logger.info("Loading ratings data...")
        ratings_file_path = os.path.join(dataset_path, 'ratings.csv')
        ratings_raw_RDD = self.sc.textFile(ratings_file_path)
        ratings_raw_header = ratings_raw_RDD.take(1)[0]
        self.ratings_RDD = ratings_raw_RDD.filter(lambda line: line!=ratings_raw_header) \
        .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2]))).cache()
        
        #Load movies data
        logger.info("Loading movies data...")
        movies_file_path = os.path.join(dataset_path, 'movies.csv')
        movies_raw_RDD = self.sc.textFile(movies_file_path)
        movies_raw_header = movies_raw_RDD.take(1)[0]
        self.movies_RDD = movies_raw_RDD.filter(lambda line: line!=movies_raw_header) \
        .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]), tokens[1], tokens[2])).cache()
        self.movies_titles_RDD = self.movies_RDD.map(lambda x: (int(x[0]), x[1])).cache()
        
        #Pre-compute movies ratings counts
        self.__count_and_average_ratings()
        
        #Train the model
        self.rank = 4
        self.seed = 5L
        self.iterations = 10
        self.regularization_parameter = 0.1
        self.__train_model()

### Add new ratings

We need to re-compute the prediction model for every new batch for user ratings.

In [3]:
def add_ratings(self, ratings):
    #Convert new user ratings to RDD
    new_ratings_RDD = self.sc.parallelize(ratings)
    #Add new ratings to the existing ones
    self.ratings_RDD = self.ratings_RDD.union(new_ratings_RDD)
    #Re-compute movie ratings count
    self.__count_and_average_ratings()
    #Re-train the ALS model
    self.__train_model()
    
    return ratings

RecommendationEngine.add_ratings = add_ratings

### Make recommendations

We make recommendations based on the new model and excluding those with less than 25 reviews.

In [8]:
def __predict_ratings(self, user_movie_RDD):
    
    predicted_RDD = self.model.predictAll(user_movie_RDD)
    predicted_rating_RDD = predicted_RDD.map(lambda x: (x.product, x.rating))
    predicted_rating_title_count_RDD = \
        predicted_rating_RDD.join(self.movies_titles_RDD).join(self.movies_rating_counts_RDD)
    predicted_rating_title_count_RDD = \
        predicted_rating_title_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))
    
    return predicted_rating_title_count_RDD

def get_top_ratings(self, user_id, movies_count):
    
    #Get pairs of (userId, movieId) for user_id unrated movies
    user_unrated_movies_RDD = self.ratings_RDD.filter(lambda rating: not rating[1]==user_id).map(lambda x: (user_id, x[1]))
    
    #Get predictions ratings
    ratings = self.__predict_ratings(user_unrated_movies_RDD).filter(lambda r: r[2] >= 25).takeOrdered(movies_count, key = lambda x: -x[1])
    
    return ratings

def get_ratings_for_movie_id(self, user_id, movie_id):
    
    requested_movies_RDD = self.sc.parallelize(movie_id).map(lambda x: (user_id, x))
    ratings = self.__predict_ratings(requested_movies_RDD).collect()
    
    return ratings

RecommendationEngine.__predict_ratings = __predict_ratings
RecommendationEngine.get_top_ratings = get_top_ratings
RecommendationEngine.get_ratings_from_movie_id = get_ratings_from_movie_id

### Build API using Flask

[Flask](http://flask.pocoo.org/) is a web microframework for Python. You can easily start up a web API by just importing in your script and use some annotations to associate your service end-points with Python functions. We will wrap the RecommendationEngine we built earlier around some of these end-points and interchange JSON formatted data with the web client.

In [2]:
from flask import Blueprint
main = Blueprint('main', __name__)

import json
#from engine import RecommendationEngine

import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

from flask import Flask, request

@main.route("/<int:user_id>/ratings/top/<int:count>", methods=["GET"])
def top_ratings(user_id, count):
    logger.debug("User %s Top ratings requested.", user_id)
    top_ratings = recommendation_engine.get_top_ratings(user_id, count)
    return json.dumps(top_ratings)

@main.route("/<int:user_id>/ratings/<int:movie_id>", method=["GET"])
def movie_ratings(user_id, movie_id):
    logger.debug("User %s rating requested for movie %s", user_id, movie_id)
    ratings = recommendation_engine.get_ratings_for_movie_id(user_id, [movie_id])
    return json.dumps(ratings)

@main.route("/<int:user_id>/ratings", method = ["POST"])
def add_ratings(user_id):
    ratings_list = request.form.keys()[0].strip().split("\n")
    ratings_list = map(lambda x: x.split(","), ratings_list)
    ratings = map(lambda x: (user_id, int(x[0]), float(x[1])), ratings_list)
    recommendation_engine.add_ratings(ratings)
    
    return json.dump(ratings)

def create_app(spark_context, dataset_path):
    global recommendation_engine
    
    recommendation_engine = RecommendationEngine(spark_context, dataset_path)
    
    app = Flask(__name__)
    app.register_blueprint(main)
    return app

The application is initialized when calling <font face="chalkboard">create_app</font>. The <font face="chalkboard">RecommendationEngine</font> is created and then we associate the @main.route annotations defined above. Each annotation is defined by:
- A route, that is it's URL and may contain parameters between <>. They are mapped to the function arguments
- A list of HTTP available methods
In the above code, we have:
- <font face="chalkboard"> GET /<user_id>/ratings/top </font> get top recommendations for from the engine
- <font face="chalkboard"> GET /<user_id>/ratings </font> get predicted rating for a particular movie
- <font face="chalkboard"> POST /<user_id>/ratings </font> add new ratings. The format is a series of lines (each ending with the newline separator)
with movie_id and rating separated by commas. e.g.

260,9  
1,8  
16,7  
25,8

### Deploy server using CherryPy

[CherryPy](http://cherrypy.org/) is a pythonic, object-oriented web framework, it allows user to build web application as building any object-oriented Python program. So we can use it to build a reliable, HTTP/1.1-compliant, WSGI thred-pooled webserver, it's also to run multiple HTTP servers at once.

In [None]:
import time, sys, cherrypy, os
from paste.translogger import TransLogger
from app import create_app
from pyspark import SparkContext, SparkConf

def init_spark_context():
    #Load spark context
    conf = SparkConf().setAppName("movie_recommendation-server")
    #pass additional python modules to each worker
    sc = SparkContext(conf = conf, pyFiles=['engine.py', 'app.py'])
    
    return sc

def run_server(app):
    #enable WSGI access logging via Paste
    app_logged = Translogger(app)
    
    #Mount the WSTI callable object(app) on the root directory
    cherrypy.tree.graft(app_logged, '/')
    
    #Set the configuration of the web server
    cherrypy.config.update({
            'engine.autoreload.on': True,
            'log.screen': True,
            'server.socket_port': 5432,
            'server.socket_host': '0.0.0.0'
        })
    
    #Start the CherryPy WSGI web server
    cherrypy.engine.start()
    cherrypy.engine.block()
    
if __name__ == "__main__":
    #Init spark context and load libraries
    try:
        sc.stop()
    except:
        pass
    sc = init_spark_context()
    dataset_path = os.path.join('data', 'ml-latest')
    app = create_app(sc, dataset_path)
    
    #start web server
    run_server(app)

INFO:engine:Starting up the Recommendation Engine: 
INFO:engine:Loading Ratings data...
INFO:engine:Loading Movies data...
INFO:engine:Counting movie ratings...
INFO:engine:Training the ALS model...


We did three things above:
- Create a spark context as defined in the function init_spark_context, passing additional Python modules.
- Create the Flask app calling the create_app we defined in app.py
- Run the server itself

### Run server with Spark

In order to have the server being able to access a Spark context and cluster, we need to submit the server.py file to pySpark by using spark-submit. I use some command like <font face='chalkboard'>~/spark-2.1.0/bin/spark-submit --master local --total-executor-cores 14 --executor-memory 6g server.py</font>. Where:

- We use <font face='chalkboard'>spark-submit</font> not <font face='chalkboard'>pyspark</font> directly
- The <font face='chalkboard'>--master</font> parameter should point to your Spark cluster setup. (Here I use local)
- We pass additional configuration parameters like <font face='chalkboard'>--total-executor-cores</font> and <font face='chalkboard'>--executor-memory</font>

## Try the service
Once the service is running, we can use it to predict top ratings for a given user.

### Post new ratings
The first thing that's needed is to add user ratings to the existing data and train a model. We can use <font face='chalkboard'>curl</font> to post new ratings from shell. Have the ratings formatted in <font face='chalkboard'>user_ratings.txt</font> and then execute the following command:

<font face='chalkboard'>curl --data-binary @user_ratings.file http://<SERVER_IP>:5432/0/ratings</font>

Spark will run some computation as described in our <font face='chalkboard'>add_rating()</font> function in <font face='chalkboard'>engine.py</font>.

### Get top recommendations

Go to <font face='chalkboard'>http://<SERVER_IP>:5432/0/ratings/top/10</font>, you will see top 10 movie recommended for the user.

### Get movie prediction

Go to <font face='chalkboard'>http://<SERVER_IP>:5432/0/ratings/500</font> to get the predicted rating for movie <font face='chalkboard'>Mrs. Doubtfire (1993)</font>.