##Movies Collaborative filtering using Spark

This example shows how to make collaborative filtering using Spark engine. In this notebook we use open MovieLens dataset (https://grouplens.org/datasets/movielens/).
First we need to download the dataset, unzip it and bring into the cluster storage.

In [2]:
%sh

export DATASET_NAME=ml-1m

rm -rf /tmp/movie-lens
wget -O /tmp/movie-lens.zip http://files.grouplens.org/datasets/movielens/${DATASET_NAME}.zip
unzip /tmp/movie-lens.zip -d /tmp/
rm -f /tmp/movie-lens.zip
mv /tmp/${DATASET_NAME} /tmp/movie-lens

In [3]:
%sh
# Fix the delimiter from :: to ,
sed -i 's/::/,/g' /tmp/movie-lens/ratings.dat
sed -i 's/::/,/g' /tmp/movie-lens/movies.dat

Now the data lives on the driver node. We need to move it into the cluster storage. We execute the node on Databricks platform and hence move our files into the `dbfs`.

In [5]:
%fs rm -r dbfs:/FileStore/movie-lens

In [6]:
%fs mv file:/tmp/movie-lens/ratings.dat dbfs:/FileStore/movie-lens/ratings.csv

In [7]:
%fs mv file:/tmp/movie-lens/movies.dat dbfs:/FileStore/movie-lens/movies.csv

In [8]:
%sh
rm -rf /tmp/movie-lens*

In [9]:
%fs ls dbfs:/FileStore/movie-lens

path,name,size
dbfs:/FileStore/movie-lens/movies.csv,movies.csv,163542
dbfs:/FileStore/movie-lens/ratings.csv,ratings.csv,21593504


###Data preparation
Let's now read the source csv files and create dataframes out of them

In [11]:
csv_files = {"ratings": "dbfs:/FileStore/movie-lens/ratings.csv", "movies": "dbfs:/FileStore/movie-lens/movies.csv"}

sqlContext.clearCache()
ratings = spark.read.csv(csv_files["ratings"], header="false", inferSchema="true").toDF("userId", "movieId", "rating", "timestamp")
movies = spark.read.csv(csv_files["movies"], header="false", inferSchema="true").toDF("movieId", "title", "genres")

ratings.cache()
movies.cache()

print("Ratings:")
ratings.show(n=3)
print("Movies:")
movies.show(n=3)

Let's identify all user ids. There are thousands of user ids. Each Id is a number, so the total size of the array is not too big and it's possible to bring it to the driver node.
We create a "userId" to "index" mapping dictionary that we can later use to simplify vector creation. Each user becomes a unique index. Index is a value from 0 to the (number of users - 1). This allows us later to create a feature vector for each movie, where a user rate is set to it's "index" position.

In [13]:
user_ids = ratings.select(ratings.userId).distinct().collect()

num_users = len(user_ids)
user_id_index_mapping = {}

for index in range(num_users):
  user_id_index_mapping[user_ids[index][0]] = index 

print("Number unique users %d" % num_users)

mapping_iter = iter(user_id_index_mapping.items())
print("First 3 mappings: {} {} {}".format(str(next(mapping_iter)), str(next(mapping_iter)), str(next(mapping_iter))))

Now we need to map all user rates to a corresponding movie. We store them into an array [(vector_index, user_rate), (vector_index, user_rate), ....]. Vector indexes are taken from a dictionary calculated on a previous step. In order to efficiently share the dictionary with all the worker nodes we create a broadcast variable out of it.

In [15]:
def make_movie_vector(movie_ratings):
  movie_id = movie_ratings[0]
  user_rates = []
  
  for user_id_rating in movie_ratings[1]:
    vector_index = user_id_index_mapping[user_id_rating[0]]
    user_rates.append((vector_index, user_id_rating[1]))
    
  return (movie_id, user_rates)

user_id_mapping_broadcast = sc.broadcast(user_id_index_mapping)
movie_rating_vectors = ratings.rdd.map(lambda row: (row.movieId, (row.userId, row.rating))).groupByKey().map(make_movie_vector).toDF(["movieId", "userRates"])

Let's also bring movie titles and genres into a result table.

In [17]:
movie_ratings = movie_rating_vectors.join(movies, movies.movieId == movie_rating_vectors.movieId).select(movie_rating_vectors.movieId, movie_rating_vectors.userRates, movies.title, movies.genres)
movie_ratings.cache()
movie_ratings.show(5)

###Inference
Now we are done with all data preparations. Let's create `find_similarities` function that gets a source movie id and returns a sorted dataframe with similarity scores. The function calculates similarities based on user rates between the source movie and all the other movies. The calculation goes though the following steps:

- Get a `source_vector` data frame that contains one row with user rates of a source movie.
- Make a cross join between the original `move_ratings` data frame and `source_vector`. This creates a new data frame that contains both source and movie vectors in each row.
- Run `calculate_similarity` for each row:
  - Build a sparse numpy vector of size "number of users" for source rates.
  - Build a sparse numpy vector of size "number of users" for movie rates.
  - Calculate cosine similarity between two vectors.
- Store calculated similarity scores into a separate column.
- Sort data frame in descending order.

In [19]:
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
import numpy as np
from numpy import dot
from numpy.linalg import norm


vector_size = num_users


def build_numpy_vector(rate_mappings):
  feature_vector = np.zeros(vector_size, dtype=float)
  
  for mapping in rate_mappings:
    feature_vector[mapping[0]] = mapping[1]
    
  return feature_vector


@udf("float")
def calculate_similarity(source_rates, movie_rates):
  source_vector = build_numpy_vector(source_rates)
  target_vector = build_numpy_vector(movie_rates)

  cos_sim = dot(source_vector, target_vector)/(norm(source_vector) * norm(target_vector))
  return cos_sim.item()


spark.udf.register("calculate_similarity", calculate_similarity)


def find_similarities(source_movie_id):
  source_vector = movie_ratings.select(col("userRates").alias("sourceRates")).filter(movie_ratings.movieId == source_movie_id)
  scores = movie_ratings.crossJoin(source_vector).select(movie_ratings.title, movie_ratings.genres, calculate_similarity("sourceRates", "userRates").alias("score"))
  return scores.orderBy(col("score"), ascending=False)

We are done with implementation. Now let's pick a movie and display the most similar ones.

In [21]:
source_movie_id = movie_ratings.filter(movie_ratings.title.contains("Star Wars: Episode I - The Phantom Menace")).select(movie_ratings.movieId).collect()[0].movieId

similarities = find_similarities(source_movie_id)
similarities.show(20, truncate=False)

Above is the list of the most similar movies. Obviously, the first one is the same movie with score `1.0`. It shows us that similarity calculation works as expected. 
To sum up, the above example shows how to build a simple collaborative filtering algorithm using Spark. The system allows to find movies similar to a source one. The recommendation here is done based on the rates given by users. So, the most similar films have the most number of similar scores given by same users.