# Task I.1: Exploratory data analysis

Initializing a Spark session

In [34]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [35]:
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Movie_recommendation").config("spark.some.config.option","some-value").getOrCreate()

The datasets can be easily downloaded from these links: 
1. 'rating.csv': https://www.kaggle.com/rounakbanik/the-movies-dataset?select=ratings_small.csv
2. 'movies.csv': https://www.kaggle.com/rounakbanik/the-movies-dataset?select=movies_metadata.csv
        
Number of rows and columns:

In [36]:
rating = spark.read.format("csv").option("header","true").option("inferSchema", "true").load(r"/content/ratings_small.csv")
rating = rating.drop('timestamp')

movies = spark.read.format("csv").option("header","true").option("inferSchema", "true").load(r"/content/movies_metadata.csv")
movie_data = rating.join(movies, on='id')

Columns = len(movie_data.columns)
Rows = movie_data.count()
print('Number of Columns: {}\nNumber of Rows: {}'.format(Columns, Rows))
movie_data.columns

Number of Columns: 26
Number of Rows: 44925


['id',
 'userId',
 'rating',
 'adult',
 'belongs_to_collection',
 'budget',
 'genres',
 'homepage',
 'imdb_id',
 'original_language',
 'original_title',
 'overview',
 'popularity',
 'poster_path',
 'production_companies',
 'production_countries',
 'release_date',
 'revenue',
 'runtime',
 'spoken_languages',
 'status',
 'tagline',
 'title',
 'video',
 'vote_average',
 'vote_count']

DATA CLEANING: Replace all the zeros in the abaove mentioned fields (except “Pregnancies”) with NaN.

In [37]:
import numpy as np
from pyspark.sql.functions import when

movie_data = movie_data.withColumn("userId",when(movie_data.userId==0,np.nan).otherwise(movie_data.userId))
movie_data = movie_data.withColumn("id",when(movie_data.id==0,np.nan).otherwise(movie_data.id))
movie_data = movie_data.withColumn("rating",when(movie_data.rating==0,np.nan).otherwise(movie_data.rating))
movie_data = movie_data.withColumn("title",when(movie_data.title==0,np.nan).otherwise(movie_data.title))
from pyspark.sql.types import IntegerType
movie_data = movie_data.withColumn("budget", movie_data["budget"].cast(IntegerType()))

movie_data.show()

+------+------+------+-----+---------------------+--------+--------------------+--------------------+---------+-----------------+----------------+--------------------+----------+--------------------+--------------------+--------------------+------------+---------+-------+--------------------+--------+--------------------+----------------+-----+------------+----------+
|    id|userId|rating|adult|belongs_to_collection|  budget|              genres|            homepage|  imdb_id|original_language|  original_title|            overview|popularity|         poster_path|production_companies|production_countries|release_date|  revenue|runtime|    spoken_languages|  status|             tagline|           title|video|vote_average|vote_count|
+------+------+------+-----+---------------------+--------+--------------------+--------------------+---------+-----------------+----------------+--------------------+----------+--------------------+--------------------+--------------------+------------+----

Correlations between independent variables using data visualization.
Selecting different numeric and string columns, and drawing 1 plot (e.g. bar chart, histogram, boxplot, etc.)
for each to summarise it


 Recommendation engine

Build the recommendation model using ALS on the training data

In [38]:
(training,test)=movie_data.randomSplit([0.8, 0.2]) # split into training and testing sets

This subtask requires you to implement a recommender system on Collaborative filtering with Alternative Least Squares Algorithm.

In [39]:
# Build the recommendation model using ALS on the training data
#Fitting the Alternating Least Squares Model

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5,regParam=0.09,rank=25,userCol="userId",itemCol="id",ratingCol="rating",coldStartStrategy="drop",nonnegative=True)
model = als.fit(training) # fit the ALS model to the training set

Generating Predictions & Model Evaluation:
    Evaluating a model is a core part of building an effective machine learning model. In PySpark we will be using 
    RMSE(Root mean squared Error) as our evaluation metric.
    The RMSE described our error in terms of the rating column.

In [40]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("RMSE="+str(rmse))
predictions.show()

RMSE=0.919734204284945
+---+------+------+---------------+---------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+----------+-----------+--------------------+--------------------+------------+------------+-------+----------------+------+-------+-----+-----+------------+----------+----------+
| id|userId|rating|          adult|belongs_to_collection|budget|              genres|            homepage|             imdb_id|   original_language|      original_title|  overview|popularity|poster_path|production_companies|production_countries|release_date|     revenue|runtime|spoken_languages|status|tagline|title|video|vote_average|vote_count|prediction|
+---+------+------+---------------+---------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+----------+-----------+--------------------+--------------------+------------+-

Recommending Movies with ALS:
    The approach here will be simple We will be taking a single userid example 29 as features and pass it to trained ALS Model.
    The same way we did with the test data!


In [41]:
single_user = test.filter(test['userId']==29).select(['id','userId','title','genres'])
# User had 10 ratings in the test data set 
# Realistically this should be some sort of hold out set!
single_user.show(truncate = False)

+------+------+------------------+-------------------------------------------------------------+
|id    |userId|title             |genres                                                       |
+------+------+------------------+-------------------------------------------------------------+
|1717.0|29.0  |All the King's Men|[{'id': 18, 'name': 'Drama'}, {'id': 53, 'name': 'Thriller'}]|
+------+------+------------------+-------------------------------------------------------------+



In [42]:
#Now we will use model.transform() function in order to generate recommended movies along with their predicted features.

recomendations = model.transform(single_user)
recomendations.orderBy('prediction',ascending=False).show(truncate = False )

+------+------+------------------+-------------------------------------------------------------+----------+
|id    |userId|title             |genres                                                       |prediction|
+------+------+------------------+-------------------------------------------------------------+----------+
|1717.0|29.0  |All the King's Men|[{'id': 18, 'name': 'Drama'}, {'id': 53, 'name': 'Thriller'}]|2.5289507 |
+------+------+------------------+-------------------------------------------------------------+----------+

