In [1]:
!pip install pyspark

Collecting pyspark
  Using cached pyspark-3.2.1-py2.py3-none-any.whl
Collecting py4j==0.10.9.3
  Using cached py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [54]:
import numpy as np 
import pandas as pd 
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
from pyspark.sql.functions import udf, col

from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator
import seaborn as sns
import matplotlib.pyplot as plt

In [55]:
spark = SparkSession.builder.appName("lab_pyspark").getOrCreate()
spark.version

'3.2.1'

In [56]:
data_user_info = (
    spark.read
    .option("header",True)
    .option('interSchema', True)
    .csv('dataset/movie_ratings_df.csv')
)
data_user_info.printSchema()

root
 |-- userId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: string (nullable = true)



In [57]:
data_user_info.show(3)

+------+------------+------+
|userId|       title|rating|
+------+------------+------+
|   196|Kolya (1996)|     3|
|    63|Kolya (1996)|     3|
|   226|Kolya (1996)|     5|
+------+------------+------+
only showing top 3 rows



In [58]:
from pyspark.sql.types import IntegerType
data_user_info = data_user_info.withColumn("userId", data_user_info["userId"].cast(IntegerType()))
data_user_info = data_user_info.withColumn("rating", data_user_info["rating"].cast(IntegerType()))

In [59]:
data_user_info.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



In [60]:
result_df = data_user_info.groupBy("userId").count().sort("count", ascending=False)
result_df.show(10)

+------+-----+
|userId|count|
+------+-----+
|   405|  737|
|   655|  685|
|    13|  636|
|   450|  540|
|   276|  518|
|   416|  493|
|   537|  490|
|   303|  484|
|   234|  480|
|   393|  448|
+------+-----+
only showing top 10 rows



In [61]:
from pyspark.ml.feature import StringIndexer, IndexToString
stringIndexer = StringIndexer(inputCol='title', outputCol='title_new')
# Applying stringindexer object on dataframe movie title column
model = stringIndexer.fit(data_user_info)
#creating new dataframe with transformed values
indexed = model.transform(data_user_info)
#validate the numerical title values
indexed.limit(5).toPandas()

Unnamed: 0,userId,title,rating,title_new
0,196,Kolya (1996),3,287.0
1,63,Kolya (1996),3,287.0
2,226,Kolya (1996),5,287.0
3,154,Kolya (1996),3,287.0
4,306,Kolya (1996),5,287.0


In [62]:
train, test = indexed.randomSplit([0.75,0.25])
from pyspark.ml.recommendation import ALS

#Training the recommender model using train datatset
rec=ALS( maxIter=10
        ,regParam=0.01
        ,userCol='userId'
        ,itemCol='title_new'
        ,ratingCol='rating'
        ,nonnegative=True
        ,coldStartStrategy="drop")

#fit the model on train set
model=rec.fit(train)

#making predictions on test set 
predicted_rating=model.transform(test)
predicted_rating.limit(5).toPandas()

Unnamed: 0,userId,title,rating,title_new,prediction
0,148,Around the World in 80 Days (1956),4,540.0,2.561407
1,148,Blade Runner (1982),5,52.0,4.238058
2,148,Brazil (1985),4,109.0,5.367517
3,148,"Close Shave, A (1995)",5,302.0,5.725676
4,148,"Deer Hunter, The (1978)",1,280.0,2.52692


In [63]:
from pyspark.ml.evaluation import RegressionEvaluator
# create Regressor evaluator object for measuring accuracy
evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')
# apply the RE on predictions dataframe to calculate RMSE
rmse=evaluator.evaluate(predicted_rating)
# print RMSE error
print(rmse)

1.014474343980139


In [64]:
import pyspark.sql.functions as sf

unique_movies=indexed.select('title_new').distinct()
#create function to recommend top 'n' movies to any particular user
def top_movies(user_id,n):
    """
    This function returns the top 'n' movies that user has not seen yet but might like 
    
    """
    #assigning alias name 'a' to unique movies df
    a = unique_movies.alias('a')
    
    #creating another dataframe which contains already watched movie by active user 
    watched_movies=indexed.filter(indexed['userId'] == user_id).select('title_new')
    
    #assigning alias name 'b' to watched movies df
    b=watched_movies.alias('b')
    
    #joining both tables on left join 
    total_movies = a.join(b, a.title_new == b.title_new,how='left')
    
    #selecting movies which active user is yet to rate or watch
    remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()
    #adding new column of user_Id of active useer to remaining movies df 
    remaining_movies=remaining_movies.withColumn("userId",sf.lit(int(user_id)))
    
    
    #making recommendations using ALS recommender model and selecting only top 'n' movies
    recommendations=model.transform(remaining_movies).orderBy('prediction',ascending=False).limit(n)
    #adding columns of movie titles in recommendations
    movie_title = IndexToString(inputCol="title_new", outputCol="title", )
    final_recommendations=movie_title.transform(recommendations)
    
    #return the recommendations to active user
    return final_recommendations.show(n,False)

# Test: recommend 5 movies for user of id=60
top_movies(60,5)

+---------+------+----------+----------------------+
|title_new|userId|prediction|title                 |
+---------+------+----------+----------------------+
|1347.0   |60    |6.432439  |Angel Baby (1995)     |
|1265.0   |60    |6.052894  |Duoluo tianshi (1995) |
|1152.0   |60    |5.569359  |Grass Harp, The (1995)|
|1054.0   |60    |5.4585047 |Primary Colors (1998) |
|1057.0   |60    |5.4487395 |Safe (1995)           |
+---------+------+----------+----------------------+

