### Libraries

In [43]:
# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark 
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip spark
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# setting spark 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

# install findspark using pip
!pip install -q findspark


In [44]:
# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [45]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [47]:
import pandas as pd
from pyspark.sql.functions import col, explode
from pyspark import SparkContext

### Spark session

In [48]:
from pyspark.sql import SparkSession
sc = SparkContext
spark = SparkSession.builder.appName('Movie Recommendations System').getOrCreate()

# 1. Collaborative Filtering

In [49]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
movies = spark.read.csv("drive/MyDrive/movie-recommender/movies.csv",header=True)
ratings = spark.read.csv("drive/MyDrive/movie-recommender/ratings.csv",header=True)

In [None]:
ratings.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|     3|1260759179|
|     1|   1061|     3|1260759182|
|     1|   1129|     2|1260759185|
|     1|   1172|     4|1260759205|
|     1|   1263|     2|1260759151|
|     1|   1287|     2|1260759187|
|     1|   1293|     2|1260759148|
|     1|   1339|   3.5|1260759125|
|     1|   1343|     2|1260759131|
|     1|   1371|   2.5|1260759135|
|     1|   1405|     1|1260759203|
|     1|   1953|     4|1260759191|
|     1|   2105|     4|1260759139|
|     1|   2150|     3|1260759194|
|     1|   2193|     2|1260759198|
|     1|   2294|     2|1260759108|
|     1|   2455|   2.5|1260759113|
|     1|   2968|     1|1260759200|
|     1|   3671|     3|1260759117|
+------+-------+------+----------+
only showing top 20 rows



In [None]:
ratings.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [None]:
ratings = ratings.\
    withColumn('userId', col('userId').cast('integer')).\
    withColumn('movieId', col('movieId').cast('integer')).\
    withColumn('rating', col('rating').cast('float')).\
    drop('timestamp')
ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|     31|   2.5|
|     1|   1029|   3.0|
|     1|   1061|   3.0|
|     1|   1129|   2.0|
|     1|   1172|   4.0|
|     1|   1263|   2.0|
|     1|   1287|   2.0|
|     1|   1293|   2.0|
|     1|   1339|   3.5|
|     1|   1343|   2.0|
|     1|   1371|   2.5|
|     1|   1405|   1.0|
|     1|   1953|   4.0|
|     1|   2105|   4.0|
|     1|   2150|   3.0|
|     1|   2193|   2.0|
|     1|   2294|   2.0|
|     1|   2455|   2.5|
|     1|   2968|   1.0|
|     1|   3671|   3.0|
+------+-------+------+
only showing top 20 rows



In [None]:
# total number of ratings in the dataset
total_ratings = ratings.select("rating").count()
print("The total number of ratings are " + str(total_ratings))

# total number of distinct userIds and distinct movieIds
total_users = ratings.select("userId").distinct().count()
total_movies = ratings.select("movieId").distinct().count()

# set denominator equal to the number of users multiplied by the number of movies
total_user_movies = total_users * total_movies

# divide numerator by the denominator
sparsity = (1.0 - (total_ratings *1.0)/total_user_movies)*100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

The total number of ratings are 19999
The ratings dataframe is  96.81% empty.


## Ratings

In [None]:
# Group data by userId, count ratings
userId_ratings = ratings.groupBy("userId").count().orderBy('count', ascending=False)
userId_ratings.show()

+------+-----+
|userId|count|
+------+-----+
|    15| 1700|
|    73| 1610|
|    30| 1011|
|    23|  726|
|   102|  678|
|   119|  641|
|   105|  525|
|    56|  522|
|    48|  513|
|    19|  423|
|   130|  375|
|    17|  363|
|   111|  341|
|   128|  323|
|    77|  315|
|    95|  299|
|    78|  263|
|    88|  255|
|    22|  220|
|   125|  210|
+------+-----+
only showing top 20 rows



In [None]:
# Group data by userId, count ratings
movieId_ratings = ratings.groupBy("movieId").count().orderBy('count', ascending=False)
movieId_ratings.show()

+-------+-----+
|movieId|count|
+-------+-----+
|    356|   70|
|    296|   67|
|    318|   63|
|    593|   59|
|    260|   58|
|    480|   56|
|    527|   54|
|   2571|   51|
|    110|   50|
|      1|   49|
|    592|   48|
|    457|   47|
|    780|   46|
|   1270|   46|
|   1196|   45|
|   2959|   44|
|    588|   44|
|   4993|   44|
|   2858|   44|
|    589|   44|
+-------+-----+
only showing top 20 rows



## Build ALS model

In [None]:
# Import the required functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [None]:
(train, test) = ratings.randomSplit([0.8, 0.2], seed = 1234)

als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, implicitPrefs = False, coldStartStrategy="drop")

# Confirm that a model called "als" was created
type(als)

pyspark.ml.recommendation.ALS

## Tune ALS model

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# hyperparameters 
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()
           
# RMSE evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
print ("total models: ", len(param_grid))

total models:  16


## Cross Validation Pipeline

## Evaluate Predictions

In [None]:
# build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# confirm cv was built
print(cv)

CrossValidator_f9768c600d12


In [None]:
# Fit cv to the train dataset
model = cv.fit(train)

#Extract best model from the cv model above
best_model = model.bestModel

In [None]:

# print best_model
print(type(best_model))

print("best model")
print("  rank:", best_model._java_obj.parent().getRank())
print("  maxIter:", best_model._java_obj.parent().getMaxIter())
print("  regParam:", best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>
**Best Model**
  Rank: 50
  MaxIter: 10
  RegParam: 0.15


In [None]:
# View the predictions
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

1.0375770341671366


In [None]:
test_predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|    85|    471|   3.0| 3.5945723|
|   102|    471|   5.0| 3.8349838|
|    73|    471|   4.0| 3.8450947|
|    30|    471|   4.0| 4.2084045|
|   133|   1088|   1.5| 2.1906755|
|    52|   1088|   4.0| 3.4424546|
|   105|   1238|   4.0|  3.544775|
|    73|   1342|   3.0| 2.6748915|
|    53|   1580|   3.0| 3.0435739|
|    93|   1580|   3.5| 3.4897757|
|    43|   1580|   4.0|  2.862114|
|    61|   1580|   3.5| 3.3713152|
|    90|   1580|   4.0|  3.510601|
|    79|   1580|   3.0| 2.3776147|
|   118|   1580|   4.0| 3.7495706|
|    30|   1580|   4.0| 3.6738088|
|    41|   1591|   4.0| 2.8229442|
|    77|   1591|   2.5| 2.4592566|
|   119|   1591|   2.0| 2.4836512|
|    22|   1645|   2.5| 3.0515492|
+------+-------+------+----------+
only showing top 20 rows



## Make Recommendations

In [None]:
# Generate n Recommendations for all users
nrecommendations = best_model.recommendForAllUsers(15)
nrecommendations.limit(15).show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    31|[[3083, 5.034974]...|
|    85|[[255, 4.8651767]...|
|    65|[[2068, 4.7784657...|
|    53|[[2433, 4.7393894...|
|   133|[[63853, 4.326188...|
|    78|[[412, 5.169707],...|
|   108|[[1680, 4.373681]...|
|    34|[[1204, 4.827096]...|
|   101|[[969, 4.919992],...|
|   115|[[59684, 5.204933...|
|   126|[[1204, 4.795785]...|
|    81|[[7574, 4.8423476...|
|    28|[[1252, 4.9166603...|
|    76|[[308, 4.5686936]...|
|    26|[[59784, 4.595728...|
+------+--------------------+



In [None]:
nrecommendations = nrecommendations\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))

nrecommendations.limit(15).show()

+------+-------+---------+
|userId|movieId|   rating|
+------+-------+---------+
|    31|   3083| 5.034974|
|    31|  27773|  4.89264|
|    31|   2068| 4.888596|
|    31|   2973| 4.887059|
|    31|    969|4.8632402|
|    31|    308| 4.861147|
|    31| 122882|4.8357797|
|    31|    412|4.8143883|
|    31|    923|4.8032746|
|    31|   1204|  4.79386|
|    31|   3006|4.7812037|
|    31|    293|  4.77867|
|    31|  31435|4.7665467|
|    31|    912| 4.761282|
|    31|    898| 4.760244|
+------+-------+---------+



## Merge recommendations


In [None]:
nrecommendations.join(movies, on='movieId').filter('userId = 20').show()

+-------+------+---------+--------------------+--------------------+
|movieId|userId|   rating|               title|              genres|
+-------+------+---------+--------------------+--------------------+
|  64285|    20|4.8574214|Wallace and Gromi...|    Animation|Comedy|
|  31116|    20|4.8574214|Sergeant York (1941)|           Drama|War|
|   7212|    20|4.8574214|I Was a Male War ...|      Comedy|Romance|
|   1680|    20| 4.475608|Sliding Doors (1998)|       Drama|Romance|
|   3406|    20|  4.32653|Captain Horatio H...|Action|Adventure|...|
|   1148|    20|4.2794657|Wallace & Gromit:...|Animation|Childre...|
|   6385|    20|4.2284565|  Whale Rider (2002)|               Drama|
|    497|    20| 4.192836|Much Ado About No...|      Comedy|Romance|
|    745|    20| 4.192061|Wallace & Gromit:...|Animation|Childre...|
|   2405|    20| 4.172579|Jewel of the Nile...|Action|Adventure|...|
|  38038|    20| 4.080236|Wallace & Gromit ...|Adventure|Animati...|
|   3510|    20| 3.989033|    Freq

In [None]:
ratings.join(movies, on='movieId').filter('userId = 20').sort('rating', ascending=False).limit(10).show()

+-------+------+------+--------------------+--------------------+
|movieId|userId|rating|               title|              genres|
+-------+------+------+--------------------+--------------------+
|   1148|    20|   5.0|Wallace & Gromit:...|Animation|Childre...|
|    497|    20|   5.0|Much Ado About No...|      Comedy|Romance|
|    720|    20|   5.0|Wallace & Gromit:...|Adventure|Animati...|
|    745|    20|   5.0|Wallace & Gromit:...|Animation|Childre...|
|    780|    20|   5.0|Independence Day ...|Action|Adventure|...|
|   1580|    20|   5.0|Men in Black (a.k...|Action|Comedy|Sci-Fi|
|   1680|    20|   5.0|Sliding Doors (1998)|       Drama|Romance|
|   2690|    20|   5.0|Ideal Husband, An...|      Comedy|Romance|
|   7212|    20|   5.0|I Was a Male War ...|      Comedy|Romance|
|  31116|    20|   5.0|Sergeant York (1941)|           Drama|War|
+-------+------+------+--------------------+--------------------+



# Content filtering

In [60]:
#importing required libraries 
%matplotlib inline
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
pd.options.display.max_columns = None

from scipy import stats
from ast import literal_eval
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.metrics.pairwise import linear_kernel, cosine_similarity
from nltk.stem.snowball import SnowballStemmer
from nltk.stem.wordnet import WordNetLemmatizer
from nltk.corpus import wordnet


import warnings; warnings.simplefilter('ignore')

In [61]:
links_df = pd.read_csv('drive/MyDrive/movie-recommender/links_small.csv', low_memory=False)
links_df = links_df[links_df['tmdbId'].notnull()]['tmdbId'].astype('int')

movies_metadata_df = pd.read_csv('drive/MyDrive/movie-recommender/movies_metadata.csv', low_memory=False)

In [62]:
def convert_to_int(x):
    try:
        return int(x)
    except:
        return np.nan

In [63]:

movies_metadata_df['id'] = movies_metadata_df['id'].apply(convert_to_int)

movies_metadata_df[movies_metadata_df['id'].isnull()]

Unnamed: 0,adult,belongs_to_collection,budget,genres,homepage,id,imdb_id,original_language,original_title,overview,popularity,poster_path,production_companies,production_countries,release_date,revenue,runtime,spoken_languages,status,tagline,title,video,vote_average,vote_count
19730,- Written by Ørnås,0.065736,/ff9qCepilowshEtG2GYWwzt2bs4.jpg,"[{'name': 'Carousel Productions', 'id': 11176}...","[{'iso_3166_1': 'CA', 'name': 'Canada'}, {'iso...",,0,104.0,"[{'iso_639_1': 'en', 'name': 'English'}]",Released,,Midnight Man,False,6.0,1,,,,,,,,,
29503,Rune Balot goes to a casino connected to the ...,1.931659,/zV8bHuSL6WXoD6FWogP9j4x80bL.jpg,"[{'name': 'Aniplex', 'id': 2883}, {'name': 'Go...","[{'iso_3166_1': 'US', 'name': 'United States o...",,0,68.0,"[{'iso_639_1': 'ja', 'name': '日本語'}]",Released,,Mardock Scramble: The Third Exhaust,False,7.0,12,,,,,,,,,
35587,Avalanche Sharks tells the story of a bikini ...,2.185485,/zaSf5OG7V8X8gqFvly88zDdRm46.jpg,"[{'name': 'Odyssey Media', 'id': 17161}, {'nam...","[{'iso_3166_1': 'CA', 'name': 'Canada'}]",,0,82.0,"[{'iso_639_1': 'en', 'name': 'English'}]",Released,Beware Of Frost Bites,Avalanche Sharks,False,4.3,22,,,,,,,,,


In [64]:
movies_metadata_df = movies_metadata_df.drop([19730, 29503, 35587])
movies_metadata_df['id'] = movies_metadata_df['id'].astype('int')

In [65]:
movies_df = movies_metadata_df[movies_metadata_df['id'].isin(links_df)]
movies_df.shape

(9099, 24)

In [66]:

movies_df['tagline'] = movies_df['tagline'].fillna('')
movies_df['description'] = movies_df['overview'] + movies_df['tagline']
movies_df['description'] = movies_df['description'].fillna('')

In [67]:
tf = TfidfVectorizer(analyzer='word',ngram_range=(1, 2),min_df=0, stop_words='english')
tfidf_matrix = tf.fit_transform(movies_df['description'])
tfidf_matrix.shape

(9099, 268124)

In [68]:
cosine_sim = linear_kernel(tfidf_matrix, tfidf_matrix)

In [39]:
movies_df = movies_df.reset_index()
titles = movies_df['title']
indices = pd.Series(movies_df.index, index=movies_df['title'])

In [69]:
def get_recommendations(title):
    idx = indices[title]
    sim_scores = list(enumerate(cosine_sim[idx]))
    sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)
    sim_scores = sim_scores[1:51]
    movie_indices = [i[0] for i in sim_scores]
    return titles.iloc[movie_indices]

In [76]:
get_recommendations('Star Wars').head(10)

949                          The Empire Strikes Back
962                               Return of the Jedi
8755                    Star Wars: The Force Awakens
6690                                 Shrek the Third
6125    Star Wars: Episode III - Revenge of the Sith
4815                               Where Eagles Dare
7539                             Shrek Forever After
2896                 On Her Majesty's Secret Service
5805                                 The Ice Pirates
515                                 Princess Caraboo
Name: title, dtype: object

In [80]:
get_recommendations('Shrek').head(10)

5383                 Shrek 2
7539     Shrek Forever After
6690         Shrek the Third
4612          Silk Stockings
7417          Dragon Hunters
2615    White Men Can't Jump
2890           Shanghai Noon
823               Cinderella
7062              Cinderella
5451          Into the Woods
Name: title, dtype: object