Preprocess and Load Data

In [1]:
import os
# Find the latest version of spark 3.3 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.3.0'
spark_version = 'spark-3.3.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com (185.125.190.36)] [Waiting for headers] [10% [Connecting to archive.ubuntu.com (185.125.190.36)] [Waiting for headers] [W0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (185.125.190.36                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (185.125.190.36                                                                               Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (185.125.190.36                                                                               Get:4 http://security.ubuntu.com/ubuntu bionic-s

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-09-04 21:25:07--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2022-09-04 21:25:07 (4.18 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Netflix-Movies").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [4]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://luiwarang-bucket.s3.amazonaws.com/Netflix_Dataset_Movie_Rating.csv"
spark.sparkContext.addFile(url)
user_data_df = spark.read.csv(SparkFiles.get("Netflix_Dataset_Movie_Rating.csv"), sep=",", header=True, inferSchema=True)

In [5]:
user_data_df.show()

+--------+----+---------+-------+------+
|movie_id|year|     name|user_id|rating|
+--------+----+---------+-------+------+
|       3|1997|Character| 712664|     5|
|       3|1997|Character|1331154|     4|
|       3|1997|Character|2632461|     3|
|       3|1997|Character|  44937|     5|
|       3|1997|Character| 656399|     4|
|       3|1997|Character| 439011|     1|
|       3|1997|Character|1644750|     3|
|       3|1997|Character|2031561|     4|
|       3|1997|Character| 616720|     4|
|       3|1997|Character|2467008|     4|
|       3|1997|Character| 701730|     2|
|       3|1997|Character|1614320|     4|
|       3|1997|Character| 115498|     3|
|       3|1997|Character| 931626|     2|
|       3|1997|Character| 699878|     4|
|       3|1997|Character|1694958|     3|
|       3|1997|Character|  66414|     5|
|       3|1997|Character|2519847|     5|
|       3|1997|Character| 948069|     3|
|       3|1997|Character|  67315|     4|
+--------+----+---------+-------+------+
only showing top

In [6]:
# List dataframe data types
user_data_df.printSchema()

root
 |-- movie_id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- rating: integer (nullable = true)



In [7]:
# Drop null columns
user_data_df =user_data_df.dropna()

In [8]:
# split into training and testing sets
train, test = user_data_df.randomSplit(weights=(0.8, 0.2))

Building Recommendation Model using ALS

In [9]:
# Build the recommendation model using ALS on the training data
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

als = ALS(maxIter=10, regParam=0.1, rank=8, nonnegative=True, coldStartStrategy="drop",\
          userCol='user_id', itemCol='movie_id', ratingCol='rating')
model = als.fit(train)

Making Predictions with ALS Model

In [10]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
predictions

DataFrame[movie_id: int, year: int, name: string, user_id: int, rating: int, prediction: float]

In [11]:
predictions.sort("user_id", "rating").show()

+--------+----+--------------------+-------+------+----------+
|movie_id|year|                name|user_id|rating|prediction|
+--------+----+--------------------+-------+------+----------+
|    1307|2003|            S.W.A.T.|      6|     1| 3.1887624|
|    1476|2004|Six Feet Under: S...|      6|     1| 3.7326908|
|    1144|1991|Fried Green Tomatoes|      6|     2| 3.3537374|
|    1145|2001| The Wedding Planner|      6|     3| 2.8061523|
|    4393|1994|The Mask: Special...|      6|     3| 3.1967182|
|    2340|1990|Joe Versus the Vo...|      6|     3| 2.6943793|
|    3905|2001|          The Others|      6|     3| 3.2622676|
|    3153|1965|      Doctor Zhivago|      6|     3| 3.3914022|
|    1971|1995|Under Siege 2: Da...|      6|     3|   3.04686|
|    2200|2002|   Collateral Damage|      6|     3|  2.889112|
|    2981|2000|Saturday Night Li...|      6|     3| 3.4268308|
|    3538|1988|             Beaches|      6|     3| 3.1719625|
|    4100|2000|            Dinosaur|      6|     3| 3.2

Evaluate the Predictions

In [12]:
# using RMSE to evaluate the model
evaluator = RegressionEvaluator(metricName='rmse', predictionCol='prediction', labelCol='rating')
rmse = evaluator.evaluate(predictions)
print('RMSE: %.4f' % rmse)

RMSE: 0.8629


Making Movie Recommendations to users

In [13]:
#Recommend top 5 movies for all users
user_recs = model.recommendForAllUsers(3)

In [14]:
user_recs.printSchema()
type(user_recs)

root
 |-- user_id: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- movie_id: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



pyspark.sql.dataframe.DataFrame

In [15]:
user_recs.show(10, False)
#user_recs_tab = user_recs.select("user_id", "recommendations.movie_id","recommendations.rating")
#user_recs_tab.show()

+-------+--------------------------------------------------------+
|user_id|recommendations                                         |
+-------+--------------------------------------------------------+
|6      |[{3456, 4.088912}, {2162, 3.954937}, {2568, 3.9330206}] |
|169    |[{3456, 4.681539}, {2162, 4.519024}, {4427, 4.511673}]  |
|183    |[{3456, 4.7231307}, {2162, 4.6299386}, {4427, 4.576907}]|
|268    |[{3456, 5.065378}, {2162, 4.9719124}, {2568, 4.9236784}]|
|283    |[{3456, 4.5059}, {1947, 4.350579}, {1256, 4.3294926}]   |
|291    |[{1947, 4.5985975}, {3456, 4.5451136}, {1256, 4.527107}]|
|296    |[{3023, 4.2063465}, {774, 4.129475}, {4427, 3.9741852}] |
|305    |[{2102, 4.8556976}, {68, 4.8315825}, {316, 4.8314896}]  |
|383    |[{3456, 4.2607512}, {3444, 4.1362333}, {2102, 4.101471}]|
|437    |[{2568, 3.3530953}, {1441, 3.2872744}, {2162, 3.284876}]|
+-------+--------------------------------------------------------+
only showing top 10 rows



In [16]:

from pyspark.sql.functions import explode, col
nrecommendations = user_recs\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('user_id', col("rec_exp.movie_id"), col("rec_exp.rating"))
nrecommendations.limit(10).show()

+-------+--------+---------+
|user_id|movie_id|   rating|
+-------+--------+---------+
|      6|    3456| 4.088912|
|      6|    2162| 3.954937|
|      6|    2568|3.9330206|
|    169|    3456| 4.681539|
|    169|    2162| 4.519024|
|    169|    4427| 4.511673|
|    183|    3456|4.7231307|
|    183|    2162|4.6299386|
|    183|    4427| 4.576907|
|    268|    3456| 5.065378|
+-------+--------+---------+



In [17]:
#Generate movie id and name table
user_data_df_drop = user_data_df.drop(*("user_id", "rating"))
user_data_df_drop.show()

+--------+----+---------+
|movie_id|year|     name|
+--------+----+---------+
|       3|1997|Character|
|       3|1997|Character|
|       3|1997|Character|
|       3|1997|Character|
|       3|1997|Character|
|       3|1997|Character|
|       3|1997|Character|
|       3|1997|Character|
|       3|1997|Character|
|       3|1997|Character|
|       3|1997|Character|
|       3|1997|Character|
|       3|1997|Character|
|       3|1997|Character|
|       3|1997|Character|
|       3|1997|Character|
|       3|1997|Character|
|       3|1997|Character|
|       3|1997|Character|
|       3|1997|Character|
+--------+----+---------+
only showing top 20 rows



In [18]:
#Join movie table with user_recs table
nrecommendations.join(user_data_df_drop, on='movie_id',how="left").filter('user_id = 906').dropDuplicates(["movie_id"]).show()

+--------+-------+---------+----+--------------------+
|movie_id|user_id|   rating|year|                name|
+--------+-------+---------+----+--------------------+
|    3456|    906| 4.449913|2004|      Lost: Season 1|
|    1476|    906|4.4608526|2004|Six Feet Under: S...|
|    4427|    906| 4.454607|2001|The West Wing: Se...|
+--------+-------+---------+----+--------------------+



In [19]:
#Join movie table with user_recs table
final_recs = nrecommendations.join(user_data_df_drop, on='movie_id',how="left").dropDuplicates(["movie_id"])

In [20]:
from google.colab import files
final_recs.toPandas().to_csv("final_recs.csv")

In [21]:
files.download('final_recs.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>