In [1]:
# ch07 推荐系统
# https://blog.csdn.net/liulingyuan6/article/details/53489390
# 步骤1: 创建SparkSession对象
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('lin_reg').getOrCreate()

In [2]:
# 步骤2: 读取数据集
df = spark.read.csv('movie_ratings_df.csv', inferSchema=True, header=True)

In [5]:
# 步骤3: 探究式数据分析
print(df.count(), len(df.columns))

100000 3


In [6]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



In [9]:
from pyspark.sql.functions import rand
df.orderBy(rand()).show(10, False)

+------+----------------------------------------------+------+
|userId|title                                         |rating|
+------+----------------------------------------------+------+
|389   |Ghost and Mrs. Muir, The (1947)               |4     |
|293   |Manon of the Spring (Manon des sources) (1986)|3     |
|848   |39 Steps, The (1935)                          |5     |
|788   |Schindler's List (1993)                       |5     |
|624   |Mission: Impossible (1996)                    |4     |
|233   |Graduate, The (1967)                          |5     |
|110   |Blink (1994)                                  |3     |
|479   |Ghost and the Darkness, The (1996)            |2     |
|216   |Sound of Music, The (1965)                    |2     |
|109   |Powder (1995)                                 |4     |
+------+----------------------------------------------+------+
only showing top 10 rows



In [11]:
df.groupBy('userId').count().orderBy('count', ascending=False).show(10, False)

+------+-----+
|userId|count|
+------+-----+
|405   |737  |
|655   |685  |
|13    |636  |
|450   |540  |
|276   |518  |
|416   |493  |
|537   |490  |
|303   |484  |
|234   |480  |
|393   |448  |
+------+-----+
only showing top 10 rows



In [12]:
df.groupBy('userId').count().orderBy('count', ascending=True).show(10, False)

+------+-----+
|userId|count|
+------+-----+
|732   |20   |
|631   |20   |
|572   |20   |
|685   |20   |
|93    |20   |
|300   |20   |
|636   |20   |
|34    |20   |
|926   |20   |
|596   |20   |
+------+-----+
only showing top 10 rows



In [13]:
df.groupBy('title').count().orderBy('count', ascending=False).show(10, False)

+-----------------------------+-----+
|title                        |count|
+-----------------------------+-----+
|Star Wars (1977)             |583  |
|Contact (1997)               |509  |
|Fargo (1996)                 |508  |
|Return of the Jedi (1983)    |507  |
|Liar Liar (1997)             |485  |
|English Patient, The (1996)  |481  |
|Scream (1996)                |478  |
|Toy Story (1995)             |452  |
|Air Force One (1997)         |431  |
|Independence Day (ID4) (1996)|429  |
+-----------------------------+-----+
only showing top 10 rows



In [14]:
# 步骤4: 特征工程
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, IndexToString
stringIndexer = StringIndexer(inputCol='title', outputCol='title_new')
model= stringIndexer.fit(df)
indexed=model.transform(df)

In [15]:
indexed.show(10)

+------+------------+------+---------+
|userId|       title|rating|title_new|
+------+------------+------+---------+
|   196|Kolya (1996)|     3|    287.0|
|    63|Kolya (1996)|     3|    287.0|
|   226|Kolya (1996)|     5|    287.0|
|   154|Kolya (1996)|     3|    287.0|
|   306|Kolya (1996)|     5|    287.0|
|   296|Kolya (1996)|     4|    287.0|
|    34|Kolya (1996)|     5|    287.0|
|   271|Kolya (1996)|     4|    287.0|
|   201|Kolya (1996)|     4|    287.0|
|   209|Kolya (1996)|     4|    287.0|
+------+------------+------+---------+
only showing top 10 rows



In [17]:
indexed.groupBy('title_new').count().orderBy('count', ascending=False).show(10, False)

+---------+-----+
|title_new|count|
+---------+-----+
|0.0      |583  |
|1.0      |509  |
|2.0      |508  |
|3.0      |507  |
|4.0      |485  |
|5.0      |481  |
|6.0      |478  |
|7.0      |452  |
|8.0      |431  |
|9.0      |429  |
+---------+-----+
only showing top 10 rows



In [18]:
# 步骤5:划分数据集
train, test = indexed.randomSplit([0.75, 0.25])
train.count()

75059

In [19]:
test.count()

24941

In [20]:
# 步骤6 构建和训练推荐系统模型
# ALS：交替最小二乘法，它只是是一种优化算法的名字，被用在求解spark中所提供的推荐系统模型的最优解。
# 超参数：nonnegative = True 不会在推荐系统中创建负数评分
#        coldStartStrategy= 'drop' 可以防止生成任何Nan评分预测
# ALS算法的缺点：
# 1.它是一个离线算法。
# 2.无法准确评估新加入的用户或商品。这个问题也被称为Cold Start问题。
from pyspark.ml.recommendation import ALS
rec=ALS(maxIter=10, regParam=0.01, userCol='userId', 
        itemCol='title_new',ratingCol='rating', 
        nonnegative=True, coldStartStrategy='drop')

In [21]:
rec_model=rec.fit(train)

In [22]:
# 步骤7: 基于测试数据进行预测和评估
predicted_ratings = rec_model.transform(test)
predicted_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title_new: double (nullable = false)
 |-- prediction: float (nullable = false)



In [24]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName='rmse', 
                               predictionCol='prediction',
                               labelCol='rating')
rmse=evaluator.evaluate(predicted_ratings)

In [25]:
print(rmse)

1.0138568587831984


In [26]:
# 步骤8: 推荐活动用户可能会喜欢的排名靠前的电影
unique_movies = indexed.select('title_new').distinct()
unique_movies.count()

1664

In [27]:
# 使用别名
a = unique_movies.alias('a')

In [28]:
user_id=85
watched_movies=indexed.filter(indexed['userId'] == user_id).select('title_new').distinct()
watched_movies.count()

287

In [29]:
b=watched_movies.alias('b')

In [30]:
total_movies=a.join(b, a.title_new == b.title_new, how='left')

In [31]:
total_movies.show(10, False)

+---------+---------+
|title_new|title_new|
+---------+---------+
|558.0    |null     |
|305.0    |305.0    |
|299.0    |null     |
|596.0    |null     |
|769.0    |null     |
|934.0    |null     |
|496.0    |496.0    |
|1051.0   |null     |
|692.0    |null     |
|810.0    |null     |
+---------+---------+
only showing top 10 rows



In [32]:
remaining_movies = total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()

In [33]:
remaining_movies.count()

1377

In [34]:
remaining_movies = remaining_movies.withColumn("userId", lit(int(user_id)))

In [35]:
remaining_movies.show(10, False)

+---------+------+
|title_new|userId|
+---------+------+
|558.0    |85    |
|299.0    |85    |
|596.0    |85    |
|769.0    |85    |
|934.0    |85    |
|1051.0   |85    |
|692.0    |85    |
|810.0    |85    |
|720.0    |85    |
|782.0    |85    |
+---------+------+
only showing top 10 rows



In [36]:
recommendations = rec_model.transform(remaining_movies).orderBy('prediction', ascending=False)

In [37]:
recommendations.show(5, False)

+---------+------+----------+
|title_new|userId|prediction|
+---------+------+----------+
|1358.0   |85    |5.725641  |
|981.0    |85    |5.162581  |
|1030.0   |85    |5.1539965 |
|1271.0   |85    |4.997465  |
|1433.0   |85    |4.8748465 |
+---------+------+----------+
only showing top 5 rows



In [38]:
movie_title= IndexToString(inputCol='title_new',
                          outputCol='title',
                          labels=model.labels)

In [39]:
final_recommendations = movie_title.transform(recommendations)
final_recommendations.show(10, False)

+---------+------+----------+-----------------------------------------+
|title_new|userId|prediction|title                                    |
+---------+------+----------+-----------------------------------------+
|1358.0   |85    |5.725641  |Angel Baby (1995)                        |
|981.0    |85    |5.162581  |Big Blue, The (Grand bleu, Le) (1988)    |
|1030.0   |85    |5.1539965 |Braindead (1992)                         |
|1271.0   |85    |4.997465  |Whole Wide World, The (1996)             |
|1433.0   |85    |4.8748465 |Boys, Les (1997)                         |
|1007.0   |85    |4.729849  |Hard Eight (1996)                        |
|1054.0   |85    |4.670152  |Red Firecracker, Green Firecracker (1994)|
|1090.0   |85    |4.6287975 |Four Days in September (1997)            |
|837.0    |85    |4.594181  |Love! Valour! Compassion! (1997)         |
|1084.0   |85    |4.451149  |Audrey Rose (1977)                       |
+---------+------+----------+-----------------------------------