In [2]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [6]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [20]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkContext

def bai1():
    #getOrCreate() sẽ lấy context hiện có hoặc tạo mới nếu chưa có
    sc = SparkContext.getOrCreate()

    #Load Data cả 2 file ratings và gộp chúng lại (union)
    ratings_rdd_1 = sc.textFile("/content/drive/My Drive/BigData/BTTH2/ratings_1.txt")
    ratings_rdd_2 = sc.textFile("/content/drive/My Drive/BigData/BTTH2/ratings_2.txt")
    all_ratings_rdd = ratings_rdd_1.union(ratings_rdd_2)

    #Load file movies
    movies_rdd = sc.textFile("/content/drive/My Drive/BigData/BTTH2/movies.txt")

    #(UserID, MovieID, Rating, Timestamp) -> (MovieID, (Rating, 1))
    def parse_rating(line):
        try:
            parts = line.split(',')
            movie_id = parts[1] # MovieID o vi tri 1
            rating = float(parts[2]) # Rating o vi tri 2
            return (movie_id, (rating, 1)) # (Key, (Gia tri, Bo dem))
        except:
            return ("ERROR", (0, 0)) # Bo qua neu dong bi loi

    ratings_parsed = all_ratings_rdd.map(parse_rating).filter(lambda x: x[0] != "ERROR")

    #Tính tổng rating (sum) và tổng số lượt đếm (count) cho mỗi MovieID
    #reduceByKey se cong 2 tuple voi nhau: (a[0] + b[0], a[1] + b[1])
    #Ket qua: (MovieID, (TotalRatingSum, TotalRatingCount))
    ratings_sum = ratings_parsed.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

    #Tính rating trung binh
    #(MovieID, (AverageRating, TotalRatingCount))
    ratings_avg = ratings_sum.mapValues(lambda total: (total[0] / total[1], total[1]))


    #(MovieID, Title, Genres) -> (MovieID, Title)
    def parse_movie(line):
        try:
            parts = line.split(',')
            movie_id = parts[0]
            title = parts[1]
            return (movie_id, title)
        except:
            return ("ERROR", "ERROR")

    movies_parsed = movies_rdd.map(parse_movie).filter(lambda x: x[0] != "ERROR")

    #(MovieID, Title) join (MovieID, (AverageRating, TotalRatingCount))
    #Ket qua: (MovieID, (Title, (AverageRating, TotalRatingCount)))
    joined_data = movies_parsed.join(ratings_avg)

    #Chuyen doi sang dinh dang: "MovieTitle AverageRating: xx (TotalRatings: xx)"
    def format_output(record):
        movie_id, (title, (avg_rating, total_count)) = record
        # Dinh dang diem trung binh lay 2 so thap phan
        return f"{title} AverageRating: {avg_rating:.2f} (TotalRatings: {total_count})"
    output_strings = joined_data.map(format_output)

    #Thu thap ket qua va in ra
    for line in output_strings.collect():
        print(line)
    print()

    #Loc ra cac phim co >= 5 luot danh gia
    #record[1] la (Title, (AverageRating, TotalRatingCount))
    #record[1][1] la (AverageRating, TotalRatingCount)
    #record[1][1][1] la TotalRatingCount
    filtered_movies = joined_data.filter(lambda record: record[1][1][1] >= 5)

    if filtered_movies.isEmpty():
        print("Khong tim thay phim nao co tu 5 danh gia tro len.")
    else:
        #Tim phim co diem trung binh cao nhat
        #Su dung reduce de tim ra record co avg_rating (a[1][1][0]) cao nhat
        highest_rated_record = filtered_movies.reduce(
            lambda a, b: a if a[1][1][0] > b[1][1][0] else b
        )

        #Lay thong tin tu record tim duoc
        title, (avg_rating, total_count) = highest_rated_record[1]

        print(f"{title} is the highest rated movie with an average rating of {avg_rating:.2f} among movies with at least 5 ratings.")

#Ham chinh de chay Spark
if __name__ == "__main__":
    #Khoi tao SparkSession de lay SparkContext
    spark = SparkSession.builder.appName("Bai1").getOrCreate()
    bai1()
    spark.stop()

Sunset Boulevard (1950) AverageRating: 4.36 (TotalRatings: 7)
The Lord of the Rings: The Fellowship of the Ring (2001) AverageRating: 3.89 (TotalRatings: 18)
No Country for Old Men (2007) AverageRating: 3.89 (TotalRatings: 18)
Mad Max: Fury Road (2015) AverageRating: 3.47 (TotalRatings: 18)
Fight Club (1999) AverageRating: 3.50 (TotalRatings: 7)
Lawrence of Arabia (1962) AverageRating: 3.44 (TotalRatings: 18)
The Godfather: Part II (1974) AverageRating: 4.00 (TotalRatings: 17)
The Social Network (2010) AverageRating: 3.86 (TotalRatings: 7)
The Terminator (1984) AverageRating: 4.06 (TotalRatings: 18)
The Silence of the Lambs (1991) AverageRating: 3.14 (TotalRatings: 7)
Gladiator (2000) AverageRating: 3.61 (TotalRatings: 18)
Psycho (1960) AverageRating: 4.00 (TotalRatings: 2)
E.T. the Extra-Terrestrial (1982) AverageRating: 3.67 (TotalRatings: 18)
The Lord of the Rings: The Return of the King (2003) AverageRating: 3.82 (TotalRatings: 11)

Sunset Boulevard (1950) is the highest rated movi

Bài 2:

In [21]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkContext

def bai2():
    sc = SparkContext.getOrCreate()

    try:
        #Load ratings (gop 2 file)
        ratings_rdd = sc.textFile("/content/drive/My Drive/BigData/BTTH2/ratings_1.txt").union(sc.textFile("/content/drive/My Drive/BigData/BTTH2/ratings_2.txt"))
        #Load movies
        movies_rdd = sc.textFile("/content/drive/My Drive/BigData/BTTH2/movies.txt")
    except Exception as e:
        print(f"LOI: Khong tim thay file. Hay kiem tra duong dan: {e}")
        return

    #Parse Movies: (MovieID, Genres)
    def parse_movie_genres(line):
        try:
            parts = line.split(',')
            movie_id = parts[0]
            # Lay phan tu cuoi cung lam Genre (tranh loi neu ten phim co dau phay)
            genres = parts[-1]
            return (movie_id, genres)
        except:
            return ("ERROR", "ERROR")

    # Parse Ratings: (MovieID, Rating)
    def parse_rating(line):
        try:
            parts = line.split(',')
            movie_id = parts[1]
            rating = float(parts[2])
            return (movie_id, rating)
        except:
            return ("ERROR", 0.0)

    movies_parsed = movies_rdd.map(parse_movie_genres).filter(lambda x: x[0] != "ERROR")
    ratings_parsed = ratings_rdd.map(parse_rating).filter(lambda x: x[0] != "ERROR")


    # (MovieID, Genres) JOIN (MovieID, Rating) => (MovieID, (Genres, Rating))
    joined_data = movies_parsed.join(ratings_parsed)

    # Ham nay se nhan 1 dong du lieu cua phim va tra ve NHIEU dong cho tung the loai
    def split_genres(record):
        movie_id, (genres_str, rating) = record
        genre_list = genres_str.split('|') # Tach chuoi bang dau gach dung

        results = []
        for genre in genre_list:
            # Tra ve tuple: (TenTheLoai, (DiemRating, 1_luot_danh_gia))
            results.append((genre, (rating, 1)))
        return results

    # Su dung flatMap de lay danh sach ra
    genre_ratings = joined_data.flatMap(split_genres)

    # ReduceByKey: Cong don diem va so luot danh gia cho tung the loai
    # (Genre, (SumRating, Count))
    genre_stats = genre_ratings.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

    # MapValues: Tinh trung binh
    # (Genre, (Average, Count))
    final_result = genre_stats.mapValues(lambda x: (x[0] / x[1], x[1]))

    def format_output(record):
        genre, (avg_rating, count) = record
        # Format theo yeu cau: "Genre - AverageRating (TotalRatings)"
        return f"{genre} - {avg_rating:.2f} ({count})"
    output_lines = final_result.map(format_output)

    # Thu thap va in ket qua
    for line in output_lines.collect():
        print(line)

# Chay
if __name__ == "__main__":
    spark = SparkSession.builder.appName("Bai2").getOrCreate()
    bai2()
    spark.stop()

Drama - 3.76 (128)
Action - 3.71 (54)
Sci-Fi - 3.73 (54)
Biography - 3.56 (25)
Family - 3.67 (18)
Horror - 4.00 (2)
Fantasy - 3.86 (29)
Thriller - 3.70 (27)
Mystery - 4.00 (2)
Adventure - 3.63 (83)
Film-Noir - 4.36 (7)
Crime - 3.81 (42)


Bài 3:

In [16]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkContext

def bai3():
    # Khoi tao SparkContext
    sc = SparkContext.getOrCreate()

    try:
        # Load ratings (gop 2 file)
        ratings_rdd = sc.textFile("/content/drive/My Drive/BigData/BTTH2/ratings_1.txt").union(sc.textFile("/content/drive/My Drive/BigData/BTTH2/ratings_2.txt"))
        # Load users
        users_rdd = sc.textFile("/content/drive/My Drive/BigData/BTTH2/users.txt")
        # Load movies
        movies_rdd = sc.textFile("/content/drive/My Drive/BigData/BTTH2/movies.txt")
    except Exception as e:
        print(f"LOI: Khong tim thay file. Vui long kiem tra duong dan: {e}")
        return

    # Parse Users: (UserID, Gender)
    def parse_user(line):
        try:
            parts = line.split(',')
            return (parts[0], parts[1]) # (UserID, Gender)
        except:
            return ("ERROR", "ERROR")

    # Parse Ratings: (UserID, (MovieID, Rating))
    # Chon UserID lam Key de ti nua join voi bang Users
    def parse_rating(line):
        try:
            parts = line.split(',')
            user_id = parts[0]
            movie_id = parts[1]
            rating = float(parts[2])
            return (user_id, (movie_id, rating)) # (UserID, (MovieID, Rating))
        except:
            return ("ERROR", (0, 0))

    users_parsed = users_rdd.map(parse_user).filter(lambda x: x[0] != "ERROR")
    ratings_parsed = ratings_rdd.map(parse_rating).filter(lambda x: x[0] != "ERROR")

    # (UserID, (MovieID, Rating)) JOIN (UserID, Gender) => (UserID, ((MovieID, Rating), Gender))
    joined_user_rating = ratings_parsed.join(users_parsed)

    # Chuyen doi sang dang: (MovieID, (Rating, Gender))
    def map_to_movie_gender(record):
        # record: (UserID, ((MovieID, Rating), Gender))
        user_id, ((movie_id, rating), gender) = record
        return (movie_id, (rating, gender))
    movie_gender_rdd = joined_user_rating.map(map_to_movie_gender)

    # Map sang dang vector de cong don:
    # (MovieID, (SumMale, CountMale, SumFemale, CountFemale))
    def map_gender_stats(record):
        movie_id, (rating, gender) = record
        if gender == 'M':
            return (movie_id, (rating, 1, 0, 0)) # Diem Nam, 1 luot Nam, 0 Nu
        elif gender == 'F':
            return (movie_id, (0, 0, rating, 1)) # 0 Nam, Diem Nu, 1 luot Nu
        else:
            return (movie_id, (0, 0, 0, 0))

    # ReduceByKey: Cong don cac chi so
    reduced_stats = movie_gender_rdd.map(map_gender_stats).reduceByKey(
        lambda a, b: (a[0]+b[0], a[1]+b[1], a[2]+b[2], a[3]+b[3])
    )

    # Tinh trung binh: (MovieID, (AvgMale, AvgFemale))
    def calculate_averages(record):
        movie_id, (sum_m, cnt_m, sum_f, cnt_f) = record

        avg_m = sum_m / cnt_m if cnt_m > 0 else 0.0
        avg_f = sum_f / cnt_f if cnt_f > 0 else 0.0

        return (movie_id, (avg_m, avg_f))

    final_stats = reduced_stats.map(calculate_averages)

    # Parse Movies: (MovieID, Title)
    def parse_movie(line):
        try:
            parts = line.split(',')
            return (parts[0], parts[1]) # (MovieID, Title)
        except:
            return ("ERROR", "ERROR")

    movies_parsed = movies_rdd.map(parse_movie).filter(lambda x: x[0] != "ERROR")

    # Join: (MovieID, Title) JOIN (MovieID, (AvgMale, AvgFemale)) => (MovieID, (Title, (AvgMale, AvgFemale)))
    final_result = movies_parsed.join(final_stats)

    def format_output(record):
        # record: (MovieID, (Title, (AvgMale, AvgFemale)))
        movie_id, (title, (avg_m, avg_f)) = record
        return f"{title} - Male_Avg: {avg_m:.2f}, Female_Avg: {avg_f:.2f}"
    output_lines = final_result.map(format_output)

    for line in output_lines.collect():
        print(line)

# Chay
if __name__ == "__main__":
    spark = SparkSession.builder.appName("Bai3").getOrCreate()
    bai3()
    spark.stop()

Gladiator (2000) - Male_Avg: 3.59, Female_Avg: 3.64
Sunset Boulevard (1950) - Male_Avg: 4.33, Female_Avg: 4.50
E.T. the Extra-Terrestrial (1982) - Male_Avg: 3.81, Female_Avg: 3.55
The Social Network (2010) - Male_Avg: 4.00, Female_Avg: 3.67
Mad Max: Fury Road (2015) - Male_Avg: 4.00, Female_Avg: 3.32
The Terminator (1984) - Male_Avg: 3.93, Female_Avg: 4.14
Fight Club (1999) - Male_Avg: 3.50, Female_Avg: 3.50
Psycho (1960) - Male_Avg: 0.00, Female_Avg: 4.00
The Godfather: Part II (1974) - Male_Avg: 4.06, Female_Avg: 3.94
The Lord of the Rings: The Fellowship of the Ring (2001) - Male_Avg: 4.00, Female_Avg: 3.80
The Lord of the Rings: The Return of the King (2003) - Male_Avg: 3.75, Female_Avg: 3.90
The Silence of the Lambs (1991) - Male_Avg: 3.33, Female_Avg: 3.00
Lawrence of Arabia (1962) - Male_Avg: 3.55, Female_Avg: 3.31
No Country for Old Men (2007) - Male_Avg: 3.92, Female_Avg: 3.83


Bài 4:

In [17]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkContext

def bai4():
    sc = SparkContext.getOrCreate()

    try:
        ratings_rdd = sc.textFile("/content/drive/My Drive/BigData/BTTH2/ratings_1.txt").union(sc.textFile("/content/drive/My Drive/BigData/BTTH2/ratings_2.txt"))
        users_rdd = sc.textFile("/content/drive/My Drive/BigData/BTTH2/users.txt")
        movies_rdd = sc.textFile("/content/drive/My Drive/BigData/BTTH2/movies.txt")
    except Exception as e:
        print(f"LOI: Khong tim thay file input. {e}")
        return

    def get_age_group(age):
        try:
            age = int(age)
            if age <= 18:
                return "0-18"
            elif age <= 35:
                return "18-35"
            elif age <= 50:
                return "35-50"
            else:
                return "50+"
        except:
            return "Unknown"

    def parse_user(line):
        try:
            parts = line.split(',')
            user_id = parts[0]
            age = parts[2]
            age_group = get_age_group(age)
            return (user_id, age_group)
        except:
            return ("ERROR", "ERROR")

    def parse_rating(line):
        try:
            parts = line.split(',')
            user_id = parts[0]
            movie_id = parts[1]
            rating = float(parts[2])
            return (user_id, (movie_id, rating))
        except:
            return ("ERROR", (0, 0))

    users_parsed = users_rdd.map(parse_user).filter(lambda x: x[0] != "ERROR")
    ratings_parsed = ratings_rdd.map(parse_rating).filter(lambda x: x[0] != "ERROR")

    joined_rdd = ratings_parsed.join(users_parsed)

    def map_to_movie_age(record):
        user_id, ((movie_id, rating), age_group) = record
        return (movie_id, (rating, age_group))

    def map_age_stats(record):
        movie_id, (rating, group) = record
        # (sum_0-18, cnt_0-18, sum_18-35, cnt_18-35, sum_35-50, cnt_35-50, sum_50+, cnt_50+)
        stats = [0.0, 0, 0.0, 0, 0.0, 0, 0.0, 0]

        if group == "0-18":
            stats[0] = rating
            stats[1] = 1
        elif group == "18-35":
            stats[2] = rating
            stats[3] = 1
        elif group == "35-50":
            stats[4] = rating
            stats[5] = 1
        elif group == "50+":
            stats[6] = rating
            stats[7] = 1

        return (movie_id, tuple(stats))

    reduced_stats = joined_rdd.map(map_to_movie_age)\
                              .map(map_age_stats)\
                              .reduceByKey(lambda a, b: tuple(x + y for x, y in zip(a, b)))

    def calculate_averages(record):
        movie_id, stats = record

        def safe_avg(total, count):
            return total / count if count > 0 else 0.0

        avg_0_18 = safe_avg(stats[0], stats[1])
        avg_18_35 = safe_avg(stats[2], stats[3])
        avg_35_50 = safe_avg(stats[4], stats[5])
        avg_50_plus = safe_avg(stats[6], stats[7])

        return (movie_id, (avg_0_18, avg_18_35, avg_35_50, avg_50_plus))

    final_stats = reduced_stats.map(calculate_averages)

    movies_parsed = movies_rdd.map(lambda x: x.split(',')).map(lambda x: (x[0], x[1]))
    final_result = movies_parsed.join(final_stats)


    # Ham ho tro: Neu gia tri > 0 thi in so, nguoc lai in "NA"
    def format_val(val):
        return f"{val:.2f}" if val > 0 else "NA"

    def format_output(record):
        movie_id, (title, (a1, a2, a3, a4)) = record

        # Su dung ham format_val da viet o tren
        return f"{title} - [0-18: {format_val(a1)}, 18-35: {format_val(a2)}, 35-50: {format_val(a3)}, 50+: {format_val(a4)}]"
    output_lines = final_result.map(format_output)

    for line in output_lines.collect():
        print(line)

# Chay
if __name__ == "__main__":
    spark = SparkSession.builder.appName("Bai4").getOrCreate()
    bai4()
    spark.stop()

Gladiator (2000) - [0-18: NA, 18-35: 3.44, 35-50: 3.81, 50+: 3.50]
Sunset Boulevard (1950) - [0-18: NA, 18-35: 4.17, 35-50: 4.50, 50+: NA]
E.T. the Extra-Terrestrial (1982) - [0-18: NA, 18-35: 3.56, 35-50: 3.83, 50+: 3.00]
The Social Network (2010) - [0-18: NA, 18-35: 4.00, 35-50: 3.67, 50+: NA]
Mad Max: Fury Road (2015) - [0-18: NA, 18-35: 3.36, 35-50: 3.64, 50+: NA]
The Terminator (1984) - [0-18: NA, 18-35: 4.17, 35-50: 4.05, 50+: 3.75]
Fight Club (1999) - [0-18: NA, 18-35: 3.50, 35-50: 3.50, 50+: 3.50]
Psycho (1960) - [0-18: NA, 18-35: 4.50, 35-50: 3.50, 50+: NA]
The Godfather: Part II (1974) - [0-18: NA, 18-35: 3.78, 35-50: 4.25, 50+: NA]
The Lord of the Rings: The Fellowship of the Ring (2001) - [0-18: NA, 18-35: 4.00, 35-50: 3.83, 50+: NA]
The Lord of the Rings: The Return of the King (2003) - [0-18: NA, 18-35: 3.83, 35-50: 3.81, 50+: NA]
The Silence of the Lambs (1991) - [0-18: NA, 18-35: 3.00, 35-50: 3.25, 50+: NA]
Lawrence of Arabia (1962) - [0-18: NA, 18-35: 3.60, 35-50: 3.29

Bài 5:

In [18]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkContext

def bai5():
    sc = SparkContext.getOrCreate()

    try:
        # Load ratings (gop 2 file)
        ratings_rdd = sc.textFile("/content/drive/My Drive/BigData/BTTH2/ratings_1.txt").union(sc.textFile("/content/drive/My Drive/BigData/BTTH2/ratings_2.txt"))
        users_rdd = sc.textFile("/content/drive/My Drive/BigData/BTTH2/users.txt")
        occupation_rdd = sc.textFile("/content/drive/My Drive/BigData/BTTH2/occupation.txt")
    except Exception as e:
        print(f"LOI: Khong tim thay file. {e}")
        return

    # Parse Users: Lay (UserID, OccupationID)
    def parse_user(line):
        try:
            parts = line.split(',')
            user_id = parts[0]
            occ_id = parts[3] # Cot OccupationID
            return (user_id, occ_id)
        except:
            return ("ERROR", "ERROR")

    # Parse Ratings: Lay (UserID, Rating)
    def parse_rating(line):
        try:
            parts = line.split(',')
            user_id = parts[0]
            rating = float(parts[2])
            return (user_id, rating)
        except:
            return ("ERROR", 0.0)

    # Parse Occupation: Lay (OccupationID, OccupationName)
    def parse_occupation(line):
        try:
            parts = line.split(',')
            occ_id = parts[0]
            occ_name = parts[1]
            return (occ_id, occ_name)
        except:
            return ("ERROR", "ERROR")

    users_parsed = users_rdd.map(parse_user).filter(lambda x: x[0] != "ERROR")
    ratings_parsed = ratings_rdd.map(parse_rating).filter(lambda x: x[0] != "ERROR")
    occ_parsed = occupation_rdd.map(parse_occupation).filter(lambda x: x[0] != "ERROR")

    # (UserID, Rating) JOIN (UserID, OccID) => (UserID, (Rating, OccID))
    joined_user_rating = ratings_parsed.join(users_parsed)

    # Chuyen doi sang dang: (OccID, (Rating, 1))
    def map_to_occ(record):
        user_id, (rating, occ_id) = record
        return (occ_id, (rating, 1))

    # ReduceByKey: Cong tong diem va tong so luot danh gia cho moi OccID
    # (OccID, (SumRating, TotalCount))
    occ_stats = joined_user_rating.map(map_to_occ)\
                                  .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

    # (OccID, (SumRating, TotalCount)) JOIN (OccID, OccName) => (OccID, ((SumRating, TotalCount), OccName))
    final_data = occ_stats.join(occ_parsed)

    def format_output(record):
        occ_id, ((total_score, count), occ_name) = record

        # Tinh trung binh
        avg_rating = total_score / count if count > 0 else 0.0

        # Format theo yeu cau
        return f"{occ_name} - TotalRatings: {count}, AverageRating: {avg_rating:.2f}"

    # Sort theo ten nghe nghiep
    output_lines = final_data.map(format_output).sortBy(lambda x: x)

    for line in output_lines.collect():
        print(line)

# Chay
if __name__ == "__main__":
    spark = SparkSession.builder.appName("Bai5").getOrCreate()
    bai5()
    spark.stop()

Accountant - TotalRatings: 6, AverageRating: 3.58
Artist - TotalRatings: 11, AverageRating: 3.73
Consultant - TotalRatings: 14, AverageRating: 3.86
Designer - TotalRatings: 13, AverageRating: 4.00
Doctor - TotalRatings: 21, AverageRating: 3.69
Engineer - TotalRatings: 18, AverageRating: 3.56
Journalist - TotalRatings: 17, AverageRating: 3.85
Lawyer - TotalRatings: 17, AverageRating: 3.65
Manager - TotalRatings: 16, AverageRating: 3.47
Nurse - TotalRatings: 11, AverageRating: 3.86
Programmer - TotalRatings: 10, AverageRating: 4.25
Salesperson - TotalRatings: 17, AverageRating: 3.65
Student - TotalRatings: 8, AverageRating: 4.00
Teacher - TotalRatings: 5, AverageRating: 3.70


Bài 6

In [19]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkContext
from datetime import datetime

def bai6():
    sc = SparkContext.getOrCreate()

    try:
        # Load ratings (gop 2 file)
        ratings_rdd = sc.textFile("/content/drive/My Drive/BigData/BTTH2/ratings_1.txt").union(sc.textFile("/content/drive/My Drive/BigData/BTTH2/ratings_2.txt"))
    except Exception as e:
        print(f"LOI: Khong tim thay file. {e}")
        return

    def parse_rating_year(line):
        try:
            parts = line.split(',')
            rating = float(parts[2])
            timestamp = int(parts[3])

            # Chuyen doi Timestamp (Unix epoch) sang Nam (YYYY)
            dt_object = datetime.fromtimestamp(timestamp)
            year = dt_object.year

            return (year, (rating, 1)) # (Nam, (Diem, 1_luot))
        except:
            return ("ERROR", (0, 0))

    # Map va Filter loi
    year_ratings = ratings_rdd.map(parse_rating_year).filter(lambda x: x[0] != "ERROR")

    # ReduceByKey: Cong tong diem va so luot danh gia theo tung nam
    # (Year, (SumRating, Count))
    reduced_stats = year_ratings.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

    # Tinh trung binh: (Year, (TotalRatings, AvgRating))
    def calculate_stats(record):
        year, (total_score, count) = record
        avg_rating = total_score / count if count > 0 else 0.0
        return (year, (count, avg_rating))

    final_stats = reduced_stats.map(calculate_stats)

    # Sap xep theo Nam tang dan (sortByKey)
    sorted_stats = final_stats.sortByKey()

    def format_output(record):
        year, (count, avg) = record
        # Format theo yeu cau: "Year - TotalRatings: xx, AverageRating: xx"
        return f"{year} - TotalRatings: {count}, AverageRating: {avg:.2f}"

    output_lines = sorted_stats.map(format_output)

    # Collect va in ket qua
    for line in output_lines.collect():
        print(line)

# Chay
if __name__ == "__main__":
    spark = SparkSession.builder.appName("Bai6").getOrCreate()
    bai6()
    spark.stop()

2020 - TotalRatings: 184, AverageRating: 3.75
