In [1]:
#import and create sparksession object
from pyspark.sql import SparkSession 
spark=SparkSession.builder.appName('rc').getOrCreate()

In [2]:
df=spark.read.csv('movie_ratings_df.csv',inferSchema=True,header=True)

In [3]:
print((df.count(),len(df.columns)))

(100000, 3)


In [4]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



In [5]:
from pyspark.sql.functions import *

In [6]:
df.orderBy(rand()).show(10,False)

+------+------------------------------------+------+
|userId|title                               |rating|
+------+------------------------------------+------+
|472   |Chain Reaction (1996)               |5     |
|772   |Fire Down Below (1997)              |3     |
|943   |Braveheart (1995)                   |4     |
|255   |Jaws (1975)                         |5     |
|815   |Hamlet (1996)                       |5     |
|897   |Striptease (1996)                   |3     |
|644   |Saint, The (1997)                   |4     |
|416   |Bridge on the River Kwai, The (1957)|5     |
|776   |Picnic (1955)                       |3     |
|119   |Men in Black (1997)                 |4     |
+------+------------------------------------+------+
only showing top 10 rows



### 简单EDA

In [7]:
df.groupBy("userId").count().orderBy('count',ascending=False).show(5,False)

+------+-----+
|userId|count|
+------+-----+
|405   |737  |
|655   |685  |
|13    |636  |
|450   |540  |
|276   |518  |
+------+-----+
only showing top 5 rows



In [8]:
df.groupBy("title").count().orderBy('count',ascending=False).show(5,False)

+-------------------------+-----+
|title                    |count|
+-------------------------+-----+
|Star Wars (1977)         |583  |
|Contact (1997)           |509  |
|Fargo (1996)             |508  |
|Return of the Jedi (1983)|507  |
|Liar Liar (1997)         |485  |
+-------------------------+-----+
only showing top 5 rows



### 特征工程

In [9]:
from pyspark.ml.feature import StringIndexer,IndexToString

In [10]:
stringIndexer = StringIndexer(inputCol="title", outputCol="title_new")

In [11]:
model = stringIndexer.fit(df)

In [12]:
indexed = model.transform(df)

In [13]:
indexed.orderBy(rand()).show(10,False)

+------+-----------------------------------+------+---------+
|userId|title                              |rating|title_new|
+------+-----------------------------------+------+---------+
|854   |Jack (1996)                        |2     |464.0    |
|907   |GoodFellas (1990)                  |5     |87.0     |
|125   |Dante's Peak (1997)                |3     |76.0     |
|246   |Twister (1996)                     |1     |43.0     |
|651   |Cold Comfort Farm (1995)           |2     |262.0    |
|645   |Schindler's List (1993)            |5     |36.0     |
|16    |Bronx Tale, A (1993)               |5     |615.0    |
|715   |Clerks (1994)                      |5     |207.0    |
|561   |Mighty Aphrodite (1995)            |3     |133.0    |
|332   |Star Trek: The Wrath of Khan (1982)|5     |72.0     |
+------+-----------------------------------+------+---------+
only showing top 10 rows



In [15]:
indexed.groupBy("title_new").count().orderBy('count',ascending=False).show(5,False)

+---------+-----+
|title_new|count|
+---------+-----+
|0.0      |583  |
|1.0      |509  |
|2.0      |508  |
|3.0      |507  |
|4.0      |485  |
+---------+-----+
only showing top 5 rows



### 划分数据集

In [16]:
train,test=indexed.randomSplit([0.75,0.25])

In [17]:
train.count()

75038

In [18]:
test.count()

24962

### 构建模型

In [19]:
from pyspark.ml.recommendation import ALS

In [20]:
alsExplicit=ALS(maxIter=10,regParam=0.01,userCol='userId',itemCol='title_new',ratingCol='rating',nonnegative=True,coldStartStrategy="drop")

In [23]:
alsImplicit = ALS(maxIter=10, regParam=0.01, implicitPrefs=True,userCol="userId", itemCol="title_new", ratingCol="rating",nonnegative=True,coldStartStrategy="drop")

In [24]:
modelExplicit = alsExplicit.fit(train)
modelImplicit = alsImplicit.fit(train)

In [25]:
predictionsExplicit = modelExplicit.transform(test)
predictionsImplicit = modelImplicit.transform(test)

In [26]:
predictionsExplicit.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title_new: double (nullable = false)
 |-- prediction: float (nullable = false)



In [27]:
predictionsImplicit.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title_new: double (nullable = false)
 |-- prediction: float (nullable = false)



In [32]:
predictionsExplicit.orderBy(rand()).show(10,False)

+------+--------------------------------------------------------+------+---------+----------+
|userId|title                                                   |rating|title_new|prediction|
+------+--------------------------------------------------------+------+---------+----------+
|716   |Mrs. Parker and the Vicious Circle (1994)               |4     |898.0    |3.5278287 |
|21    |Bad Moon (1996)                                         |2     |1257.0   |3.059618  |
|704   |Adventures of Priscilla, Queen of the Desert, The (1994)|4     |305.0    |3.9275265 |
|614   |Glimmer Man, The (1996)                                 |2     |617.0    |4.8782835 |
|758   |Jaws (1975)                                             |4     |49.0     |4.1439214 |
|913   |Stand by Me (1986)                                      |4     |86.0     |2.9935067 |
|194   |Raiders of the Lost Ark (1981)                          |4     |10.0     |3.7930808 |
|561   |True Romance (1993)                                 

In [33]:
predictionsImplicit.orderBy(rand()).show(10,False)

+------+------------------------------------+------+---------+----------+
|userId|title                               |rating|title_new|prediction|
+------+------------------------------------+------+---------+----------+
|648   |Somewhere in Time (1980)            |3     |407.0    |0.33995605|
|308   |Forrest Gump (1994)                 |2     |27.0     |0.88827765|
|344   |Interview with the Vampire (1994)   |3     |225.0    |0.16361305|
|622   |Bridge on the River Kwai, The (1957)|5     |175.0    |0.6002901 |
|295   |Miracle on 34th Street (1994)       |4     |330.0    |0.7866774 |
|385   |Godfather, The (1972)               |4     |11.0     |0.73441947|
|862   |Shining, The (1980)                 |5     |113.0    |0.8084216 |
|655   |Bound (1996)                        |3     |242.0    |1.1908602 |
|608   |Game, The (1997)                    |4     |67.0     |0.4708835 |
|214   |Twelve Monkeys (1995)               |5     |13.0     |0.83955616|
+------+------------------------------

### 评估

In [34]:
from pyspark.ml.evaluation import RegressionEvaluator

In [35]:
evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')

In [36]:
rmseExplicit = evaluator.evaluate(predictionsExplicit)
rmseImplicit = evaluator.evaluate(predictionsImplicit)

In [37]:
rmseExplicit

1.0352919110659273

In [38]:
rmseImplicit

3.167296329945288

### 推荐

In [39]:
unique_movies=indexed.select('title_new').distinct()

In [40]:
unique_movies.count()

1664

In [44]:
unique_movies.show(10)

+---------+
|title_new|
+---------+
|    305.0|
|    596.0|
|    299.0|
|    769.0|
|    692.0|
|    934.0|
|   1051.0|
|    496.0|
|    558.0|
|    170.0|
+---------+
only showing top 10 rows



In [42]:
a=unique_movies.alias('a')

In [43]:
a.show()

+---------+
|title_new|
+---------+
|    305.0|
|    596.0|
|    299.0|
|    769.0|
|    692.0|
|    934.0|
|   1051.0|
|    496.0|
|    558.0|
|    170.0|
|    184.0|
|    576.0|
|    147.0|
|    810.0|
|    720.0|
|    782.0|
|   1369.0|
|   1587.0|
|    160.0|
|    608.0|
+---------+
only showing top 20 rows



### 选定用户

In [45]:
user_id=85

In [46]:
watched_movies=indexed.filter(indexed['userId'] == user_id).select('title_new').distinct()

In [47]:
watched_movies.count()

287

In [48]:
b=watched_movies.alias('b')

### 选出用户没有观看的电影

In [49]:
total_movies = a.join(b, a.title_new == b.title_new,how='left')

In [51]:
total_movies.show(10)

+---------+---------+
|title_new|title_new|
+---------+---------+
|    305.0|    305.0|
|    596.0|     null|
|    299.0|     null|
|    769.0|     null|
|    692.0|     null|
|    934.0|     null|
|   1051.0|     null|
|    496.0|     null|
|    558.0|    558.0|
|    170.0|     null|
+---------+---------+
only showing top 10 rows



In [52]:
remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()

In [54]:
remaining_movies.show(10)

+---------+
|title_new|
+---------+
|    596.0|
|    299.0|
|    769.0|
|    692.0|
|    934.0|
|   1051.0|
|    496.0|
|    170.0|
|    184.0|
|    576.0|
+---------+
only showing top 10 rows



In [55]:
remaining_movies.count()

1377

增加userid

In [56]:
remaining_movies=remaining_movies.withColuymn("userId",lit(int(user_id)))

In [58]:
remaining_movies.show(10)

+---------+------+
|title_new|userId|
+---------+------+
|    596.0|    85|
|    299.0|    85|
|    769.0|    85|
|    692.0|    85|
|    934.0|    85|
|   1051.0|    85|
|    496.0|    85|
|    170.0|    85|
|    184.0|    85|
|    576.0|    85|
+---------+------+
only showing top 10 rows



### 为选定用户预测

In [59]:
recommendations=modelExplicit.transform(remaining_movies).orderBy('prediction',ascending=False)

In [60]:
recommendations.show()

+---------+------+----------+
|title_new|userId|prediction|
+---------+------+----------+
|   1128.0|    85| 5.4916253|
|    829.0|    85| 4.7110386|
|   1219.0|    85| 4.6468077|
|    928.0|    85| 4.5846024|
|   1465.0|    85| 4.5183053|
|   1054.0|    85| 4.5009017|
|   1542.0|    85|  4.498245|
|   1517.0|    85|  4.498245|
|   1432.0|    85|  4.498245|
|   1632.0|    85|  4.498245|
|    496.0|    85| 4.4632254|
|    787.0|    85| 4.4523687|
|    494.0|    85| 4.4508457|
|   1360.0|    85|  4.438513|
|    302.0|    85|  4.437845|
|    108.0|    85|  4.412201|
|   1494.0|    85| 4.3672028|
|   1410.0|    85| 4.3464622|
|    938.0|    85| 4.3400383|
|    261.0|    85| 4.3393865|
+---------+------+----------+
only showing top 20 rows



In [61]:
movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)

In [62]:
final_recommendations=movie_title.transform(recommendations)

In [63]:
final_recommendations.show(10,False)

+---------+------+----------+-----------------------------------------------------------+
|title_new|userId|prediction|title                                                      |
+---------+------+----------+-----------------------------------------------------------+
|1128.0   |85    |5.4916253 |Incognito (1997)                                           |
|829.0    |85    |4.7110386 |My Man Godfrey (1936)                                      |
|1219.0   |85    |4.6468077 |Faster Pussycat! Kill! Kill! (1965)                        |
|928.0    |85    |4.5846024 |Paradise Lost: The Child Murders at Robin Hood Hills (1996)|
|1465.0   |85    |4.5183053 |Anna (1996)                                                |
|1054.0   |85    |4.5009017 |Primary Colors (1998)                                      |
|1432.0   |85    |4.498245  |Joy Luck Club, The (1993)                                  |
|1517.0   |85    |4.498245  |Slingshot, The (1993)                                      |
|1632.0   

### 定义函数

In [37]:
unique_movies=indexed.select('title_new').distinct()

In [66]:
#create function to recommend top 'n' movies to any particular user
def top_movies(user_id,n):
    """
    This function returns the top 'n' movies that user has not seen yet but might like 
    
    """
    #给电影名称分配一个别名a，用来join操作
    a = unique_movies.alias('a')
    
    #创建选取的用户看过的电影名称【itemid】
    watched_movies=indexed.filter(indexed['userId'] == user_id).select('title_new')
    
    #给看过的电影名称分配一个别名b，用来join操作
    b=watched_movies.alias('b')
    
    #join
    total_movies = a.join(b, a.title_new == b.title_new,how='left')
    
    #去掉用户看过的电影来创建推荐电影池
    remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()
    
    
    #添加一个列，用户的userid
    remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))
    
    
    #用前面训练好的模型来进行预测
    recommendations=modelExplicit.transform(remaining_movies).orderBy('prediction',ascending=False).limit(n)
    
    
    #itemID转换为title
    movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)
    final_recommendations=movie_title.transform(recommendations)
    
    #返回推荐列表
    return final_recommendations.show(n,False)

In [67]:
top_movies(85,10)

+---------+------+----------+-----------------------------------------------------------+
|title_new|userId|prediction|title                                                      |
+---------+------+----------+-----------------------------------------------------------+
|1128.0   |85    |5.4916253 |Incognito (1997)                                           |
|829.0    |85    |4.7110386 |My Man Godfrey (1936)                                      |
|1219.0   |85    |4.6468077 |Faster Pussycat! Kill! Kill! (1965)                        |
|928.0    |85    |4.5846024 |Paradise Lost: The Child Murders at Robin Hood Hills (1996)|
|1465.0   |85    |4.5183053 |Anna (1996)                                                |
|1054.0   |85    |4.5009017 |Primary Colors (1998)                                      |
|1632.0   |85    |4.498245  |Spanish Prisoner, The (1997)                               |
|1517.0   |85    |4.498245  |Slingshot, The (1993)                                      |
|1542.0   