In [12]:
from pyspark.sql import SparkSession

# 创建 Spark 会话
spark = SparkSession.builder \
    .appName("Movie Rating Analysis") \
    .getOrCreate()

# 加载 rate.txt 文件
rate_path = "hdfs://127.0.0.1:9000/jd/dataset/moviedata/rate.txt"
ratings_df = spark.read.option("delimiter", "::").csv(rate_path, header=False, inferSchema=True)
ratings_df = ratings_df.withColumnRenamed("_c0", "user_id") \
                       .withColumnRenamed("_c1", "movie_id") \
                       .withColumnRenamed("_c2", "rating") \
                       .withColumnRenamed("_c3", "timestamp")

ratings_df.show(5)
ratings_df.printSchema()


+-------+--------+------+----------+
|user_id|movie_id|rating| timestamp|
+-------+--------+------+----------+
|      1|       1|     5|2019-09-20|
|      1|       2|     5|2020-04-05|
|      2|       3|     4|2020-06-07|
|      3|       4|     5|2020-07-07|
|      2|       1|     3|2020-06-03|
+-------+--------+------+----------+
only showing top 5 rows

root
 |-- user_id: integer (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: string (nullable = true)



In [None]:
# 1.输出每部电影的总评分并查询总评分前3名(输出格式：电影编号  总评分)

In [13]:
# 计算每部电影的总评分
movie_total_ratings = ratings_df.groupBy("movie_id").sum("rating").withColumnRenamed("sum(rating)", "total_rating")

# 查询总评分前3名
top_3_movies = movie_total_ratings.orderBy("total_rating", ascending=False).limit(3)
top_3_movies.show()


+--------+------------+
|movie_id|total_rating|
+--------+------------+
|       1|          13|
|       5|           9|
|       6|           9|
+--------+------------+



In [None]:
# 2根据评分数据得到每位用户的评分前三名

In [21]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 为每个用户创建一个窗口函数，根据评分降序排列，并根据 movie_id 排序确保处理一致性
window_spec = Window.partitionBy("user_id").orderBy(F.desc("rating"), "movie_id")

# 使用 rank() 来给每部电影评分排序
user_top_ratings = ratings_df.withColumn("rank", F.rank().over(window_spec))

# 过滤出每个用户的前三名评分
user_top_3_ratings = user_top_ratings.filter(user_top_ratings["rank"] <= 3).select("user_id", "movie_id", "rating")

# 显示结果
user_top_3_ratings.show()


+-------+--------+------+
|user_id|movie_id|rating|
+-------+--------+------+
|      1|       1|     5|
|      1|       2|     5|
|      1|       5|     5|
|      3|       1|     5|
|      3|       4|     5|
|      2|       6|     5|
|      2|       2|     4|
|      2|       3|     4|
+-------+--------+------+



In [None]:
# 3. 根据评分数据得到最火的三部电影,最火的电影是评论次数最多的电影。我们可以通过对 movie_id 进行计数来找出评论最多的电影。

In [22]:
# 统计每部电影的评分次数
movie_rating_count = ratings_df.groupBy("movie_id").count().withColumnRenamed("count", "rating_count")

# 查询评论次数最多的前三部电影
top_3_hot_movies = movie_rating_count.orderBy("rating_count", ascending=False).limit(3)
top_3_hot_movies.show()


+--------+------------+
|movie_id|rating_count|
+--------+------------+
|       1|           3|
|       6|           2|
|       5|           2|
+--------+------------+



In [None]:
# 4. 求被评分次数最多的3部电影，并给出评分次数
# 这一部分实际上和第3点是相似的，只是输出格式要求不同。

In [23]:
    # 直接复用第三步的查询
    top_3_hot_movies_count = movie_rating_count.orderBy("rating_count", ascending=False).limit(3)
    top_3_hot_movies_count.show()


+--------+------------+
|movie_id|rating_count|
+--------+------------+
|       1|           3|
|       6|           2|
|       5|           2|
+--------+------------+



In [None]:
# 5. 每个电影前三名的评分,对于每部电影，我们可以为其评分按降序排列，并提取前三名评分。

In [25]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 为每部电影创建一个窗口函数，按评分降序排列
window_spec_movie = Window.partitionBy("movie_id").orderBy(F.desc("rating"))

# 添加排名列
movie_top_ratings = ratings_df.withColumn("rank", F.rank().over(window_spec_movie))

# 过滤出每部电影的前三名评分
movie_top_3_ratings = movie_top_ratings.filter(movie_top_ratings["rank"] <= 3).select("movie_id", "rating")

# 显示结果
movie_top_3_ratings.show()


+--------+------+
|movie_id|rating|
+--------+------+
|       1|     5|
|       1|     5|
|       1|     3|
|       6|     5|
|       6|     4|
|       3|     4|
|       5|     5|
|       5|     4|
|       4|     5|
|       7|     5|
|       2|     5|
|       2|     4|
+--------+------+



In [None]:
# 6. 求2017年上映的电影中，评分最高的3部剧情类电影
# 假设我们有一个 movies.csv 文件包含了电影的相关信息（如电影类型、上映年份等），我们需要加载这个文件，并过滤出2017年上映的剧情类电影，按评分求前3名。

In [None]:
# 假设我们有一个 movie_info.csv 文件，包含电影的ID、类型和上映年份
movie_info_path = "hdfs://jd/dataset/moviedata/movies.csv"
movies_df = spark.read.option("delimiter", "\t").csv(movie_info_path, header=True, inferSchema=True)

# 假设movies.csv文件有以下列: movie_id, title, genre, year
movies_df = movies_df.filter((movies_df["year"] == 2017) & (movies_df["genre"] == "Drama"))

# 连接电影信息和评分数据
movie_ratings_2017_drama = ratings_df.join(movies_df, "movie_id")

# 计算评分最高的3部电影
top_3_drama_movies = movie_ratings_2017_drama.groupBy("movie_id").sum("rating").withColumnRenamed("sum(rating)", "total_rating") \
                                             .orderBy("total_rating", ascending=False).limit(3)

top_3_drama_movies.show()


In [None]:
# 7. 分别求男性，女性当中评分(平均得分)最高的10部电影
# 假设用户信息存储在一个 users.csv 文件中，包含用户的 user_id 和 gender，我们需要连接这个数据来计算男性和女性评分最高的电影。

In [None]:
# 假设 users.csv 文件包含用户性别信息
user_info_path = "hdfs://jd/dataset/moviedata/users.csv"
users_df = spark.read.option("delimiter", "\t").csv(user_info_path, header=True, inferSchema=True)

# 假设 users.csv 文件有： user_id, gender
# 连接用户信息和评分数据
user_ratings = ratings_df.join(users_df, "user_id")

# 按性别计算评分最高的10部电影
user_ratings_avg = user_ratings.groupBy("gender", "movie_id").avg("rating").withColumnRenamed("avg(rating)", "avg_rating")

# 求每个性别评分最高的10部电影
top_10_movies_by_gender = user_ratings_avg.orderBy("avg_rating", ascending=False).limit(10)
top_10_movies_by_gender.show()
