# spark准备工作

In [36]:
%%spark.set_config

spark.executor.instances=1
spark.executor.memory=512m
spark.executor.cores=1

In [37]:
!echo $SPARK_HOME

/opt/apps/SPARK3/spark3-current


In [38]:
spark, sc = %spark.get_session PeiSiyu2

24/05/28 05:26:02 WARN [Thread-4] Utils: Service 'sparkDriver' could not bind on port 14040. Attempting port 14041.
24/05/28 05:26:02 WARN [Thread-4] Utils: Service 'sparkDriver' could not bind on port 14041. Attempting port 14042.
24/05/28 05:26:02 WARN [Thread-4] Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/05/28 05:26:02 WARN [Thread-4] Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
24/05/28 05:26:11 WARN [Thread-4] Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 14041. Attempting port 14042.
24/05/28 05:26:11 WARN [Thread-4] Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 14042. Attempting port 14043.


# 读入数据

In [39]:
ratings = spark.read.options(header='true', inferSchema='true').csv("/user/peisiyu/ratings.csv") 
ratings.show()

[Stage 1:>                                                          (0 + 1) / 1]

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



                                                                                

In [40]:
ratings

DataFrame[userId: int, movieId: int, rating: double, timestamp: int]

In [41]:
new_ratings = ratings[['userId', 'movieId', 'rating']]

In [42]:
new_ratings

DataFrame[userId: int, movieId: int, rating: double]

In [43]:
new_ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
|     1|    163|   5.0|
|     1|    216|   5.0|
|     1|    223|   3.0|
|     1|    231|   5.0|
|     1|    235|   4.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 20 rows



# ALS实现协同过滤算法

## 定义CollaborativeFiltering类，主要封装模型训练、模型保存、用户推荐、商品推荐等代码

In [44]:
class CollaborativeFiltering(object):
    def __init__(self, spark_session):
        self.spark_session = spark_session
        self.model = None
 
    def train(self, train_set, user_col, item_col, rating_col, epoch=10,regparam=0.01):
        """
        Build the recommendation model using ALS on the training data
        Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
        """
        als = ALS(regParam=regparam, maxIter=epoch, userCol=user_col, itemCol=item_col, ratingCol=rating_col,
                  coldStartStrategy='drop')
        self.model = als.fit(train_set)
 
    def eval(self, test_set, label_col='ratingFloat', metric='rmse'):
        """ Evaluate the model on the test data """
        predictions = self.model.transform(test_set)
 
        # self.model.itemFactors.show(10, truncate=False)
        # self.model.userFactors.show(10, truncate=False)
        evaluator = RegressionEvaluator(predictionCol="prediction", labelCol=label_col, metricName=metric)
        loss = evaluator.evaluate(predictions)
        return loss
    
    def recommend_for_all_users(self, num_items=10):
        user_recs = self.model.recommendForAllUsers(numItems=num_items)
        return user_recs
 
    def recommend_for_all_items(self, num_users=10):
        item_recs = self.model.recommendForAllItems(numUsers=num_users)
        return item_recs
 
    def recommend_for_user_subset(self, dataset, num_items=10):
        user_recs = self.model.recommendForUserSubset(dataset=dataset, numItems=num_items)
        return user_recs
 
    def recommend_for_item_subset(self, dataset, num_users=10):
        item_recs = self.model.recommendForItemSubset(dataset=dataset, numUsers=num_users)
        return item_recs
 

## 实例

In [45]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [46]:
# 拆分训练集和测试集
training, test = new_ratings.randomSplit((0.8, 0.2), seed=2022)

In [47]:
new_ratings.count()

100836

In [48]:
training.count()

80513

In [49]:
test.count()

20323

In [50]:
# collaborative filtering start
cf = CollaborativeFiltering(spark_session=spark)

In [52]:
#设定迭代次数为10,正则化参数为0.01（默认参数）
epoch = 10
regparam=0.01
cf.train(train_set=training,user_col='userId',item_col='movieId',rating_col='rating',epoch=epoch,regparam=0.01)

In [53]:
loss = cf.eval(test_set=test, label_col='rating', metric='rmse')
print("[Root-mean-square error] {}".format(loss))

                                                                                

[Root-mean-square error] 1.1127280419724743


In [54]:
# Generate top 10 movie recommendations for each user
user_recs = cf.recommend_for_all_users(num_items=10)
user_recs.show(10, False)



+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                                 |
+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1     |[{7063, 7.0580053}, {4144, 6.7719827}, {909, 6.448697}, {3030, 6.4123316}, {150548, 6.3936024}, {417, 6.2242136}, {1194, 6.1942916}, {3451, 6.184679}, {1955, 6.1611433}, {93840, 6.1324496}]   |
|3     |[{3272, 7.6883883}, {79224, 5.8178453}, {3134, 5.7493644}, {115713, 5.68881}, {4941, 5.4718914}, {4006, 5.4168653}, {2459, 5.3406515}, {901, 5.3155794}, {112623, 5.306456}, {55363, 5.2

                                                                                

In [55]:
# Generate top 10 user recommendations for each movie
movie_recs = cf.recommend_for_all_items(num_users=10)
movie_recs.show(10, False)



+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|movieId|recommendations                                                                                                                                                               |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1      |[{147, 6.102933}, {53, 6.013961}, {458, 5.8678455}, {569, 5.556576}, {413, 5.4606504}, {162, 5.3545556}, {151, 5.23018}, {243, 5.19298}, {43, 5.189424}, {278, 5.145734}]     |
|3      |[{458, 7.1260815}, {423, 7.024401}, {147, 6.5888653}, {498, 6.52118}, {335, 6.447675}, {81, 6.0910735}, {447, 5.7989807}, {225, 5.792771}, {329, 5.787903}, {569, 5.6192894}] |
|5      |[{458, 6.085923}, {498, 5.6787095}, {536, 5.636318}, {224, 5.29198

                                                                                

In [56]:
# Generate top 10 movie recommendations for a specified set of users
user_data = new_ratings.select("userId").distinct().limit(10)
user_sub_recs = cf.recommend_for_user_subset(dataset=user_data, num_items=10)
user_sub_recs.show(10, False)



+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                                  |
+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|540   |[{161582, 6.7038}, {27611, 6.665559}, {5291, 6.5706635}, {3035, 6.412145}, {81564, 6.335823}, {1147, 6.321308}, {179819, 6.251233}, {34332, 6.2253895}, {129354, 6.1192193}, {125, 6.1170926}]   |
|471   |[{2186, 7.971822}, {6993, 7.9436293}, {6380, 7.80059}, {322, 7.614662}, {4361, 7.1671457}, {7700, 7.055908}, {1883, 6.8738575}, {7164, 6.717863}, {4649, 6.629978}, {60766, 6.608391

                                                                                

In [57]:
# Generate top 10 user recommendations for a specified set of movies
movie_data = new_ratings.select("movieId").distinct().limit(10)
movie_sub_recs = cf.recommend_for_item_subset(dataset=movie_data, num_users=10)
movie_sub_recs.show(10, False)



+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|movieId|recommendations                                                                                                                                                               |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1580   |[{554, 5.2928925}, {53, 5.2623644}, {188, 5.1120358}, {375, 4.9856853}, {529, 4.9694757}, {162, 4.9580684}, {35, 4.9333563}, {576, 4.8983}, {389, 4.877339}, {192, 4.8710365}]|
|32460  |[{259, 9.278439}, {576, 7.9415765}, {468, 6.9675856}, {197, 6.9533453}, {548, 6.94057}, {557, 6.826799}, {96, 6.7282505}, {379, 6.633329}, {360, 6.5739}, {48, 6.451295}]     |
|1591   |[{360, 9.634414}, {498, 9.149178}, {277, 7.42953}, {130, 7.387612}

                                                                                

## 改变迭代次数和正则化参数

In [58]:
#设定参数
epoch = 20
cf.train(train_set=training,user_col='userId',item_col='movieId',rating_col='rating',epoch=epoch)

In [59]:
loss = cf.eval(test_set=test, label_col='rating', metric='rmse')
print("[Root-mean-square error] {}".format(loss))

                                                                                

[Root-mean-square error] 1.1370852646756728


In [60]:
#设定迭代次数为5
epoch = 5
cf.train(train_set=training,user_col='userId',item_col='movieId',rating_col='rating',epoch=epoch)

In [61]:
loss = cf.eval(test_set=test, label_col='rating', metric='rmse')
print("[Root-mean-square error] {}".format(loss))

[Root-mean-square error] 1.0881434638988712


In [62]:
#设定迭代次数为15
epoch = 15
cf.train(train_set=training,user_col='userId',item_col='movieId',rating_col='rating',epoch=epoch)

In [63]:
loss = cf.eval(test_set=test, label_col='rating', metric='rmse')
print("[Root-mean-square error] {}".format(loss))

[Root-mean-square error] 1.1279213920929592


In [67]:
#改变正则化参数
regparam=1
cf.train(train_set=training,user_col='userId',item_col='movieId',rating_col='rating',epoch=10,regparam=1)


In [68]:
loss = cf.eval(test_set=test, label_col='rating', metric='rmse')
print("[Root-mean-square error] {}".format(loss))

[Root-mean-square error] 1.320118439825058


In [69]:
#改变正则化参数
regparam=0.1
cf.train(train_set=training,user_col='userId',item_col='movieId',rating_col='rating',epoch=10,regparam=1)


In [70]:
loss = cf.eval(test_set=test, label_col='rating', metric='rmse')
print("[Root-mean-square error] {}".format(loss))

                                                                                

[Root-mean-square error] 1.320118439825058


In [71]:
#改变两个参数
cf.train(train_set=training,user_col='userId',item_col='movieId',rating_col='rating',epoch=5,regparam=0.1)
loss = cf.eval(test_set=test, label_col='rating', metric='rmse')
print("[Root-mean-square error] {}".format(loss))


                                                                                

[Root-mean-square error] 0.8847266806944029


In [72]:
#改变两个参数
cf.train(train_set=training,user_col='userId',item_col='movieId',rating_col='rating',epoch=5,regparam=1)
loss = cf.eval(test_set=test, label_col='rating', metric='rmse')
print("[Root-mean-square error] {}".format(loss))


[Root-mean-square error] 1.3201205218351366


In [73]:
#改变两个参数
cf.train(train_set=training,user_col='userId',item_col='movieId',rating_col='rating',epoch=15,regparam=0.1)
loss = cf.eval(test_set=test, label_col='rating', metric='rmse')
print("[Root-mean-square error] {}".format(loss))


[Root-mean-square error] 0.8804746213955296


### 讨论：迭代次数设定到30以上时将会难以运行

In [35]:
%spark.stop_session

'no context exists'