# MovieLens: Spark-based Big Data Recommendation Analysis

# Task1-Data Loading and Basic Data Analysis with RDD and Dataframe
---

# 1. Data Structure Operation Test

## 1.1. Dataframe Operation Example

In [1]:
from pyspark.sql import SparkSession

# create a SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

# create a simple DataFrame, store into a partition directory
data = [("John", 28), ("Linda", 33), ("Michael", 22)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

# Show the DataFrame
df.show()

# Filter the DataFrame based on age > 25
df_filtered = df.filter(df.Age > 25)

# Show the DataFrame
df_filtered.show()

# Stop the SparkSession
spark.stop()

23/12/09 11:23:20 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

+-------+---+
|   Name|Age|
+-------+---+
|   John| 28|
|  Linda| 33|
|Michael| 22|
+-------+---+

+-----+---+
| Name|Age|
+-----+---+
| John| 28|
|Linda| 33|
+-----+---+



In [22]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("rdd_example").getOrCreate()

# Get the SparkContext from SparkSession
sc = spark.sparkContext

# Create a list of integers
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data)

# Use map function to double each value in the RDD
mapped_rdd = rdd.map(lambda x: x * 2)

# Use filter function to get values greater than 5 in the RDD
filtered_rdd = mapped_rdd.filter(lambda x: x > 5)

# Sum the values in the filtered RDD
sum_result = filtered_rdd.reduce(lambda x, y: x + y)

# Print the sum
print("Filtered and aggregated RDD result:", sum_result)

# Stop the SparkSession
spark.stop()

23/12/01 13:13:31 INFO SparkEnv: Registering MapOutputTracker
23/12/01 13:13:31 INFO SparkEnv: Registering BlockManagerMaster
23/12/01 13:13:31 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/12/01 13:13:31 INFO SparkEnv: Registering OutputCommitCoordinator
                                                                                

Filtered and aggregated RDD result: 104


# 2. Data Load with Data Structure

In [2]:
ratings_file = "file:///mnt/gwx/datasets/ml-25m/ratings.csv"

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

In [3]:
ratings_file = "gs://dataproc-staging-asia-southeast2-933547737015-zijhgarf/ratings.csv"
spark = SparkSession.builder.appName("Rating Analysis").getOrCreate()
ratings_raw_data = spark.sparkContext.textFile(ratings_file)
ratings_raw_data_header = ratings_raw_data.take(1)[0]

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/01 14:19:17 INFO SparkEnv: Registering MapOutputTracker
23/12/01 14:19:17 INFO SparkEnv: Registering BlockManagerMaster
23/12/01 14:19:17 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/12/01 14:19:17 INFO SparkEnv: Registering OutputCommitCoordinator
                                                                                

In [4]:
ratings_raw_data.take(3)

['userId,movieId,rating,timestamp',
 '1,296,5.0,1147880044',
 '1,306,3.5,1147868817']

In [5]:
ratings_data = ratings_raw_data.filter(lambda line: line!=ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [6]:
ratings_data.take(3)

                                                                                

[('1', '296', '5.0'), ('1', '306', '3.5'), ('1', '307', '5.0')]

# 3. Data Analysis

## 3.1. Statistic Analysis

In [7]:
num_ratings = ratings_data.count()
print(f"Num of ratings:{num_ratings}")



评分数量：25000095


                                                                                

In [14]:
spark.stop()

In [15]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Rating Analysis").getOrCreate()

# Read the ratings data into a Spark DataFrame
ratings_df = spark.read.csv("gs://dataproc-staging-asia-southeast2-933547737015-zijhgarf/ratings.csv", header=True, inferSchema=True)

# Process the data by Spark DataFrame
ratings_df.groupBy("movieId").avg("rating").show()

23/12/01 14:33:00 INFO SparkEnv: Registering MapOutputTracker
23/12/01 14:33:00 INFO SparkEnv: Registering BlockManagerMaster
23/12/01 14:33:00 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/12/01 14:33:00 INFO SparkEnv: Registering OutputCommitCoordinator

+-------+------------------+
|movieId|       avg(rating)|
+-------+------------------+
|   1580|3.5817083457378187|
|   1959| 3.634788998530338|
|   2366| 3.543409877319912|
|   5300| 3.626334519572954|
|   6620|3.7897800776196635|
|   1591| 2.637077181835171|
|   3175|3.6077836141619484|
|  96488|3.9735049205147615|
| 175197| 2.754918032786885|
|  44022|3.2593627146699773|
| 160563|3.0350500715307582|
|    471|3.6579813752234034|
|   1645| 3.547347362181387|
|   3918|2.9821298322392416|
|   7982|3.6235955056179776|
|   8638|3.9717508278145695|
|   1088|  3.25002094679514|
|   6357| 3.669491525423729|
|   3997|2.0634660421545665|
|   4519|3.3481739844070577|
+-------+------------------+
only showing top 20 rows



                                                                                

In [16]:
top_movies = (ratings_df.groupBy("movieId")
              .avg("rating")
              .withColumnRenamed("avg(rating)", "average_rating")
              .orderBy("average_rating", ascending=False)
              .limit(5))

# Show results
top_movies.show()



+-------+--------------+
|movieId|average_rating|
+-------+--------------+
| 207095|           5.0|
| 202181|           5.0|
| 122193|           5.0|
| 137018|           5.0|
| 131628|           5.0|
+-------+--------------+



                                                                                

In [20]:
# Read the movies data into a Spark DataFrame
movies_df = spark.read.csv("gs://dataproc-staging-asia-southeast2-933547737015-zijhgarf/movies.csv", header=True, inferSchema=True)

movies_df.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [21]:
specific_movieIds = [207095, 202181, 122193, 137018, 131628]

# Process data by filter and isin
specific_movies_df = movies_df.filter(movies_df.movieId.isin(specific_movieIds))

# Show the result
specific_movies_df.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
| 122193|   Kit Carson (1940)|     Romance|Western|
| 131628|       Loaded (2014)|        Comedy|Drama|
| 137018|A Sister's Reveng...|Drama|Mystery|Thr...|
| 202181| Warlock Moon (1973)|     Horror|Thriller|
| 207095|Windy City Heat (...|  Comedy|Documentary|
+-------+--------------------+--------------------+



## 3.2. Simple Recommendation with Bayesian Average Rating

In [22]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 1. Compute every movie's rating count and average rating
movie_stats = ratings_df.groupBy("movieId").agg(
    F.count("rating").alias("count"),
    F.mean("rating").alias("mean")
)

# 2. Compute the global average rating
C = movie_stats.select(F.mean("count")).collect()[0][0]
m = movie_stats.select(F.mean("mean")).collect()[0][0]

# 3. Define the Bayesian average rating function
def bayesian_avg(count, mean):
    return (C * m + count * mean) / (C + count)

bayesian_avg_udf = F.udf(bayesian_avg)

bayesian_avg_ratings = movie_stats.withColumn(
    "bayesian_avg", bayesian_avg_udf("count", "mean")
)

# 4. Combine the movies DataFrame
movie_stats = bayesian_avg_ratings.join(
    movies_df.select("movieId", "title"), "movieId"
)

# 5. Sort the movies by their Bayesian average rating and show the top 5
top_movies = movie_stats.orderBy(F.desc("bayesian_avg")).limit(5)
top_movies.show()



+-------+-----+------------------+-----------------+--------------------+
|movieId|count|              mean|     bayesian_avg|               title|
+-------+-----+------------------+-----------------+--------------------+
|    318|81482| 4.413576004516335|4.406637765911728|Shawshank Redempt...|
|    858|52498| 4.324336165187245|4.314311946380134|Godfather, The (1...|
|     50|55366| 4.284353213163313|4.275147751556197|Usual Suspects, T...|
|   1221|34188|4.2617585117585115|4.247196813161273|Godfather: Part I...|
|    527|60411| 4.247579083279535|4.239392970515866|Schindler's List ...|
+-------+-----+------------------+-----------------+--------------------+



                                                                                