##Load Packages and Functions


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
INPUT_DIRECTORY = "/content/drive/MyDrive/BDSN data/" #for google mount
# INPUT_DIRECTORY = "./data/" #for jupyter notebook

In [None]:
%%capture 
#prevent large printout with %%capture

#Download Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

#Install Apache Spark 3.2.1 with Hadoop 3.2, get zipped folder
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

#Unzip folder
!tar xvf spark-3.2.1-bin-hadoop3.2.tgz

#Install findspark, pyspark 3.2.1
!pip install -q findspark
!pip install pyspark==3.2.1

#Set variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.2.1-bin-hadoop3.2"

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from pyspark import SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import TimestampType
import textwrap

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder

In [None]:
def get_movie_title_from_id(movieId):
  title =  movie_titles.loc[movie_titles["movieId"]==movieId,"title"].item()
  return title

##Import Data and Preprocessing


In [None]:
#create SparkSession and SparkContext objects
from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()

In [None]:
def get_user_recommended_movies(recs_df, userId):
  try:
    recommendations = recs_df[recs_df["userId"] == userId]["recommendations"]
    for movie in recommendations[0]:
      print(f"Movie: \n{get_movie_title_from_id(movie[0])}\nPredicted Rating: {movie[1]}\n")
  except:
    print("That userId does not exist in the dataset.  Try another.")

In [None]:
#Import data
file_path = INPUT_DIRECTORY + "ratings.csv"
ratings = spark.read.csv(file_path, header=True, inferSchema=True)
ratings.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [None]:
num_ratings = ratings.count()
num_users = ratings.select('userId').distinct().count()
num_movies = ratings.select('movieId').distinct().count()

print(f"Number of ratings: {num_ratings}")
print(f"Number of distinct users: {num_users}")
print(f"Number of distinct movies: {num_movies}")

Number of ratings: 100836
Number of distinct users: 610
Number of distinct movies: 9724


In [None]:
#Import data
file_path = INPUT_DIRECTORY + "tags.csv"
tags = spark.read.csv(file_path, header=True, inferSchema=True)
tags.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



##Preprocessing

###Check datatypes

In [None]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [None]:
#Import movie titles
file_path = INPUT_DIRECTORY + "movies.csv"
titles = spark.read.csv(file_path, header=True, inferSchema=True)
titles = titles.select("movieId", "title") #remove genres
titles.show(5, truncate=False)

+-------+----------------------------------+
|movieId|title                             |
+-------+----------------------------------+
|1      |Toy Story (1995)                  |
|2      |Jumanji (1995)                    |
|3      |Grumpier Old Men (1995)           |
|4      |Waiting to Exhale (1995)          |
|5      |Father of the Bride Part II (1995)|
+-------+----------------------------------+
only showing top 5 rows



In [None]:
import pandas as pd

file_path = INPUT_DIRECTORY + "movies.csv"
movie_titles = pd.read_csv(file_path)
movie_titles.head()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [None]:
movie_ratings = ratings.join(titles, on="movieId", how="left")
movie_ratings.show(5, truncate=False)

+-------+------+------+---------+---------------------------+
|movieId|userId|rating|timestamp|title                      |
+-------+------+------+---------+---------------------------+
|1      |1     |4.0   |964982703|Toy Story (1995)           |
|3      |1     |4.0   |964981247|Grumpier Old Men (1995)    |
|6      |1     |4.0   |964982224|Heat (1995)                |
|47     |1     |5.0   |964983815|Seven (a.k.a. Se7en) (1995)|
|50     |1     |5.0   |964982931|Usual Suspects, The (1995) |
+-------+------+------+---------+---------------------------+
only showing top 5 rows



In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import TimestampType

movie_ratings = movie_ratings.withColumn("timestamp", F.from_unixtime("timestamp"))
movie_ratings.show()

+-------+------+------+-------------------+--------------------+
|movieId|userId|rating|          timestamp|               title|
+-------+------+------+-------------------+--------------------+
|      1|     1|   4.0|2000-07-30 18:45:03|    Toy Story (1995)|
|      3|     1|   4.0|2000-07-30 18:20:47|Grumpier Old Men ...|
|      6|     1|   4.0|2000-07-30 18:37:04|         Heat (1995)|
|     47|     1|   5.0|2000-07-30 19:03:35|Seven (a.k.a. Se7...|
|     50|     1|   5.0|2000-07-30 18:48:51|Usual Suspects, T...|
|     70|     1|   3.0|2000-07-30 18:40:00|From Dusk Till Da...|
|    101|     1|   5.0|2000-07-30 18:14:28|Bottle Rocket (1996)|
|    110|     1|   4.0|2000-07-30 18:36:16|   Braveheart (1995)|
|    151|     1|   5.0|2000-07-30 19:07:21|      Rob Roy (1995)|
|    157|     1|   5.0|2000-07-30 19:08:20|Canadian Bacon (1...|
|    163|     1|   5.0|2000-07-30 19:00:50|    Desperado (1995)|
|    216|     1|   5.0|2000-07-30 18:20:08|Billy Madison (1995)|
|    223|     1|   3.0|20

In [None]:
movie_ratings = movie_ratings.select("title", "movieId", "userId", "rating", "timestamp")
movie_ratings.show(5, truncate=False)

+---------------------------+-------+------+------+-------------------+
|title                      |movieId|userId|rating|timestamp          |
+---------------------------+-------+------+------+-------------------+
|Toy Story (1995)           |1      |1     |4.0   |2000-07-30 18:45:03|
|Grumpier Old Men (1995)    |3      |1     |4.0   |2000-07-30 18:20:47|
|Heat (1995)                |6      |1     |4.0   |2000-07-30 18:37:04|
|Seven (a.k.a. Se7en) (1995)|47     |1     |5.0   |2000-07-30 19:03:35|
|Usual Suspects, The (1995) |50     |1     |5.0   |2000-07-30 18:48:51|
+---------------------------+-------+------+------+-------------------+
only showing top 5 rows



##EDA

###Movie and User Summary Statistics

In [None]:
movie_ratings.select("title", "movieId").groupBy("title").count().orderBy("count", ascending=False).show(20, truncate=False)

+------------------------------------------------------------------------------+-----+
|title                                                                         |count|
+------------------------------------------------------------------------------+-----+
|Forrest Gump (1994)                                                           |329  |
|Shawshank Redemption, The (1994)                                              |317  |
|Pulp Fiction (1994)                                                           |307  |
|Silence of the Lambs, The (1991)                                              |279  |
|Matrix, The (1999)                                                            |278  |
|Star Wars: Episode IV - A New Hope (1977)                                     |251  |
|Jurassic Park (1993)                                                          |238  |
|Braveheart (1995)                                                             |237  |
|Terminator 2: Judgment Day (1991)         

##ML Model

We'll split our data 80/20% into training/testing sets and set seed to 1 for reproducibility:

In [None]:
ratings = ratings.select("userId", "movieId", "rating")
(training_data, test_data) = ratings.randomSplit([.8, .2], seed=42)

Initialize our model. We'll set the following parameters before optimizing hyperparameters:

nonnegative: True. We only want non-negative numbers, as a negative rating has no meaning in this context.
coldStartStrategy: "drop". Helps avoid situations where all of a user's ratings are added to the training set only. This data will not be used when calculating RMSE, because predictions on these users would be meaningless because there is nothing to test.

implicitPrefs: False. We have actual ratings, so we don't need to use implicit feedback.

In [None]:
from pyspark.ml.recommendation import ALS

als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", 
          nonnegative = True, coldStartStrategy = "drop", implicitPrefs = False)

Now we'll build our ParamGridBuilder:

In [None]:
from pyspark.ml.tuning import ParamGridBuilder

param_grid = ParamGridBuilder() \
                  .addGrid(als.rank, [5, 20]) \
                  .addGrid(als.maxIter, [5]) \
                  .addGrid(als.regParam, [0.01, 0.05, 1]) \
                  .build()

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  6


Create CrossValidator

In [None]:
from pyspark.ml.tuning import CrossValidator

cv = CrossValidator(estimator = als, 
                    estimatorParamMaps= param_grid,
                    evaluator = evaluator,
                    numFolds = 5)

Fit Data:

In [None]:
model = cv.fit(training_data)

best_model = model.bestModel

Get information on the best model:

In [None]:
print(type(best_model))

print("\n**Best Model**")
print("  Rank:", best_model.rank)
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())
print("  RegParam:", best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>

**Best Model**
  Rank: 5
  MaxIter: 5
  RegParam: 0.05


##Performance Evaluation

Let's generate predictions on the test data:

In [None]:
test_predictions = model.transform(test_data)
test_predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   463|   1088|   3.5| 3.6652236|
|   580|   3175|   2.5| 3.6313622|
|   580|  44022|   3.5|  3.263882|
|   362|   1645|   5.0| 3.9244337|
|   597|   1959|   4.0|  4.402317|
|   155|   3175|   4.0| 3.5555627|
|   368|   2122|   2.0|  2.490454|
|   115|   1645|   4.0| 4.0562916|
|   115|   3175|   4.0|  3.983466|
|    28|   1645|   2.5|  3.039145|
|    28|   3175|   1.5| 2.7867851|
|   587|   1580|   4.0| 3.9089146|
|   332|   1645|   3.5| 3.1693602|
|   332|   2366|   3.5|  3.373988|
|   577|   1580|   3.0| 3.0526443|
|   577|   1959|   4.0|  3.248259|
|   271|   6658|   2.0|  2.986471|
|   606|   1088|   3.0| 3.4248211|
|    91|   1580|   3.5| 3.6721838|
|    91|   6620|   3.5|  2.965326|
+------+-------+------+----------+
only showing top 20 rows



In [None]:
# Evaluate the "test_predictions" dataframe
RMSE = evaluator.evaluate(test_predictions)

# Print the RMSE
print(RMSE)

0.9131980300124588


##Generate Recommendations:

In [None]:
# Generate top 10 movie recommendations for each user
userRecs = best_model.recommendForAllUsers(10)
userRecs.show(5, truncate=False)



+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                                |
+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1     |[{85, 6.697616}, {3379, 6.5410295}, {96004, 6.5410295}, {8235, 6.4249797}, {6201, 6.4249797}, {26326, 6.36415}, {6732, 6.338998}, {7579, 6.20956}, {3224, 6.1658792}, {3819, 6.092615}]        |
|3     |[{3223, 5.5451603}, {7116, 5.474176}, {4082, 5.419378}, {2488, 5.2489834}, {5746, 5.2328577}, {6835, 5.2328577}, {5181, 5.170884}, {25825, 5.156314}, {25850, 5.115393}, {4518, 5.1024218}] 

In [None]:
userRecs_pandas = userRecs.toPandas()
userRecs_pandas.head()

Unnamed: 0,userId,recommendations
0,1,"[(85, 6.697616100311279), (3379, 6.54102945327..."
1,3,"[(3223, 5.545160293579102), (7116, 5.474175930..."
2,5,"[(136341, 6.889410018920898), (126921, 6.88941..."
3,6,"[(85, 7.3541131019592285), (7116, 6.4737396240..."
4,9,"[(99764, 6.6789631843566895), (58301, 6.595906..."


In [None]:
get_movie_title_from_id(10)

'GoldenEye (1995)'

In [None]:
get_user_recommended_movies(userRecs_pandas, 1)

Movie: 
Angels and Insects (1995)
Predicted Rating: 6.697616100311279

Movie: 
On the Beach (1959)
Predicted Rating: 6.541029453277588

Movie: 
Dragon Ball Z: The History of Trunks (Doragon bôru Z: Zetsubô e no hankô!! Nokosareta chô senshi - Gohan to Torankusu) (1993)
Predicted Rating: 6.541029453277588

Movie: 
Safety Last! (1923)
Predicted Rating: 6.4249796867370605

Movie: 
Lady Jane (1986)
Predicted Rating: 6.4249796867370605

Movie: 
Holy Mountain, The (Montaña sagrada, La) (1973)
Predicted Rating: 6.364150047302246

Movie: 
Hello, Dolly! (1969)
Predicted Rating: 6.338997840881348

Movie: 
Pride and Prejudice (1940)
Predicted Rating: 6.209559917449951

Movie: 
Woman in the Dunes (Suna no onna) (1964)
Predicted Rating: 6.165879249572754

Movie: 
Tampopo (1985)
Predicted Rating: 6.092615127563477

