In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn
import random
import os

from pyspark.sql import SparkSession 
from pyspark.ml  import Pipeline     
from pyspark.sql import SQLContext  
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer

In [2]:
spark = SparkSession.builder.appName('recommender_system').getOrCreate()

In [3]:
entries = os.listdir('C:\\Users\GUEN999\Desktop\Data engineer\kaggle\dataset')
print(entries)

['gender_submission.csv', 'movie_ratings_df.csv', 'test.csv', 'train.csv']


In [4]:
# load the dataset and create sprk dataframe
df=spark.read.csv('C:\\Users\GUEN999\Desktop\Data engineer\kaggle\dataset\movie_ratings_df.csv',inferSchema=True,header=True)
# Using limit(), or select() or show() to view the data. I often use limit()
# Using toPandas() method to return Pyspark DataFrame as Pandas table
df.limit(3).toPandas()

Unnamed: 0,userId,title,rating
0,196,Kolya (1996),3
1,63,Kolya (1996),3
2,226,Kolya (1996),5


In [5]:
df.show()

+------+------------+------+
|userId|       title|rating|
+------+------------+------+
|   196|Kolya (1996)|     3|
|    63|Kolya (1996)|     3|
|   226|Kolya (1996)|     5|
|   154|Kolya (1996)|     3|
|   306|Kolya (1996)|     5|
|   296|Kolya (1996)|     4|
|    34|Kolya (1996)|     5|
|   271|Kolya (1996)|     4|
|   201|Kolya (1996)|     4|
|   209|Kolya (1996)|     4|
|    35|Kolya (1996)|     2|
|   354|Kolya (1996)|     5|
|   199|Kolya (1996)|     5|
|   113|Kolya (1996)|     2|
|     1|Kolya (1996)|     5|
|   173|Kolya (1996)|     5|
|   360|Kolya (1996)|     4|
|   234|Kolya (1996)|     4|
|    14|Kolya (1996)|     4|
|   309|Kolya (1996)|     4|
+------+------------+------+
only showing top 20 rows



In [6]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



In [7]:
from pyspark.ml.feature import StringIndexer, IndexToString
stringIndexer = StringIndexer(inputCol='title', outputCol='title_new')
# Applying stringindexer object on dataframe movie title column
model = stringIndexer.fit(df)
#creating new dataframe with transformed values
indexed = model.transform(df)
#validate the numerical title values
indexed.limit(5).toPandas()

Unnamed: 0,userId,title,rating,title_new
0,196,Kolya (1996),3,287.0
1,63,Kolya (1996),3,287.0
2,226,Kolya (1996),5,287.0
3,154,Kolya (1996),3,287.0
4,306,Kolya (1996),5,287.0


In [8]:
train, test = indexed.randomSplit([0.75, 0.25])
from pyspark.ml.recommendation import ALS

rec = ALS( maxIter=10
          ,regParam=0.01
          ,userCol='userId'
          ,itemCol='userId'
          ,ratingCol='rating'
          ,nonnegative=True
          ,coldStartStrategy="drop")

rec_model=rec.fit(train)

predicted_ratings=rec_model.transform(test)
predicted_ratings.limit(5).toPandas()

Unnamed: 0,userId,title,rating,title_new,prediction
0,148,2001: A Space Odyssey (1968),5,59.0,4.062721
1,148,Back to the Future (1985),3,20.0,4.062721
2,148,Brazil (1985),4,109.0,4.062721
3,148,Cinderella (1950),3,243.0,4.062721
4,148,Cold Comfort Farm (1995),5,262.0,4.062721


In [9]:
# Importing Regression Evaluator to measure RMSE
from pyspark.ml.evaluation import RegressionEvaluator
# create Regressor evaluator object for measuring accuracy
evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')
# apply the RE on predictions dataframe to calculate RMSE
rmse=evaluator.evaluate(predicted_ratings)
# print RMSE error
print(rmse)

1.0469546136238974


In [10]:
# First we need to create dataset of all distinct movies 
unique_movies=indexed.select('title_new').distinct()
unique_movies.show()

+---------+
|title_new|
+---------+
|    305.0|
|    596.0|
|    299.0|
|    769.0|
|    692.0|
|    934.0|
|   1051.0|
|    496.0|
|    558.0|
|    170.0|
|    184.0|
|    576.0|
|    147.0|
|    810.0|
|    720.0|
|    782.0|
|   1369.0|
|   1587.0|
|    160.0|
|    608.0|
+---------+
only showing top 20 rows

