In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, monotonically_increasing_id

# Khởi tạo phiên Spark với MongoDB và PostgreSQL

spark = SparkSession.builder \
    .appName("Goodreads Spark with MongoDB and PostgreSQL") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1,org.postgresql:postgresql:42.7.4") \
    .config("spark.mongodb.read.connection.uri", "mongodb://localhost:27017/goodreads_db.books") \
    .getOrCreate()

In [2]:
# Cài đặt mức độ log

spark.sparkContext.setLogLevel("INFO")

# Đọc dữ liệu từ MongoDB vào DataFrame

df = spark.read \
    .format("mongo") \
    .option("uri", "mongodb://localhost:27017/goodreads_db.books") \
    .load()

In [3]:
# Hiển thị dữ liệu từ MongoDB ban đầu

df.show(5)

df.printSchema()

+---------------+----------+--------------------+--------------------+-----------------+-----+----+------+-------+---------+--------------------+--------------------+
|         Author|Cover Type|                Date|         Description|Number of Ratings|Pages|Rank|Rating|Reviews|    Score|               Title|                 _id|
+---------------+----------+--------------------+--------------------+-----------------+-----+----+------+-------+---------+--------------------+--------------------+
|   Paulo Coelho| Paperback|First published J...|Paulo Coelho's ma...|        3,182,340|  197|  26|  3.91|126,908|  920,276|       The Alchemist|{66f41f0679bd5bcc...|
|Suzanne Collins| Hardcover|First published S...|In the ruins of a...|        8,957,477|  374|   1|  4.34|225,244|3,905,463|The Hunger Games ...|{66f41f0679bd5bcc...|
|  Lewis Carroll| Paperback|First published D...|"I can't explain ...|          576,089|  239|  22|  4.06| 16,150|  967,102|Alice’s Adventure...|{66f41f0779bd5bcc...

In [4]:
# Kiểm tra dữ liệu trước khi chuyển đổi

df.select("Number of Ratings", "Reviews").show(5)

+-----------------+-------+
|Number of Ratings|Reviews|
+-----------------+-------+
|        3,182,340|126,908|
|        8,957,477|225,244|
|          576,089| 16,150|
|        4,088,521|122,769|
|        2,976,980| 58,642|
+-----------------+-------+
only showing top 5 rows



In [5]:
# Bước 1: Xử lý dữ liệu có dạng "8,932,568" - Loại bỏ dấu phẩy

df = df.withColumn("Number of Ratings", regexp_replace(col("Number of Ratings"), ",", "")) \
       .withColumn("Reviews", regexp_replace(col("Reviews"), ",", ""))

In [6]:
# Bước 2: Chuyển đổi kiểu dữ liệu sau khi loại bỏ dấu phẩy

df = df.withColumn("Pages", col("Pages").cast("int")) \
       .withColumn("Rating", col("Rating").cast("float")) \
       .withColumn("Number of Ratings", col("Number of Ratings").cast("int")) \
       .withColumn("Reviews", col("Reviews").cast("int"))

In [7]:
# Hiển thị dữ liệu sau khi chuyển đổi

df.select("Number of Ratings", "Reviews").show(5)

+-----------------+-------+
|Number of Ratings|Reviews|
+-----------------+-------+
|          3182340| 126908|
|          8957477| 225244|
|           576089|  16150|
|          4088521| 122769|
|          2976980|  58642|
+-----------------+-------+
only showing top 5 rows



In [8]:
# Bước 3: Xử lý dữ liệu null, nếu có giá trị null sẽ thay bằng giá trị mặc định

df = df.na.fill({
    "Pages": 0,
    "Rating": 0.0,
    "Number of Ratings": 0,
    "Reviews": 0
})

In [9]:
# Tạo các bảng từ dữ liệu

# Bảng authors chứa thông tin về tác giả, sử dụng distinct để loại bỏ các giá trị trùng lặp

authors_df = df.select("Author").distinct(
).withColumnRenamed("Author", "author_name")


In [10]:
# Bảng books chứa thông tin về sách và ngày xuất bản

books_df = df.select("Title", "Author", "Pages", "Cover Type", "Date") \
             .withColumnRenamed("Title", "book_title") \
             .withColumnRenamed("Author", "author_name") \
             .withColumnRenamed("Pages", "num_pages") \
             .withColumnRenamed("Cover Type", "cover_type") \
             .withColumnRenamed("Date", "publish_date")

In [11]:
# Bảng ratings chứa thông tin về đánh giá

ratings_df = df.select("Title", "Rating", "Number of Ratings", "Reviews") \
               .withColumnRenamed("Title", "book_title") \
               .withColumnRenamed("Rating", "rating") \
               .withColumnRenamed("Number of Ratings", "num_ratings") \
               .withColumnRenamed("Reviews", "num_reviews")

In [12]:
# Bước 4: Tạo khóa chính cho bảng sách và tác giả

books_df = books_df.withColumn("book_id", monotonically_increasing_id())

authors_df = authors_df.withColumn("author_id", monotonically_increasing_id())


In [13]:
# Thêm khóa ngoại vào bảng ratings để kết nối với bảng books

ratings_df = ratings_df.join(books_df.select(
    "book_title", "book_id"), on="book_title", how="inner")

# Thêm khóa ngoại vào bảng books để kết nối với bảng authors

books_df = books_df.join(authors_df.select(
    "author_name", "author_id"), on="author_name", how="inner")

In [14]:
# Loại bỏ các giá trị rỗng hoặc bằng 0 cho các bảng quan trọng

books_df = books_df.filter((books_df["num_pages"] > 0) & (
    books_df["publish_date"].isNotNull()))
ratings_df = ratings_df.filter(
    (ratings_df["rating"] > 0) & (ratings_df["num_ratings"] > 0))


In [15]:
# Top 10 sách có đánh giá cao nhất

top_rated_books = ratings_df.orderBy(col("rating").desc()).limit(10)

top_rated_books.show()

+--------------------+------+-----------+-----------+-------+
|          book_title|rating|num_ratings|num_reviews|book_id|
+--------------------+------+-----------+-----------+-------+
|The Complete Calv...|  4.81|      40834|       1206|    765|
|The Addiction Man...|  4.81|       1179|         52|    875|
|Words of Radiance...|  4.76|     381139|      24344|    532|
|Harry Potter Seri...|  4.74|     291990|       8452|    678|
|Kingdom of Ash (T...|   4.7|     688833|      72490|    388|
|The Way of Kings ...|  4.66|     533802|      39886|    766|
|A Court of Mist a...|  4.65|    2307530|     196537|    551|
|A Court of Mist a...|  4.65|    2307530|     196537|     45|
|A Court of Mist a...|  4.65|    2307530|     196537|    551|
|A Court of Mist a...|  4.65|    2307530|     196537|     45|
+--------------------+------+-----------+-----------+-------+



In [16]:
# Top 10 sách có nhiều đánh giá nhất

most_rated_books = ratings_df.orderBy(col("num_ratings").desc()).limit(10)

most_rated_books.show()

+--------------------+------+-----------+-----------+-------+
|          book_title|rating|num_ratings|num_reviews|book_id|
+--------------------+------+-----------+-----------+-------+
|Harry Potter and ...|  4.47|   10393270|     169354|    935|
|Harry Potter and ...|  4.47|   10393270|     169354|    918|
|The Hunger Games ...|  4.34|    8957477|     225244|      1|
|Twilight (The Twi...|  3.66|    6764739|     134200|    985|
|To Kill a Mocking...|  4.26|    6327305|     121142|    989|
|To Kill a Mocking...|  4.26|    6327305|     121142|     27|
|To Kill a Mocking...|  4.26|    6327305|     121142|    989|
|To Kill a Mocking...|  4.26|    6327305|     121142|     27|
|    The Great Gatsby|  3.93|    5394297|     110917|      8|
|The Fault in Our ...|  4.13|    5297202|     181573|    993|
+--------------------+------+-----------+-----------+-------+



In [17]:
# Tính tổng số sách mỗi tác giả đã viết

books_per_author = books_df.groupBy("author_name").count().orderBy(col("count").desc())

books_per_author.show(10)

+--------------------+-----+
|         author_name|count|
+--------------------+-----+
|Stephen         King|   18|
|       Sarah J. Maas|   14|
| William Shakespeare|   12|
|        Rick Riordan|   11|
|     Cassandra Clare|   10|
|        J.K. Rowling|   10|
|          C.S. Lewis|    9|
|       Richelle Mead|    8|
|           P.C. Cast|    7|
|      J.R.R. Tolkien|    7|
+--------------------+-----+
only showing top 10 rows



In [18]:
# Tính số trang trung bình cho mỗi tác giả

average_pages_per_author = books_df.groupBy("author_name").avg("num_pages").orderBy(col("avg(num_pages)").desc())

average_pages_per_author.show(10)

+--------------------+------------------+
|         author_name|    avg(num_pages)|
+--------------------+------------------+
|      Bill Watterson|            1456.0|
|         Leo Tolstoy|            1178.0|
|           Anonymous|            1114.0|
|         Ken Follett|            1106.5|
|David Foster Wallace|            1088.0|
|Laura Ingalls Wilder|1086.3333333333333|
|  George R.R. Martin|            1041.2|
|   Margaret Mitchell|            1037.0|
|Miguel de Cervant...|            1023.0|
|Marion Zimmer Bra...|            1009.0|
+--------------------+------------------+
only showing top 10 rows



In [19]:
# Top 10 sách có nhiều trang nhất

longest_books = books_df.orderBy(col("num_pages").desc()).limit(10)

longest_books.show()

+--------------------+--------------------+---------+--------------------+--------------------+-------+---------+
|         author_name|          book_title|num_pages|          cover_type|        publish_date|book_id|author_id|
+--------------------+--------------------+---------+--------------------+--------------------+-------+---------+
|        J.K. Rowling|Harry Potter Seri...|     4100|           Hardcover|First published O...|    678|      585|
|Laura Ingalls Wilder|The Little House ...|     2700|           Paperback|First published J...|    874|      580|
|  Arthur Conan Doyle|The Complete Sher...|     1796|           Paperback|First published J...|    760|      207|
|      J.R.R. Tolkien|J.R.R. Tolkien 4-...|     1728|Mass Market Paper...|First published J...|    984|       68|
|     Stephenie Meyer|The Twilight Coll...|     1690|           Hardcover|First published N...|    578|       94|
|           Anonymous|The Holy Bible: K...|     1590|           Hardcover|First publishe

In [20]:
# Tìm sách có cả nhiều lượt đánh giá và điểm đánh giá cao

popular_and_highly_rated_books = ratings_df.filter((ratings_df["rating"] >= 4.0) & (ratings_df["num_ratings"] >= 50000))

popular_and_highly_rated_books.orderBy(col("rating").desc(), col("num_ratings").desc()).show(10)

+--------------------+------+-----------+-----------+-------+
|          book_title|rating|num_ratings|num_reviews|book_id|
+--------------------+------+-----------+-----------+-------+
|Words of Radiance...|  4.76|     381139|      24344|    532|
|Harry Potter Seri...|  4.74|     291990|       8452|    678|
|Kingdom of Ash (T...|   4.7|     688833|      72490|    388|
|The Way of Kings ...|  4.66|     533802|      39886|    766|
|A Court of Mist a...|  4.65|    2307530|     196537|    551|
|A Court of Mist a...|  4.65|    2307530|     196537|     45|
|A Court of Mist a...|  4.65|    2307530|     196537|    551|
|A Court of Mist a...|  4.65|    2307530|     196537|     45|
|     The Nightingale|  4.63|    1518678|     123935|    585|
|Empire of Storms ...|  4.63|     813727|      67037|    103|
+--------------------+------+-----------+-----------+-------+
only showing top 10 rows



In [21]:
# Phân tích mối tương quan giữa số trang và điểm đánh giá

books_ratings_df = books_df.join(ratings_df, on="book_title", how="inner")

books_ratings_df.select("num_pages", "rating").describe().show()

+-------+------------------+-------------------+
|summary|         num_pages|             rating|
+-------+------------------+-------------------+
|  count|              1022|               1022|
|   mean|425.82876712328766|  4.101849330847977|
| stddev| 279.7754329523757|0.23676153971897146|
|    min|                26|                3.1|
|    max|              4100|               4.81|
+-------+------------------+-------------------+



In [22]:
# Thống kê sách theo loại bìa

cover_type_distribution = books_df.groupBy("cover_type").count().orderBy(col("count").desc())

cover_type_distribution.show()

+--------------------+-----+
|          cover_type|count|
+--------------------+-----+
|           Paperback|  513|
|           Hardcover|  282|
|Mass Market Paper...|  116|
|      Kindle Edition|   70|
|               ebook|   13|
|       Leather Bound|    1|
|     Library Binding|    1|
|          Board book|    1|
|                Nook|    1|
+--------------------+-----+



In [23]:
# Kết nối tới PostgreSQL

jdbc_url = "jdbc:postgresql://localhost:5432/goodreads_books"
connection_properties = {
    "user": "postgres",
    "password": "thangvt4102004",
    "driver": "org.postgresql.Driver"
}

In [24]:
# Hàm ghi dữ liệu vào PostgreSQL

def write_to_postgres(df, table_name):
    try:
        df.write.jdbc(
            url=jdbc_url,
            table=table_name,
            mode="overwrite",
            properties=connection_properties
        )
        print(f"Đã ghi thành công bảng {table_name} vào PostgreSQL!")
    except Exception as e:
        print(f"Lỗi khi ghi bảng {table_name} vào PostgreSQL: {e}")

In [25]:
# Lưu bảng tác giả, sách và đánh giá vào PostgreSQL

write_to_postgres(authors_df, "authors")

write_to_postgres(books_df, "books")

write_to_postgres(ratings_df, "ratings")

Đã ghi thành công bảng authors vào PostgreSQL!
Đã ghi thành công bảng books vào PostgreSQL!
Đã ghi thành công bảng ratings vào PostgreSQL!
