# PySpark Implementation of a Movie Recommender System


This project shows the efficient and distributed loading of the MovieLens Dataset using pyspark, as well as some basic EDA and using alternating least squares (ALS) to train a collaborative filtering model.

In [1]:
import sys
!{sys.executable} -m pip show findspark

Name: findspark
Version: 2.0.1
Summary: Find pyspark to make it importable.
Home-page: https://github.com/minrk/findspark
Author: Min RK
Author-email: benjaminrk@gmail.com
License: BSD (3-clause)
Location: /Users/liamkristoffy/anaconda3/envs/pyspark_env/lib/python3.9/site-packages
Requires: 
Required-by: 


In [1]:
import findspark

findspark.init()
findspark.find()

'/Users/liamkristoffy/anaconda3/envs/pyspark_env/lib/python3.9/site-packages/pyspark'

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col

# Initialize Spark Session
spark = SparkSession.builder.appName("Movie Recommendation System").config("spark.executor.memory", "8g").config("spark.driver.memory", "8g").getOrCreate()

# Load Ratings and Movies Data
ratings = spark.read.csv("Data/rating.csv", header=True, inferSchema=True)
movies = spark.read.csv("Data/movie.csv", header=True, inferSchema=True)

# Show schema
ratings.printSchema()
movies.printSchema()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/21 21:26:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [3]:
ratings.head(5)

[Row(userId=1, movieId=2, rating=3.5, timestamp=datetime.datetime(2005, 4, 2, 23, 53, 47)),
 Row(userId=1, movieId=29, rating=3.5, timestamp=datetime.datetime(2005, 4, 2, 23, 31, 16)),
 Row(userId=1, movieId=32, rating=3.5, timestamp=datetime.datetime(2005, 4, 2, 23, 33, 39)),
 Row(userId=1, movieId=47, rating=3.5, timestamp=datetime.datetime(2005, 4, 2, 23, 32, 7)),
 Row(userId=1, movieId=50, rating=3.5, timestamp=datetime.datetime(2005, 4, 2, 23, 29, 40))]

In [4]:
ratings.describe().show()



+-------+-----------------+------------------+------------------+
|summary|           userId|           movieId|            rating|
+-------+-----------------+------------------+------------------+
|  count|         20000263|          20000263|          20000263|
|   mean|69045.87258292554| 9041.567330339605|3.5255285642993797|
| stddev|40038.62665316145|19789.477445413166|1.0519889192942444|
|    min|                1|                 1|               0.5|
|    max|           138493|            131262|               5.0|
+-------+-----------------+------------------+------------------+



                                                                                

In [5]:
# Check number of unique users and movies
num_users = ratings.select("userId").distinct().count()
num_movies = ratings.select("movieId").distinct().count()
print(f"Number of users: {num_users}, Number of movies: {num_movies}")

# Average rating per movie
avg_ratings = ratings.groupBy("movieId").avg("rating").withColumnRenamed("avg(rating)", "avg_rating")
avg_ratings.show(5)

# Join with movies to get titles
avg_ratings_with_titles = avg_ratings.join(movies, "movieId")
avg_ratings_with_titles.orderBy(col("avg_rating").desc()).show(5)


                                                                                

Number of users: 138493, Number of movies: 26744


                                                                                

+-------+------------------+
|movieId|        avg_rating|
+-------+------------------+
|   3997|2.0703468490473864|
|   1580|  3.55831928049466|
|   3918| 2.918940609951846|
|   2366|3.5492681454655197|
|   3175| 3.600717102904267|
+-------+------------------+
only showing top 5 rows





+-------+----------+--------------------+------------------+
|movieId|avg_rating|               title|            genres|
+-------+----------+--------------------+------------------+
| 130644|       5.0|The Garden of Sin...|         Animation|
| 126945|       5.0|  Small Roads (2011)|(no genres listed)|
| 129530|       5.0|Slingshot Hip Hop...|(no genres listed)|
|  32230|       5.0|Snow Queen, The (...|  Children|Fantasy|
| 129036|       5.0|People of the Win...|       Documentary|
+-------+----------+--------------------+------------------+
only showing top 5 rows



                                                                                

In [6]:
# Split data
train, test = ratings.randomSplit([0.8, 0.2])

In [7]:
train = train.repartition(4)

In [8]:
# Configure ALS model
als = ALS(
    maxIter=10, 
    regParam=0.01,
    userCol="userId", 
    itemCol="movieId", 
    ratingCol="rating", 
    nonnegative=True, 
    implicitPrefs=False, 
    coldStartStrategy="drop"
)

In [9]:
#fit the model
als_model = als.fit(train)

24/11/21 21:28:40 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

In [10]:
predict = als_model.transform(test)

predict.show()

[Stage 118:>                (0 + 8) / 8][Stage 120:>                (0 + 0) / 1]

+------+-------+------+-------------------+----------+
|userId|movieId|rating|          timestamp|prediction|
+------+-------+------+-------------------+----------+
|   133|   1580|   2.0|2014-04-11 02:33:18| 2.7897742|
|   236|   1088|   2.5|2004-02-17 18:26:26| 3.1447015|
|   251|   1088|   4.0|2000-08-03 06:36:08| 3.7573924|
|   251|   1959|   2.0|2000-08-03 06:29:59| 2.9279752|
|   271|   1088|   0.5|2008-10-29 21:54:55| 2.6502137|
|   271|   3175|   2.5|2008-10-29 21:47:54| 2.8301508|
|   271|  44022|   1.5|2008-10-31 19:59:12|  2.959443|
|   321|   1580|   1.0|2007-05-07 21:40:23|  3.325054|
|   332|   1580|   5.0|1998-07-30 01:00:05| 2.9053912|
|   471|   1580|   3.0|2000-04-15 20:16:58|  3.384078|
|   471|   1959|   4.0|2001-11-04 23:30:20|  3.536185|
|   516|   1591|   2.0|1998-04-07 21:58:17|  3.162324|
|   587|   1580|   4.0|1999-12-18 01:44:27| 3.6119425|
|   613|  68135|   3.0|2009-08-22 19:45:47| 3.4716125|
|   667|   1580|   0.5|2010-05-28 22:46:48| 2.3415527|
|   844|  

                                                                                

In [15]:
# Evaluate the model on test data
from pyspark.ml.evaluation import RegressionEvaluator

predictions = als_model.transform(test)
evaluator = RegressionEvaluator(
    metricName="rmse", 
    labelCol="rating", 
    predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions.na.drop())
print(f"Root-mean-square error = {rmse}")

[Stage 333:>                                                        (0 + 8) / 8]

Root-mean-square error = 0.8066791115839644


                                                                                