In [1]:
from pyspark.sql import SparkSession
import time
# 初始化 Spark 会话
spark = SparkSession.\
        builder.\
        appName("MovieLens").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "4g").\
        getOrCreate()


start_time = time.time()
# 读取评分数据
ratings_df = spark.read.csv('local-workspace/ml-25m/ratings.csv', header=True, inferSchema=True)
movies_df = spark.read.csv('local-workspace/ml-25m/movies.csv', header=True, inferSchema=True)
genome_scores_df = spark.read.csv('local-workspace/ml-25m/genome-scores.csv', header=True, inferSchema=True)

23/12/04 17:35:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

In [2]:
from pyspark.sql.functions import avg, count
from pyspark.sql.functions import split, col
# 计算评级数
average_ratings = ratings_df.groupBy("movieId").agg(avg("rating").alias("average_rating"))
# 计算每部电影的评分人数
rating_counts = ratings_df.groupBy("movieId").agg(count("userId").alias("rating_count"))
# 合并平均评分和评分人数
movie_ratings = average_ratings.join(rating_counts, "movieId")

# 将电影标题加入结果中
movies_with_titles = movie_ratings.join(movies_df, "movieId").select("movieId", "title", "average_rating", "rating_count")
# 按平均评分降序排序
sorted_movie_ratings = movies_with_titles.orderBy("average_rating", ascending=False)
# 显示结果
sorted_movie_ratings.show(5)



+-------+--------------------+--------------+------------+
|movieId|               title|average_rating|rating_count|
+-------+--------------------+--------------+------------+
| 204012|Kick That Habit (...|           5.0|           1|
| 122193|   Kit Carson (1940)|           5.0|           1|
| 202181| Warlock Moon (1973)|           5.0|           1|
| 159471|Evening's Civil T...|           5.0|           1|
| 131628|       Loaded (2014)|           5.0|           1|
+-------+--------------------+--------------+------------+


                                                                                

In [3]:
# 筛选出评分人数大于 10 的电影
movies_with_more_than_10_ratings = movies_with_titles.filter(movie_ratings.rating_count > 10)
# 按平均评分降序排序
sorted_movie_ratings = movies_with_more_than_10_ratings.orderBy("average_rating", ascending=False)
# 显示结果
sorted_movie_ratings.show(5)



+-------+--------------------+-----------------+------------+
|movieId|               title|   average_rating|rating_count|
+-------+--------------------+-----------------+------------+
| 171011|Planet Earth II (...|4.483096085409253|        1124|
| 159817| Planet Earth (2006)|4.464796794504865|        1747|
|    318|Shawshank Redempt...|4.413576004516335|       81482|
| 170705|Band of Brothers ...|4.398598820058997|        1356|
| 158958|    Pollyanna (2003)|4.384615384615385|          13|
+-------+--------------------+-----------------+------------+


                                                                                

In [4]:
# 筛选出评分人数大于 100 的电影
movies_with_more_than_100_ratings = movies_with_titles.filter(movie_ratings.rating_count > 100)
# 按平均评分降序排序
sorted_movie_ratings = movies_with_more_than_100_ratings.orderBy("average_rating", ascending=False)
# 显示结果
sorted_movie_ratings.show(5)



+-------+--------------------+------------------+------------+
|movieId|               title|    average_rating|rating_count|
+-------+--------------------+------------------+------------+
| 171011|Planet Earth II (...| 4.483096085409253|        1124|
| 159817| Planet Earth (2006)| 4.464796794504865|        1747|
|    318|Shawshank Redempt...| 4.413576004516335|       81482|
| 170705|Band of Brothers ...| 4.398598820058997|        1356|
| 171495|              Cosmos|4.3267148014440435|         277|
+-------+--------------------+------------------+------------+


                                                                                

In [5]:
# 筛选出评分人数大于 1000 的电影
movies_with_more_than_1000_ratings = movies_with_titles.filter(movie_ratings.rating_count > 1000)
# 按平均评分降序排序
sorted_movie_ratings = movies_with_more_than_1000_ratings.orderBy("average_rating", ascending=False)
# 显示结果
sorted_movie_ratings.show(5)



+-------+--------------------+-----------------+------------+
|movieId|               title|   average_rating|rating_count|
+-------+--------------------+-----------------+------------+
| 171011|Planet Earth II (...|4.483096085409253|        1124|
| 159817| Planet Earth (2006)|4.464796794504865|        1747|
|    318|Shawshank Redempt...|4.413576004516335|       81482|
| 170705|Band of Brothers ...|4.398598820058997|        1356|
|    858|Godfather, The (1...|4.324336165187245|       52498|
+-------+--------------------+-----------------+------------+


                                                                                

# 1. Splitting the Dataset:
First, import the necessary modules and read your rating data into a Spark DataFrame. Then, split this data into training and testing sets.

In [6]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.sql.functions import col
from pyspark.ml.regression import LinearRegression

In [7]:
# Split the data
(train_data, test_data) = ratings_df.randomSplit([0.7, 0.3], seed=5021)

In [8]:
train_data.show()

[Stage 22:>                                                         (0 + 1) / 1]

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    665|   5.0|1147878820|
|     1|   1088|   4.0|1147868495|
|     1|   1175|   3.5|1147868826|
|     1|   1237|   5.0|1147868839|
|     1|   1250|   4.0|1147868414|
|     1|   1260|   3.5|1147877857|
|     1|   2011|   2.5|1147868079|
|     1|   2068|   2.5|1147869044|
|     1|   2161|   3.5|1147868609|
|     1|   2573|   4.0|1147878923|
|     1|   2692|   5.0|1147869100|
|     1|   3448|   4.0|1147868480|
|     1|   3569|   5.0|1147879603|
|     1|   3949|   5.0|1147868678|
|     1|   4144|   5.0|1147868898|
|     1|   4325|   5.0|1147878122|
|     1|   4422|   3.0|1147869048|
|     1|   4703|   4.0|1147869223|
|     1|   5269|   0.5|1147879571|
+------+-------+------+----------+


                                                                                

In [9]:
test_data.describe().show()



+-------+-----------------+------------------+------------------+--------------------+
|summary|           userId|           movieId|            rating|           timestamp|
+-------+-----------------+------------------+------------------+--------------------+
|  count|          7502892|           7502892|           7502892|             7502892|
|   mean|81188.96771058413| 21366.92766642516|3.5339601716244884|1.2155145892096126E9|
| stddev|46789.99952670272|39174.464718786876|1.0608207109360328| 2.268376646835225E8|
|    min|                1|                 1|               0.5|           789652009|
|    max|           162541|            209169|               5.0|          1574327549|
+-------+-----------------+------------------+------------------+--------------------+


                                                                                

# 2. Matrix Factorization with ALS:
ALS (Alternating Least Squares) is a popular matrix factorization algorithm in Spark's MLlib for collaborative filtering.

In [64]:
ALS_start_time = time.time()
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
als_model = als.fit(train_data)
# Predictions
predictions = als_model.transform(test_data)

                                                                                

In [67]:
# 从 predictions 中选择需要的列
selected_data = predictions.select("userId", "movieId", "rating", "prediction")

# 指定保存 CSV 文件的路径
output_path = "local-workspace/ml-25m/als_predictions.csv"

# 将 DataFrame 保存为 CSV 文件
selected_data.coalesce(1).write.csv(path=output_path, mode="overwrite", header=True)

                                                                                

In [11]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="r2")
test_eval = evaluator.evaluate(predictions)
print("R2: {}".format(test_eval))
ALS_end_time = time.time()
print("Time used for ALS: {}".format(ALS_end_time - ALS_start_time))

                                                                                

Root-mean-square error = 0.8228278220248162




R2: 0.3982120117710436
Time used for ALS: 72.76677417755127


                                                                                

# 3. Extract Features and Build Another ML Model:
For this part, you need to join your user and movie data with the rating data, then transform these features into a format suitable for machine learning models in Spark.

In [12]:
# Assuming movies_df and users_df are your DataFrames for movies and users
# Join them with the rating data and perform feature transformations

# An example of a feature transformation
lr_start_time = time.time()
string_indexer = StringIndexer(inputCol="userId", outputCol="userIdIndex")
vector_assembler = VectorAssembler(inputCols=["userIdIndex", "movieId"], outputCol="features")

lr = LinearRegression(featuresCol="features", labelCol="rating")

# Pipeline
pipeline = Pipeline(stages=[string_indexer, vector_assembler, lr])
lr_model = pipeline.fit(train_data)

# predictions
lr_predictions = lr_model.transform(test_data)

23/12/04 17:39:13 WARN Instrumentation: [c9a4855a] regParam is zero, which might cause numerical instability and overfitting.
23/12/04 17:39:14 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB
23/12/04 17:39:32 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB
23/12/04 17:39:32 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/12/04 17:39:32 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/12/04 17:39:32 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
23/12/04 17:39:32 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
23/12/04 17:39:33 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB
23/12/04 17:39:49 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB
                                                                                

In [13]:
# evaluations
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")
test_eval = evaluator.evaluate(lr_predictions)
print("RMSE: {}".format(test_eval))
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="r2")
test_eval = evaluator.evaluate(lr_predictions)
print("R2: {}".format(test_eval))
lr_end_time = time.time()
print("Time used for Linear Regression: {}".format(lr_end_time - lr_start_time))

23/12/04 17:39:59 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB
23/12/04 17:40:13 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB
                                                                                

RMSE: 1.051967075754656


23/12/04 17:40:13 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB

R2: 0.016622260944419875
Time used for Linear Regression: 86.7550299167633


23/12/04 17:40:26 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB
                                                                                

### Deal with the feature of movies
Count the average score of each movie
Get the one-hot encoding of genres

In [14]:

from pyspark.sql.functions import avg, col
data_pre_process_start_time = time.time()
# 求所有电影的平均评分
average_rating_df = movies_with_titles.select("movieId", "average_rating", "rating_count") # 前面已经算过
average_rating_df = average_rating_df.withColumnRenamed("average_rating", "movie_average_rating")
average_rating_df = average_rating_df.withColumnRenamed("rating_count", "movie_rating_count")

# 用均分填补没有打分的电影
overall_average_rating = average_rating_df.select(avg("movie_average_rating")).first()[0]
average_rating_df = average_rating_df.na.fill({"movie_average_rating": overall_average_rating})

# 显示前5条记录
average_rating_df.show(5)



+-------+--------------------+------------------+
|movieId|movie_average_rating|movie_rating_count|
+-------+--------------------+------------------+
|    148|   2.908955223880597|               335|
|    463|   2.813008130081301|               369|
|    471|  3.6579813752234034|             10631|
|    496|  3.2767624020887727|               383|
|    833|  2.7182422451994093|              1354|
+-------+--------------------+------------------+


                                                                                

In [15]:
from pyspark.sql.functions import split
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# 拆分 genres 字段
movies_df_vectorized = movies_df.withColumn("split_genres", split(col("genres"), "\|"))

In [16]:
from pyspark.sql.functions import col, split, explode, udf
from pyspark.sql.types import ArrayType, IntegerType
# 获取所有可能的类别
all_genres = movies_df_vectorized.select(explode(col("split_genres")).alias("genre")).distinct().collect()
all_genres = [row['genre'] for row in all_genres]

# 为每个类别定义一个 UDF
def genre_indicator(genre):
    def indicator(genres_list):
        return 1 if genre in genres_list else 0
    return udf(indicator, IntegerType())

# 为每个类别添加一个新列
for genre in all_genres:
    genre_udf = genre_indicator(genre)
    movies_df_vectorized = movies_df_vectorized.withColumn(genre, genre_udf(col("split_genres")))
movies_df_vectorized = movies_df_vectorized.drop("title", "genres", "split_genres")

In [17]:
movies_df_vectorized.show(5)

+-------+-----+-------+--------+---------+-----+---+-----------+-------+-------+-------+---------+---------+------------------+----+------+-------+------+--------+------+------+
|movieId|Crime|Romance|Thriller|Adventure|Drama|War|Documentary|Fantasy|Mystery|Musical|Animation|Film-Noir|(no genres listed)|IMAX|Horror|Western|Comedy|Children|Action|Sci-Fi|
+-------+-----+-------+--------+---------+-----+---+-----------+-------+-------+-------+---------+---------+------------------+----+------+-------+------+--------+------+------+
|      1|    0|      0|       0|        1|    0|  0|          0|      1|      0|      0|        1|        0|                 0|   0|     0|      0|     1|       1|     0|     0|
|      2|    0|      0|       0|        1|    0|  0|          0|      1|      0|      0|        0|        0|                 0|   0|     0|      0|     0|       1|     0|     0|
|      3|    0|      1|       0|        0|    0|  0|          0|      0|      0|      0|        0|        0|  

### Another vectorized method of movie genre


In [18]:
max_array_length = 10
# 假设 movie_df 是您的原始 DataFrame，并且 genres 列包含由 '|' 分隔的字符串
# 将 genres 列拆分为数组
split_genres = split(movies_df['genres'], '\|')

new_columns = [movies_df['movieId']]  # 包括 movieId 列
# 使用列表推导式创建新列
new_columns += [split_genres.getItem(i).alias(f'genre{i+1}') for i in range(max_array_length)]
# 创建新的 DataFrame，仅包含这些 genre 列
genre_df = movies_df.select(*new_columns)
# 显示结果
genre_df.show(5)

+-------+---------+---------+--------+------+-------+------+------+------+------+-------+
|movieId|   genre1|   genre2|  genre3|genre4| genre5|genre6|genre7|genre8|genre9|genre10|
+-------+---------+---------+--------+------+-------+------+------+------+------+-------+
|      1|Adventure|Animation|Children|Comedy|Fantasy|  null|  null|  null|  null|   null|
|      2|Adventure| Children| Fantasy|  null|   null|  null|  null|  null|  null|   null|
|      3|   Comedy|  Romance|    null|  null|   null|  null|  null|  null|  null|   null|
|      4|   Comedy|    Drama| Romance|  null|   null|  null|  null|  null|  null|   null|
|      5|   Comedy|     null|    null|  null|   null|  null|  null|  null|  null|   null|
+-------+---------+---------+--------+------+-------+------+------+------+------+-------+


In [19]:
# 进行indexer操作
indexers = [StringIndexer(inputCol=f'genre{i+1}', outputCol=f'genre{i+1}Index', handleInvalid="keep") for i in range(max_array_length)]
# 创建 Pipeline
pipeline = Pipeline(stages=indexers)
# 应用 Pipeline
indexed_df = pipeline.fit(genre_df).transform(genre_df)
selected_columns = [col for col in indexed_df.columns if not col.startswith('genre') or col.endswith('Index')]
genre_indexed_df = indexed_df.select(selected_columns)
# 显示结果
genre_indexed_df.show()

+-------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+
|movieId|genre1Index|genre2Index|genre3Index|genre4Index|genre5Index|genre6Index|genre7Index|genre8Index|genre9Index|genre10Index|
+-------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+
|      1|        7.0|       12.0|        9.0|        5.0|        3.0|       11.0|        8.0|        3.0|        1.0|         1.0|
|      2|        7.0|       10.0|        5.0|       16.0|       13.0|       11.0|        8.0|        3.0|        1.0|         1.0|
|      3|        1.0|        1.0|       17.0|       16.0|       13.0|       11.0|        8.0|        3.0|        1.0|         1.0|
|      4|        1.0|        0.0|        1.0|       16.0|       13.0|       11.0|        8.0|        3.0|        1.0|         1.0|
|      5|        1.0|       18.0|       17.0|       16.0|       13.0|       11.0|  

## Deal with the feature of users
calculate the average rating of each user
calculate the average rating of each user in each kind of movie
calculate the max point and the min point of each user

In [20]:
# 假设 rating_df 已经是一个 PySpark DataFrame
# 计算每个用户的电影平均评分
user_average_ratings = ratings_df.groupBy("userId").agg(avg("rating").alias("user_average_rating"))
# 显示结果
user_average_ratings.show(5)



+------+-------------------+
|userId|user_average_rating|
+------+-------------------+
| 33375|   3.81651376146789|
| 33412|  4.336956521739131|
| 33569| 3.7745098039215685|
| 33602|  2.212121212121212|
| 33717|   3.38031914893617|
+------+-------------------+


                                                                                

In [21]:
big_df = ratings_df.join(user_average_ratings, on="userId")

In [22]:
big_df = big_df.join(movies_df_vectorized, on="movieId")

In [23]:
from pyspark.sql.functions import col, when, avg

# 假设 big_df 已经是一个 PySpark DataFrame
# 列出所有的电影类型
genres = ["Crime", "Romance", "Thriller", "Adventure", "Drama", "War", 
          "Documentary", "Fantasy", "Mystery", "Musical", "Animation", 
          "Film-Noir", "(no genres listed)", "IMAX", "Horror", "Western", 
          "Comedy", "Children", "Action", "Sci-Fi"]

# 计算每个用户对每个类型的平均评分
exprs = [avg(when(col(genre) == 1, col("rating"))).alias(genre) for genre in genres]

user_genre_ratings = big_df.groupBy("userId").agg(*exprs)
user_genre_ratings = user_genre_ratings.join(user_average_ratings, "userId")


# 填充缺失值
for genre in genres:
    user_genre_ratings = user_genre_ratings.withColumn(genre, when(col(genre).isNull(), col("user_average_rating")).otherwise(col(genre)))

In [24]:
# 改column名方便确认这部分特征是来自用户的
for genre in genres:
    user_genre_ratings = user_genre_ratings.withColumnRenamed(genre, "user_" + genre)

In [25]:
user_genre_ratings.show(5)

23/12/04 17:41:14 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-----------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+
|userId|        user_Crime|     user_Romance|     user_Thriller|   user_Adventure|        user_Drama|          user_War|  user_Documentary|     user_Fantasy|      user_Mystery|      user_Musical|    user_Animation|    user_Film-Noir|user_(no genres listed)|         user_IMAX|       user_Horror|      user_Western|       user_Comedy|     user_Children|       user_Action|       user_Sci-Fi|user_average_rating|
+------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+-----------------+------------------+--

                                                                                

In [26]:
from pyspark.sql.functions import col, when, avg, max, min
# 计算每个用户的最高分和最低分
user_max_min_ratings = big_df.groupBy("userId").agg(
    max("rating").alias("user_max"),
    min("rating").alias("user_min")
)

In [27]:
user_max_min_ratings.show(5)



+------+--------+--------+
|userId|user_max|user_min|
+------+--------+--------+
|   148|     5.0|     3.0|
|   463|     5.0|     1.0|
|   471|     5.0|     3.0|
|   496|     5.0|     0.5|
|   833|     5.0|     0.5|
+------+--------+--------+


                                                                                

### Join feature matrix and genome_scores by movieId 

In [28]:
big_df = big_df.join(average_rating_df, on="movieId")

In [29]:
big_df = big_df.drop('user_average_rating')
big_df = big_df.join(user_genre_ratings, on="userId")
big_df = big_df.join(user_max_min_ratings, on="userId")
big_df = big_df.join(movies_df, on="movieId")

In [30]:
big_df.show(5)



+-------+------+------+----------+-----+-------+--------+---------+-----+---+-----------+-------+-------+-------+---------+---------+------------------+----+------+-------+------+--------+------+------+--------------------+------------------+-----------------+-----------------+-----------------+-----------------+----------+--------+----------------+-----------------+-----------------+------------------+------------------+-----------------+-----------------------+------------------+-----------+-----------------+-----------+-----------------+-----------+-----------+-------------------+--------+--------+--------------------+--------------------+
|movieId|userId|rating| timestamp|Crime|Romance|Thriller|Adventure|Drama|War|Documentary|Fantasy|Mystery|Musical|Animation|Film-Noir|(no genres listed)|IMAX|Horror|Western|Comedy|Children|Action|Sci-Fi|movie_average_rating|movie_rating_count|       user_Crime|     user_Romance|    user_Thriller|   user_Adventure|user_Drama|user_War|user_Documentar

                                                                                

In [31]:
data_pre_process_end_time = time.time()
print("Time used for data pre-process: {}".format(data_pre_process_end_time - data_pre_process_start_time))

Time used for data pre-process: 159.901447057724


In [32]:
# # Data Alignment Using another way of vectorizing
# big_df = big_df.join(genre_df, on="movieId")
# big_df = big_df.join(user_average_ratings, on="userId")
# big_df = big_df.join(average_rating_df, on="movieId")

In [None]:
# Data Alignment using only average rating of users and movies
big_df = ratings_df.join(user_average_ratings, on="userId")
big_df = big_df.join(average_rating_df, on="movieId")

# Using ML model to predict

### Use Linear Regression to predict

In [57]:
(train_data, test_data) = big_df.randomSplit([0.7, 0.3], seed=5021)

In [34]:
lr_final_start_time = time.time()
# Assuming movies_df and users_df are your DataFrames for movies and users
# Join them with the rating data and perform feature transformations
input_cols = big_df.columns
to_remove = ["rating", "userId", "movieId", "timestamp", "title", "genres"]
# to_remove = ["rating", "timestamp", "title", "genres"]
for col in to_remove:
    if col in input_cols:
        input_cols.remove(col)
input_cols

['Crime',
 'Romance',
 'Thriller',
 'Adventure',
 'Drama',
 'War',
 'Documentary',
 'Fantasy',
 'Mystery',
 'Musical',
 'Animation',
 'Film-Noir',
 '(no genres listed)',
 'IMAX',
 'Horror',
 'Western',
 'Comedy',
 'Children',
 'Action',
 'Sci-Fi',
 'movie_average_rating',
 'movie_rating_count',
 'user_Crime',
 'user_Romance',
 'user_Thriller',
 'user_Adventure',
 'user_Drama',
 'user_War',
 'user_Documentary',
 'user_Fantasy',
 'user_Mystery',
 'user_Musical',
 'user_Animation',
 'user_Film-Noir',
 'user_(no genres listed)',
 'user_IMAX',
 'user_Horror',
 'user_Western',
 'user_Comedy',
 'user_Children',
 'user_Action',
 'user_Sci-Fi',
 'user_average_rating',
 'user_max',
 'user_min']

In [35]:
vector_assembler = VectorAssembler(inputCols=input_cols, outputCol="features")

lr = LinearRegression(featuresCol="features", labelCol="rating", regParam=0.1)

# Pipeline
pipeline = Pipeline(stages=[vector_assembler, lr])
lr_model = pipeline.fit(train_data)

# predictions
lr_predictions = lr_model.transform(test_data)

                                                                                

In [68]:
# 从 predictions 中选择需要的列
selected_data = lr_predictions.select("userId", "movieId", "rating", "prediction")

# 指定保存 CSV 文件的路径
output_path = "local-workspace/ml-25m/lr_predictions.csv"

# 将 DataFrame 保存为 CSV 文件
selected_data.coalesce(1).write.csv(path=output_path, mode="overwrite", header=True)

                                                                                

In [36]:
# evaluations
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")
test_eval = evaluator.evaluate(lr_predictions)
print("RMSE: {}".format(test_eval))
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="r2")
test_eval = evaluator.evaluate(lr_predictions)
print("R2: {}".format(test_eval))
lr_final_end_time = time.time()
print("Time used for Linear Regression: {}".format(lr_final_end_time - lr_final_start_time))

                                                                                

RMSE: 0.8651459535999295




R2: 0.3350825148025829
Time used for Linear Regression: 537.2375557422638


                                                                                

### Use random forest to predict

In [37]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor


rf_start_time = time.time()
assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
rf = RandomForestRegressor(featuresCol="features", labelCol="rating")

# 创建管道
pipeline = Pipeline(stages=[assembler, rf])

# 训练模型
model = pipeline.fit(train_data)
# 预测
predictions = model.transform(test_data)

                                                                                

In [38]:
# 获取训练好的随机森林模型
rf_model = model.stages[-1]

# 获取特征重要性
importances = rf_model.featureImportances

# 特征名称与其重要性
for feature, importance in zip(input_cols, importances):
    print(f"{feature}: {importance}")

Crime: 6.886792801968188e-05
Romance: 0.0
Thriller: 3.523737173904606e-05
Adventure: 0.00022844381095603352
Drama: 0.004749294894781922
War: 0.0
Documentary: 0.0
Fantasy: 0.0
Mystery: 0.0
Musical: 0.0
Animation: 0.0
Film-Noir: 0.0
(no genres listed): 0.0
IMAX: 0.0
Horror: 0.0
Western: 0.0
Comedy: 0.00037033431206308414
Children: 0.0
Action: 0.0009589248997926818
Sci-Fi: 6.0410468050548805e-05
movie_average_rating: 0.36517223034791796
movie_rating_count: 0.01081479597023588
user_Crime: 0.023427844242746066
user_Romance: 0.003515250931201064
user_Thriller: 0.07693333620040771
user_Adventure: 0.032349879822003685
user_Drama: 0.0982825875963453
user_War: 0.001984513888941274
user_Documentary: 0.0015511478486050889
user_Fantasy: 0.0023520137220515275
user_Mystery: 0.001596039048151747
user_Musical: 0.00023382565989862
user_Animation: 0.0005719214261535914
user_Film-Noir: 0.000504691361966573
user_(no genres listed): 0.036662998952653875
user_IMAX: 5.9601541639709355e-05
user_Horror: 6.32657

In [39]:
# evaluations
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")
test_eval = evaluator.evaluate(predictions)
print("RMSE: {}".format(test_eval))
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="r2")
test_eval = evaluator.evaluate(predictions)
print("R2: {}".format(test_eval))
rf_end_time = time.time()
print("Time used for Random Forest: {}".format(rf_end_time - rf_start_time))

                                                                                

RMSE: 0.8762516875542676




R2: 0.31790207768881995
Time used for Random Forest: 560.7150785923004


                                                                                

### Use ALS to predict(combining with feature matrix)

In [51]:
# # 初始化计时
# ALS_final_start_time = time.time()
# # 特征列
# feature_cols = ['movieId'] + genres  # 确保这里列出了所有相关的独热编码列
# als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="features", ratingCol="rating", coldStartStrategy="drop")
# # 创建VectorAssembler
# assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
# train_data = assembler.transform(train_data)
# test_data = assembler.transform(test_data)
# als_model = als.fit(train_data)
# # 使用训练好的模型进行预测
# predictions = als_model.transform(test_data)

23/12/04 18:48:39 ERROR Instrumentation: java.lang.IllegalArgumentException: requirement failed: Column features must be of type numeric but was actually of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.util.SchemaUtils$.checkNumericType(SchemaUtils.scala:78)
	at org.apache.spark.ml.recommendation.ALSParams.validateAndTransformSchema(ALS.scala:256)
	at org.apache.spark.ml.recommendation.ALSParams.validateAndTransformSchema$(ALS.scala:253)
	at org.apache.spark.ml.recommendation.ALS.validateAndTransformSchema(ALS.scala:593)
	at org.apache.spark.ml.recommendation.ALS.transformSchema(ALS.scala:725)
	at org.apache.spark.ml.recommendation.ALS.$anonfun$fit$1(ALS.scala:692)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at

IllegalArgumentException: requirement failed: Column features must be of type numeric but was actually of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.

In [54]:
len(feature_cols)

21

In [None]:
# evaluations
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")
test_eval = evaluator.evaluate(predictions)
print("RMSE: {}".format(test_eval))
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="r2")
test_eval = evaluator.evaluate(predictions)
print("R2: {}".format(test_eval))
rf_end_time = time.time()
print("Time used for Random Forest: {}".format(rf_end_time - rf_start_time))

### Use GBT to predict

In [58]:
from pyspark.ml.regression import GBTRegressor

GBT_start_time = time.time()
gbt = GBTRegressor(featuresCol="features", labelCol="rating", maxIter=10)
pipeline = Pipeline(stages=[assembler, gbt])
model = pipeline.fit(train_data)
predictions = model.transform(test_data)

                                                                                

In [59]:
# evaluations
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")
test_eval = evaluator.evaluate(predictions)
print("RMSE: {}".format(test_eval))
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="r2")
test_eval = evaluator.evaluate(predictions)
print("R2: {}".format(test_eval))
GBT_end_time = time.time()
print("Time used for GBT: {}".format(GBT_end_time - GBT_start_time))

                                                                                

RMSE: 1.0263213320434903




R2: 0.0642589035793959
Time used for GBT: 557.7050244808197


                                                                                

In [60]:
print("Time used for ALS: {}".format(ALS_end_time - ALS_start_time))
print("Time used for Linear Regression: {}".format(lr_end_time - lr_start_time))
print("Time used for data pre-process: {}".format(data_pre_process_end_time - data_pre_process_start_time))
print("Time used for Linear Regression: {}".format(lr_final_end_time - lr_final_start_time))
print("Time used for Random Forest: {}".format(rf_end_time - rf_start_time))
print("Time used for GBT: {}".format(GBT_end_time - GBT_start_time))

Time used for ALS: 72.76677417755127
Time used for Linear Regression: 86.7550299167633
Time used for data pre-process: 159.901447057724
Time used for Linear Regression: 537.2375557422638
Time used for Random Forest: 560.7150785923004
Time used for GBT: 557.7050244808197


### Use Anotherway to make feature matrix

In [None]:

from pyspark.sql.functions import col, when

# 提取所需列
selected_columns = big_df.select("user_average_rating", "movie_average_rating", "movie_rating_count")

# 计算每部电影所属的 genre 数量
genre_count = sum([col(genre) for genre in genres]).alias("genre_count")
big_df = big_df.withColumn("genre_count", genre_count)

# 为新 DataFrame 创建新特征
for genre in genres:
    user_avg_rating_col = f"user_{genre}"  # 用户对该 genre 的平均评分列名
    genre_feature_col = f"{genre}_feature"  # 新的特征列名

    # 计算特征值并添加到新 DataFrame
    selected_columns = selected_columns.withColumn(
        genre_feature_col,
        when(big_df[genre] == 1, big_df[user_avg_rating_col] / big_df["genre_count"]).otherwise(0)
    )

# 显示新 DataFrame 的结果
selected_columns.show()


In [None]:
# # evaluations
# evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")
# test_eval = evaluator.evaluate(predictions)
# print("RMSE: {}".format(test_eval))
# evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="r2")
# test_eval = evaluator.evaluate(predictions)
# print("R2: {}".format(test_eval))