In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark RDD Example").getOrCreate()
sc = spark.sparkContext

In [None]:
from google.colab import files
uploaded = files.upload()

Saving movies.txt to movies.txt
Saving occupation.txt to occupation.txt
Saving ratings_1.txt to ratings_1.txt
Saving ratings_2.txt to ratings_2.txt
Saving users.txt to users.txt


## **Loading data and formating**

movies.txt

In [None]:
movies_rdd = sc.textFile("movies.txt")
movies_rdd.take(5)

['1001,The Godfather (1972),Crime|Drama',
 '1002,The Shawshank Redemption (1994),Drama',
 "1003,Schindler's List (1993),Biography|Drama|History",
 '1004,Raging Bull (1980),Biography|Drama|Sport',
 '1005,Casablanca (1942),Drama|Romance|War']

In [None]:
movies_split = movies_rdd.map(lambda line: line.split(","))
movies_clean = movies_split.filter(lambda fields: len(fields) == 3)
movies_dict = movies_clean.map(
    lambda fields: (fields[0].strip(), (fields[1].strip(), fields[2].strip()))
).collectAsMap()

movie_titles = [value[0] for key, value in movies_dict.items()]

ratings.txt

In [None]:
ratings_rdd1 = sc.textFile("ratings_1.txt")
ratings_rdd2 = sc.textFile("ratings_2.txt")
ratings_rdd = ratings_rdd1.union(ratings_rdd2)
ratings_rdd.take(5)

['7,1020,4.5,1577836800',
 '23,1015,3.5,1577923200',
 '45,1030,4.0,1578009600',
 '12,1047,3.0,1578096000',
 '38,1012,4.5,1578182400']

In [59]:
ratings_split = ratings_rdd.map(lambda line: line.split(","))
ratings_clean = ratings_split.filter(lambda fields: len(fields) == 4)
ratings_pair = ratings_clean.map(
    lambda fields: (fields[0].strip(), fields[1].strip(), float(fields[2].strip()),int(fields[3].strip()))
)
ratings_pair.take(5)

[('7', '1020', 4.5, 1577836800),
 ('23', '1015', 3.5, 1577923200),
 ('45', '1030', 4.0, 1578009600),
 ('12', '1047', 3.0, 1578096000),
 ('38', '1012', 4.5, 1578182400)]

users.txt


In [None]:
users_rdd = sc.textFile("users.txt")
users_rdd.take(5)

['1,M,28,3,12345',
 '2,F,35,7,23456',
 '3,M,42,2,34567',
 '4,F,19,10,45678',
 '5,M,31,1,56789']

In [None]:
users_split = users_rdd.map(lambda line: line.split(","))
users_clean = users_split.filter(lambda fields: len(fields) == 5)
users_pair = users_clean.map(
    lambda fields: (fields[0].strip(), fields[1].strip(), fields[2].strip(), fields[3].strip())
)
users_pair.take(5)

[('1', 'M', '28', '3'),
 ('2', 'F', '35', '7'),
 ('3', 'M', '42', '2'),
 ('4', 'F', '19', '10'),
 ('5', 'M', '31', '1')]

occupation.txt


In [44]:
occupation_rdd = sc.textFile("occupation.txt")
occupation_rdd.take(5)

['1,Programmer', '2,Doctor', '3,Engineer', '4,Teacher', '5,Lawyer']

In [45]:
occupation_split = occupation_rdd.map(lambda line: line.split(","))
occupation_clean = occupation_split.filter(lambda fields: len(fields) == 2)
occupation_pair = occupation_clean.map(
    lambda fields: (fields[0].strip(), fields[1].strip())
)
occupation_pair.take(5)

[('1', 'Programmer'),
 ('2', 'Doctor'),
 ('3', 'Engineer'),
 ('4', 'Teacher'),
 ('5', 'Lawyer')]

# **Bài 1: Tính Điểm Đánh Giá Trung Bình và Tổng Số Lượt Đánh Giá Cho Mỗi Phim**

In [52]:
# Với mỗi rating thì ta phát hành theo key là movies_id còn values là [rating,1]
ratings_by_movie = ratings_pair.map(
    lambda x: (x[1], (x[2], 1))
)
# Tổng hợp điểm và đếm số lượt đánh giá cho mỗi movies_id
movie_rating_stats = ratings_by_movie.reduceByKey(
    lambda a, b: (a[0] + b[0], a[1] + b[1])
)

# Tính trung bình rating cho mỗi phim
def process_avg_rating(item):
    movies_id, score_rating = item
    score, rating = score_rating
    avg_score = score / rating
    return movies_id, (avg_score, rating)

movie_rating_avg = movie_rating_stats.map(process_avg_rating)

# movie_rating_avg = movie_rating_stats.map(
#     lambda x: (x[0], x[1][0] / x[1][1], x[1][1])
# )

# In kết quả từng phim
print("Bài 1: Điểm trung bình và tổng số lượt đánh giá cho mỗi phim:\n")

for mid, (avg, count) in movie_rating_avg.collect():
    title = movies_dict.get(mid,"Unknown")[0]
    print(f"{title} AverageRating: {avg:.2f} (TotalRatings: {count})")

# Tìm phim có điểm trung bình cao nhất (chỉ xét phim có tối thiểu 5 lượt đánh giá)
movie_rating_avg_filtered = movie_rating_avg.filter(lambda x: x[1][1] >= 5)

if movie_rating_avg_filtered.isEmpty():
    print("Không có phim nào có ít nhất 5 lượt đánh giá.")
else:
    best_movie = movie_rating_avg_filtered.max(key=lambda x: x[1][0])
    best_title = movies_dict.get(best_movie[0],"Unknown")[0]
    best_avg = best_movie[1][0]
    print(f"\n\n{best_title} là phim có điểm đánh giá trung bình cao nhất với điểm đánh giá trung bình là: {best_avg:.2f} với ít nhất 5 lượt đánh giá.")



Bài 1: Điểm trung bình và tổng số lượt đánh giá cho mỗi phim:

Gladiator (2000) AverageRating: 3.61 (TotalRatings: 18)
The Terminator (1984) AverageRating: 4.06 (TotalRatings: 18)
Fight Club (1999) AverageRating: 3.50 (TotalRatings: 7)
E.T. the Extra-Terrestrial (1982) AverageRating: 3.67 (TotalRatings: 18)
Sunset Boulevard (1950) AverageRating: 4.36 (TotalRatings: 7)
The Social Network (2010) AverageRating: 3.86 (TotalRatings: 7)
Psycho (1960) AverageRating: 4.00 (TotalRatings: 2)
Mad Max: Fury Road (2015) AverageRating: 3.47 (TotalRatings: 18)
The Lord of the Rings: The Fellowship of the Ring (2001) AverageRating: 3.89 (TotalRatings: 18)
The Godfather: Part II (1974) AverageRating: 4.00 (TotalRatings: 17)
The Lord of the Rings: The Return of the King (2003) AverageRating: 3.82 (TotalRatings: 11)
The Silence of the Lambs (1991) AverageRating: 3.14 (TotalRatings: 7)
Lawrence of Arabia (1962) AverageRating: 3.44 (TotalRatings: 18)
No Country for Old Men (2007) AverageRating: 3.89 (Total

# **Bài 2: Phân Tích Đánh Giá Theo Thể Loại**

In [53]:

# Với mỗi rating thì ta phát hành theo key là movies_id còn values là [rating,1]
ratings_by_movie = ratings_pair.map(
    lambda x: (x[1], (x[2], 1))
)

# Tổng hợp điểm và đếm số lượt đánh giá cho mỗi movies_id
movie_rating_stats = ratings_by_movie.reduceByKey(
    lambda a, b: (a[0] + b[0], a[1] + b[1])
)

# Tính trung bình rating cho mỗi phim
def process_avg_rating(item):
    movies_id, score_rating = item
    score, rating = score_rating
    avg_score = score / rating
    return movies_id, (avg_score, rating)

movie_rating_avg = movie_rating_stats.map(process_avg_rating)

# movie_rating_avg = movie_rating_stats.map(
#     lambda x: (x[0], x[1][0] / x[1][1], x[1][1])
# )

# Tính trung bình rating cho mỗi thể loại phim
def movie_to_genres(item):
    movie_id, avg_info = item
    avg, count = avg_info

    # Lấy chuỗi genre trong movies_dict
    genres = movies_dict[movie_id][1].split("|")

    # Phát ra nhiều bản ghi theo từng genre
    return [(genre, (avg, count)) for genre in genres]

genre_rating_avg = movie_rating_avg.flatMap(movie_to_genres)

genre_rating_avg = genre_rating_avg.reduceByKey(
    lambda a, b: (a[0] + b[0], a[1] + b[1])
)

# In kết quả từng phim
print("Bài 2: Điểm trung bình và tổng số lượt đánh giá cho mỗi loại phim:\n")

for genre, (avg, count) in genre_rating_avg.collect():
    print(f"{genre} AverageRating: {avg:.2f} (TotalRatings: {count})")

Bài 2: Điểm trung bình và tổng số lượt đánh giá cho mỗi loại phim:

Sci-Fi AverageRating: 11.19 (TotalRatings: 54)
Horror AverageRating: 4.00 (TotalRatings: 2)
Family AverageRating: 3.67 (TotalRatings: 18)
Thriller AverageRating: 11.03 (TotalRatings: 27)
Action AverageRating: 11.14 (TotalRatings: 54)
Adventure AverageRating: 18.23 (TotalRatings: 83)
Drama AverageRating: 37.51 (TotalRatings: 128)
Film-Noir AverageRating: 4.36 (TotalRatings: 7)
Biography AverageRating: 7.30 (TotalRatings: 25)
Mystery AverageRating: 4.00 (TotalRatings: 2)
Fantasy AverageRating: 7.71 (TotalRatings: 29)
Crime AverageRating: 11.03 (TotalRatings: 42)


# **Bài 3: Phân Tích Đánh Giá Theo Giới Tính**

In [54]:
# Tạo dict UserID -> Gender để tìm kiếm theo key-values
users_gender_dict = users_pair.map(
    lambda x: (x[0], x[1])   # (UserID, Gender)
).collectAsMap()

# Tổng hợp giới tính và điểm đánh giá theo mỗi movie_id => (MovieID, (Gender, Rating))
ratings_with_gender = ratings_pair.map(
    lambda x: (x[1], (users_gender_dict.get(x[0], "Unknown"), x[2]))
)

# Với mỗi rating thì ta phát hành theo key là (movies_id,genre) còn values là [rating,1]
movie_gender_pair = ratings_with_gender.map(
    lambda x: ((x[0], x[1][0]), (x[1][1], 1))
)

# Tổng hợp điểm và số lượt đáh giá theo mỗi (movies_id,genre)
movie_gender_stats = movie_gender_pair.reduceByKey(
    lambda a, b: (a[0] + b[0], a[1] + b[1])
)

# Tính trung bình rating theo genre mỗi (movies_id) => (MovieID, (Gender, avg_rating))
movie_gender_avg = movie_gender_stats.map(
    lambda x: (x[0][0], (x[0][1], x[1][0] / x[1][1]))
)

# Gom nam/nữ lại chung mỗi movie_id
movie_gender_grouped = movie_gender_avg.groupByKey().mapValues(list)


# In kết quả từng phim
print("Bài 3: Phân tích đánh giá theo giới tính:\n")

for movie_id, gender_list in movie_gender_grouped.collect():
    title = movies_dict.get(movie_id, ["Unknown"])[0]

    gender_dict = {g: r for g, r in gender_list}

    male = gender_dict.get("M", "NA")
    female = gender_dict.get("F", "NA")

    male_str = f"{male:.2f}" if male != "NA" else "NA"
    female_str = f"{female:.2f}" if female != "NA" else "NA"

    print(f"{title} - Male_Avg: {male_str}, Female_Avg: {female_str}")


Bài 3: Phân tích đánh giá theo giới tính:

Gladiator (2000) - Male_Avg: 3.59, Female_Avg: 3.64
The Terminator (1984) - Male_Avg: 3.93, Female_Avg: 4.14
Fight Club (1999) - Male_Avg: 3.50, Female_Avg: 3.50
E.T. the Extra-Terrestrial (1982) - Male_Avg: 3.81, Female_Avg: 3.55
Sunset Boulevard (1950) - Male_Avg: 4.33, Female_Avg: 4.50
The Social Network (2010) - Male_Avg: 4.00, Female_Avg: 3.67
Psycho (1960) - Male_Avg: NA, Female_Avg: 4.00
Mad Max: Fury Road (2015) - Male_Avg: 4.00, Female_Avg: 3.32
The Lord of the Rings: The Fellowship of the Ring (2001) - Male_Avg: 4.00, Female_Avg: 3.80
The Godfather: Part II (1974) - Male_Avg: 4.06, Female_Avg: 3.94
The Lord of the Rings: The Return of the King (2003) - Male_Avg: 3.75, Female_Avg: 3.90
The Silence of the Lambs (1991) - Male_Avg: 3.33, Female_Avg: 3.00
Lawrence of Arabia (1962) - Male_Avg: 3.55, Female_Avg: 3.31
No Country for Old Men (2007) - Male_Avg: 3.92, Female_Avg: 3.83


# **Bài 4: Phân Tích Đánh Giá Theo Nhóm Tuổi**

In [55]:
# Hàm gom nhóm tuổi
def to_age_group(age):
    age = int(age)
    if age <= 18:
        return "0-18"
    elif age <= 35:
        return "18-35"
    elif age <= 50:
        return "35-50"
    else:
        return "50+"

# Tạo dictionary để truy cập nhanh UserID → AgeGroup
users_age_dict = users_pair.map(
    lambda x: (x[0], to_age_group(x[2]))
).collectAsMap()

# Tổng hợp ratings với thông tin agegroup => (MovieID, AgeGroup, Rating)
ratings_with_age = ratings_pair.map(
    lambda x: (x[1], users_age_dict.get(x[0], "Unknown"), x[2])
)

# Tổng hợp điểm và số lượt đánh giá theo key (MovieID, AgeGroup) => ((MovieID, AgeGroup), (Rating, 1))
movie_age_pair = ratings_with_age.map(
    lambda x: ((x[0], x[1]), (x[2], 1))
)


# Tính tổng điểm và số lượt đánh theo mỗi key (MovieID, AgeGroup)
movie_age_stats = movie_age_pair.reduceByKey(
    lambda a, b: (a[0] + b[0], a[1] + b[1])
)


# Tính trung bình rating cho mỗi (MovieID, AgeGroup) => (MovieID, (AgeGroup, avg))
movie_age_avg = movie_age_stats.map(
    lambda x: (x[0][0], (x[0][1], x[1][0] / x[1][1]))
)

# Gom theo MovieID
movie_age_grouped = movie_age_avg.groupByKey().mapValues(list)

# In kết quả từng phim theo từng nhóm tuổi
age_order = ["0-18", "18-35", "35-50", "50+"]

print("Bài 4: Phân tích đánh giá theo độ tuổi:\n")

for movie_id, age_list in movie_age_grouped.collect():

    title = movies_dict.get(movie_id, ["Unknown"])[0]

    # Chuyển list → dict để truy cập cho nhanh
    age_dict = {age: avg for age, avg in age_list}

    formatted = []
    for group in age_order:
        if group in age_dict:
            formatted.append(f"{group}: {age_dict[group]:.2f}")
        else:
            formatted.append(f"{group}: NA")

    formatted_str = ", ".join(formatted)
    print(f"{title} - [{formatted_str}]")


Bài 4: Phân tích đánh giá theo độ tuổi:

Gladiator (2000) - [0-18: NA, 18-35: 3.44, 35-50: 3.81, 50+: 3.50]
Fight Club (1999) - [0-18: NA, 18-35: 3.50, 35-50: 3.50, 50+: 3.50]
The Terminator (1984) - [0-18: NA, 18-35: 4.17, 35-50: 4.05, 50+: 3.75]
E.T. the Extra-Terrestrial (1982) - [0-18: NA, 18-35: 3.56, 35-50: 3.83, 50+: 3.00]
Sunset Boulevard (1950) - [0-18: NA, 18-35: 4.17, 35-50: 4.50, 50+: NA]
The Social Network (2010) - [0-18: NA, 18-35: 4.00, 35-50: 3.67, 50+: NA]
Psycho (1960) - [0-18: NA, 18-35: 4.50, 35-50: 3.50, 50+: NA]
Mad Max: Fury Road (2015) - [0-18: NA, 18-35: 3.36, 35-50: 3.64, 50+: NA]
The Lord of the Rings: The Fellowship of the Ring (2001) - [0-18: NA, 18-35: 4.00, 35-50: 3.83, 50+: NA]
The Godfather: Part II (1974) - [0-18: NA, 18-35: 3.78, 35-50: 4.25, 50+: NA]
The Lord of the Rings: The Return of the King (2003) - [0-18: NA, 18-35: 3.83, 35-50: 3.81, 50+: NA]
The Silence of the Lambs (1991) - [0-18: NA, 18-35: 3.00, 35-50: 3.25, 50+: NA]
Lawrence of Arabia (19

# **Bài 5: Phân Tích Đánh Giá Theo Occupation (Nghề nghiệp) Của Người Dùng**

In [56]:
# Tạo dictionary để truy cập nhanh UserID → OccupationID
users_occ_dict = users_pair.map(
    lambda x: (x[0], x[3])
).collectAsMap()

# Tạo dictionary để truy cập nhanh OccupationID → OccupationName
occupation_dict = occupation_pair.collectAsMap()

# Gắn occupation vào mỗi rating: (OccupationName, Rating)
ratings_with_occ = ratings_pair.map(
    lambda x: (occupation_dict.get(users_occ_dict.get(x[0], "Unknown"), "Unknown"), x[2])
)

# Tổng hợp điểm và số lượt đánh giá theo occupation => OccupationName,(Rating,1)
occ_stats = ratings_with_occ.map(
    lambda x: (x[0], (x[1], 1))
)

# Tính tổng điểm và số lượt đánh theo mỗi occupation
occ_stats = occ_stats.reduceByKey(
    lambda a, b: (a[0] + b[0], a[1] + b[1])
)

# Tính trung bình rating cho mỗi occupation => (Occupation, avg, totalCount)
occ_avg = occ_stats.map(
    lambda x: (x[0], x[1][0] / x[1][1], x[1][1])
)

# In kết quả từng đánh giá theo từng nghề nghiệp
print("Bài 5: Phân tích đánh giá theo occupation:\n")

for occ, avg, total in occ_avg.collect():
    print(f"{occ} - AverageRating: {avg:.2f} (TotalRatings: {total})")


Bài 5: Phân tích đánh giá theo occupation:

Consultant - AverageRating: 3.86 (TotalRatings: 14)
Nurse - AverageRating: 3.86 (TotalRatings: 11)
Lawyer - AverageRating: 3.65 (TotalRatings: 17)
Manager - AverageRating: 3.47 (TotalRatings: 16)
Student - AverageRating: 4.00 (TotalRatings: 8)
Salesperson - AverageRating: 3.65 (TotalRatings: 17)
Engineer - AverageRating: 3.56 (TotalRatings: 18)
Designer - AverageRating: 4.00 (TotalRatings: 13)
Doctor - AverageRating: 3.69 (TotalRatings: 21)
Journalist - AverageRating: 3.85 (TotalRatings: 17)
Artist - AverageRating: 3.73 (TotalRatings: 11)
Programmer - AverageRating: 4.25 (TotalRatings: 10)
Accountant - AverageRating: 3.58 (TotalRatings: 6)
Teacher - AverageRating: 3.70 (TotalRatings: 5)


# **Bài 6: Phân Tích Đánh Giá Theo Thời Gian**

In [62]:
from datetime import datetime

# Hàm lấy năm từ timestamp
def extract_year(ts):
    try:
        return datetime.fromtimestamp(int(ts)).year
    except:
        return None

# Tổng hợp điểm và số lượt đánh giá teo từng năm => (Year, (Rating, 1))
ratings_by_year = ratings_pair.map(
    lambda x: (extract_year(x[3]), (x[2], 1))
).filter(lambda x: x[0] is not None)   # loại timestamp lỗi

# Tính tổng điểm và số lượt đánh giá theo từng năm
year_stats = ratings_by_year.reduceByKey(
    lambda a, b: (a[0] + b[0], a[1] + b[1])
)

# Tính trung bình rating cho từng năm => (Year, TotalRatings, AverageRating)
year_avg = year_stats.map(
    lambda x: (x[0], x[1][1], x[1][0] / x[1][1])
)

# In kết quả

print("Bài 6: Phân tích đánh giá theo thời gian (Year):\n")

for year, total, avg in sorted(year_avg.collect()):
    print(f"{year} - TotalRatings: {total}, AverageRating: {avg:.2f}")


Bài 6: Phân tích đánh giá theo thời gian (Year):

2020 - TotalRatings: 184, AverageRating: 3.75
