<a href="https://colab.research.google.com/github/amybinny/Movie-Recommendation-PySpark/blob/main/Movie_Recommendation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Movie recommendation

In the following model, movie recommndation is performed using collaborative filtering. Collaborative filtering is based on the assumption that people who agreed in the past will agree in the future, and that they will like similar kinds of items as they liked in the past. The system generates recommendations using only information about rating profiles for different users or items. By locating peer users/items with a rating history similar to the current user or item, they generate recommendations using this neighborhood. (https://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering)

## Data importing

In [None]:
import numpy as np

In [None]:
!pip install pyspark 
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 53.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=301a59aa7d2f70ed3f29c90c8b10dffb0a1af655c625e6692ebed796341b9915
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [None]:
#from google.colab import drive
#drive.mount('/content/drive')

In [None]:
#ds = spark.read.csv('/content/gdrive/MyDrive/projects/Movie Rating/movie_ratings_df.csv', header = True, inferSchema=True)
#ds.printSchema()

In [None]:
from google.colab import files
files.upload()

Saving movie_ratings_df.csv to movie_ratings_df.csv


{'movie_ratings_df.csv': b'userId,title,rating\n196,Kolya (1996),3\n63,Kolya (1996),3\n226,Kolya (1996),5\n154,Kolya (1996),3\n306,Kolya (1996),5\n296,Kolya (1996),4\n34,Kolya (1996),5\n271,Kolya (1996),4\n201,Kolya (1996),4\n209,Kolya (1996),4\n35,Kolya (1996),2\n354,Kolya (1996),5\n199,Kolya (1996),5\n113,Kolya (1996),2\n1,Kolya (1996),5\n173,Kolya (1996),5\n360,Kolya (1996),4\n234,Kolya (1996),4\n14,Kolya (1996),4\n309,Kolya (1996),4\n331,Kolya (1996),4\n21,Kolya (1996),3\n111,Kolya (1996),4\n439,Kolya (1996),5\n355,Kolya (1996),4\n204,Kolya (1996),5\n145,Kolya (1996),5\n30,Kolya (1996),5\n463,Kolya (1996),2\n144,Kolya (1996),4\n417,Kolya (1996),3\n2,Kolya (1996),5\n497,Kolya (1996),1\n523,Kolya (1996),5\n12,Kolya (1996),5\n202,Kolya (1996),3\n131,Kolya (1996),5\n451,Kolya (1996),1\n532,Kolya (1996),4\n539,Kolya (1996),5\n537,Kolya (1996),3\n416,Kolya (1996),4\n566,Kolya (1996),5\n597,Kolya (1996),4\n181,Kolya (1996),1\n639,Kolya (1996),4\n123,Kolya (1996),5\n520,Kolya (1996),5\n617

In [None]:
ds = spark.read.csv('movie_ratings_df.csv', header = True, inferSchema=True)
ds.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



In [None]:
ds.count()

100000

In [None]:
'''from pyspark.sql.functions import countDistinct, col
ds_n.select(countDistinct('userId')).show()

ds_n.agg(*(countDistinct(col(c)) for c in ds_n.columns)).show()
ds_n.select([countDistinct(col(c)) for c in ds_n.columns]).show() # either works

ds_n.groupBy(ds_n.userId).count().agg({'count': 'min'}).show()'''

"from pyspark.sql.functions import countDistinct, col\nds_n.select(countDistinct('userId')).show()\n\nds_n.agg(*(countDistinct(col(c)) for c in ds_n.columns)).show()\nds_n.select([countDistinct(col(c)) for c in ds_n.columns]).show() # either works\n\nds_n.groupBy(ds_n.userId).count().agg({'count': 'min'}).show()"

## Modeling the data using ALS classifier for recommending movies

In [None]:
from pyspark.ml.feature import StringIndexer 
indexer = StringIndexer(inputCol='title',outputCol='title_indexer')
ds_n = indexer.fit(ds).transform(ds)

train,test = ds_n.randomSplit([0.8,0.2])

In [None]:
from pyspark.ml.recommendation import ALS
ALSclas = ALS(maxIter=10
        ,regParam=0.01
        ,userCol='userId'
        ,itemCol='title_indexer'
        ,ratingCol='rating'
        ,nonnegative=True
        ,coldStartStrategy="drop")

ALSfit = ALSclas.fit(train)
predTrain = ALSfit.transform(train)
predTest = ALSfit.transform(test)
predTest.show()

+------+--------------------+------+-------------+----------+
|userId|               title|rating|title_indexer|prediction|
+------+--------------------+------+-------------+----------+
|   148|Beauty and the Be...|     4|        114.0|    4.4594|
|   148| Blade Runner (1982)|     5|         52.0|  3.562661|
|   148|Dr. Strangelove o...|     5|        123.0|  3.776698|
|   148|     Fantasia (1940)|     5|        153.0| 3.6457617|
|   148| Forrest Gump (1994)|     5|         27.0| 3.6612306|
|   148|   Free Willy (1993)|     1|        761.0|  3.058413|
|   148|Godfather, The (1...|     1|         11.0|  3.871947|
|   148|James and the Gia...|     5|        260.0| 3.3894315|
|   148|Pink Floyd - The ...|     5|        298.0| 2.7169454|
|   148|    Star Wars (1977)|     5|          0.0|  4.590154|
|   148|   Sting, The (1973)|     5|         75.0|  4.098567|
|   148|This Is Spinal Ta...|     5|        127.0|  3.315443|
|   463|Angels and Insect...|     5|        451.0| 3.0004053|
|   463|

In [None]:
# Importing Regression Evaluator to measure RMSE
from pyspark.ml.evaluation import RegressionEvaluator
# create Regressor evaluator object for measuring accuracy
evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')
# apply the RE on predictions dataframe to calculate RMSE
rmse=evaluator.evaluate(predTrain)
# print RMSE error
print(rmse)

# apply the RE on predictions dataframe to calculate RMSE
rmse=evaluator.evaluate(predTest)
# print RMSE error
print(rmse)

0.7014491827999746
1.0167532754264041


In [None]:
# trained for movies not yet watched
'''from pyspark.sql.functions import col
#we need a userId to rate at least 20 movies
def top_n_rec(user_ID, n=5):
  watched = ds.filter(ds.userId == user_ID).select('title')
  all = ds.select('title').distinct()
  new = all.subtract(watched)

  new_detailed = ds_n.filter(ds_n.title == new.title)
  return ALSclas.fit(new_detailed).transform(new_detailed
                    ).orderBy(col('prediction').desc()
                    #).select('title'
                    #).distinct(
                    ).dropDuplicates(['title']
                    ).limit(n)#.show(truncate=False)'''

"from pyspark.sql.functions import col\n#we need a userId to rate at least 20 movies\ndef top_n_rec(user_ID, n=5):\n  watched = ds.filter(ds.userId == user_ID).select('title')\n  all = ds.select('title').distinct()\n  new = all.subtract(watched)\n\n  new_detailed = ds_n.filter(ds_n.title == new.title)\n  return ALSclas.fit(new_detailed).transform(new_detailed\n                    ).orderBy(col('prediction').desc()\n                    #).select('title'\n                    #).distinct(\n                    ).dropDuplicates(['title']\n                    ).limit(n)#.show(truncate=False)"

## Create a function to execute top n movie recommendations for a certain userId

In [None]:
# trained for all movies in dataset

from pyspark.sql.functions import col
#we need a userId to rate at least 20 movies

def top_n_rec(user_ID, n=5):
  #we need to fit the model for the whole datatset
  ALSfitWhole = ALSclas.fit(ds_n)

  watched = ds.filter(ds.userId == user_ID).select('title')
  all = ds.select('title').distinct()
  new = all.subtract(watched)

  new_detailed = ds_n.filter(ds_n.title == new.title)
  
  return ALSfitWhole.transform(new_detailed
                    ).orderBy(col('prediction').desc()
                    #).select('title'
                    #).distinct(
                    ).dropDuplicates(['title']
                    ).limit(n)#.show(truncate=False)

In [None]:
a=top_n_rec(60)
a.show()

+------+--------------------+------+-------------+----------+
|userId|               title|rating|title_indexer|prediction|
+------+--------------------+------+-------------+----------+
|   310|When We Were King...|     5|        663.0|  5.515714|
|   747|       Psycho (1960)|     5|         79.0|  5.510033|
|    58|Heavenly Creature...|     5|        463.0| 5.2266345|
|   882|Snow White and th...|     5|        158.0| 5.1732755|
|   848|   Annie Hall (1977)|     5|        138.0|  5.103743|
+------+--------------------+------+-------------+----------+



In [None]:
#creating an empty RDD to create an empty dataframe
emptyRDD = spark.sparkContext.emptyRDD()
print(emptyRDD)
# create structure type for the empty datatframe
from pyspark.sql.types import StructType,StructField, StringType,IntegerType
schema = StructType([
  StructField('userId', StringType(), True),
  StructField('title', StringType(), True),
  StructField('rating', IntegerType(), True),
  StructField('title_indexer', StringType(), True),
  StructField('prediction', IntegerType(), True),
  StructField('movieFor', StringType(), True)
  ])
#Create empty DataFrame from empty RDD
d = spark.createDataFrame(emptyRDD,schema)
d.printSchema()
# Add new constanct column
from pyspark.sql.functions import lit
d.withColumn("bonus_percent", lit(0.3)).show() # just example

EmptyRDD[554] at emptyRDD at NativeMethodAccessorImpl.java:0
root
 |-- userId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title_indexer: string (nullable = true)
 |-- prediction: integer (nullable = true)
 |-- movieFor: string (nullable = true)

+------+-----+------+-------------+----------+--------+-------------+
|userId|title|rating|title_indexer|prediction|movieFor|bonus_percent|
+------+-----+------+-------------+----------+--------+-------------+
+------+-----+------+-------------+----------+--------+-------------+



In [None]:
# I learnt this trick while doing my assignment using pandas
df = spark.createDataFrame([(np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan)], # values
    ['userId','title','rating','title_indexer','prediction','movieFor','bonus_percent'])  # add your column names here

df = df.na.drop()
df.show()

+------+-----+------+-------------+----------+--------+-------------+
|userId|title|rating|title_indexer|prediction|movieFor|bonus_percent|
+------+-----+------+-------------+----------+--------+-------------+
+------+-----+------+-------------+----------+--------+-------------+



### Given a set of movie ratings for new users, the following function executes top n movie recommandations for the new users.

In [None]:
#we need a userId with at least 20 movie ratings in the form of a dataframe with column names: userId, title, ratings

def top_n_rec(df, n=5):
  
  #combine the new and the old dataset
  ds_concat = ds.union(df) # both dataframes should have same column names: userId, title, ratings
  
  indexer = StringIndexer(inputCol='title',outputCol='title_indexer')
  ds_n_concat = indexer.fit(ds_concat).transform(ds_concat)
  
  #fit the model ofr the combined dataset
  ALSfit = ALSclas.fit(ds_n_concat)
  
  #select the unique userId's from the new dataset
  dataCollect = df.dropDuplicates(['userId']).select('userId').collect()
  
  df2 = spark.createDataFrame(emptyRDD,schema)
  for row in dataCollect:
    watched = ds_n_concat.filter(ds_n_concat.userId == row['userId']).select('title')
    all = ds_n_concat.select('title').distinct()
    new = all.subtract(watched)
    new_detailed = ds_n_concat.join(new, on='title', how="right")

    a=ALSfit.transform(new_detailed
                    ).orderBy(col('prediction').desc()
                    #).select('title'
                    #).distinct(
                    ).dropDuplicates(['title']
                    ).limit(n)#.show(truncate=False)
    
    a = a.withColumn("movieFor", lit(row['userId']))
    df2 = df2.union(a)

  return df2

In [None]:
#new_detailed.show()
#ds_n_concat.filter(ds_n_concat.title == new.title).dropDuplicates(['title']).sort(col('title')).show(100)
#watched = ds_n_concat.filter(ds_n_concat.userId == 60).select('title').sort(col('title')).show(40)
#new.sort(col('title')).show(100)

In [None]:
df = ds.filter((ds.userId == 196) | (ds.userId == 60))

In [None]:
result=top_n_rec(df, n=10)
result.show()

+--------------------+-----+------+-------------+----------+--------+
|              userId|title|rating|title_indexer|prediction|movieFor|
+--------------------+-----+------+-------------+----------+--------+
|       Psycho (1960)|  747|     5|         79.0|  5.492635|     196|
|When We Were King...|  310|     5|        664.0| 5.2964754|     196|
|Snow White and th...|  882|     5|        161.0|  5.157075|     196|
|Heavenly Creature...|   58|     5|        458.0|  5.111349|     196|
|   Annie Hall (1977)|  794|     5|        138.0| 5.0567827|     196|
| Three Wishes (1995)|  760|     5|       1171.0| 5.0300636|     196|
|         Cosi (1996)|  819|     5|       1360.0| 4.8906355|     196|
| If Lucy Fell (1996)|  907|     4|        807.0| 4.7607937|     196|
|Night of the Livi...|  219|     5|        509.0|  4.584039|     196|
|    Fair Game (1995)|  279|     4|       1096.0|  3.983437|     196|
|When We Were King...|  310|     5|        664.0| 5.2964754|      60|
|Snow White and th..

In [None]:
movies = df.select('title').distinct()

In [None]:
'''from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType
sparkContext=spark.sparkContext

row = Row("movies")
row_with_index = Row("movies", "index")

df_movies = sc.parallelize(row(movies(x)) for x in range(1, 5)).toDF()
df_movies.show(5)'''

'from pyspark.sql import Row\nfrom pyspark.sql.types import StructType, StructField, LongType\nsparkContext=spark.sparkContext\n\nrow = Row("movies")\nrow_with_index = Row("movies", "index")\n\ndf_movies = sc.parallelize(row(movies(x)) for x in range(1, 5)).toDF()\ndf_movies.show(5)'

### We need a userId with at least 20 movie ratings in the form of a dataframe with column names: userId, title, ratings

In [None]:
def top_n_rec(df, n=5):
  
  #combine the new and the old dataset
  ds_concat = ds.union(df) # both dataframes should have same column names: userId, title, ratings
  
  indexer = StringIndexer(inputCol='title',outputCol='title_indexer')
  ds_n_concat = indexer.fit(ds_concat).transform(ds_concat)
  
  #fit the model ofr the combined dataset
  ALSfit = ALSclas.fit(ds_n_concat)
  
  #select the unique userId's from the new dataset
  dataCollect = df.dropDuplicates(['userId']).select('userId').collect()
  
  df2 = spark.createDataFrame(emptyRDD,schema)
  for row in dataCollect:
    watched = ds_n_concat.filter(ds_n_concat.userId == row['userId']).select('title')
    all = ds_n_concat.select('title').distinct()
    new = all.subtract(watched)
    new_detailed = ds_n_concat.join(new, on='title', how="right")

    a=ALSfit.transform(new_detailed
                    ).orderBy(col('prediction').desc()
                    #).select('title'
                    #).distinct(
                    ).dropDuplicates(['title']
                    ).limit(n)#.show(truncate=False)
    
    a = a.withColumn("movieFor", lit(row['userId']))
    df2 = df2.union(a)

  return df2

In [None]:
#df.sort(df.title).show(50)
#df.groupBy(df.title).count().orderBy(col('count').desc()).show(truncate=False)


In [None]:
#watched = ds.filter('userId = 465').select('title')
#all = ds.select('title').distinct()
#new = all.subtract(watched)

In [None]:
#all_pred = predTrain.union(predTest)
#all_pred.count()

In [None]:
#all.subtract(watched).show()

AttributeError: ignored