In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *


postgres_driver_path = "/Users/Admin/Downloads/Downloads/Spark/postgresql-42.3.9.jar"

# Tạo SparkSession với cấu hình MongoDB và PostgreSQL
spark = SparkSession.builder \
    .appName("Read MongoDB Data") \
    .config("spark.jars", postgres_driver_path) \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.2") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/dbnhanamcrawler") \
    .getOrCreate()


# Đọc dữ liệu từ MongoDB collection "tblunitop"
df = spark.read.format("mongo").option("collection", "tblnhanam").load()


df = df.select("barcode", "category", "description", "in_stock", "name", "pages", "price", "publishing_affiliate", "seller", "size", "subcategory")

# Hiển thị dữ liệu
df.show()


+--------------------+------------------+--------------------+--------+--------------------+-----+-------+--------------------+-----------------+-----------------+--------------------+
|             barcode|          category|         description|in_stock|                name|pages|  price|publishing_affiliate|           seller|             size|         subcategory|
+--------------------+------------------+--------------------+--------+--------------------+-----+-------+--------------------+-----------------+-----------------+--------------------+
|        893532502311|  Kỹ Năng/Sống Đẹp|Muốn hết khổ thì ...|       1|Chánh Tâm, Chánh ...|  216|129,000|          Bloombooks|Phương Nam Online|  14.5 x 20 x 1.2|        Kỹ Năng Sống|
|        978604401412|      Tuổi Mới Lớn|“Giờ thì tôi đã h...|       1|Chàng Sói Hay Ngượng|  184| 96,000|                AMAK|Phương Nam Online|      18 x 13 x 1|        Truyện Tranh|
|        978604442299|           Văn Học|Hinako - một tiểu...|       1|Bảo 

In [2]:
# Hiển thị cấu trúc dữ liệu
df.printSchema()

root
 |-- barcode: string (nullable = true)
 |-- category: string (nullable = true)
 |-- description: string (nullable = true)
 |-- in_stock: string (nullable = true)
 |-- name: string (nullable = true)
 |-- pages: string (nullable = true)
 |-- price: string (nullable = true)
 |-- publishing_affiliate: string (nullable = true)
 |-- seller: string (nullable = true)
 |-- size: string (nullable = true)
 |-- subcategory: string (nullable = true)



In [3]:
from pyspark.sql.functions import col,regexp_replace

# Chuyển đổi kiểu dữ liệu cho các cột

df = df.withColumn("price", regexp_replace(col("price"), ",", ""))\
       .withColumn("price", col("price").cast("float")) \
       .withColumn("in_stock", col("in_stock").cast("int")) \
       .withColumn("pages", col("pages").cast("int"))

# Hiển thị cấu trúc dữ liệu sau khi chuyển đổi
df.printSchema()


root
 |-- barcode: string (nullable = true)
 |-- category: string (nullable = true)
 |-- description: string (nullable = true)
 |-- in_stock: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- pages: integer (nullable = true)
 |-- price: float (nullable = true)
 |-- publishing_affiliate: string (nullable = true)
 |-- seller: string (nullable = true)
 |-- size: string (nullable = true)
 |-- subcategory: string (nullable = true)



In [4]:
from pyspark.sql.functions import avg, when, col

# Bước 1: Tính giá trị trung bình của cột 'pages' và làm tròn về số nguyên gần nhất
# Chuyển 'pages' về số nguyên trước khi tính trung bình để đảm bảo tính toán chính xác
avg_pages = df.select(avg(col("pages").cast("int"))).collect()[0][0]


# Bước 2: Thay thế các giá trị thiếu (null) trong cột 'pages' bằng giá trị trung bình đã làm tròn
df = df.withColumn("pages", when(col("pages").isNull(), avg_pages).otherwise(col("pages")))

# Bước 3: Chuyển đổi cột 'pages' về kiểu số nguyên (int)
df = df.withColumn("pages", col("pages").cast("int"))

# Hiển thị kết quả để kiểm tra
df.select("pages").show(5)


# Thay thế giá trị thiếu của cột 'size' bằng "Unknown"
df = df.withColumn("size", when(df["size"].isNull(), "Unknown").otherwise(df["size"]))

# Hiển thị kết quả sau khi xử lý cột 'pages' và 'size'
df.show(5)


+-----+
|pages|
+-----+
|  216|
|  184|
|  248|
|  332|
|  152|
+-----+
only showing top 5 rows

+------------+----------------+--------------------+--------+--------------------+-----+--------+--------------------+-----------------+---------------+-----------------+
|     barcode|        category|         description|in_stock|                name|pages|   price|publishing_affiliate|           seller|           size|      subcategory|
+------------+----------------+--------------------+--------+--------------------+-----+--------+--------------------+-----------------+---------------+-----------------+
|893532502311|Kỹ Năng/Sống Đẹp|Muốn hết khổ thì ...|       1|Chánh Tâm, Chánh ...|  216|129000.0|          Bloombooks|Phương Nam Online|14.5 x 20 x 1.2|     Kỹ Năng Sống|
|978604401412|    Tuổi Mới Lớn|“Giờ thì tôi đã h...|       1|Chàng Sói Hay Ngượng|  184| 96000.0|                AMAK|Phương Nam Online|    18 x 13 x 1|     Truyện Tranh|
|978604442299|         Văn Học|Hinako - một tiểu

In [5]:
# Thay thế các giá trị null hoặc mô tả có độ dài dưới 10 ký tự bằng một chuỗi mặc định
df = df.withColumn(
    "description",
    when(col("description").isNull() | (length(col("description")) < 20), "No description")
    .otherwise(col("description"))
)


# Thay thế các giá trị rỗng hoặc chỉ chứa khoảng trắng bằng một chuỗi mặc định
df = df.withColumn(
    "description",
    when(trim(col("description")) == "", "No description")
    .otherwise(col("description"))
)

from pyspark.sql.functions import when, col, length


# Hiển thị một vài dòng để kiểm tra kết quả
df.select("description").show(5, truncate=False)




+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|description                                                                                                                                                                                                                                                                                                                                                                                                                                |
+-------------------------------------------------------------------------------------------------------------------------------------------

In [6]:
# Thay thế giá trị thiếu trong cột 'publishing_affiliate' bằng "No affiliate publisher"
df = df.withColumn("publishing_affiliate", when(df["publishing_affiliate"].isNull(), "No affiliate publisher").otherwise(df["publishing_affiliate"]))

# Hiển thị kết quả sau khi xử lý cột 'publishing_affiliate'
df.show(5)


+------------+----------------+--------------------+--------+--------------------+-----+--------+--------------------+-----------------+---------------+-----------------+
|     barcode|        category|         description|in_stock|                name|pages|   price|publishing_affiliate|           seller|           size|      subcategory|
+------------+----------------+--------------------+--------+--------------------+-----+--------+--------------------+-----------------+---------------+-----------------+
|893532502311|Kỹ Năng/Sống Đẹp|Muốn hết khổ thì ...|       1|Chánh Tâm, Chánh ...|  216|129000.0|          Bloombooks|Phương Nam Online|14.5 x 20 x 1.2|     Kỹ Năng Sống|
|978604401412|    Tuổi Mới Lớn|“Giờ thì tôi đã h...|       1|Chàng Sói Hay Ngượng|  184| 96000.0|                AMAK|Phương Nam Online|    18 x 13 x 1|     Truyện Tranh|
|978604442299|         Văn Học|Hinako - một tiểu...|       1|Bảo Mẫu Bí Mật Củ...|  248|135000.0|                AMAK|Phương Nam Online|  18 x 13

In [8]:
from pyspark.sql.functions import regexp_extract, col, when

# Bước 1: Trích xuất mã barcode từ các giá trị có cấu trúc 'sachsapphathanh-893535261661-dht' hoặc 'sachsapphathanh-893528091726'
df = df.withColumn(
    "barcode",
    when(
        col("barcode").rlike(r"sachsapphathanh-\d+-dht"),  # Kiểm tra cấu trúc 'sachsapphathanh-<số>-dht'
        regexp_extract(col("barcode"), r"sachsapphathanh-(\d+)-dht", 1)  # Trích xuất mã số trong cấu trúc này
    ).when(
        col("barcode").rlike(r"sachsapphathanh-\d+"),  # Kiểm tra cấu trúc 'sachsapphathanh-<số>' (mới)
        regexp_extract(col("barcode"), r"sachsapphathanh-(\d+)", 1)  # Trích xuất mã số trong cấu trúc mới
    ).otherwise(col("barcode"))  # Giữ nguyên mã barcode cho các trường hợp còn lại
)

# Bước 2: Loại bỏ các dòng có giá trị barcode với cấu trúc 'sachsapphathanh-dht' (không chứa mã số)
df = df.filter(~col("barcode").rlike(r"sachsapphathanh-dht"))

# Hiển thị kết quả sau khi xử lý
df.select("barcode").show(5)

+------------+
|     barcode|
+------------+
|893532502311|
|978604401412|
|978604442299|
|978604442298|
|978604402586|
+------------+
only showing top 5 rows



In [9]:
# Đếm số dòng
num_rows = df.count()

# Đếm số cột
num_columns = len(df.columns)

# In kết quả
print(f"Số dòng còn lại: {num_rows}")
print(f"Số cột: {num_columns}")



Số dòng còn lại: 990
Số cột: 11


In [12]:
# df.write.option("header", True) \
#         .option("sep", "$") \
#         .csv("output_csv", mode="overwrite")


In [10]:
# Thiết lập JDBC URL và thuộc tính
jdbc_url = "jdbc:postgresql://localhost:5432/nhanamscrape_db"
properties = {
    "user": "postgres",
    "password": "database1203",  # Thay thế mật khẩu của bạn ở đây
    "driver": "org.postgresql.Driver"
}




In [11]:
# Chèn dữ liệu vào bảng 'categories'
df_categories = df.select("category", "subcategory")
df_categories.write.jdbc(url=jdbc_url, table="categories", mode="append", properties=properties)




In [12]:
# Chèn dữ liệu vào bảng 'publishing_affiliates'
df_affiliates = df.select("publishing_affiliate")
df_affiliates.write.jdbc(url=jdbc_url, table="publishing_affiliates", mode="append", properties=properties)




In [13]:
# Chèn dữ liệu vào bảng 'sellers'
df_sellers = df.select("seller")
df_sellers.write.jdbc(url=jdbc_url, table="sellers", mode="append", properties=properties)

In [14]:
# Chèn dữ liệu vào bảng 'books'
df_books = df.select("barcode", "name", "description", "pages", "price", "size", "in_stock")
df_books.write.jdbc(url=jdbc_url, table="books", mode="append", properties=properties)
