In [1]:
#import and create sparksession object

In [6]:
from pyspark.sql import SparkSession 
spark = SparkSession\
    .builder\
    .appName('Recommend-System-Pyspark')\
    .getOrCreate()

In [7]:
spark

In [120]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

### 准备数据集

In [9]:
# load the dataset and create sprk dataframe

In [152]:
df = spark.read.csv('/data/movie_ratings_df.csv',
                    inferSchema=True,
                    header=True)

In [11]:
# validate the shape of the data 

In [12]:
print((df.count(),len(df.columns)))

(100000, 3)


In [13]:
#check columns in dataframe

In [14]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



In [15]:
# alidate few rows of dataframe in random order

### 数据统计

In [16]:
df.orderBy(rand()).show(10,False)

+------+------------------------------------+------+
|userId|title                               |rating|
+------+------------------------------------+------+
|544   |Jackie Brown (1997)                 |4     |
|884   |Fargo (1996)                        |5     |
|38    |Jury Duty (1995)                    |1     |
|367   |Jaws (1975)                         |4     |
|286   |Tommy Boy (1995)                    |1     |
|882   |Terminator, The (1984)              |5     |
|200   |Pulp Fiction (1994)                 |4     |
|813   |Beautician and the Beast, The (1997)|3     |
|226   |Unforgiven (1992)                   |5     |
|638   |Full Metal Jacket (1987)            |3     |
+------+------------------------------------+------+
only showing top 10 rows



In [17]:
# check number of ratings by each user

In [18]:
df.groupBy('userId')\
    .count()\
    .orderBy('count',ascending=False)\
    .show(10,False)

+------+-----+
|userId|count|
+------+-----+
|405   |737  |
|655   |685  |
|13    |636  |
|450   |540  |
|276   |518  |
|416   |493  |
|537   |490  |
|303   |484  |
|234   |480  |
|393   |448  |
+------+-----+
only showing top 10 rows



In [None]:
# number of times movie been rated 

In [19]:
df.groupBy('title')\
    .count()\
    .orderBy('count',ascending=False)\
    .show(10,False)

+-----------------------------+-----+
|title                        |count|
+-----------------------------+-----+
|Star Wars (1977)             |583  |
|Contact (1997)               |509  |
|Fargo (1996)                 |508  |
|Return of the Jedi (1983)    |507  |
|Liar Liar (1997)             |485  |
|English Patient, The (1996)  |481  |
|Scream (1996)                |478  |
|Toy Story (1995)             |452  |
|Air Force One (1997)         |431  |
|Independence Day (ID4) (1996)|429  |
+-----------------------------+-----+
only showing top 10 rows



### Pyspark ML Pipeline

In [21]:
# import String indexer to convert string values to numeric values
# Training the recommender model using train datatset

In [153]:
from pyspark.ml.feature import StringIndexer,IndexToString
from pyspark.ml import Pipeline

In [None]:
#creating string indexer to convert the movie title column values into numerical values

In [154]:
stringIndexer = StringIndexer(
    inputCol="title",            
    outputCol="title_new")

In [155]:
pipeline = Pipeline(stages = [stringIndexer])

In [156]:
pipeline.getStages()

[StringIndexer_52f3d03f78c8]

In [157]:
pipelineModel = pipeline.fit(df)

In [158]:
index_df = pipelineModel.transform(df)

In [159]:
index_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title_new: double (nullable = false)



In [160]:
index_df.show(10, False)

+------+------------+------+---------+
|userId|title       |rating|title_new|
+------+------------+------+---------+
|196   |Kolya (1996)|3     |287.0    |
|63    |Kolya (1996)|3     |287.0    |
|226   |Kolya (1996)|5     |287.0    |
|154   |Kolya (1996)|3     |287.0    |
|306   |Kolya (1996)|5     |287.0    |
|296   |Kolya (1996)|4     |287.0    |
|34    |Kolya (1996)|5     |287.0    |
|271   |Kolya (1996)|4     |287.0    |
|201   |Kolya (1996)|4     |287.0    |
|209   |Kolya (1996)|4     |287.0    |
+------+------------+------+---------+
only showing top 10 rows



In [73]:
#number of times each numerical movie title has been rated 

In [161]:
index_df\
    .groupBy('title_new')\
    .count()\
    .orderBy('count',ascending=False)\
    .show(10,False)

+---------+-----+
|title_new|count|
+---------+-----+
|0.0      |583  |
|1.0      |509  |
|2.0      |508  |
|3.0      |507  |
|4.0      |485  |
|5.0      |481  |
|6.0      |478  |
|7.0      |452  |
|8.0      |431  |
|9.0      |429  |
+---------+-----+
only showing top 10 rows



In [165]:
index_df.select("title", "title_new")\
    .distinct()\
    .orderBy("title_new",ascending=True)\
    .show(10,False)

+-----------------------------+---------+
|title                        |title_new|
+-----------------------------+---------+
|Star Wars (1977)             |0.0      |
|Contact (1997)               |1.0      |
|Fargo (1996)                 |2.0      |
|Return of the Jedi (1983)    |3.0      |
|Liar Liar (1997)             |4.0      |
|English Patient, The (1996)  |5.0      |
|Scream (1996)                |6.0      |
|Toy Story (1995)             |7.0      |
|Air Force One (1997)         |8.0      |
|Independence Day (ID4) (1996)|9.0      |
+-----------------------------+---------+
only showing top 10 rows



### 切分数据集

In [81]:
train_df, test_df = index_df.randomSplit([0.75, 0.25])

In [82]:
print("Train Dataset Num :", train_df.count())
print("Test Dataset Num :", test_df.count())

Train Dataset Num : 75125
Test Dataset Num : 24875


### 训练模型

In [79]:
from pyspark.ml.recommendation import ALS

In [78]:
als = ALS(maxIter=10,
          regParam=0.01,
          userCol='userId',
          itemCol='title_new',
          ratingCol='rating',
          nonnegative=True,
          coldStartStrategy="drop")

In [83]:
rec_model = als.fit(train_df)

In [84]:
predicted_df = rec_model.transform(test_df)

In [85]:
predicted_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title_new: double (nullable = false)
 |-- prediction: float (nullable = false)



In [87]:
predicted_df.orderBy(rand()).show(10, False)

+------+---------------------------------------------+------+---------+----------+
|userId|title                                        |rating|title_new|prediction|
+------+---------------------------------------------+------+---------+----------+
|213   |Sting, The (1973)                            |4     |75.0     |4.394649  |
|499   |Alien (1979)                                 |4     |44.0     |3.3748245 |
|189   |Ben-Hur (1959)                               |4     |270.0    |4.327587  |
|294   |Romy and Michele's High School Reunion (1997)|2     |339.0    |3.3786664 |
|410   |Postman, The (1997)                          |3     |552.0    |4.132841  |
|416   |Manchurian Candidate, The (1962)             |5     |239.0    |4.8652353 |
|910   |Twister (1996)                               |3     |42.0     |2.8483937 |
|347   |Young Frankenstein (1974)                    |2     |117.0    |4.493652  |
|855   |Wings of Desire (1987)                       |4     |562.0    |3.8445659 |
|925

### 模型评估

In [88]:
# importing Regression Evaluator to measure RMSE

In [89]:
from pyspark.ml.evaluation import RegressionEvaluator

In [90]:
evaluator = RegressionEvaluator(
    metricName='rmse',
    predictionCol='prediction',
    labelCol='rating')

In [91]:
rmse = evaluator.evaluate(predicted_df)

In [92]:
print('RMSE: ', rmse)

RMSE:  1.0146951683205292


### 创建推荐系统

In [97]:
#create dataset of all distinct movies 

In [99]:
unique_movies = index_df.select('title_new').distinct()

In [100]:
unique_movies.count()

1664

In [103]:
a = unique_movies.alias('a')

In [106]:
a.show(5)

+---------+
|title_new|
+---------+
|    558.0|
|    305.0|
|    299.0|
|    596.0|
|    769.0|
+---------+
only showing top 5 rows



In [107]:
user_id = 85

In [None]:
# creating another dataframe which contains already watched movie by active user 

In [109]:
watched_movies = index_df\
    .filter(index_df['userId'] == user_id)\
    .select('title_new')\
    .distinct()

In [110]:
watched_movies.count()

287

In [111]:
b = watched_movies.alias('b')

In [112]:
b.show(5)

+---------+
|title_new|
+---------+
|    305.0|
|    496.0|
|    184.0|
|    147.0|
|     70.0|
+---------+
only showing top 5 rows



In [113]:
total_movies = a.join(
    b, 
    a.title_new == b.title_new,
    how='left')

In [114]:
total_movies.show(5, False)

+---------+---------+
|title_new|title_new|
+---------+---------+
|558.0    |null     |
|305.0    |305.0    |
|299.0    |null     |
|596.0    |null     |
|769.0    |null     |
+---------+---------+
only showing top 5 rows



In [124]:
# 过滤掉用户已经看过或评分过的电影

In [121]:
remaining_movies = total_movies\
    .where(F.col("b.title_new").isNull())\
    .select(a.title_new)\
    .distinct()

In [122]:
remaining_movies.show(5, False)

+---------+
|title_new|
+---------+
|558.0    |
|299.0    |
|596.0    |
|769.0    |
|934.0    |
+---------+
only showing top 5 rows



In [125]:
remaining_movies = remaining_movies.withColumn("userId", F.lit(int(user_id)))

In [126]:
remaining_movies.show(10, False)

+---------+------+
|title_new|userId|
+---------+------+
|558.0    |85    |
|299.0    |85    |
|596.0    |85    |
|769.0    |85    |
|934.0    |85    |
|1051.0   |85    |
|692.0    |85    |
|810.0    |85    |
|720.0    |85    |
|782.0    |85    |
+---------+------+
only showing top 10 rows



In [127]:
# making recommendations using ALS recommender model and selecting only topN movies

In [128]:
recommendations = rec_model\
    .transform(remaining_movies)\
    .orderBy('prediction',ascending=False)

In [129]:
recommendations.show(5,False)

+---------+------+----------+
|title_new|userId|prediction|
+---------+------+----------+
|1328.0   |85    |4.8966446 |
|1271.0   |85    |4.7829847 |
|1132.0   |85    |4.7179346 |
|288.0    |85    |4.656434  |
|1367.0   |85    |4.6416006 |
+---------+------+----------+
only showing top 5 rows



In [130]:
#converting title_new values back to movie titles

In [145]:
movie_title = IndexToString(
    inputCol="title_new",
    outputCol="title",
    labels=pipelineModel.stages[0].labels)

final_recommendations = movie_title.transform(recommendations)

In [146]:
final_recommendations.show(10, False)

+---------+------+----------+-----------------------------------------------+
|title_new|userId|prediction|title                                          |
+---------+------+----------+-----------------------------------------------+
|1328.0   |85    |4.8966446 |Legal Deceit (1997)                            |
|1271.0   |85    |4.7829847 |Whole Wide World, The (1996)                   |
|1132.0   |85    |4.7179346 |Incognito (1997)                               |
|288.0    |85    |4.656434  |Hoop Dreams (1994)                             |
|1367.0   |85    |4.6416006 |Maya Lin: A Strong Clear Vision (1994)         |
|285.0    |85    |4.621625  |Wrong Trousers, The (1993)                     |
|514.0    |85    |4.5345607 |Jean de Florette (1986)                        |
|967.0    |85    |4.5279994 |Thirty-Two Short Films About Glenn Gould (1993)|
|1468.0   |85    |4.5212555 |Anna (1996)                                    |
|638.0    |85    |4.498848  |Shall We Dance? (1996)             

In [147]:
def get_top_movies(user_id, n):
    """
    This function returns the top 'n' movies that user has not seen yet but might like 
    
    """
    #assigning alias name 'a' to unique movies df
    a = unique_movies.alias('a')
    
    #creating another dataframe which contains already watched movie by active user 
    watched_movies = index_df\
        .filter(index_df['userId'] == user_id)\
        .select('title_new')\
        .distinct()    
    
    #assigning alias name 'b' to watched movies df
    b = watched_movies.alias('b')
    
    #joining both tables on left join 
    total_movies = a.join(
        b, 
        a.title_new == b.title_new,
        how='left')
    
    #selecting movies which active user is yet to rate or watch
    remaining_movies = total_movies\
        .where(F.col("b.title_new").isNull())\
        .select(a.title_new)\
        .distinct()
    
    
    #adding new column of user_Id of active useer to remaining movies df 
    remaining_movies = remaining_movies.withColumn("userId", F.lit(int(user_id)))
    
    
    #making recommendations using ALS recommender model and selecting only top 'n' movies
    recommendations = rec_model\
        .transform(remaining_movies)\
        .orderBy('prediction',ascending=False)\
        .limit(n)
    
    
    #adding columns of movie titles in recommendations
    movie_title = IndexToString(
        inputCol="title_new",
        outputCol="title",
        labels=pipelineModel.stages[0].labels)
    
    final_recommendations = movie_title.transform(recommendations)    
    
    #return the recommendations to active user
    return final_recommendations.show(truncate=False)

In [149]:
get_top_movies(85, 10)

+---------+------+----------+-----------------------------------------------+
|title_new|userId|prediction|title                                          |
+---------+------+----------+-----------------------------------------------+
|1328.0   |85    |4.8966446 |Legal Deceit (1997)                            |
|1271.0   |85    |4.7829847 |Whole Wide World, The (1996)                   |
|1132.0   |85    |4.7179346 |Incognito (1997)                               |
|288.0    |85    |4.656434  |Hoop Dreams (1994)                             |
|1367.0   |85    |4.6416006 |Maya Lin: A Strong Clear Vision (1994)         |
|285.0    |85    |4.621625  |Wrong Trousers, The (1993)                     |
|514.0    |85    |4.5345607 |Jean de Florette (1986)                        |
|967.0    |85    |4.5279994 |Thirty-Two Short Films About Glenn Gould (1993)|
|1468.0   |85    |4.5212555 |Anna (1996)                                    |
|638.0    |85    |4.498848  |Shall We Dance? (1996)             