## Bring in initial dependencies

In [1]:
import os
# Find the latest version of spark 3.2  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.2'
spark_version = 'spark-3.2.3'
os.environ['SPARK_VERSION']=spark_version
# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark
# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"
# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease [3,622 B]
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [Waiting for headers] [10% [Connecting to archive.ubuntu.com (185.125.190.39)] [Waiting for headers] [C                                                                               Get:2 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
Hit:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease
Hit:5 http://archive.ubuntu.com/ubuntu focal InRelease
Get:6 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
Get:7 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ Packages [73.4 kB]
Hit:8 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
Hit:9 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu focal InRelease
Get:10 http://archive.ubuntu.com/ubuntu focal-backports InR

In [2]:
#import dependencies
from pyspark import SparkFiles
import pandas as pd

In [3]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("movieRecommender").getOrCreate()

## Bring in data from .csv's and join

In [4]:
# Load in genome score data
url ="https://movie-lens-data-p4t1.s3.amazonaws.com/genome-scores.csv"
spark.sparkContext.addFile(url)
gs_df = spark.read.csv(SparkFiles.get("genome-scores.csv"), sep=",", header=True, inferSchema=True)
gs_df.show()

+-------+-----+--------------------+
|movieId|tagId|           relevance|
+-------+-----+--------------------+
|      1|    1|0.028749999999999998|
|      1|    2|0.023749999999999993|
|      1|    3|              0.0625|
|      1|    4| 0.07574999999999998|
|      1|    5|             0.14075|
|      1|    6|             0.14675|
|      1|    7|              0.0635|
|      1|    8|             0.20375|
|      1|    9|               0.202|
|      1|   10|             0.03075|
|      1|   11|             0.58025|
|      1|   12| 0.10249999999999998|
|      1|   13| 0.20174999999999998|
|      1|   14|0.007000000000000006|
|      1|   15|0.024500000000000022|
|      1|   16| 0.17275000000000001|
|      1|   17|0.016500000000000015|
|      1|   18| 0.10399999999999998|
|      1|   19|              0.6625|
|      1|   20| 0.30074999999999996|
+-------+-----+--------------------+
only showing top 20 rows



In [5]:
# drop all tags under 80% significance
relevant_tags = gs_df.filter(gs_df['relevance']>= .80)

In [6]:
# Load in tags data
url ="https://movie-lens-data-p4t1.s3.amazonaws.com/genome-tags.csv"
spark.sparkContext.addFile(url)
tags_df = spark.read.csv(SparkFiles.get("genome-tags.csv"), sep=",", header=True, inferSchema=True)
tags_df.show()

+-----+---------------+
|tagId|            tag|
+-----+---------------+
|    1|            007|
|    2|   007 (series)|
|    3|   18th century|
|    4|          1920s|
|    5|          1930s|
|    6|          1950s|
|    7|          1960s|
|    8|          1970s|
|    9|          1980s|
|   10|   19th century|
|   11|             3d|
|   12|           70mm|
|   13|            80s|
|   14|           9/11|
|   15|        aardman|
|   16|aardman studios|
|   17|       abortion|
|   18|         absurd|
|   19|         action|
|   20|  action packed|
+-----+---------------+
only showing top 20 rows



In [7]:
# Join tags into dataframe for use in model, drop tagID and relevance as they are no longer needed
join_tags = relevant_tags.join(tags_df,['tagID'],'left')
join_tags = join_tags.drop('tagId','relevance')
rows = join_tags.count()
print(f'{rows}')

138430


In [8]:
# Change to pandas dataframe for more transformation
pd_tags = join_tags.toPandas()
pd_tags.head()

Unnamed: 0,movieId,tag
0,1,adventure
1,1,animated
2,1,animation
3,1,cartoon
4,1,cgi


In [9]:
# combine all tags for each movie into a single value
grouped_tags = pd_tags.groupby('movieId')['tag'].transform(lambda x: ' '.join(map(str,x)))
pd_tags = pd.merge(pd_tags, grouped_tags, left_index=True, right_index=True )
pd_tags = pd_tags.drop('tag_x', axis = 1)
pd_tags = pd_tags.drop_duplicates()
pd_tags = pd_tags.rename(columns={'tag_y': 'tag'})
pd_tags.head()
# pd_tags.dtypes

Unnamed: 0,movieId,tag
0,1,adventure animated animation cartoon cgi child...
30,2,adventure animals based on a book childhood ch...
43,3,comedy good sequel sequel sequels
47,4,chick flick divorce women
50,5,comedy family father daughter relationship goo...


In [10]:
# bring in ratings data
url ="https://movie-lens-data-p4t1.s3.amazonaws.com/ratings.csv"
spark.sparkContext.addFile(url)
ratings_df = spark.read.csv(SparkFiles.get("ratings.csv"), sep=",", header=True, inferSchema=True)
ratings_df.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    306|   3.5|1147868817|
|     1|    307|   5.0|1147868828|
|     1|    665|   5.0|1147878820|
|     1|    899|   3.5|1147868510|
|     1|   1088|   4.0|1147868495|
|     1|   1175|   3.5|1147868826|
|     1|   1217|   3.5|1147878326|
|     1|   1237|   5.0|1147868839|
|     1|   1250|   4.0|1147868414|
|     1|   1260|   3.5|1147877857|
|     1|   1653|   4.0|1147868097|
|     1|   2011|   2.5|1147868079|
|     1|   2012|   2.5|1147868068|
|     1|   2068|   2.5|1147869044|
|     1|   2161|   3.5|1147868609|
|     1|   2351|   4.5|1147877957|
|     1|   2573|   4.0|1147878923|
|     1|   2632|   5.0|1147878248|
|     1|   2692|   5.0|1147869100|
+------+-------+------+----------+
only showing top 20 rows



In [11]:
# find the average ratings for each movie
ratings_avg = ratings_df.groupBy('movieID').avg('rating')

In [12]:
# find how many people have reviewed each movie
ratings_count = ratings_df.groupBy('movieID').count()
ratings_count = ratings_count.withColumnRenamed('count','ratings_count')

In [13]:
# join new ratings fields
ratings_join = ratings_count.join(ratings_avg, ['movieID'])
rows = ratings_join.count()
print(f'{rows}')

59047


In [14]:
pd_ratings = ratings_join.toPandas()
pd_ratings.dropna(inplace=True)
pd_ratings.head()

Unnamed: 0,movieID,ratings_count,avg(rating)
0,1088,11935,3.250021
1,1580,40308,3.581708
2,3175,14659,3.607784
3,44022,4833,3.259363
4,175197,610,2.754918


In [15]:
# Look at ratings counts to create classification
pd_ratings['ratings_count'].value_counts()
pd_ratings.dtypes

movieID            int32
ratings_count      int64
avg(rating)      float64
dtype: object

In [16]:
# rename avg(rating) column
pd_ratings.rename({'avg(rating)':'avg_rating'}, axis = 1, inplace = True)
pd_ratings.head()

Unnamed: 0,movieID,ratings_count,avg_rating
0,1088,11935,3.250021
1,1580,40308,3.581708
2,3175,14659,3.607784
3,44022,4833,3.259363
4,175197,610,2.754918


In [17]:
ratings_count_cat = pd.cut(pd_ratings.ratings_count, bins = [0,100,500,1000,5000,50000], labels = ['under one hundred', 'under five hundred', 'under one thousand','under five thousand', 'over five thousand'])
pd_ratings['ratings_count_category'] = ratings_count_cat
pd_ratings.head()

Unnamed: 0,movieID,ratings_count,avg_rating,ratings_count_category
0,1088,11935,3.250021,over five thousand
1,1580,40308,3.581708,over five thousand
2,3175,14659,3.607784,over five thousand
3,44022,4833,3.259363,under five thousand
4,175197,610,2.754918,under one thousand


In [18]:
# Change number to classification bins
avg_ratings_cat = pd.cut(pd_ratings.avg_rating, bins = [0,.75,1.25,1.75,2.25,2.75,3.25,3.75,4.25,4.75,5],labels =\
                         ['1/2 star','1 star','1 1/8 star',' 2 stars','2 1/2 stars','3 stars','3 1/2 stars','4 stars',' 4 1/2 stars',' 5 stars'])
pd_ratings['avg_ratings_category'] = avg_ratings_cat
pd_ratings.head()

Unnamed: 0,movieID,ratings_count,avg_rating,ratings_count_category,avg_ratings_category
0,1088,11935,3.250021,over five thousand,3 1/2 stars
1,1580,40308,3.581708,over five thousand,3 1/2 stars
2,3175,14659,3.607784,over five thousand,3 1/2 stars
3,44022,4833,3.259363,under five thousand,3 1/2 stars
4,175197,610,2.754918,under one thousand,3 stars


In [19]:
# Join tags and ratings dataframes
tags_ratings_df = pd.merge(pd_tags, pd_ratings, left_on="movieId", right_on = "movieID")
tags_ratings_df = tags_ratings_df.drop('movieID', axis = 1)
tags_ratings_df.head()
tags_ratings_df.dtypes


movieId                      int32
tag                         object
ratings_count                int64
avg_rating                 float64
ratings_count_category    category
avg_ratings_category      category
dtype: object

In [20]:
# Change categories to strings and replace NaN with empty string
tags_ratings_df = tags_ratings_df.astype({'ratings_count_category':'string','avg_ratings_category':'string'})
tags_ratings_df[['ratings_count_category','avg_ratings_category']] = tags_ratings_df[['ratings_count_category','avg_ratings_category']].fillna('')
tags_ratings_df.dtypes
tags_ratings_df.head()

Unnamed: 0,movieId,tag,ratings_count,avg_rating,ratings_count_category,avg_ratings_category
0,1,adventure animated animation cartoon cgi child...,57309,3.893708,,4 stars
1,2,adventure animals based on a book childhood ch...,24228,3.251527,over five thousand,3 1/2 stars
2,3,comedy good sequel sequel sequels,11804,3.142028,over five thousand,3 stars
3,4,chick flick divorce women,2523,2.853547,under five thousand,3 stars
4,5,comedy family father daughter relationship goo...,11714,3.058434,over five thousand,3 stars


In [21]:
# Load in movie data
url ="https://movie-lens-data-p4t1.s3.amazonaws.com/movies.csv"
spark.sparkContext.addFile(url)
movie_df = spark.read.csv(SparkFiles.get("movies.csv"), sep=",", header=True, inferSchema=True)
movie_df.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [22]:
# Convert movies to pandas for further processing
pd_movies = movie_df.toPandas()
pd_movies.head()
# pd_movies.dtypes

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [23]:
# remove pipe delimiter freom genres
pd_movies['genres'] = pd_movies['genres'].str.split('|')
pd_movies['genrestring'] = [','.join(map(str, l)) for l in pd_movies['genres']]
pd_movies = pd_movies.drop('genres', axis = 1)
pd_movies.head()

Unnamed: 0,movieId,title,genrestring
0,1,Toy Story (1995),"Adventure,Animation,Children,Comedy,Fantasy"
1,2,Jumanji (1995),"Adventure,Children,Fantasy"
2,3,Grumpier Old Men (1995),"Comedy,Romance"
3,4,Waiting to Exhale (1995),"Comedy,Drama,Romance"
4,5,Father of the Bride Part II (1995),Comedy


In [24]:
#  Split year from title
pd_movies['title_date'] = pd_movies['title'].str.split('(').str[1].str.replace(')','')
pd_movies.head()
# pd_movies.dtypes

  pd_movies['title_date'] = pd_movies['title'].str.split('(').str[1].str.replace(')','')


Unnamed: 0,movieId,title,genrestring,title_date
0,1,Toy Story (1995),"Adventure,Animation,Children,Comedy,Fantasy",1995
1,2,Jumanji (1995),"Adventure,Children,Fantasy",1995
2,3,Grumpier Old Men (1995),"Comedy,Romance",1995
3,4,Waiting to Exhale (1995),"Comedy,Drama,Romance",1995
4,5,Father of the Bride Part II (1995),Comedy,1995


In [25]:
#  Change year to string and fill all NaN
pd_movies['title_date'] = pd_movies['title_date'].astype(str)
pd_movies['title_date'] = pd_movies['title_date'].fillna('')
pd_movies.dtypes
# tags_ratings_df.head()

movieId         int32
title          object
genrestring    object
title_date     object
dtype: object

In [26]:
# Combine movies dataframe into tags_ratings dataframe for analysis
final = pd.merge(pd_movies, tags_ratings_df)
final.head()
# final.dtypes

Unnamed: 0,movieId,title,genrestring,title_date,tag,ratings_count,avg_rating,ratings_count_category,avg_ratings_category
0,1,Toy Story (1995),"Adventure,Animation,Children,Comedy,Fantasy",1995,adventure animated animation cartoon cgi child...,57309,3.893708,,4 stars
1,2,Jumanji (1995),"Adventure,Children,Fantasy",1995,adventure animals based on a book childhood ch...,24228,3.251527,over five thousand,3 1/2 stars
2,3,Grumpier Old Men (1995),"Comedy,Romance",1995,comedy good sequel sequel sequels,11804,3.142028,over five thousand,3 stars
3,4,Waiting to Exhale (1995),"Comedy,Drama,Romance",1995,chick flick divorce women,2523,2.853547,under five thousand,3 stars
4,5,Father of the Bride Part II (1995),Comedy,1995,comedy family father daughter relationship goo...,11714,3.058434,over five thousand,3 stars


In [27]:
# Combine tags, ratings count category average ratings category
def combined_text(row):
  return row['tag']+" "+row['ratings_count_category']+" "+ row['avg_ratings_category']+" "+ row['genrestring']+" "+ row['title_date']
final['combined_text'] = final.apply(combined_text, axis = 1)
final.head()

Unnamed: 0,movieId,title,genrestring,title_date,tag,ratings_count,avg_rating,ratings_count_category,avg_ratings_category,combined_text
0,1,Toy Story (1995),"Adventure,Animation,Children,Comedy,Fantasy",1995,adventure animated animation cartoon cgi child...,57309,3.893708,,4 stars,adventure animated animation cartoon cgi child...
1,2,Jumanji (1995),"Adventure,Children,Fantasy",1995,adventure animals based on a book childhood ch...,24228,3.251527,over five thousand,3 1/2 stars,adventure animals based on a book childhood ch...
2,3,Grumpier Old Men (1995),"Comedy,Romance",1995,comedy good sequel sequel sequels,11804,3.142028,over five thousand,3 stars,comedy good sequel sequel sequels over five th...
3,4,Waiting to Exhale (1995),"Comedy,Drama,Romance",1995,chick flick divorce women,2523,2.853547,under five thousand,3 stars,chick flick divorce women under five thousand ...
4,5,Father of the Bride Part II (1995),Comedy,1995,comedy family father daughter relationship goo...,11714,3.058434,over five thousand,3 stars,comedy family father daughter relationship goo...


## Import Sci-kit Learn Dependencies

In [28]:
#Import Sci-kit Learn and NLTK dependencies
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.metrics.pairwise import cosine_similarity



In [29]:
# Create model
cv = CountVectorizer()

# Fit to combined text field to create vectors for use in cosine function
count_matrix = cv.fit_transform(final['combined_text'])
print("Count Matrix:", count_matrix.toarray())

Count Matrix: [[0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 ...
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]]


In [30]:
# Calculate cosine similarity 
cosine_sim = cosine_similarity(count_matrix)

In [31]:
# Look at matrix. See that each movie is perfectly similar to itself. Higher the number, the more similar the movies are
print(cosine_sim)

[[1.         0.38490018 0.14433757 ... 0.07698004 0.14002801 0.11547005]
 [0.38490018 1.         0.20833333 ... 0.11111111 0.0404226  0.05555556]
 [0.14433757 0.20833333 1.         ... 0.16666667 0.42443734 0.25      ]
 ...
 [0.07698004 0.11111111 0.16666667 ... 1.         0.40422604 0.55555556]
 [0.14002801 0.0404226  0.42443734 ... 0.40422604 1.         0.72760688]
 [0.11547005 0.05555556 0.25       ... 0.55555556 0.72760688 1.        ]]


In [32]:
#Input movie
movie_input = 'Despicable Me (2010)'


In [33]:
# Get input movie index
def get_index_from_title(title):
    return final[final.title == title].index.values[0]
movie_index = get_index_from_title(movie_input)
movie_index

10146

In [37]:
# get list of most similar movies
cos_movie_input = list(enumerate(cosine_sim[movie_index]))
sorted_cos_movies = sorted(cos_movie_input, key=lambda x:x[1], reverse=True)
most_similar_ten = sorted_cos_movies[1:11]
most_similar_ten

[(12954, 0.7750576015460305),
 (1974, 0.7745966692414832),
 (8655, 0.7659416862050704),
 (12241, 0.7631672440718631),
 (12836, 0.7615773105863907),
 (2029, 0.7548711866766251),
 (11368, 0.7487767802667671),
 (12244, 0.7462025072446365),
 (2774, 0.741041737787324),
 (10445, 0.741041737787324)]

In [38]:
# get titles of similar movies
def get_title_from_index(index):
    return final[final.index == index]["title"].values[0]
i=1
for movie in most_similar_ten:
    print(get_title_from_index(movie[0]))
    i=i+1
    if i>11:
        break

The Boss Baby (2017)
Antz (1998)
Ratatouille (2007)
Minions (2015)
Storks (2016)
Bug's Life, A (1998)
Cloudy with a Chance of Meatballs 2 (2013)
The Good Dinosaur (2015)
Stuart Little (1999)
Hop (2011)
