In [295]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [296]:
'''
This notebook gives implementations of various approaches to recommender systems on datasets containing popular movies:
1)demographic filtering - finding most popular movies by some metric and showing those to all users
2)content based filtering - finding similarity between movies and recommending that to people interested in certain movies
3)collaborative filtering - predicting how users would rate a certain movie and recommending them top estimates
'''
import numpy as np 
import pandas as pd
from google.colab import drive
from pyspark.sql import SparkSession

drive.mount('/content/drive')
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
#load movie datasets (going through pandas because spark read csv doesn't read data properly)
df1=pd.read_csv('/content/drive/My Drive/datasets/movies/tmdb_5000_credits.csv')
df2=pd.read_csv('/content/drive/My Drive/datasets/movies/tmdb_5000_movies.csv')
df1=spark.createDataFrame(df1)
df2=spark.createDataFrame(df2)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [297]:
'''
We first load the tmdb5000 dataset to demonstrate demographic and content based filtering.
This dataset contains information about movies, but no individual user data
First dataset contains the following:
movie_id - A unique identifier for each movie.
cast - The name of lead and supporting actors.
crew - The name of Director, Editor, Composer, Writer etc.
'''
df1.show()

+--------+--------------------+--------------------+--------------------+
|movie_id|               title|                cast|                crew|
+--------+--------------------+--------------------+--------------------+
|   19995|              Avatar|[{"cast_id": 242,...|[{"credit_id": "5...|
|     285|Pirates of the Ca...|[{"cast_id": 4, "...|[{"credit_id": "5...|
|  206647|             Spectre|[{"cast_id": 1, "...|[{"credit_id": "5...|
|   49026|The Dark Knight R...|[{"cast_id": 2, "...|[{"credit_id": "5...|
|   49529|         John Carter|[{"cast_id": 5, "...|[{"credit_id": "5...|
|     559|        Spider-Man 3|[{"cast_id": 30, ...|[{"credit_id": "5...|
|   38757|             Tangled|[{"cast_id": 34, ...|[{"credit_id": "5...|
|   99861|Avengers: Age of ...|[{"cast_id": 76, ...|[{"credit_id": "5...|
|     767|Harry Potter and ...|[{"cast_id": 3, "...|[{"credit_id": "5...|
|  209112|Batman v Superman...|[{"cast_id": 18, ...|[{"credit_id": "5...|
|    1452|    Superman Returns|[{"cast

In [298]:
'''
Second dataset contains the following:
budget - The budget in which the movie was made.
genre - The genre of the movie, Action, Comedy ,Thriller etc.
homepage - A link to the homepage of the movie.
id - This is infact the movie_id as in the first dataset.
keywords - The keywords or tags related to the movie.
original_language - The language in which the movie was made.
original_title - The title of the movie before translation or adaptation.
overview - A brief description of the movie.
popularity - A numeric quantity specifying the movie popularity.
production_companies - The production house of the movie.
production_countries - The country in which it was produced.
release_date - The date on which it was released.
revenue - The worldwide revenue generated by the movie.
runtime - The running time of the movie in minutes.
status - "Released" or "Rumored".
tagline - Movie's tagline.
title - Title of the movie.
vote_average - average ratings the movie recieved.
vote_count - the count of votes recieved.
'''
df2.show()

+---------+--------------------+--------------------+------+--------------------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+------------+----------+-------+--------------------+--------+--------------------+--------------------+------------+----------+
|   budget|              genres|            homepage|    id|            keywords|original_language|      original_title|            overview|popularity|production_companies|production_countries|release_date|   revenue|runtime|    spoken_languages|  status|             tagline|               title|vote_average|vote_count|
+---------+--------------------+--------------------+------+--------------------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+------------+----------+-------+--------------------+--------+--------------------+--------------------+------------+----------+
|237000000|[{"id": 28, "name...

In [299]:
#join dataframes
df1 = df1.drop("title")
df = df1.join(df2, df1.movie_id == df2.id, "inner")
df=df.drop("movie_id")
df.show()

+--------------------+--------------------+---------+--------------------+--------------------+-----+--------------------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+------------+---------+-------+--------------------+--------+--------------------+--------------------+------------+----------+
|                cast|                crew|   budget|              genres|            homepage|   id|            keywords|original_language|      original_title|            overview|popularity|production_companies|production_countries|release_date|  revenue|runtime|    spoken_languages|  status|             tagline|               title|vote_average|vote_count|
+--------------------+--------------------+---------+--------------------+--------------------+-----+--------------------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+------------+---------+-------+--------

In [300]:
'''First approach to recommender systems: demographic filtering
We recommend the most popular movies based on some metric and use that recommendation for all users
The metric we'll use is IMDB's weighted rating (wr), given by:
WR=(v/(v+m) * R)+(m/(v+m) * C), where
v is the number of votes for the movie;
m is the minimum votes required to be listed in the chart;
R is the average rating of the movie; And
C is the mean vote across the whole report
'''
from pyspark.sql.functions import *
mean_votes_value=df.select(mean("vote_average")).collect()[0][0]
C=mean_votes_value
'''taking m as 0.9 quantime of the number of votes
this means that for a movie to be shown, it must have more votes than 90% of all movies'''
m=df.approxQuantile("vote_count", [0.9], 0.01)[0]


In [301]:
qualified_movies=df.filter(df.vote_count>m)
print(df.count(),qualified_movies.count())
qualified_movies=qualified_movies.withColumn("score",(col("vote_count")/(col("vote_count")+m) * col("vote_average"))+(m/(m+col("vote_count")) *C))

4803 527


In [302]:
qualified_movies.orderBy(qualified_movies.score.desc()).select("title","vote_count","vote_average","score").show()

+--------------------+----------+------------+------------------+
|               title|vote_count|vote_average|             score|
+--------------------+----------+------------+------------------+
|The Shawshank Red...|      8205|         8.5| 8.089566398667397|
|          Fight Club|      9413|         8.3| 7.964618546645576|
|     The Dark Knight|     12002|         8.2|7.9403712192591485|
|        Pulp Fiction|      8428|         8.3|7.9319558284772835|
|       The Godfather|      5893|         8.4| 7.886607896717145|
|           Inception|     13752|         8.1| 7.880722972484728|
|        Interstellar|     10867|         8.1| 7.830327511289671|
|        Forrest Gump|      7927|         8.2| 7.830313247604206|
|The Lord of the R...|      8064|         8.1| 7.752800128125049|
|The Empire Strike...|      5879|         8.2| 7.730231493617875|
|           Star Wars|      6624|         8.1| 7.692635529388596|
|The Lord of the R...|      8705|         8.0| 7.690443773382661|
|    Schin

In [303]:
'''The second approach to recommender systems: content based filtering
We recommend movies based on similar movies. We will compute a pairwise simliarity metric between two movies
We first only look at only the movie descriptions and find the most similar ones
Later, we take more information into account when finding similar movies (genres, keywords, cast, crew)
We map each movie to a vector representation and find the closest one based on cosine simliarity
We're using the TF-IDF as our vector representation but it is possible to use other embeddings
(word2vec, bert, clip embedding, etc.) 
'''

#first approach: using only overview with TF-IDF
from pyspark.ml import Pipeline
from pyspark.ml.feature import StopWordsRemover, HashingTF, IDF, Tokenizer, Normalizer, BucketedRandomProjectionLSH
def tf_idf_vectorize(column_name):
  #split desired column into words
  tokenizer = Tokenizer(inputCol=column_name, outputCol="words")
  words_df = tokenizer.transform(df)
  # remove stopwords
  stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
  filtered_df = stopwords_remover.transform(words_df)

  # calculate term frequency to raw_features column
  hashing_tf = HashingTF(inputCol="filtered_words", outputCol="raw_features")
  featurized_df = hashing_tf.transform(filtered_df)

  # calculate inverse document frequency based on raw features to get features
  idf = IDF(inputCol="raw_features", outputCol="features")
  idf_model = idf.fit(featurized_df)
  tf_idf_df = idf_model.transform(featurized_df)

  # normalize output features
  normalizer = Normalizer(inputCol="features", outputCol="norm_features")
  norm_pipeline = Pipeline(stages=[normalizer])
  norm_model = norm_pipeline.fit(tf_idf_df)
  norm_tf_idf_df = norm_model.transform(tf_idf_df)

  # create BucketedRandomProjectionLSH pipeline which stores similar movies close to each other
  brp_lsh = BucketedRandomProjectionLSH(inputCol="norm_features", outputCol="hashes", bucketLength=0.1, numHashTables=3)
  brp_model = brp_lsh.fit(norm_tf_idf_df)

  def findSimilarMovies(movie_id):
    # find similar movies from the BucketedRandomProjectionLSH 
    # select the embedding column and index column from the movies_df DataFrame
    print("Original movie:")
    norm_tf_idf_df.filter(norm_tf_idf_df.id == movie_id).select("title").show()
    embedding_df = norm_tf_idf_df.select("norm_features", "id")
    # extract the embedding vector for the n-th movie using indexing
    n_embedding = embedding_df.filter(embedding_df.id == movie_id).first()[0]

    print("Recommendations:")
    # find similar movies
    similar_documents = brp_model.approxNearestNeighbors(norm_tf_idf_df, n_embedding, 10)
    similar_documents.select("title").show(truncate=False)

  return norm_tf_idf_df,brp_model,findSimilarMovies

norm_tf_idf_df,brp_model,findSimilarMovies=tf_idf_vectorize("overview")
findSimilarMovies(558)
findSimilarMovies(df.filter(df.title == "The Dark Knight Rises").select("id").first()[0])


Original movie:
+------------+
|       title|
+------------+
|Spider-Man 2|
+------------+

Recommendations:
+------------------------+
|title                   |
+------------------------+
|Spider-Man 2            |
|The Helix... Loaded     |
|The Amazing Spider-Man  |
|27 Dresses              |
|Spider-Man 3            |
|The Way of the Gun      |
|Psycho Beach Party      |
|Bronson                 |
|Supporting Characters   |
|The Amazing Spider-Man 2|
+------------------------+

Original movie:
+--------------------+
|               title|
+--------------------+
|The Dark Knight R...|
+--------------------+

Recommendations:
+---------------------+
|title                |
+---------------------+
|The Dark Knight Rises|
|The Helix... Loaded  |
|The Dark Knight      |
|Batman Returns       |
|Batman Forever       |
|Batman               |
|Meet the Deedles     |
|Slow Burn            |
|Batman & Robin       |
|JFK                  |
+---------------------+



In [304]:
#now using director, actor, genre and keyword data to do filtering with tf-idf
from pyspark.sql.types import *
import json
def get_director(x):
  x=json.loads(x)
  for i in x:
      if i['job'] == 'Director':
          return i['name']
  return None
def clean_data(x):
    if isinstance(x, list):
        return [str.lower(i.replace(" ", "")) for i in x]
    else:
        #Check if director exists. If not, return empty string
        if isinstance(x, str):
            return str.lower(x.replace(" ", ""))
        else:
            return ''

director_udf=udf(lambda x:clean_data(get_director(x)),StringType())
df = df.withColumn('director', director_udf(col('crew')))

In [305]:
# Returns at most first 3 elements of list (using to filter top actors, genres, and keywords)
def get_first_names(x):
    x=json.loads(x)
    names=[i["name"] for i in x][:3]
    return names
names_udf=udf(lambda x:" ".join(clean_data(get_first_names(x))),StringType())
features = ['cast', 'keywords', 'genres']
df=df.withColumn("cast",names_udf(col("cast")))
df=df.withColumn("keywords",names_udf(col("keywords")))
df=df.withColumn("genres",names_udf(col("genres")))
df.select("title","cast","director","keywords","genres").show()

+--------------------+--------------------+------------------+--------------------+--------------------+
|               title|                cast|          director|            keywords|              genres|
+--------------------+--------------------+------------------+--------------------+--------------------+
|   A Christmas Carol|garyoldman jimcar...|    robertzemeckis|holiday basedonno...|     animation drama|
|        Flushed Away|hughjackman katew...|       davidbowers|londonengland und...|adventure animati...|
|        The Campaign|willferrell zachg...|          jayroach|politics politici...|              comedy|
|           Lucky You|ericbana drewbarr...|      curtishanson|poker sport lasvegas|       drama romance|
|                 Ray|jamiefoxx kerrywa...|    taylorhackford|blackpeople soul ...|         drama music|
|        Stormbreaker|alexpettyfer sara...|       geoffreysax|england secretint...|adventure action ...|
|       The Last Shot|matthewbroderick ...|     jeffnat

In [307]:
df=df.withColumn("soup",concat(col("keywords"),lit(' '),
    col("cast"),lit(' '),col("director"),lit(' '),col("genres")))

norm_tf_idf_df,brp_model,findSimilarMovies=tf_idf_vectorize("soup")
findSimilarMovies(df.filter(df.title == "The Godfather").select("id").first()[0])
findSimilarMovies(df.filter(df.title == "The Dark Knight Rises").select("id").first()[0])

Original movie:
+-------------+
|        title|
+-------------+
|The Godfather|
+-------------+

Recommendations:
+-------------------------------------------------+
|title                                            |
+-------------------------------------------------+
|The Godfather                                    |
|Death Calls                                      |
|Crowsnest                                        |
|The Blood of My Brother: A Story of Death in Iraq|
|Hum To Mohabbat Karega                           |
|Short Cut to Nirvana: Kumbh Mela                 |
|Light from the Darkroom                          |
|Diamond Ruff                                     |
|The Little Ponderosa Zoo                         |
|UnDivided                                        |
+-------------------------------------------------+

Original movie:
+--------------------+
|               title|
+--------------------+
|The Dark Knight R...|
+--------------------+

Recommendations:
+-------

In [308]:
'''The third approach to content recommender systems is collaborative filtering
We take into account all connections between movies and users based on their ratings.
We then train an alternating least squares model to do recommendations
The algorithm works by mapping users and movies to a vector space of fixed hidden dimension
It does this in a way that multiplying the user and movie matrices yields the lowest error on the given examples
We take the root mean squared error between actual and predicted ratings as our error
The optimization is done so that we alternate between fixing one matrix and solving for the other using the least squares method
After optimization, we can read out the values for the movies users have not watched and see which ones are predicted to have the highest rating
These are the ones we recommend to the user.
We use the movie lens small dataset, containing movie information as well as user ratings.
'''

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

# Load the ratings data

ratingsSchema = StructType([
  StructField("userId", IntegerType(), True),
  StructField("movieId", IntegerType(), True),
  StructField("rating", FloatType(), True),
  StructField("timestamp", IntegerType(), True)
])
movie_ratings = spark.read.csv("/content/drive/My Drive/datasets/movies/ratings.csv",schema=ratingsSchema,header=True)

# Load the movie data
movie_titles = spark.read.csv("/content/drive/My Drive/datasets/movies/movies.csv", header='true', inferSchema='true')
(training, test) = movie_ratings.randomSplit([.8, .2])

# split into training and testing sets
(training, test) = movie_ratings.randomSplit([.8, .2])
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, rank=4, regParam=0.01, userCol='userId', itemCol='movieId', ratingCol='rating', coldStartStrategy='drop')
# fit the ALS model to the training set
model=als.fit(training)
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print(rmse)

0.9907057278721845


In [309]:
# cross validation to find the best model
# this takes a long time to run
# just loading best model instead below

# from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# # initialize the ALS model
# als_model = ALS(userCol='userId', itemCol='movieId', ratingCol='rating', coldStartStrategy='drop')
# # create the parameter grid
# params = ParamGridBuilder().addGrid(als_model.regParam, [.01, .05, .1, .15]).addGrid(als_model.rank, [10, 50, 100, 150]).build()
# #instantiating crossvalidator estimator
# cv = CrossValidator(estimator=als_model, estimatorParamMaps=params, evaluator=evaluator, parallelism=4)
# best_model = cv.fit(movie_ratings)
# model = best_model.bestModel

final_als = ALS(maxIter=10, rank=50, regParam=0.15, userCol='userId', itemCol='movieId', ratingCol='rating', coldStartStrategy='drop')
final_model = final_als.fit(training)
test_predictions = final_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

0.8754838631457397


In [310]:

# make recommendations for all users using the recommendForAllUsers method
recommendations = final_model.recommendForAllUsers(10)
    
def name_retriever(movie_id):
    return movie_titles.where(movie_titles.movieId == movie_id).take(1)[0]['title']

def get_user_recs(user_id, rating_df):

  print("Previous user ratings:")
  movie_title_udf=udf(lambda x:name_retriever(x),StringType())
  curr_user_ratings=rating_df.where(rating_df.userId==user_id)
  curr_user_ratings.join(movie_titles,on="movieId").select("rating","title").orderBy("rating",ascending=False).show(truncate=False)
  # get recommendations specifically for the new user that has been added to the DataFrame
  recs_for_user = recommendations.where(recommendations.userId == user_id).take(1)

  for ranking, (movie_id, rating) in enumerate(recs_for_user[0]['recommendations']):
    movie_string = name_retriever(movie_id)
    print('Recommendation {}: {} | predicted score: {}'.format(ranking+1, movie_string, rating))
  

In [311]:
get_user_recs(1,movie_ratings)

Previous user ratings:
+------+-------------------------------------------+
|rating|title                                      |
+------+-------------------------------------------+
|5.0   |Mr. Smith Goes to Washington (1939)        |
|5.0   |Blues Brothers, The (1980)                 |
|5.0   |Winnie the Pooh and the Blustery Day (1968)|
|5.0   |Seven (a.k.a. Se7en) (1995)                |
|5.0   |Three Caballeros, The (1945)               |
|5.0   |Bottle Rocket (1996)                       |
|5.0   |Sword in the Stone, The (1963)             |
|5.0   |Canadian Bacon (1995)                      |
|5.0   |Dumbo (1941)                               |
|5.0   |Billy Madison (1995)                       |
|5.0   |Bedknobs and Broomsticks (1971)            |
|5.0   |Star Wars: Episode IV - A New Hope (1977)  |
|5.0   |Alice in Wonderland (1951)                 |
|5.0   |Fugitive, The (1993)                       |
|5.0   |Ghost and the Darkness, The (1996)         |
|5.0   |Tombstone (1993