### **Thiết lập môi trường Spark**

**Mục tiêu:**
1. Import tất cả các thư viện cần thiết cho các bài toán.
2. Khởi tạo `SparkSession` một lần duy nhất để tất cả các bài toán bên dưới có thể tái sử dụng. Việc này giúp tối ưu hóa hiệu suất, tránh lãng phí tài nguyên do phải khởi tạo và dừng Spark nhiều lần.

In [None]:
import os
import re
import shutil
import math
from pyspark.sql import SparkSession

# Khởi tạo SparkSession duy nhất cho toàn bộ notebook
spark = SparkSession.builder.appName("MapReduceExercises").getOrCreate()
sc = spark.sparkContext

### **Bài 1: Tìm số lớn nhất**

**Các bước thực hiện:**
- **Dữ liệu đầu vào:** Tệp `phantichdulieu/songuyen.txt`.
- **Các bước tính toán:**
  1. Đọc tệp thành RDD.
  2. `flatMap` để tách dòng thành các số.
  3. `map` để chuyển chuỗi thành số nguyên.
  4. `max()` để tìm giá trị lớn nhất.
- **Kết quả trả về:** Số nguyên lớn nhất.

In [None]:
rdd_b1 = sc.textFile("phantichdulieu/songuyen.txt")
numbers_str_rdd = rdd_b1.flatMap(lambda line: line.split(' '))
numbers_int_rdd = numbers_str_rdd.filter(lambda x: x != '').map(lambda x: int(x))
max_number = numbers_int_rdd.max()
print(f"Số lớn nhất tìm được là: {max_number}")

### **Giải thích kết quả**

Chương trình đã đọc, xử lý và tìm ra giá trị số nguyên lớn nhất từ tệp `songuyen.txt`.

### **Bài 2: Đếm số lần xuất hiện của một xâu**

**Các bước thực hiện:**
- **Dữ liệu đầu vào:** Tệp `phantichdulieu/HXH.txt`.
- **Các bước tính toán:**
  1. Đọc tệp vào RDD.
  2. `map` để đếm số lần xuất hiện của xâu trong mỗi dòng.
  3. `reduce` để tính tổng số lần xuất hiện.
- **Kết quả trả về:** Tổng số lần xâu xuất hiện.

In [None]:
substring_to_count = "xuân"
rdd_b2 = sc.textFile("phantichdulieu/HXH.txt")
counts_rdd = rdd_b2.map(lambda line: line.lower().count(substring_to_count.lower()))
total_count = counts_rdd.reduce(lambda a, b: a + b)
print(f"Tổng số lần xuất hiện của xâu '{substring_to_count}' là: {total_count}")

### **Giải thích kết quả**
Kết quả cho biết tổng số lần từ "xuân" (không phân biệt hoa thường) xuất hiện trong toàn bộ văn bản.

### **Bài 3: Thống kê Bigram**

**Các bước thực hiện:**
- **Dữ liệu đầu vào:** Tệp `phantichdulieu/HXH.txt`.
- **Các bước tính toán:**
  1. Đọc tệp, tiền xử lý và tạo bigram.
  2. Dùng `map` và `reduceByKey` để đếm tần suất.
  3. Sắp xếp và lưu kết quả vào thư mục `HXH`.
- **Kết quả trả về:** Thư mục `HXH` chứa kết quả.

In [None]:
output_dir = "HXH"
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)

rdd_b3 = sc.textFile("phantichdulieu/HXH.txt")
def create_bigrams(line):
    clean_line = re.sub(r'[^a-zA-Záàảãạăắằẳẵặâấầẩẫậéèẻẽẹêếềểễệíìỉĩịóòỏõọôốồổỗộơớờởỡợúùủũụưứừửữựýỳỷỹỵđ\s]', '', line.lower())
    words = clean_line.split()
    return [(words[i] + ' ' + words[i+1], 1) for i in range(len(words) - 1)] if len(words) >= 2 else []

bigrams_rdd = rdd_b3.flatMap(create_bigrams)
bigram_counts_rdd = bigrams_rdd.reduceByKey(lambda a, b: a + b)
sorted_bigrams_rdd = bigram_counts_rdd.sortBy(lambda x: x[1], ascending=False)
sorted_bigrams_rdd.saveAsTextFile(output_dir)
print(f"Đã thống kê và lưu kết quả vào thư mục '{output_dir}'.")

### **Giải thích kết quả**
Kết quả được lưu trong thư mục `HXH` dưới dạng các tệp `part-xxxxx`, chứa các cặp từ phổ biến nhất và tần suất của chúng.

### **Bài 4: Phân tích xếp hạng phim**
---
**Chuẩn bị dữ liệu cho Bài 4:**
Đọc tệp `u.data` một lần và cache RDD để tái sử dụng cho tất cả các câu hỏi phụ, giúp tăng hiệu suất.

In [None]:
raw_ratings_rdd = sc.textFile("phantichdulieu/u.data")
# Chuyển đổi mỗi dòng thành tuple (user, movie, rating)
ratings_rdd = raw_ratings_rdd.map(lambda line: line.split('\t')).map(lambda x: (x[0], x[1], int(x[2])))
# Cache RDD để tăng tốc độ truy cập trong các câu sau
ratings_rdd.cache()
print(f"Đã đọc và cache {ratings_rdd.count()} dòng dữ liệu ratings.")

#### **4.1 Đếm số bộ phim rating của mỗi người dùng**


In [None]:
user_ratings_count = ratings_rdd.map(lambda r: (r[0], 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False)
print("Số lượng phim mỗi người dùng đã rating (10 người đầu tiên):")
for row in user_ratings_count.take(10):
    print(f"User: {row[0]}, Count: {row[1]}")

#### **4.2 Tính trung bình rating của mỗi người dùng**


In [None]:
user_sum_count = ratings_rdd.map(lambda r: (r[0], (r[2], 1))).reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
user_avg_rating = user_sum_count.mapValues(lambda v: v[0] / v[1]).sortBy(lambda x: x[1], ascending=False)
print("Rating trung bình của mỗi người dùng (10 người đầu tiên):")
for row in user_avg_rating.take(10):
    print(f"User: {row[0]}, Average Rating: {row[1]:.2f}")

#### **4.3 Tính trung bình, cao nhất, thấp nhất rating cho mỗi bộ phim**


In [None]:
movie_ratings = ratings_rdd.map(lambda r: (r[1], r[2])).groupByKey()
movie_stats = movie_ratings.mapValues(lambda ratings: (sum(ratings)/len(ratings), max(ratings), min(ratings)))
print("Thống kê rating (avg, max, min) cho mỗi phim (10 phim đầu tiên):")
for row in movie_stats.take(10):
    print(f"Movie: {row[0]}, Stats (Avg, Max, Min): ({row[1][0]:.2f}, {row[1][1]}, {row[1][2]})")

#### **4.4 Tìm những bộ phim rating chung của từng cặp người dùng**


In [None]:
user_movies = ratings_rdd.map(lambda r: (r[0], r[1])).groupByKey().mapValues(set)
user_pairs = user_movies.cartesian(user_movies).filter(lambda x: x[0][0] < x[1][0])
common_movies = user_pairs.map(lambda x: ((x[0][0], x[1][0]), x[0][1].intersection(x[1][1])))
print("Phim xem chung của từng cặp người dùng (5 cặp đầu tiên có phim chung):")
for row in common_movies.filter(lambda x: len(x[1]) > 0).take(5):
    print(f"Users: {row[0]}, Common Movies: {row[1]}")

#### **4.5 Tính độ tương tự Cosin giữa hai người dùng a và u (Sửa lỗi logic)**


In [None]:
user_a = '196'
user_u = '186'

# Lấy ratings của từng user dưới dạng (movie, rating)
ratings_a = ratings_rdd.filter(lambda r: r[0] == user_a).map(lambda r: (r[1], r[2]))
ratings_u = ratings_rdd.filter(lambda r: r[0] == user_u).map(lambda r: (r[1], r[2]))

# Join để tìm các phim chung và cặp rating tương ứng (movie, (rating_a, rating_u))
common_ratings_rdd = ratings_a.join(ratings_u)

# Chỉ tính toán trên các phim chung
numerator = common_ratings_rdd.map(lambda x: x[1][0] * x[1][1]).sum()
denominator_a = math.sqrt(common_ratings_rdd.map(lambda x: x[1][0]**2).sum())
denominator_u = math.sqrt(common_ratings_rdd.map(lambda x: x[1][1]**2).sum())

cosine_similarity = 0
if denominator_a > 0 and denominator_u > 0:
    cosine_similarity = numerator / (denominator_a * denominator_u)

print(f"Độ tương tự Cosin (đã sửa) giữa user {user_a} và {user_u} là: {cosine_similarity:.4f}")

#### **4.6 Tính dự đoán rating của người dùng a cho bộ phim i (Sửa lỗi logic và hiệu năng)**


In [None]:
target_user = '1'
target_movie = '3'
similarity_threshold = 0.5

# (user, (movie, rating))
user_movie_ratings = ratings_rdd.map(lambda r: (r[0], (r[1], r[2])))

# (movie, (user, rating))
movie_user_ratings = ratings_rdd.map(lambda r: (r[1], (r[0], r[2])))

# Lấy tất cả ratings của target_user
target_user_ratings = user_movie_ratings.filter(lambda x: x[0] == target_user).values().collectAsMap()
target_user_norm = math.sqrt(sum(rating**2 for rating in target_user_ratings.values()))

# Broadcast biến này để các worker có thể truy cập hiệu quả
b_target_user_ratings = sc.broadcast(target_user_ratings)
b_target_user_norm = sc.broadcast(target_user_norm)

# Nhóm ratings theo user: (user, {(movie, rating), ...})
all_user_ratings_grouped = user_movie_ratings.groupByKey().mapValues(list)

# Tính Cosine Similarity cho tất cả user với target_user
def calculate_similarity(user_data):
    user_id, ratings_list = user_data
    if user_id == target_user: return (user_id, (0, 0)) # Bỏ qua chính target_user

    # Lấy dữ liệu từ broadcast variable
    t_ratings = b_target_user_ratings.value
    t_norm = b_target_user_norm.value

    numerator = 0
    common_ratings_norm_sq = 0
    for movie, rating in ratings_list:
        if movie in t_ratings:
            numerator += rating * t_ratings[movie]
            common_ratings_norm_sq += rating**2
    
    u_norm = math.sqrt(common_ratings_norm_sq)

    similarity = 0
    if t_norm > 0 and u_norm > 0:
        similarity = numerator / (t_norm * u_norm)
    
    # Trả về (user_id, (similarity, rating cho target_movie nếu có))
    rating_for_target_movie = 0
    for movie, rating in ratings_list:
        if movie == target_movie:
            rating_for_target_movie = rating
            break
            
    return (user_id, (similarity, rating_for_target_movie))

similarities_rdd = all_user_ratings_grouped.map(calculate_similarity)

# Lọc những user có độ tương tự cao và đã rate target_movie
similar_users_rdd = similarities_rdd.filter(lambda x: x[1][0] >= similarity_threshold and x[1][1] > 0)

# Tính toán tử số và mẫu số cho công thức dự đoán
prediction_numerator = similar_users_rdd.map(lambda x: x[1][0] * x[1][1]).sum()
prediction_denominator = similar_users_rdd.map(lambda x: abs(x[1][0])).sum()

predicted_rating = 0
if prediction_denominator > 0:
    predicted_rating = prediction_numerator / prediction_denominator

print(f"Rating dự đoán (đã sửa) của user {target_user} cho phim {target_movie} là: {predicted_rating:.4f}")

### **Dọn dẹp môi trường**
Dừng SparkSession để giải phóng tài nguyên.

In [None]:
spark.stop()