In [2]:

from pyspark import SparkContext
sc =  SparkContext.getOrCreate()


#load data
movies = sc.textFile("movies.txt")
ratings1 = sc.textFile("ratings_1.txt")
ratings2 = sc.textFile("ratings_2.txt")
users = sc.textFile("users.txt")
occupations = sc.textFile("occupation.txt")

In [3]:
sc

# Bài 1: Tính Điểm Đánh Giá Trung Bình và Tổng Số Lượt Đánh Giá Cho Mỗi Phim

In [4]:
def parse_movie(line):
    parts = line.split(",")
    return (parts[0], parts[1])   # (MovieID, Title)


def parse_rating(line):
    parts = line.split(",")
    return (parts[1], float(parts[2]))  # (MovieID, Rating)

# RDD
movies_rdd = movies.map(parse_movie)
ratings_rdd = ratings1.map(parse_rating).union(
                ratings2.map(parse_rating)
             )

movie_stats = (
    ratings_rdd
    .mapValues(lambda r: (r, 1))               # (MovieID, (rating, 1))
    .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1]))  # sum rating + count
    .mapValues(lambda x: (x[0] / x[1], x[1]))  # avg, count
)

# Format: (MovieID, (AverageRating, TotalRatings))
joined = movie_stats.join(movies_rdd)
# (MovieID, ((avg, count), Title))

# rating >= 5
filtered = joined.filter(lambda x: x[1][0][1] >= 5)


# highest rating
best_movie = filtered.max(key=lambda x: x[1][0][0])

#best_movie = filtered.reduce(
# lambda a, b: a if a[1][0][0] > b[1][0][0] else b
#)



all_movie_results = joined.collect()

for mid, data in all_movie_results:
    avg, total = data[0]
    title = data[1]
    print(f"{title} AverageRating: {avg:.2f} (TotalRatings: {total})")

print()
print(f"{best_movie[1][1]} is the highest rated movie with an average rating of "
      f"{best_movie[1][0][0]:.2f} among movies with at least 5 ratings.")


Sunset Boulevard (1950) AverageRating: 4.36 (TotalRatings: 7)
Mad Max: Fury Road (2015) AverageRating: 3.47 (TotalRatings: 18)
The Lord of the Rings: The Fellowship of the Ring (2001) AverageRating: 3.89 (TotalRatings: 18)
No Country for Old Men (2007) AverageRating: 3.89 (TotalRatings: 18)
Fight Club (1999) AverageRating: 3.50 (TotalRatings: 7)
The Social Network (2010) AverageRating: 3.86 (TotalRatings: 7)
The Godfather: Part II (1974) AverageRating: 4.00 (TotalRatings: 17)
Lawrence of Arabia (1962) AverageRating: 3.44 (TotalRatings: 18)
Gladiator (2000) AverageRating: 3.61 (TotalRatings: 18)
The Terminator (1984) AverageRating: 4.06 (TotalRatings: 18)
The Silence of the Lambs (1991) AverageRating: 3.14 (TotalRatings: 7)
E.T. the Extra-Terrestrial (1982) AverageRating: 3.67 (TotalRatings: 18)
Psycho (1960) AverageRating: 4.00 (TotalRatings: 2)
The Lord of the Rings: The Return of the King (2003) AverageRating: 3.82 (TotalRatings: 11)

Sunset Boulevard (1950) is the highest rated movi

# Bài 2 Phân Tích Đánh Giá Theo Thể Loại

In [5]:
def parse_movie(line):
    parts = line.split(",")
    movieId = parts[0]
    genres = parts[2].split("|")   # tách thành list
    return (movieId, genres)

def parse_rating(line):
    parts = line.split(",")
    return (parts[1], float(parts[2]))   # (MovieID, Rating)


movies_rdd = movies.map(parse_movie)
ratings_rdd = ratings1.map(parse_rating).union(
                ratings2.map(parse_rating)
              )

# join
# (MovieID, ((genres), rating))
movie_rating = ratings_rdd.join(movies_rdd)

genre_ratings = movie_rating.flatMap(
    lambda x: [(genre, x[1][0]) for genre in x[1][1]]
)

# compute
genre_stats = (
    genre_ratings
    .mapValues(lambda r: (r, 1))
    .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1]))
    .mapValues(lambda x: (x[0] / x[1], x[1]))   # (avg, count)
)

# colect and print
results = genre_stats.collect()

for genre, data in results:
    avg, total = data
    print(f"{genre} - AverageRating: {avg:.2f} (TotalRatings: {total})")

Drama - AverageRating: 3.76 (TotalRatings: 128)
Action - AverageRating: 3.71 (TotalRatings: 54)
Sci-Fi - AverageRating: 3.73 (TotalRatings: 54)
Biography - AverageRating: 3.56 (TotalRatings: 25)
Family - AverageRating: 3.67 (TotalRatings: 18)
Horror - AverageRating: 4.00 (TotalRatings: 2)
Fantasy - AverageRating: 3.86 (TotalRatings: 29)
Thriller - AverageRating: 3.70 (TotalRatings: 27)
Mystery - AverageRating: 4.00 (TotalRatings: 2)
Adventure - AverageRating: 3.63 (TotalRatings: 83)
Film-Noir - AverageRating: 4.36 (TotalRatings: 7)
Crime - AverageRating: 3.81 (TotalRatings: 42)


# Bài 3 Phân Tích Đánh Giá Theo Giới Tính

In [6]:
def parse_movie(line):
    parts = line.split(",")
    return (parts[0], parts[1])   # MovieID, Title

def parse_rating(line):
    parts = line.split(",")
    return (parts[0], parts[1], float(parts[2]))  # UserID, MovieID, Rating

def parse_user(line):
    parts = line.split(",")
    return (parts[0], parts[1])   # UserID, Gender

#RDD
movies_rdd = movies.map(parse_movie)
users_rdd = users.map(parse_user)
ratings_rdd = ratings1.map(parse_rating).union( ratings2.map(parse_rating))
# ratings_rdd --> (UserID, MovieID, Rating)


# join (dùng user_id)
ratings_by_user = ratings_rdd.map(lambda x: (x[0], (x[1], x[2])))
ratings_users = ratings_by_user.join(users_rdd)
movie_gender_rating = ratings_users.map(
    lambda x: ((x[1][0][0], x[1][1]), x[1][0][1])  # ((MovieID, Gender), Rating)
)


# compute
# (MovieID, Gender) → sum, count
movie_gender_stats = movie_gender_rating.mapValues(
    lambda r: (r, 1)
).reduceByKey(
    lambda a, b: (a[0] + b[0], a[1] + b[1])
)

movie_gender_avg = movie_gender_stats.mapValues(
    lambda x: x[0] / x[1]
)

movie_gender = movie_gender_avg.map(
    lambda x: (x[0][0], (x[0][1], x[1]))
)

# Group by MovieID
movie_group = movie_gender.groupByKey()


# join title
movie_with_title = movie_group.join(movies_rdd)
# (MovieID, ([ (Gender, Avg), ... ], Title))



results = movie_with_title.collect()

for mid, (gender_list, title) in results:
    male_avg = "NA"
    female_avg = "NA"

    for gender, avg in gender_list:
        if gender == "M":
            male_avg = f"{avg:.2f}"
        elif gender == "F":
            female_avg = f"{avg:.2f}"

    print(f"{title} - Male_Avg: {male_avg}, Female_Avg: {female_avg}")


Gladiator (2000) - Male_Avg: 3.59, Female_Avg: 3.64
Sunset Boulevard (1950) - Male_Avg: 4.33, Female_Avg: 4.50
Mad Max: Fury Road (2015) - Male_Avg: 4.00, Female_Avg: 3.32
The Social Network (2010) - Male_Avg: 4.00, Female_Avg: 3.67
E.T. the Extra-Terrestrial (1982) - Male_Avg: 3.81, Female_Avg: 3.55
Fight Club (1999) - Male_Avg: 3.50, Female_Avg: 3.50
The Terminator (1984) - Male_Avg: 3.93, Female_Avg: 4.14
The Lord of the Rings: The Fellowship of the Ring (2001) - Male_Avg: 4.00, Female_Avg: 3.80
The Godfather: Part II (1974) - Male_Avg: 4.06, Female_Avg: 3.94
The Lord of the Rings: The Return of the King (2003) - Male_Avg: 3.75, Female_Avg: 3.90
Psycho (1960) - Male_Avg: NA, Female_Avg: 4.00
The Silence of the Lambs (1991) - Male_Avg: 3.33, Female_Avg: 3.00
No Country for Old Men (2007) - Male_Avg: 3.92, Female_Avg: 3.83
Lawrence of Arabia (1962) - Male_Avg: 3.55, Female_Avg: 3.31


# Bài 4 : Phân Tích Đánh Giá Theo Nhóm Tuổi

In [7]:
def age_group(age):
    if age <= 18:
        return "0-18"
    elif age <= 35:
        return "18-35"
    elif age <= 50:
        return "35-50"
    else:
        return "50+"

def parse_movie(line):
    parts = line.split(",")
    return (parts[0], parts[1])  # MovieID, Title

def parse_rating(line):
    parts = line.split(",")
    return (parts[0], parts[1], float(parts[2]))  # UserID, MovieID, Rating

def parse_user(line):
    parts = line.split(",")
    return (parts[0], int(parts[2]))  # UserID, Age


movies_rdd = movies.map(parse_movie)
users_rdd = users.map(parse_user)     # (UserID, Age)
ratings_rdd = ratings1.map(parse_rating).union(ratings2.map(parse_rating))


ratings_by_user = ratings_rdd.map(lambda x: (x[0], (x[1], x[2])))
rating_user_join = ratings_by_user.join(users_rdd)

# Map → ((MovieID, AgeGroup), Rating)
movie_age_rating = rating_user_join.map(
    lambda x: ((x[1][0][0], age_group(x[1][1])), x[1][0][1])
)



age_stats = (
    movie_age_rating
    .mapValues(lambda r: (r, 1))
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))    # sum + count
    .mapValues(lambda x: x[0] / x[1])                       # avg
)

movie_age_pairs = age_stats.map(lambda x: (x[0][0], (x[0][1], x[1])))
movie_grouped = movie_age_pairs.groupByKey()

movie_with_title = movie_grouped.join(movies_rdd)

results = movie_with_title.collect()

AGE_ORDER = ["0-18", "18-35", "35-50", "50+"]

for mid, (age_list, title) in results:
    # Convert iterable to dict: {'18-35': 3.44, ...}
    age_map = {group: f"{avg:.2f}" for group, avg in age_list}

    output = []
    for ag in AGE_ORDER:
        output.append(age_map.get(ag, "NA"))

    print(f"{title} - [0-18: {output[0]}, 18-35: {output[1]}, 35-50: {output[2]}, 50+: {output[3]}]")


Gladiator (2000) - [0-18: NA, 18-35: 3.44, 35-50: 3.81, 50+: 3.50]
Mad Max: Fury Road (2015) - [0-18: NA, 18-35: 3.36, 35-50: 3.64, 50+: NA]
Sunset Boulevard (1950) - [0-18: NA, 18-35: 4.17, 35-50: 4.50, 50+: NA]
The Social Network (2010) - [0-18: NA, 18-35: 4.00, 35-50: 3.67, 50+: NA]
E.T. the Extra-Terrestrial (1982) - [0-18: NA, 18-35: 3.56, 35-50: 3.83, 50+: 3.00]
Fight Club (1999) - [0-18: NA, 18-35: 3.50, 35-50: 3.50, 50+: 3.50]
The Terminator (1984) - [0-18: NA, 18-35: 4.17, 35-50: 4.05, 50+: 3.75]
The Lord of the Rings: The Fellowship of the Ring (2001) - [0-18: NA, 18-35: 4.00, 35-50: 3.83, 50+: NA]
The Godfather: Part II (1974) - [0-18: NA, 18-35: 3.78, 35-50: 4.25, 50+: NA]
The Lord of the Rings: The Return of the King (2003) - [0-18: NA, 18-35: 3.83, 35-50: 3.81, 50+: NA]
Psycho (1960) - [0-18: NA, 18-35: 4.50, 35-50: 3.50, 50+: NA]
The Silence of the Lambs (1991) - [0-18: NA, 18-35: 3.00, 35-50: 3.25, 50+: NA]
No Country for Old Men (2007) - [0-18: NA, 18-35: 3.81, 35-50: 

# Bài 5 Phân Tích Đánh Giá Theo Occupation (Nghề nghiệp) Của Người Dùng

In [8]:
def parse_rating(line):
    parts = line.split(",")
    return (parts[0], float(parts[2]))

def parse_user(line):
    parts = line.split(",")
    return (parts[0], parts[3])
def parse_occupation(line):
    parts = line.split(",")
    return (parts[0], parts[1])            # (OccupationID, OccupationName)


ratings_rdd = ratings1.map(parse_rating).union(ratings2.map(parse_rating))
users_rdd = users.map(parse_user)
occupation_rdd = occupations.map(parse_occupation)



rating_user_join = ratings_rdd.join(users_rdd)
occ_rating = rating_user_join.map(lambda x: (x[1][1], x[1][0]))



occ_stats = (
    occ_rating
    .mapValues(lambda r: (r, 1))                         # (Rating, 1)
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) # sum + count
)

# Compute average rating
occ_avg = occ_stats.mapValues(lambda x: (x[1], x[0] / x[1]))



final_rdd = occ_avg.join(occupation_rdd)
# → (OccupationID, ((TotalRatings, Avg), OccupationName))

# MAP
final_result = final_rdd.map(
    lambda x: (x[1][1], x[1][0][1], x[1][0][0])
    # (OccupationName, AvgRating, TotalRatings)
)

results = final_result.collect()



for name, avg, total in results:
    print(f"{name} - AverageRating: {avg:.2f} (TotalRatings: {total})")

Lawyer - AverageRating: 3.65 (TotalRatings: 17)
Accountant - AverageRating: 3.58 (TotalRatings: 6)
Teacher - AverageRating: 3.70 (TotalRatings: 5)
Salesperson - AverageRating: 3.65 (TotalRatings: 17)
Manager - AverageRating: 3.47 (TotalRatings: 16)
Consultant - AverageRating: 3.86 (TotalRatings: 14)
Student - AverageRating: 4.00 (TotalRatings: 8)
Artist - AverageRating: 3.73 (TotalRatings: 11)
Journalist - AverageRating: 3.85 (TotalRatings: 17)
Doctor - AverageRating: 3.69 (TotalRatings: 21)
Designer - AverageRating: 4.00 (TotalRatings: 13)
Programmer - AverageRating: 4.25 (TotalRatings: 10)
Engineer - AverageRating: 3.56 (TotalRatings: 18)
Nurse - AverageRating: 3.86 (TotalRatings: 11)


# Bài 6 Phân Tích Đánh Giá Theo Thời Gian

In [9]:
import datetime

def parse_rating(line):
    parts = line.split(",")
    rating = float(parts[2])
    timestamp = int(parts[3])
    year = datetime.datetime.fromtimestamp(timestamp).year
    return (year, rating)   # (Year, Rating)

ratings_rdd = ratings1.map(parse_rating).union(ratings2.map(parse_rating))
year_stats = (
    ratings_rdd
    .mapValues(lambda r: (r, 1))                    # (Year, (Rating, 1))
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))   # sum + count
)

# compute
year_avg = year_stats.mapValues(lambda x: (x[1], x[0] / x[1]))
# → (Year, (TotalRatings, AverageRating))


results = year_avg.collect()

for year, (total, avg) in sorted(results):
    print(f"{year} - TotalRatings: {total}, AverageRating: {avg:.2f}")

2020 - TotalRatings: 184, AverageRating: 3.75
