In [1]:
import os
os.environ["PYSPARK_PYTHON"] = r"C:\Users\DELL\AppData\Local\Programs\Python\Python310\python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"] = r"C:\Users\DELL\AppData\Local\Programs\Python\Python310\python.exe"

In [2]:
%pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [3]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession


# Cấu hình Spark
spark_conf = (
    SparkConf()
    .setAppName("MovieRatingSpark")
    .setMaster("local[*]")
)

# Tạo SparkContext
spark_context = SparkContext.getOrCreate(conf=spark_conf)

# Tạo SparkSession
spark_session = (
    SparkSession
    .builder
    .appName("MovieRatingSpark")
    .getOrCreate()
)

if spark_session:
    print("SparkSession created successfully.")
else:
    print("Error: Failed to create SparkSession.")

SparkSession created successfully.


In [4]:
import os

rs_path = "./resource/"

# Đọc file movies và rating{1,2} với RDD
movies_rdd = spark_context.textFile(os.path.join(rs_path, "movies.txt"))
ratings_1_rdd = spark_context.textFile(os.path.join(rs_path, "ratings_1.txt"))
ratings_2_rdd = spark_context.textFile(os.path.join(rs_path, "ratings_2.txt"))

# Show data
if movies_rdd and ratings_1_rdd and ratings_2_rdd:
    print("\nmovies.txt:")
    for line in movies_rdd.take(5):
        print(line)
    print("\nratings_1.txt:")
    for line in ratings_1_rdd.take(5):
        print(line)
    print("\nratings_2.txt:")
    for line in ratings_2_rdd.take(5):
        print(line)
else:
    print("Error: Failed to load RDD files.")


movies.txt:
1001,The Godfather (1972),Crime|Drama
1002,The Shawshank Redemption (1994),Drama
1003,Schindler's List (1993),Biography|Drama|History
1004,Raging Bull (1980),Biography|Drama|Sport
1005,Casablanca (1942),Drama|Romance|War

ratings_1.txt:
7,1020,4.5,1577836800
23,1015,3.5,1577923200
45,1030,4.0,1578009600
12,1047,3.0,1578096000
38,1012,4.5,1578182400

ratings_2.txt:
12,1012,3.5,1577837800
34,1039,4.0,1577924200
27,1043,4.5,1578010600
8,1020,3.0,1578097000
19,1050,4.0,1578183400


In [5]:
# Parse movies
def parse_movie(line):
    parts = line.split(',', 2)
    movie_id = int(parts[0])
    title = parts[1]
    genres = parts[2] if len(parts) > 2 else ""
    genres_list = genres.split('|') if genres else []
    return (movie_id, genres_list)
movies_parse = movies_rdd.map(parse_movie)

print("After parsing:")
for col in movies_parse.take(5):
    print(f"Movie: {col[0]} {col[1]}")


After parsing:
Movie: 1001 ['Crime', 'Drama']
Movie: 1002 ['Drama']
Movie: 1003 ['Biography', 'Drama', 'History']
Movie: 1004 ['Biography', 'Drama', 'Sport']
Movie: 1005 ['Drama', 'Romance', 'War']


In [6]:
# Parse ratings
def parse_rating(line):
    parts = line.split(',')
    movie_id = int(parts[1])
    rating = float(parts[2])
    return (movie_id, rating)

ratings_1_parse = ratings_1_rdd.map(parse_rating)
ratings_2_parse = ratings_2_rdd.map(parse_rating)

#show rating 1
print("Rating 1:")
for col in ratings_1_parse.take(5):
    print(f"Movie: {col[0]}, rating: {col[1]}")

Rating 1:
Movie: 1020, rating: 4.5
Movie: 1015, rating: 3.5
Movie: 1030, rating: 4.0
Movie: 1047, rating: 3.0
Movie: 1012, rating: 4.5


In [7]:
# Merge ratings 1 && 2
all_ratings = ratings_1_parse.union(ratings_2_parse)
print(f"After merging: {all_ratings.count()}")

# all_ratings: (movie_id, rating)
# x[0] = movie_id
# x[1] = rating
rating_pairs = all_ratings.map(
    lambda x: (x[0], (x[1], 1))  # => (movie_id, (rating, 1))
)

# a = (sum, count)
# b = (sum, count)
rating_stats = rating_pairs.reduceByKey(        
    lambda a, b: (a[0] + b[0], a[1] + b[1])
)

#Join rating_stats
movie_ratings = movies_parse.join(rating_stats) # => (movie_id, ([genre], (sum, count)))

# FlatMap => (genre, (sum, count))
genre_rating_pairs = movie_ratings.flatMap(
    lambda x: [(genre, x[1][1]) for genre in x[1][0]]
)

# a = (sum, count)
# b = (sum, count)
genre_stats = genre_rating_pairs.reduceByKey(
    lambda a, b: (a[0] + b[0], a[1] + b[1]) 
)

genre_avg = genre_stats.mapValues(
    lambda x: (x[0] / x[1], x[1])  # (avgGenre, countRatings)
)


# Kết quả
for (genre, (avg, cnt)) in genre_avg.take(10):
    print(f"{genre} - AverageRating:{avg:.2f} (TotalRatings:{cnt})")


After merging: 184
Sci-Fi - AverageRating:3.73 (TotalRatings:54)
Action - AverageRating:3.71 (TotalRatings:54)
Family - AverageRating:3.67 (TotalRatings:18)
Drama - AverageRating:3.76 (TotalRatings:128)
Biography - AverageRating:3.56 (TotalRatings:25)
Horror - AverageRating:4.00 (TotalRatings:2)
Thriller - AverageRating:3.70 (TotalRatings:27)
Adventure - AverageRating:3.63 (TotalRatings:83)
Film-Noir - AverageRating:4.36 (TotalRatings:7)
Mystery - AverageRating:4.00 (TotalRatings:2)


In [8]:
# Clear
spark_context.stop()
spark_session.stop()