<a href="https://colab.research.google.com/github/4k5h1t/PySpark-Movie-Rec/blob/main/movie_recommender_using_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Movie Recommender Systems using Spark with PySpark

## Installing required dependencies


In [None]:
!pip install findspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 57 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 71.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845514 sha256=17522976b5798c04555ef98a53a74a66100cf7658152a052098f447ff5f40688
  Stored in directory: /root/.cache/pip/wheels/42/59/f5/79a5bf931714dcd201b26025347785f08

## Downloading the Dataset

In [None]:
!apt-get install -y aria2
!mkdir -p ./MovieLens/ 
!aria2c -s 16 -x 16 "https://files.grouplens.org/datasets/movielens/ml-25m.zip" -d ./MovieLens/

Reading package lists... Done
Building dependency tree       
Reading state information... Done
aria2 is already the newest version (1.33.1-1).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 5 not upgraded.

11/19 09:00:26 [[1;32mNOTICE[0m] Downloading 1 item(s)

11/19 09:00:27 [[1;31mERROR[0m] CUID#7 - Download aborted. URI=https://files.grouplens.org/datasets/movielens/ml-25m.zip
Exception: [AbstractCommand.cc:351] errorCode=19 URI=https://files.grouplens.org/datasets/movielens/ml-25m.zip
  -> [AbstractCommand.cc:792] errorCode=19 CUID#7 - Name resolution for files.grouplens.org failed:Could not contact DNS servers

11/19 09:00:27 [[1;32mNOTICE[0m] Download GID#261e039777cfa366 not complete: 

Download Results:
gid   |stat|avg speed  |path/URI
261e03|[1;31mERR[0m |       0B/s|https://files.grouplens.org/datasets/movielens/ml-25m.zip

Status Le

## Downloading SPARK and Java Dependencies

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!wget -q https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

!tar xf spark-3.2.1-bin-hadoop3.2.tgz


## Extracting the Dataset

In [None]:
import zipfile
with zipfile.ZipFile("/content/MovieLens/ml-25m.zip", 'r') as zip_ref:
    zip_ref.extractall("/content/MovieLens")

FileNotFoundError: ignored

## Start Setup Time

In [None]:
import time
setupst = time.time()

## Setting up SPARK and Java Paths

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

## Importing PySpark

In [None]:
import findspark
findspark.init('/content/spark-3.2.1-bin-hadoop3.2')
import pyspark

## Starting up Spark Session, Clusters etc. 

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('recommendation').getOrCreate()

## And importing ALS model as well as Evaluation metrics (Root Mean Square Error)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

## Reading loaded Dataset

In [None]:
data = spark.read.csv('MovieLens/ml-25m/ratings.csv',inferSchema=True,header=True)
setupet = time.time()

## Describing / Showcasing Dataset

In [None]:
data.head()

In [None]:
data.printSchema()

In [None]:
data.describe().show()

## Implementing ML Algorithm and Evaluation

### Train Test Split

In [None]:
(train_data, test_data) = data.randomSplit([0.7, 0.3], seed=42)
setupTime = setupet - setupst

Setting up and Training the ALS Model

In [None]:
trainst = time.time()
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(train_data)
trainet = time.time()
trainTime = trainet - trainst

### Testing trained model

In [None]:
testst = time.time()
predictions = model.transform(test_data)
testet = time.time()
testTime = testet - testst

In [None]:
predictions.show()

### Evaluating Predictions

In [None]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

## Extracting information of one user

In [None]:
userId = int(input('Enter a User ID to find recommendations for: '))
single_user = test_data.filter(test_data['userId']==12).select(['movieId','userId'])

In [None]:
single_user.show()

## Running Model again for one selected user (testing)

In [None]:
test1st = time.time()
reccomendations = model.transform(single_user)
test1et = time.time()
test1Time = test1et - test1st

In [None]:
reccomendations.orderBy('prediction',ascending=False).show()

## Analysing with the help of supporting Datasets (Easier to Understand)

In [None]:
moviesdf = spark.read.csv(r"MovieLens/ml-25m/movies.csv", inferSchema = True, header = True)  
moviesdf.show()

## Final Predictions with Movie Titles

In [None]:
rec = reccomendations
joined = moviesdf.join(rec, ['movieId'],how="inner")
joined.select('userId', 'movieId', 'title', 'genres', 'prediction').orderBy('prediction', ascending=False).show()

In [None]:
setupTime
trainTime
testTime
test1time