In [1]:
import sys
import os
from pyspark import SparkContext
from pyspark.sql import SparkSession

# Cấu hình Môi trường Python cho Spark 
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Cấu hình Đường dẫn Dữ liệu 
base_path = r"G:\.shortcut-targets-by-id\1eDKYURr-Qm222Ul6PoZLc4b99cuwHTC3\Cường\Đại học\DS200\Lab 2"

# Khởi tạo SparkContext 
sc = SparkContext.getOrCreate()

# HÀM HỖ TRỢ ĐỌC DỮ LIỆU 
def get_file_path(filename):
    return os.path.join(base_path, filename)

def parse_line(line):
    return line.split(',')

# 1. LOAD MOVIES 
def parse_movie(line):
    parts = line.split(',')
    if len(parts) < 3: return None 
    mid = int(parts[0])
    genres = parts[-1]
    # Title là tất cả các phần ở giữa nối lại (phòng trường hợp tên phim có dấu phẩy)
    title = ",".join(parts[1:-1]) 
    return (mid, (title, genres))

movies_rdd = sc.textFile(get_file_path("movies.txt")) \
    .map(parse_movie) \
    .filter(lambda x: x is not None) # Lọc dòng lỗi

# 2. LOAD & UNION RATINGS 
# Input: UserID, MovieID, Rating, Timestamp
r1 = sc.textFile(get_file_path("ratings_1.txt"))
r2 = sc.textFile(get_file_path("ratings_2.txt"))

# Gộp và parse
ratings_rdd = r1.union(r2) \
    .map(parse_line) \
    .filter(lambda x: len(x) == 4) \
    .map(lambda x: (int(x[0]), int(x[1]), float(x[2]), int(x[3])))

# 3. LOAD USERS 
# Input: UserID, Gender, Age, OccupationID, Zipcode
users_rdd = sc.textFile(get_file_path("users.txt")) \
    .map(parse_line) \
    .filter(lambda x: len(x) >= 4) \
    .map(lambda x: (int(x[0]), (x[1], int(x[2]), int(x[3]))))

# 4. LOAD OCCUPATION 
# Input: ID, OccupationName
occ_rdd = sc.textFile(get_file_path("occupation.txt")) \
    .map(parse_line) \
    .filter(lambda x: len(x) >= 2) \
    .map(lambda x: (int(x[0]), x[1]))

# KIỂM TRA
try:
    print(f"Tổng số phim: {movies_rdd.count()}")
    print(f"Tổng số rating: {ratings_rdd.count()}")
    print("Mẫu phim:", movies_rdd.take(1))
    print("Mẫu rating:", ratings_rdd.take(1))
except Exception as e:
    print("Vẫn còn lỗi khi action (count/take):")
    print(e)

Tổng số phim: 50
Tổng số rating: 184
Mẫu phim: [(1001, ('The Godfather (1972)', 'Crime|Drama'))]
Mẫu rating: [(7, 1020, 4.5, 1577836800)]


# Bài 1: Tính Điểm Đánh Giá Trung Bình và Tổng Số Lượt Đánh Giá Cho Mỗi Phim

In [3]:
# 1. Map rating về cặp Key-Value: (MovieID, (Rating, 1))
# Mục đích: Chuẩn bị để cộng dồn Rating (để tính tổng) và số 1 (để đếm số lượng)
movie_ratings = ratings_rdd.map(lambda x: (x[1], (x[2], 1)))

# 2. ReduceByKey: Cộng dồn các giá trị của cùng 1 MovieID
# Kết quả: (MovieID, (TotalRatingSum, TotalCount))
movie_stats = movie_ratings.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

# 3. Tính trung bình: (MovieID, (AvgRating, TotalCount))
movie_avg = movie_stats.mapValues(lambda x: (x[0] / x[1], x[1]))

# 4. Join với bảng Movies để lấy tên phim
# movies_rdd: (MovieID, (Title, Genres))
joined_movies = movie_avg.join(movies_rdd)

# 5. Format lại cho gọn: (Title, AvgRating, TotalCount)
final_movies = joined_movies.map(lambda x: (x[1][1][0], x[1][0][0], x[1][0][1]))

# 6. Tìm phim điểm cao nhất (Điều kiện: TotalRatings >= 5)
# Lọc phim có >= 5 rating
filtered_movies = final_movies.filter(lambda x: x[2] >= 5)

# Sắp xếp giảm dần theo điểm trung bình (x[1])
sorted_movies = filtered_movies.sortBy(lambda x: x[1], ascending=False)

# Lấy phim đầu tiên (cao nhất)
try:
    highest_rated = sorted_movies.first()
   
    # In mẫu 5 phim
    print("Danh sách phim (Top 5 mẫu):")
    for m in sorted_movies.take(5):
        print(f"{m[0]} - AverageRating: {m[1]:.2f} (TotalRatings: {m[2]})")
        
    print(f"\n[WINNER]: '{highest_rated[0]}' là bộ phim có điểm trung bình cao nhất với số điểm {highest_rated[1]:.2f} trong các phim có ít nhất 5 lượt đánh giá.")

except:
    print("Không tìm thấy phim nào thỏa mãn điều kiện >= 5 ratings.")

Danh sách phim (Top 5 mẫu):
Sunset Boulevard (1950) - AverageRating: 4.36 (TotalRatings: 7)
The Terminator (1984) - AverageRating: 4.06 (TotalRatings: 18)
The Godfather: Part II (1974) - AverageRating: 4.00 (TotalRatings: 17)
The Lord of the Rings: The Fellowship of the Ring (2001) - AverageRating: 3.89 (TotalRatings: 18)
No Country for Old Men (2007) - AverageRating: 3.89 (TotalRatings: 18)

[WINNER]: 'Sunset Boulevard (1950)' là bộ phim có điểm trung bình cao nhất với số điểm 4.36 trong các phim có ít nhất 5 lượt đánh giá.


# Bài 2: Phân Tích Đánh Giá Theo Thể Loại

In [4]:
# 1. Join Ratings với Movies trước để lấy thông tin Thể loại cho từng lượt đánh giá
# ratings_kv: (MovieID, Rating) -> Chỉ lấy MovieID và Rating
ratings_kv = ratings_rdd.map(lambda x: (x[1], x[2])) 

# Join: (MovieID, (Rating, (Title, Genres)))
join_rdd = ratings_kv.join(movies_rdd)

# 2. Hàm flatMap quan trọng: Tách chuỗi thể loại "Action|Drama" thành nhiều dòng
def explode_genres(data):
    # data format: (MovieID, (Rating, (Title, "Action|Drama")))
    rating = data[1][0]
    genres_str = data[1][1][1]
    
    # Tách chuỗi bằng dấu '|'
    genre_list = genres_str.split('|')
    
    results = []
    for genre in genre_list:
        # Với mỗi thể loại, ta tạo một cặp (Genre, (Rating, 1))
        results.append((genre, (rating, 1)))
    return results

# 3. Áp dụng flatMap -> ReduceByKey -> Tính trung bình
genre_stats = join_rdd.flatMap(explode_genres) \
                      .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
                      .mapValues(lambda x: (x[0] / x[1], x[1])) 

# Collect về driver để in
results_2 = genre_stats.collect()

print(f"{'Genre':<15} | {'Avg Rating':<10} | {'Total Ratings':<10}")
print("-" * 45)
for genre, stats in results_2:
    print(f"{genre:<15} | {stats[0]:<10.2f} | {stats[1]:<10}")

Genre           | Avg Rating | Total Ratings
---------------------------------------------
Sci-Fi          | 3.73       | 54        
Action          | 3.71       | 54        
Drama           | 3.76       | 128       
Family          | 3.67       | 18        
Biography       | 3.56       | 25        
Horror          | 4.00       | 2         
Fantasy         | 3.86       | 29        
Thriller        | 3.70       | 27        
Mystery         | 4.00       | 2         
Adventure       | 3.63       | 83        
Film-Noir       | 4.36       | 7         
Crime           | 3.81       | 42        


# Bài 3: Phân Tích Đánh Giá Theo Giới Tính

In [2]:
import sys
import os
from pyspark import SparkContext

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

base_path = r"G:\.shortcut-targets-by-id\1eDKYURr-Qm222Ul6PoZLc4b99cuwHTC3\Cường\Đại học\DS200\Lab 2"
def get_path(f): return os.path.join(base_path, f)

# Stop SparkContext cũ nếu đang chạy để tránh xung đột
try:
    sc.stop()
except:
    pass

# Khởi tạo lại với 'local[1]' -> Chạy 1 luồng để ổn định nhất trên Windows
sc = SparkContext("local[1]", "DS200_Lab2_Fixed")

def parse_line(line): return line.split(',')

# 1. Movies: (MovieID, Title)
movies = sc.textFile(get_path("movies.txt")).map(parse_line).filter(lambda x: len(x)>2)\
           .map(lambda x: (int(x[0]), ",".join(x[1:-1])))

# 2. Ratings: (UserID, MovieID, Rating)
r_raw = sc.textFile(get_path("ratings_1.txt")).union(sc.textFile(get_path("ratings_2.txt")))
ratings = r_raw.map(parse_line).filter(lambda x: len(x)==4)\
               .map(lambda x: (int(x[0]), int(x[1]), float(x[2])))

# 3. Users: (UserID, Gender) -> Chỉ lấy cột cần thiết
users = sc.textFile(get_path("users.txt")).map(parse_line).filter(lambda x: len(x)>=2)\
          .map(lambda x: (int(x[0]), x[1].strip())) # Strip để xóa khoảng trắng thừa

print(f"Data Loaded: {ratings.count()} ratings, {users.count()} users")

# Thực hiện bài 3
# 1. Chuẩn bị join:
# ratings_key: (UserID, (MovieID, Rating))
ratings_key = ratings.map(lambda x: (x[0], (x[1], x[2])))

# 2. Join & Map Logic
# Logic: Nếu M thì gán (Rating, 1, 0, 0), nếu F thì (0, 0, Rating, 1)
# Output của map: (MovieID, (SumM, CntM, SumF, CntF))
stats = ratings_key.join(users) \
    .map(lambda x: (x[1][0][0], (x[1][0][1], 1, 0.0, 0) if x[1][1] == 'M' else (0.0, 0, x[1][0][1], 1))) \
    .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1], a[2]+b[2], a[3]+b[3]))

# 3. Tính trung bình & Join tên phim
final_result = stats.mapValues(lambda v: (v[0]/v[1] if v[1]>0 else 0, v[2]/v[3] if v[3]>0 else 0)) \
                    .join(movies) \
                    .map(lambda x: (x[1][1], x[1][0])) # (Title, (AvgM, AvgF))


for title, (avg_m, avg_f) in final_result.take(10):
    print(f"Phim: {title[:25]:<25} | Male: {avg_m:.2f} | Female: {avg_f:.2f}")

Data Loaded: 184 ratings, 50 users
Phim: E.T. the Extra-Terrestria | Male: 3.81 | Female: 3.55
Phim: Psycho (1960)             | Male: 0.00 | Female: 4.00
Phim: Gladiator (2000)          | Male: 3.59 | Female: 3.64
Phim: Fight Club (1999)         | Male: 3.50 | Female: 3.50
Phim: The Terminator (1984)     | Male: 3.93 | Female: 4.14
Phim: The Lord of the Rings: Th | Male: 4.00 | Female: 3.80
Phim: The Godfather: Part II (1 | Male: 4.06 | Female: 3.94
Phim: Mad Max: Fury Road (2015) | Male: 4.00 | Female: 3.32
Phim: The Silence of the Lambs  | Male: 3.33 | Female: 3.00
Phim: Lawrence of Arabia (1962) | Male: 3.55 | Female: 3.31


# Bài 4: Phân Tích Đánh Giá Theo Nhóm Tuổi

In [4]:
# 1. Định nghĩa hàm phân nhóm (đơn giản để worker dễ serialize)
def get_age_group_safe(age):
    try:
        a = int(age)
        if a <= 18: return "0-18"
        elif a <= 35: return "18-35"
        elif a <= 50: return "35-50"
        else: return "50+"
    except:
        return None

# 2. Chuẩn bị dữ liệu
# ratings_key: (UserID, (MovieID, Rating))
ratings_key = ratings.map(lambda x: (x[0], (x[1], x[2])))

# users_age: (UserID, Age) - Lấy cột index 2 trong file users.txt
users_age = sc.textFile(get_path("users.txt")) \
              .map(parse_line) \
              .filter(lambda x: len(x) >= 3) \
              .map(lambda x: (int(x[0]), x[2]))

# 3. Join & Tính toán
# Join -> (UserID, ((MovieID, Rating), Age))
# Map -> ((MovieID, AgeGroup), (Rating, 1))
age_stats = ratings_key.join(users_age) \
    .map(lambda x: ((x[1][0][0], get_age_group_safe(x[1][1])), (x[1][0][1], 1))) \
    .filter(lambda x: x[0][1] is not None) \
    .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) \
    .mapValues(lambda v: v[0]/v[1]) # Tính Avg

# 4. Gom nhóm lại theo Phim để hiển thị
# Chuyển về: (MovieID, (AgeGroup, AvgRating))
movie_age_grouped = age_stats.map(lambda x: (x[0][0], (x[0][1], x[1]))) \
                             .groupByKey() \
                             .mapValues(dict) # Chuyển list các nhóm tuổi thành Dictionary

# 5. Join với Movies để lấy tên và in kết quả
final_age = movie_age_grouped.join(movies)

groups_order = ["0-18", "18-35", "35-50", "50+"]

for movie_id, (age_dict, title) in final_age.take(10):
    output_parts = []
    for g in groups_order:
        val = age_dict.get(g)
        val_str = f"{val:.2f}" if val else "NA"
        output_parts.append(f"{g}: {val_str}")
    
    # In ra dạng: MovieTitle - [0-18: xx, 18-35: xx...] [cite: 39]
    print(f"{title[:20]:<20} - [{', '.join(output_parts)}]")

E.T. the Extra-Terre - [0-18: NA, 18-35: 3.56, 35-50: 3.83, 50+: 3.00]
Psycho (1960)        - [0-18: NA, 18-35: 4.50, 35-50: 3.50, 50+: NA]
Fight Club (1999)    - [0-18: NA, 18-35: 3.50, 35-50: 3.50, 50+: 3.50]
Gladiator (2000)     - [0-18: NA, 18-35: 3.44, 35-50: 3.81, 50+: 3.50]
The Godfather: Part  - [0-18: NA, 18-35: 3.78, 35-50: 4.25, 50+: NA]
The Terminator (1984 - [0-18: NA, 18-35: 4.17, 35-50: 4.05, 50+: 3.75]
The Lord of the Ring - [0-18: NA, 18-35: 4.00, 35-50: 3.83, 50+: NA]
Mad Max: Fury Road ( - [0-18: NA, 18-35: 3.36, 35-50: 3.64, 50+: NA]
The Silence of the L - [0-18: NA, 18-35: 3.00, 35-50: 3.25, 50+: NA]
Lawrence of Arabia ( - [0-18: NA, 18-35: 3.60, 35-50: 3.29, 50+: 4.50]


# Bài 5: Phân Tích Đánh Giá Theo Occupation (Nghề nghiệp) Của Người Dùng

In [5]:
# 1. Load Occupation map: (OccID, OccName) [cite: 17]
occ_map = sc.textFile(get_path("occupation.txt")) \
            .map(parse_line) \
            .filter(lambda x: len(x) >= 2) \
            .map(lambda x: (int(x[0]), x[1].strip()))

# 2. Lấy UserID và OccupationID từ Users [cite: 15]
# users_occ: (UserID, OccID)
users_occ = sc.textFile(get_path("users.txt")) \
              .map(parse_line) \
              .filter(lambda x: len(x) >= 4) \
              .map(lambda x: (int(x[0]), int(x[3])))

# 3. Join Ratings với Users -> Lấy OccupationID cho từng rating
# Join result: (UserID, ((MovieID, Rating), OccID))
# Map -> (OccID, (Rating, 1))
occ_stats = ratings_key.join(users_occ) \
    .map(lambda x: (x[1][1], (x[1][0][1], 1))) \
    .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1]))

# 4. Join với bảng tên nghề nghiệp & Tính trung bình
final_occ = occ_stats.join(occ_map) \
    .map(lambda x: (x[1][1], x[1][0][0]/x[1][0][1], x[1][0][1])) \
    .sortBy(lambda x: x[1], ascending=False) # Sắp xếp theo điểm cao nhất

print(f"{'Occupation':<20} | {'Avg Rating':<10} | {'Count'}")
print("-" * 45)
for row in final_occ.collect():
    # Output: Occupation - TotalRatings: xx, AverageRating: xx [cite: 43]
    print(f"{row[0]:<20} | {row[1]:<10.2f} | {row[2]}")

Occupation           | Avg Rating | Count
---------------------------------------------
Programmer           | 4.25       | 10
Designer             | 4.00       | 13
Student              | 4.00       | 8
Nurse                | 3.86       | 11
Consultant           | 3.86       | 14
Journalist           | 3.85       | 17
Artist               | 3.73       | 11
Teacher              | 3.70       | 5
Doctor               | 3.69       | 21
Salesperson          | 3.65       | 17
Lawyer               | 3.65       | 17
Accountant           | 3.58       | 6
Engineer             | 3.56       | 18
Manager              | 3.47       | 16


# Bài 6: Phân Tích Đánh Giá Theo Thời Gian

In [7]:
from datetime import datetime
import os

# 1. Cấu hình lại đường dẫn 
base_path = r"G:\.shortcut-targets-by-id\1eDKYURr-Qm222Ul6PoZLc4b99cuwHTC3\Cường\Đại học\DS200\Lab 2"
def get_path(f): return os.path.join(base_path, f)

# 2. Load lại Ratings với đủ 4 cột: UserID, MovieID, Rating, Timestamp
# Schema gốc: UserID, MovieID, Rating, Timestamp 
def parse_rating_with_time(line):
    parts = line.split(',')
    if len(parts) == 4:
        return (int(parts[0]), int(parts[1]), float(parts[2]), int(parts[3]))
    return None

# Load từ cả 2 file
r_raw_full = sc.textFile(get_path("ratings_1.txt")).union(sc.textFile(get_path("ratings_2.txt")))
ratings_full = r_raw_full.map(parse_rating_with_time).filter(lambda x: x is not None)

# 3. Xử lý logic: Map -> Reduce -> Sort
# Lưu ý: Viết logic datetime thẳng vào lambda để tránh lỗi Worker Crash trên Windows
# x[3] là Timestamp, x[2] là Rating
year_stats = ratings_full.map(lambda x: (datetime.fromtimestamp(x[3]).year, (x[2], 1))) \
                         .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) \
                         .mapValues(lambda v: (v[1], v[0]/v[1])) \
                         .sortByKey()

# 4. Xuất kết quả
print("\n--- Xu hướng đánh giá theo Năm ---")
# Output yêu cầu: Year - TotalRatings: xx, AverageRating: xx [cite: 44]
results = year_stats.collect()

for year, (count, avg) in results:
    print(f"Năm {year} - TotalRatings: {count}, AverageRating: {avg:.2f}")


--- Xu hướng đánh giá theo Năm ---
Năm 2020 - TotalRatings: 184, AverageRating: 3.75
