# Machine Learning with Spark

### This is based on a public lesson from Databricks [available here](https://github.com/databricks/spark-training/tree/master/machine-learning)

Have you ever wondered how Netflix knows what movies you will like, based just on your ratings of other movies? This is a machine learning problem called Collaborative Filtering. Spark comes with a pretty good algorithm for solving the collaborative filtering problem called Alternating Least Squares. We'll build a model of a real movie ratings data set, and then see what your predictions are for various movies!

Here are some more details:
 - The Movielens data set comes from the University of Minnesota CS department -- it was one of the first consumer recommender systems in the early 2000s. It is still online [here](http://movielens.org).
 - We won't be doing any serious cross-validation of the model, although Spark does make that relatively easy'
 - The SparkML guide is [here](http://spark.apache.org/docs/latest/ml-guide.html). The Spark Collaborative Filtering guide (very relevant) is [here](http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html)
 - An article on the ALS learning algorithm is [here](http://bugra.github.io/work/notes/2014-04-19/alternating-least-squares-method-for-collaborative-filtering/). The original research paper is [here](http://dl.acm.org/citation.cfm?id=1608614).

In [None]:
import pyspark
import pyspark.sql
import pandas, pandas.tools.plotting
import matplotlib.pyplot as plt
from pyspark.sql.functions import *

from IPython.display import display, HTML

try: sc = pyspark.SparkContext('local[*]')
except ValueError: pass
spark = pyspark.sql.SparkSession(sc)

# Useful function for displaying a DataFrame in a nice-looking way
def show(df):
   display(HTML(
    '<table><tr><th>{}</th></tr><tr>{}</tr></table>'.format(
        '</th><th>'.join(str(_) for _ in df.columns),
        '</tr><tr>'.join(
            '<td>{}</td>'.format('</td><td>'.join(str(_) for _ in row)) for row in df.take(50))
        )
     ))

In [None]:
# Functions for loading and parsing the Movielens data set

# To save time training, limit the number of ratings used
LIMIT=100000

# /data/movie-ratings.dat is in this format: 
# userid::movieId::rating (1-5 scale)::timestamp (we ignore this)
# 1::1193::5::978300760
# 1::661::3::978302109
# 1::914::3::978301968
# 1::3408::4::978300275
# 1::2355::5::978824291

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
def parseRating(line):
    # Parse a user-movie-rating triple from /data/movie-ratings.dat
    fields = line.strip().split("::")
    return (int(fields[0]), int(fields[1]), float(fields[2]))
    
def parseMovie(line):
    # Parse a movie ID and title from /data/movies.dat
    fields = line.strip().split("::")          
    return int(fields[0]), fields[1] 
    
def loadRatings():
    # load and parse the entire movie-ratings file
    f = open('../data/movie-ratings.dat', 'r').readlines()
    ratings = [parseRating(l) for l in f]
    # Skip any ratings of 0 (these are bad data)
    ratings = [Rating(l[0], l[1], l[2]) for l in ratings if l[2] > 0][:LIMIT]
    return ratings

# Preload all the movie names into a global dict for easy lookup

movieNamesDict = {}
for movie in open('../data/movies.dat', encoding = "ISO-8859-1").readlines():
    parsed = parseMovie(movie)
    movieNamesDict[parsed[0]] = parsed[1]

def getNameOfMovie(movie):
    if movie in movieNamesDict: return movieNamesDict[movie]
    else: return 'Unknown'
    

In [None]:
rank = 20
numIterations = 15
ratingsRdd = sc.parallelize(loadRatings())
model = ALS.train(ratingsRdd, rank, numIterations, 0.01)

# Compute the Mean Squared Error so we know how good our model is
testdata = ratingsRdd.map(lambda p: (p[0], p[1]))
# Try to reconstruct all the ratings from the model
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
# Join them with the real ratings
ratesAndPreds = ratingsRdd.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
# Compute the MSE between the real rating and the predicted rating
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))

Now, let's rate some movies (as if we were choosing star ratings using the Netflix interface)

In [None]:
myRatings = [
(1, 1),     # Toy Story
(780, 1000),   # Independence Day
(590, 1),   # Dances with Wolves
(648, 1000),   # Mission: Impossible
(344, 1),   # Ace Ventura: Pet Detective
(165, 1000),   # Die Hard With a Vengeance
(153, 1),   # Batman Forever
(597, 1),   # Pretty Woman
(1580, 1),  # Men in Black
(231, 1)   # Dumb and Dumber
]

We add our ratings under user ID 0, and re-train the model. Then find the top-scoring movies from the model. 

In [None]:

# Each of our ratings will have user ID 0
myRatingsRdd = sc.parallelize([Rating(0, p[0], p[1]) for p in myRatings])

# Train the model with both the original ratings, and the new ratings for "user 0"
model = ALS.train(ratingsRdd.union(myRatingsRdd), rank, numIterations)

# The "candidates rows" will have user ID 0, real movie IDs, and no rating
candidates = ratingsRdd.map(lambda rating: [0, rating[1]])

# We ask the algorithm to fill in a rating for each candidate
predictions = model.predictAll(candidates)

# Sort them so the highest predictions are at the top
topPredictions = predictions.distinct().map(lambda r: (r[2], r[1])).sortByKey(ascending=False)

# Get the name of each movie in topPredictions
topPredictionsWithNames = topPredictions.map(lambda x: (x[0], x[1], getNameOfMovie(x[1])))

topPredictionsDF = topPredictionsWithNames.toDF(["Predicted Rating","Movie ID","Movie Title"])
show(topPredictionsDF)

# Questions
1. One problem with this program is that the movies that you initially rated always show up in the output as well. Can you remove them?
2. The Iterations and Rank parameters have a big effect on the quality of the model. Can you add a loop to optimize these?