# Spark-Recommender-IBM-Tutorial

In this notebook, we will walk through the development of a Movie Recommendaiton engine. We will use the ratings dataset from `MovieLens` to train an `ALS model` using `Apache Spark ML` package. We expect this notebook to be run on `IBM Cloud` using `IBM Watson DataScience Offering`.


**In this notebook we will walk through**
- Loading the MovieLen dataset and performing some data exploration tasks
- Building/Training a Matrix Factorization (ALS) model on the user-ratings data using Apache Spark
- Serving recommendations 

## Pre-requisists 

In order to run this notebook successfully you will need to

 - create IBM DataScience Project 
   - create IBM ID from [here](https://dataplatform.cloud.ibm.com/home), if you don't have one already 
   - activate IBM Watson for your created account, if its not already activated
   - create a new `DataScience` project 
   
 - download `MovieLens` dataset and upload it to your project as a `data asset`
    - download the small-dataset from [here](http://files.grouplens.org/datasets/movielens/ml-latest-small.zip) or the full-dataset from [here](http://files.grouplens.org/datasets/movielens/ml-latest.zip)
    
    - unzip the downloaded file
    - upload `ratings.csv`, `movies.csv` and `links.csv` as data assets to your project
   
  - create a new notebook `from url`
    - provide the url for this notebook
    - use the default pyspark environment with `python-3.5` and `spark-2.3`
      - this environment provides an `Anaconda` runtime preloaded with the most popular data science libraries
    
 - create a valid TMDB APIKEY (to show movie posters while serving recommendations). More information on how to create a TMDB APIKEY can be found [here](https://developers.themoviedb.org/3)

### verify python version 

In [None]:
# verify python version
! python --version

### verify spark is running  

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [None]:
# verify spark is running 
spark

## Step 1: Loading The Data & Data Exploration 

You can easily connect your data assets to your notebook by clicking on `find and add data` on the top right corner of your notebook. You only need to select the data asset you want to load, click on the corresponding `insert to code` and finally `insert SparkSession DataFrame` to create a dataframe from your data-asset. This is how we will load our ratings, movies and links datasets into spark DataFrames

### Load Ratings Data

Each row of the `DataFrame` consists of a `userId`, `movieId`, `timestamp` and `rating` given by the user to the movie.

In [None]:
import ibmos2spark

# @hidden_cell
credentials = {
    'endpoint': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
    'api_key': 'Hx-7hqkD_F6DolqjFF8oMU1FtFtHllaXGWuhIV-LRsUx',
    'service_id': 'iam-ServiceId-4dc09431-b807-4651-a98c-cb835538e744',
    'iam_service_endpoint': 'https://iam.ng.bluemix.net/oidc/token'}

configuration_name = 'os_5d2c1a44e4274f0ca732c9cf847c65de_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

bucket_id='recommendationengineswalkthrough-donotdelete-pr-ufctuswrrmk82o'

In [None]:
# load ratings data
# drop timestamp column since its not used in building the model

ratings_df = spark.read\
    .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
    .option('header', 'true')\
    .option('inferSchema', 'true') \
    .load(cos.url('ratings.csv', bucket_id)) \
    .drop('timestamp') \
    .repartition(4) \
    .cache()

ratings_count=ratings_df.count()
print("ratings_count: {}".format(ratings_count))

ratings_df.show(5, truncate=False)
ratings_df.printSchema()

### Ratings Data Exploration [Users]
In the following cells we show 
 - the number of users
 - the minimum number of ratings given by a user
 - the maximum number of ratings given by a user 
 - the average number of ratings given by a user 

In [None]:
from pyspark.sql.functions import col
n_users = ratings_df.select('userId').distinct().count()
min_user_rating_count = ratings_df.groupby('userId').count().sort(col('count').asc()).take(1)[0]['count']
max_user_rating_count = ratings_df.groupby('userId').count().sort(col('count').desc()).take(1)[0]['count']
avg_user_rating_count = ratings_df.groupby('userId').count().groupby().avg('count').take(1)[0]['avg(count)']

print("n_users: {}".format(n_users))
print('min_user_rating_count: {}'.format(min_user_rating_count))
print('max_user_rating_count: {}'.format(max_user_rating_count))
print('avg_user_rating_count: {}'.format(avg_user_rating_count))

using a more efficient way (multiple aggregations)

In [None]:
import pyspark.sql.functions as F

ratings_df.groupby('userId').count() \
    .agg(
        F.count('count').alias('n_users'), 
        F.min('count').alias('min_user_rating_count'), 
        F.max('count').alias('max_user_rating_count'), 
        F.avg('count').alias('avg_user_rating_count')
        ) \
    .show()

using built in describe function in Spark 

In [None]:
print("summary statistics user ratings counts")
ratings_df.groupBy("userId").count().select('count').describe().show()

### Ratings Data Exploration [Movies]
In the following cells we show 
 - the number of movies
 - the minimum number of ratings given to a movie
 - the maximum number of ratings given to a movie
 - the average number of ratings given to a movie

using built in `describe` function in Spark

In [None]:
print("summary statistics movies ratings counts")
ratings_df.groupBy("movieId").count().select('count').describe().show()

### Ratings Data Exploration [Ratings]
In the following cells we show/plot the distribution of ratings

In [None]:
from pyspark.sql.functions import asc
ratings_df.groupby('rating').count().sort(asc('rating')).show()

In [None]:
# creating a sorted list of ratings counts (to use in plots)
x = ratings_df.groupBy("rating").count().collect()

# sort in place based on ratings (i.e. from 0.5 -- 5)
x.sort(key=lambda x: x['rating'], reverse=False)
sorted_ratings_counts_list = list(map(lambda x: x['count'], x))
print("sorted_ratings_counts_list: {}".format(sorted_ratings_counts_list))

In [None]:
import numpy as np
import matplotlib.pyplot as plt

n_groups = 10
fig, ax = plt.subplots()
index = np.arange(n_groups)
x_labels=list(map(lambda x: str(x), list(np.arange(0.5,5.5,0.5))))
bar_width = 0.5
opacity = 0.4

rects1 = plt.bar(index, sorted_ratings_counts_list, bar_width, alpha=opacity, color='b', label='ratings')
plt.xlabel('ratings')
plt.ylabel('Counts')

plt.title('Distribution of ratings')

plt.xticks(index + bar_width, x_labels)
plt.legend()
plt.tight_layout()
plt.show()

### Ratings Data Exploration [Other]


In [None]:
ratings_df.stat.crosstab("userId", "rating").show(5)
ratings_df.stat.crosstab("movieId", "rating").show(5)

### Collect User Top Ratings at Driver
In the following cells we will collect users TOP ratings at SparkDriver. This step is only performed for the purpose of this tutorial, as the users top ratings will be displayed later with recommendations, and is not required in a typical recommenaditon engine.

In [None]:
from pyspark.sql.functions import struct
user_ratings_df = ratings_df.select('userId', struct('movieId', 'rating').alias('movieAndRating'))
user_ratings_df.show(5, truncate=False)
user_ratings_df.printSchema()

In [None]:
from pyspark.sql.functions import collect_list

user_ratings_df = user_ratings_df \
    .groupby('userId') \
    .agg(collect_list('movieAndRating').alias('userRatings'))

user_ratings_df.show(5)
user_ratings_df.printSchema()

In [None]:
from pyspark.sql.functions import udf

user_ratings_schema = user_ratings_df.select('userRatings').schema.fields[0].dataType
print("user_ratings_schema: {}".format(user_ratings_schema))

def sort_and_limit(lst, n=5, desc=True):
    return sorted(lst, key=lambda x: x[1], reverse=desc)[0:n]
sort_and_limit_udf = udf(sort_and_limit, user_ratings_schema)

l=[[1,2],[3,4],[3,2],[0,1], [13,2],[10,1],[1,10],[3,3]]
sort_and_limit(l)

In [None]:
user_ratings_df = user_ratings_df.select('userId', sort_and_limit_udf('userRatings').alias('userRatings'))

user_ratings_df.show(5, truncate=False)
user_ratings_df.printSchema()

In [None]:
user_ratings = user_ratings_df.rdd.map(lambda r: (r['userId'], r['userRatings'])).collectAsMap()
len(user_ratings)

In [None]:
uid=240
user_ratings[240]

### Load Movies Data

The file `movies.csv` contains the `movieId`, `title` and `genres` for each movie. As you can see, the `genres` field is a bit tricky to use, as the genres are in the form of one string delimited by the `|` character: `Adventure|Animation|Children|Comedy|Fantasy`. Additionally the movie `title` string contains the `release year`, which is a separate field we can extract.

In [None]:
# load raw data from CSV
movies_df = spark.read\
    .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
    .option('header', 'true')\
    .option('inferSchema', 'true') \
    .load(cos.url('movies.csv', bucket_id)) \
    .repartition(2)

movies_df.show(5, truncate=False)

### Movies Data Transformation 1

Create a user-defined function (UDF) to extract this delimited string into a list of genres

In [None]:
from pyspark.sql.types import ArrayType, StringType
# define a UDF to convert the raw genres string to an array of genres and lowercase
def extract_genres(genres):
    return genres.lower().split("|")
extract_genres_udf = udf(extract_genres , ArrayType(StringType()))
extract_genres("Adventure|Animation|Children|Comedy|Fantasy")

Create a UDF to extract the release year from the title using a Python regular expression.

In [None]:
from pyspark.sql.types import StructType, StructField
import re

# define a UDF to extract the release year from the title, and return the new title and year in a struct type
def extract_year(title):
    result = re.search("\(\d{4}\)", title)
    try:
        if result:
            group = result.group()
            year = group[1:-1]
            start_pos = result.start()
            title = title[:start_pos-1]
            return (title, year)
        else:
            return (title, 1970)
    except:
        return (title, 1970)

extract_year_udf = udf(extract_year,\
                   StructType([StructField("title", StringType(), True),\
                               StructField("release_date", StringType(), True)]))
    

extract_year("MovieExample (2010)")

APPLY!

In [None]:
movies_df = movies_df \
    .select("movieId", 
            extract_year_udf("title").title.alias("title"), 
            extract_year_udf("title").release_date.alias("release_date"),
            extract_genres_udf("genres").alias("genres"))

n_movies = movies_df.count()
print("n_movies: {}".format(n_movies))
movies_df.show(5, truncate=False)

### Movies Data Transformation 2

Next, we join the `links.csv` data to `movies` so that there is an id for _The Movie Database_ corresponding to each movie. We will use this id to retrieve movie poster images when displaying recommendations later.

In [None]:
link_data = spark.read\
    .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
    .option('header', 'true')\
    .option('inferSchema', 'true') \
    .load(cos.url('links.csv', bucket_id)) \
    .repartition(2)

link_data.show(5)

In [None]:
# join movies with links to get TMDB id
movies_df = movies_df.join(link_data, on='movieId')\
    .select(movies_df.columns + ['tmdbId'])

n_movies = movies_df.count()
print("n_movies: {}".format(n_movies))
movies_df.show(5, truncate=False)

### Collect Transformed Movies Data at Driver
We will maintain a mapping from Key (movieId) to a Dict (movieInfo). This step is only performed for the purpose of this tutorial; typically movies information is stored in some Database, and is external to the recommendaiton application (we only need the ratings data from collaborative filtering)

In [None]:
movies_map_df = movies_df.select('movieId', struct(movies_df.columns).alias('movieInfo'))
movies_map_df.show(5, truncate=False)
movies_map_df.printSchema()

In [None]:
movies_info = movies_map_df.rdd.map(lambda r: (r['movieId'], r['movieInfo'])).collectAsMap()
len(movies_info)

In [None]:
# Quering a movie from redis
movie_id=10
print(movies_info[movie_id])
print()
print(movies_info[movie_id]['title'])
print(movies_info[movie_id]['tmdbId'])

### Summary 

From the previous steps we now have
 - ratings_df: spark dataframe that will be used in training the ALS Matrix Factorization Model 
 - user_ratings: mapping from userId to list of top 5 ratings done by the user (sorted by rating in desc order)
 - movies_info: mapping from movieId to movieInformation

In [None]:
print("Ratings DataFrame")
ratings_df.show(5, truncate=False)
print()
print("Query Movie details about movieId: {}".format(movie_id))
print(movies_info[movie_id])
print()
print("Query top User ratings for userId: {}".format(uid))
print(user_ratings[uid])

## Step 2: Train a recommmender model on the ratings data

In the following blocks we will use the ratings data to build a collaborative filtering recommendation model using Apache Spark's implementation of ALS. 

[Collaborative filtering](https://en.wikipedia.org/wiki/Collaborative_filtering) is a recommendation approach that is effectively based on the "wisdom of the crowd". It makes the assumption that, if two people share similar preferences, then the things that one of them prefers could be good recommendations to make to the other. In other words, if user A tends to like certain movies, and user B shares some of these preferences with user A, then the movies that user A likes, that user B _has not yet seen_, may well be movies that user B will also like.

In a similar manner, we can think about _items_ as being similar if they tend to be rated highly by the same people, on average. 

Hence these models are based on the combined, collaborative preferences and behavior of all users in aggregate. They tend to be very effective in practice (provided you have enough preference data to train the model). The ratings data we have is a form of _explicit preference data_, perfect for training collaborative filtering models.


### Alternating Least Squares
Alternating Least Squares (ALS) is a specific algorithm for solving a type of collaborative filtering model known as [matrix factorization (MF)](https://en.wikipedia.org/wiki/Matrix_decomposition). The core idea of MF is to represent the ratings as a _user-item ratings matrix_. The entries in this matrix are the ratings given by users to movies. Typically, that _user-item ratings matrix_ has _missing entries_ because not all users have rated all movies. In this situation we refer to the data as _sparse_.


![als-diagram.png](https://github.com/ThinkBigEg/movie-recommendation-tutorial/blob/master/als-diagram.png?raw=true)

MF methods aim to find two much smaller matrices (one representing the _users_ and the other the _items_) that, when multiplied together, re-construct the original ratings matrix as closely as possible. This is know as _factorizing_ the original matrix, hence the name of the technique.

The two smaller matrices are called _factor matrices_ (or _latent features_). The user and movie factor matrices are illustrated on the right in the diagram above. The idea is that each user factor vector is a compressed representation of the user's preferences and behavior. Likewise, each item factor vector is a compressed representation of the item. Once the model is trained, the factor vectors can be used to make recommendations, which is what you will do in the following sections.

__Further reading:__
* [Spark MLlib Collaborative Filtering](http://spark.apache.org/docs/latest/ml-collaborative-filtering.html)
* [Alternating Least Squares and collaborative filtering](https://datasciencemadesimpler.wordpress.com/tag/alternating-least-squares/)
* [Quora question on Alternating Least Squares](https://www.quora.com/What-is-the-Alternating-Least-Squares-method-in-recommendation-systems-And-why-does-this-algorithm-work-intuition-behind-this)

Fortunately, Spark's MLlib machine learning library has a scalable, efficient implementation of matrix factorization built in, which we can use to train our recommendation model.

**Spark's ALS takes the following inputs:**
 - numBlocks: the number of blocks the users and items will be partitioned into in order to parallelize computation (defaults to 10)
 - rank: the number of latent factors in the model (defaults to 10)
 - maxIter: the maximum number of iterations to run (defaults to 10)
 - regParam: the regularization parameter in ALS - used to prevent overfitting (defaults to 0.1)
 - implicitPrefs: specifies whether to use the explicit feedback ALS variant or the one adapted for implicit feedback data (defaults to false, which means it's using explicit feedback)
   - alpha: This is a parameter applicable to the implicit feedback variant of ALS, which governs the baseline confidence in preference observations (defaults to 1.0)
 - nonnegative: This parameter specifies whether or not to use non-negative constraints for least squares (defaults to false)

In [None]:
training_df,test_df = ratings_df.randomSplit([0.8, 0.2])

In [None]:
from pyspark.ml.recommendation import ALS

rank = 20  # number of latent factors
maxIter = 10
regParam=0.01  # prevent overfitting

als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", 
          regParam=regParam, 
          rank=rank, 
          maxIter=maxIter,
          coldStartStrategy="drop",
          seed=12)

model = als.fit(training_df)

In [None]:
test_df.show(5, truncate=False)
predictions_df = model.transform(test_df)
predictions_df.show(5, truncate=False)

**Model Evaluation**

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
error = evaluator.evaluate(predictions_df)

error

### ML Tuning: model selection and hyperparameter tuning

An important task in ML is `model selection`, or using data to find the best model or parameters for a given task. This is also called tuning. Tuning may be done for individual Estimators such as LogisticRegression, or for entire Pipelines which include multiple algorithms, featurization, and other steps. Users can tune an entire Pipeline at once, rather than tuning each element in the Pipeline separately.

MLlib supports model selection using tools such as `CrossValidator` and `TrainValidationSplit`. From which we are using the `CrossValidator`. These tools require the following items:
 - Estimator: algorithm or Pipeline to tune
 - Set of ParamMaps: parameters to choose from, sometimes called a “parameter grid” to search over
 - Evaluator: metric to measure how well a fitted Model does on held-out test data

At a high level, these model selection tools work as follows:
 - They split the input data into separate training and test datasets.
 - For each (training, test) pair, they iterate through the set of ParamMaps:
   - For each ParamMap, they fit the Estimator using those parameters, get the fitted Model, and evaluate the Model’s performance using the Evaluator.
 - They select the Model produced by the best-performing set of parameters.
 
The Evaluator can be a RegressionEvaluator for regression problems, a BinaryClassificationEvaluator for binary data, or a MulticlassClassificationEvaluator for multiclass problems. The default metric used to choose the best ParamMap can be overridden by the setMetricName method in each of these evaluators.

To help construct the parameter grid, we are using the ParamGridBuilder utility.

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop")

# creating parameter grid (parameters to be tuned)
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 15, 20],) \
    .addGrid(als.maxIter, [10, 15],) \
    .addGrid(als.regParam, [0.01, 0.1, 0.2, 0.3],) \
    .build()

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating")
crossval = CrossValidator(estimator=als, 
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=3)

cv_model = crossval.fit(training_df)
best_model=cv_model.bestModel

In [None]:
best_rank=best_model.rank
best_regParm=best_model._java_obj.parent().getRegParam()
best_iterations=best_model._java_obj.parent().getMaxIter()

print("best_rank: {}".format(best_rank))
print("best_regParm: {}".format(best_regParm))
print("best_iterations: {}".format(best_iterations))

In [None]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

predictions_df = best_model.transform(test_df)
predictions_df.show(5, truncate=False)


error = evaluator.evaluate(predictions_df)
error

### Using the Model
- userFactors
- itemFactors
- recommendForAllUsers
- recommendForAllItems
- recommendForUserSubset
- recommendForItemSubset

In [None]:
best_model.userFactors.show(5)
best_model.itemFactors.show(5)

## Step 3: Persisting the model

For the purpose of this tutorial we are just collecting the users top 5 recommendaitons at the driver.
First we will generate the users top 5 recommendations from the ALS model

In [None]:
n_recs=5
users_recommendations_df = best_model.recommendForAllUsers(n_recs)

users_recommendations_df.show(5, truncate=False)
users_recommendations_df.printSchema()

In [None]:
users_recommendations = users_recommendations_df.rdd.map(lambda r: (r['userId'], r['recommendations'])).collectAsMap()
len(users_recommendations)

In [None]:
uid=240
users_recommendations[uid]

## Step 4: Serving Recommendations

* Given a user, find the movies with the highest predicted rating
* Display the results as an HTML table in Jupyter

In [None]:
def get_user_ratings(user_map, uid):
    u_ratings = user_map[uid]
    res = []
    for r in u_ratings:
        movieId = r['movieId']
        rating = r['rating']
        movie_info = movies_info[movieId].asDict()
        movie_info['rating'] = rating
        res.append(movie_info)
    return res

    

print(get_user_ratings(user_ratings, 240))
print()
print(get_user_ratings(users_recommendations, 240))

**Fetching movie posters from TMdb API**

In [None]:
! pip install tmdbsimple

In [None]:
from IPython.display import Image, HTML, display
import tmdbsimple as tmdb
from tmdbsimple import APIKeyError
# TMdb API key
tmdb.API_KEY = '4b577aa255b7f9215deb1ed3933b0d7c'

def get_poster_url(tmdbid):
    """Fetch movie poster image URL from TMDb API given a tmdbId"""
    IMAGE_URL = 'https://image.tmdb.org/t/p/w500'  # base URL for TMDB poster images
    try:
        try:
            movie = tmdb.Movies(tmdbid).info()
            poster_url = IMAGE_URL + movie['poster_path'] if 'poster_path' in movie and movie['poster_path'] is not None else ""
            return poster_url
        except APIKeyError as ae:
            return "KEY_ERR"
    except Exception as me:
        return "NA"

movie_id=2
movie_info=movies_info[movie_id]

try:
    print("movie title: {}".format(movie_info['title']))
    movie_poster_url =  get_poster_url(movie_info['tmdbId'])
    print("movie_poster_url: {}".format(movie_poster_url))
    display(Image(movie_poster_url, width=200))
except Exception as e:
    print(e)
    print("Cannot import tmdbsimple, no movie posters will be displayed!")

In [None]:
def display_user_recs(uid):
    # utility function that constructs an HTML of ratings
    # ---------------------------------------------------
    def construct_recommendations_html(ratings):
        html = "<table border=0>"
        i = 0
        for movie in ratings:
            movie_im_url = get_poster_url(movie['tmdbId'])
            movie_title = movie['title']
            movie_rating = movie['rating']
            #
            html += "<td><h5>{}</h5><img src={} width=150></img></td><td><h5>{:.2f}</h5></td>".format(movie_title, movie_im_url, movie_rating)

            i += 1
            if i % 5 == 0:
                html += "</tr><tr>"
        html += "</tr></table>"
        return html
    
    
    # display the movies that this user has rated highly 
    # -------------------------------------------------
    top_rated = get_user_ratings(user_ratings, uid)  # List [Dict (movieId, tmdbId, rating, title)]
    display(HTML("<h2>Top Ratings By User: {}</h2>".format(uid)))
    display(HTML("<h4>The user has rated the following movies highly:</h4>"))
    top_rated_html = construct_recommendations_html(top_rated)
    display(HTML(top_rated_html))
    
    
    # display the movies that this user has the highest score
    # -------------------------------------------------    
    recommendations = get_user_ratings(users_recommendations, uid)
    display(HTML("<br/>"))
    display(HTML("<h2>Top Recommendations For User: {}</h2>".format(uid)))
    display(HTML("<h4>The user is recommended these movies:</h4>"))
    recommendations_html = construct_recommendations_html(recommendations)
    display(HTML(recommendations_html))

### Serving Recommendations

Now, we're ready to generate movie recommendations, personalized for specific users.
Given a user, we can recommend movies to that user based on the predicted ratings from our model. Recall that the collaborative filtering model means that, at a high level, we will recommend movies _liked by other users who liked the same movies as the given user_.

In [None]:
display_user_recs(12)

Note that since we are using a very small dataset, the results may not be too good.