# Movie recommender system with Spark machine learning

In this Jupyter notebook, you will use Apache Spark and the Spark machine learning library to build a recommender system for movies with a data set from MovieLens. 

## MovieLens

MovieLens is a project developed by GroupLens, a research laboratory at the University of Minnesota. MovieLens provides an online movie recommender application that uses anonymously-collected data to improve recommender algorithms. Anyone can try the app for free and get movies recommendations. To help people develop the best recommendation algorithms, MovieLens also released several data sets. In this notebook, you'll use the latest data set, which has two sizes.

The full data set consists of more than 24 million ratings across more than 40,000 movies by more than 250,000 users. The file size is kept under 1GB by using indexes instead of full string names.

The small data set that you'll use in this notebook is a subset of the full data set. It's generally a good idea to start building a working program with a small data set to get faster performance while interacting, exploring, and getting errors with your data. When you have a fully working program, you can apply the same code to the larger data set, possibly on a larger cluster of processors. You can also minimize memory consumption by limiting the data volume as much as possible, for example, by using indexes.

## Spark machine learning library
Apache Spark’s machine learning library makes practical machine learning scalable and easy. The library consists of common machine learning algorithms and utilities, including classification, regression, clustering, collaborative filtering (this notebook!), dimensionality reduction, lower-level optimization primitives, and higher-level pipeline APIs.

The library has two packages:

spark.mllib contains the original API that handles data in RDDs. It's in maintenance mode, but fully supported.
spark.ml contains a newer API for constructing ML pipelines. It handles data in DataFrames. It's being actively enhanced.
You'll use the spark.ml package in this notebook.

## Table of contents
* 1 Load the data   
* 2 Explore the data with Spark APIs
* 3 Build the recommender system
* 4 Save results
* 5 Distribute results

![](https://github.com/hatv/dsx_SparkLessons/blob/master/machine-learning.png?raw=true)

## 1. Load the data

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Movie Recommender').getOrCreate()

In [None]:
movies = spark.read.option("header", "true").option("inferSchema", "true").csv("Data/movies.csv").cache()

In [None]:
ratings = spark.read.option("header", "true").option("inferSchema", "true").csv("Data/ratings.csv").cache()

## 2. Explore the data with Spark APIs

You'll use the Spark DataFrame API and SparkSQL to look at the data. The Spark DataFrame API and SparkSQL are high level APIs to query and transform Spark DataFrames. See DataFrame documentation for a detailed description of the API.


In [None]:
movies.printSchema()
ratings.printSchema()

In [None]:
movies.show(truncate=False)

In [None]:
ratings.show(truncate=False)

Run the describe() method to see the count, mean, standard deviation, minimum, and maximum values for the data in each column:

In [None]:
ratings.describe().show()

Not all of these statistics are actually meaningful!

You can use specific methods from the DataFrame API to compute any statistic:



In [None]:
print("Number of different users: " + str(ratings.select('userId').distinct().count()))
print("Number of different movies: " + str(ratings.select('movieId').distinct().count()))
print("Number of movies with at least one rating strictly higher than 4: " + str(ratings.filter('rating > 4').select('movieId').distinct().count()))

You can also leverage your SQL knowledge to query the data. Spark version 2.0 is ANSI SQL-92 compliant and can run the 99 TPC-DS queries.

Find the number of movies with ratings higher than 4 again, this time with SQL:

In [None]:
ratings.createOrReplaceTempView('ratings')
spark.sql("SELECT COUNT(DISTINCT(movieId)) AS nb FROM ratings WHERE rating > 4").show()

You can easily switch between Spark distributed DataFrames and pandas local DataFrames.

In [None]:
import pandas as pd

ratings.toPandas().head()

## 3. Build the recommender system


There are different methods for building a recommender system, such as, user-based, content-based, or collaborative filtering. Collaborative filtering calculates recommendations based on similarities between users and products. For example, collaborative filtering assumes that users who give the similar ratings on the same movies will also have similar opinions on movies that they haven't seen.

The alternating least squares (ALS) algorithm provides collaborative filtering between users and products to find products that the customers might like, based on their previous ratings.

In this case, the ALS algorithm will create a matrix of all users versus all movies. Most cells in the matrix will be empty. An empty cell means the user hasn't reviewed the movie yet. The ALS algorithm will fill in the probable (predicted) ratings, based on similarities between user ratings. The algorithm uses the least squares computation to minimize the estimation errors, and alternates between solving for movie factors and solving for user factors.

Check the size of the ratings matrix:


In [None]:
spark.sql("""
    SELECT *, (100 - (100 * nb_ratings/matrix_size)) AS sparsity
    FROM (
        SELECT nb_users, nb_movies, nb_ratings, nb_users * nb_movies AS matrix_size
        FROM (
            SELECT COUNT(*) AS nb_ratings, COUNT(DISTINCT(movieId)) AS nb_movies, COUNT(DISTINCT(userId)) AS nb_users
            FROM ratings
        )
    )
""").show()

**Less than 2% of the matrix is filled!**

5.4 Split the data set
Split your ratings data set between an 80% training data set and a 20% test data set. Then rerun the steps to train the model on the training set, run it on the test set, and evaluate the performance.

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [None]:
trainingRatings, testRatings = ratings.randomSplit([80.0, 20.0])

In [None]:

als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(trainingRatings)
predictions = model.transform(testRatings)

In [None]:
predictions.show()

In [None]:
avgRatings = ratings.select('rating').groupBy().avg().first()[0]
print("The average rating in the dataset is: " + str(avgRatings))

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
print("The root mean squared error (RMSE) for our model is: " + str(evaluator.evaluate(predictions.na.fill(avgRatings))))


Obviously, you get lower performance than with the previous model, but you're protected against overfitting: you will actually get this level of performance on new incoming data!

### 4.6 Recommend movies

To recommend movies for a specific user, create a function that applies the trained model, ALSModel, on the list of movies that the user hasn't yet rated.

Create a recommendMovies function:

In [None]:
from pyspark.sql.functions import lit

def recommendMovies(model, user, nbRecommendations):
    # Create a Spark DataFrame with the specified user and all the movies listed in the ratings DataFrame
    dataSet = ratings.select("movieId").distinct().withColumn("userId", lit(user))

    # Create a Spark DataFrame with the movies that have already been rated by this user
    moviesAlreadyRated = ratings.filter(ratings.userId == user).select("movieId", "userId")

    # Apply the recommender system to the data set without the already rated movies to predict ratings
    predictions = model.transform(dataSet.subtract(moviesAlreadyRated)).dropna().orderBy("prediction", ascending=False).limit(nbRecommendations).select("movieId", "prediction")

    # Join with the movies DataFrame to get the movies titles and genres
    recommendations = predictions.join(movies, predictions.movieId == movies.movieId).select(predictions.movieId, movies.title, movies.genres, predictions.prediction)

    recommendations.show(truncate=False)
    return recommendations


Now run this function to recommend 10 movies for three different users:

In [None]:
print("Recommendations for user 133:")
recommendMovies(model, 133, 10)
print("Recommendations for user 471:")
recommendMovies(model, 471, 10)
print("Recommendations for user 496:")
recommendMovies(model, 496, 10)

## 4. Save Results

When using the model later in realtime, we need to save the results. We can score the model in realtime in two ways:
    1. Save the model and score it in realtime
    2. Save the table with predictions, so you can lookup the user (item) in realtime

In [None]:
model.write().overwrite().save("model")

In [None]:
predictions.write.mode("overwrite").save("Data/predictions")

In [None]:
predictions.toPandas().to_csv("Data/predictions.csv")

## 5. Distribute Results

Next we build a flask API that has two functions or endpoints:
- Giving top recommendations for a user: /ratings/top. By calling (POST) this endpoint with a userId en optionally a count in the body, the top recommended items together with the prediction scores are returned.
- Compute the score of a user-item pair: /ratings/calculateScore. By calling (POST) this endpoint with a list of userId and movieId in the body, the prediction for this user-item pair is returned.

In [None]:
#%%bash
#pip install Flask

In [None]:
from flask import Flask, request, jsonify

In [None]:
## Initialize app
app = Flask(__name__)

## Endpoint 1: Top ratings for a user
@app.route("/ratings/top", methods=["GET"])
def top_ratings():

    ## read the parameters of the API call
    userId_str = request.args.get("userId")
    try:
        userId = int(userId_str)
    except:
        return "'userId' is required and should be an Integer."
        sys.exit("'userId' is required and should be an Integer.")
        
    try:
        count = int(count_str)
    except:
        count = 5
    
    # Recommend
    recommendations = recommendMovies(model, userId, count)
    
    # Transform to Python dictionary
    recommendations.collect()
    top_ratings = list(map(lambda a: a.asDict(), recommendations.collect()))
    
    # Return the result to the API
    return jsonify(top_ratings)

## Endpoint 2: calculate scores for a user-movie combination
@app.route("/ratings/calculateScore", methods=["GET"])
def newScore():
    ## read the parameters of the API call
    userId_str = request.args.get("userId")
    try:
        userId = int(userId_str)
    except:
        return "'userId' is required and should be an Integer."
        sys.exit("'userId' is required and should be an Integer.")
        
    ## read the parameters of the API call
    movieId_str = request.args.get("movieId")
    try:
        movieId = int(movieId_str)
    except:
        return "'userId' is required and should be an Integer."
        sys.exit("'userId' is required and should be an Integer.")
    
    # Create a Spark dataframe based in the scores list
    ratings = spark.createDataFrame(content) 

    # Predict estimated ratings for the user
    res = model.transform(ratings)
    
    # Collect results in a list
    newPredictions = list(res.toPandas().T.to_dict().values())
    
    # Return the result to the API
    return jsonify(newPredictions) 

if __name__ == '__main__':
    app.run(host='localhost', port=6000)