In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import SQLContext
from pyspark import SparkConf
from pyspark.sql.functions import regexp_replace
from pyspark.sql.window import Window
import pyodbc
import psycopg2

# Cấu hình Spark
conf = SparkConf() \
    .set("spark.jars.packages", "org.postgresql:postgresql:42.7.4,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.mongodb.spark:mongo-spark-connector_2.12:3.0.1,com.microsoft.sqlserver:mssql-jdbc:10.2.0.jre8") \
    .setMaster("local[*]") \
    .setAppName("kafka_spark_sql")

sc = SparkContext(conf=conf)
sqlC = SQLContext(sc)

# Đọc dữ liệu từ Kafka với topic "Datatopic"
kafka_bootstrap_servers = "localhost:9092"
topic = "Datatopic"

# Đọc dữ liệu từ Kafka
raw_data = sqlC.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", topic) \
    .load()

# Chuyển đổi dữ liệu từ Kafka (nội dung trong cột "value" là byte, cần chuyển thành chuỗi)
raw_data = raw_data.selectExpr("CAST(value AS STRING)")


# Giả sử dữ liệu Kafka đã ở định dạng JSON, chúng ta sẽ chuyển dữ liệu thành DataFrame
truyen = sqlC.read.json(raw_data.rdd.map(lambda row: row['value']))



In [2]:
truyen.show(5)

+--------+--------------------+----------+-------------+---------+--------------+---------+-------------+--------------------+--------------------+--------------------+--------------+
|Xep_hang|                 _id|  cap_nhat|luot_danh_gia| luot_xem|nguoi_theo_doi|so_chuong|      tac_gia|            the_loai|             tieu_de|             tom_tat|    trang_thai|
+--------+--------------------+----------+-------------+---------+--------------+---------+-------------+--------------------+--------------------+--------------------+--------------+
|     4.5|{6706e41f98066a57...|2024-09-24|          543|   15.801|         4.506|       54|Đang cập nhật|Action, Manhwa, T...|Mạt Thế Zombie 82-08|Giới thiệu về Web...|Đang tiến hành|
|     4.5|{6706e42498066a57...|2024-10-08|           25|         |           215|       13|Đang cập nhật|Manhua, Ngôn Tình...| Ánh Trăng Của Cô Ấy|Giới thiệu về Web...|Đang tiến hành|
|     4.5|{6706e42698066a57...|2024-10-08|      2007300|4.277.557|       929.041

XỬ LÝ DỮ LIỆU

In [3]:
truyen_cleaned = truyen.filter(
    (F.col('tieu_de') != '') &
    (F.col('tac_gia') != '') &
    (F.col('luot_danh_gia') != '') &
    (F.col('xep_hang') != '') &
    (F.col('tom_tat') != '') &
    (F.col('the_loai') != '') &
    (F.col('nguoi_theo_doi') != '') &
    (F.col('cap_nhat') != '') &
    (F.col('trang_thai') != '') &
    (F.col('luot_xem') != '') &
    (F.col('so_chuong') > 0)
)
truyen_cleaned = truyen_cleaned.withColumn(
    'so_chuong', 
    F.when(F.col('so_chuong').isNotNull(), F.regexp_replace(F.col('so_chuong'), r'\.', '')).cast('int')
)
truyen_cleaned = truyen_cleaned.withColumn(
    'nguoi_theo_doi',
    F.when(F.col('nguoi_theo_doi').isNotNull(), F.regexp_replace(F.col('nguoi_theo_doi'), r'\.', '').cast('int')).otherwise(0)
)
truyen_cleaned = truyen_cleaned.withColumn(
    'luot_xem',
    F.when(F.col('luot_xem').isNotNull(), F.regexp_replace(F.col('luot_xem'), r'\.', '').cast('int')).otherwise(0)
)
truyen_cleaned = truyen_cleaned.withColumn(
    'luot_danh_gia',
    F.when(F.col('luot_danh_gia').isNotNull(), F.regexp_replace(F.col('luot_danh_gia'), r'\.', '').cast('int')).otherwise(0)
)
truyen_cleaned = truyen_cleaned.withColumn(
    'cap_nhat',
    F.to_date(F.col('cap_nhat'), 'yyyy-MM-dd')
)
truyen_cleaned = truyen_cleaned.withColumn(
    'xep_hang',
    F.round(F.col('xep_hang').cast('float'), 1)
)

phrases_to_remove = [
    "Giới thiệu về Website NetTruyenWW.Com Chào mừng bạn đến với website đọc truyện tranh online được yêu thích nhất - NetTruyenWW, nơi bạn có thể thỏa sức khám phá thế giới truyện tranh phong phú và đa dạng. Tại đây, chúng tôi luôn cập nhật liên tục những bộ truyện mới nhất và được yêu thích nhất, từ các thể loại truyện tranh hành động, truyện tranh ngôn tình, manhua, manhwa, cho đến các thể loại khác như truyện tranh chuyển sinh, truyện tranh xuyên không, Độc giả có thể dễ dàng tìm kiếm và theo dõi các tác phẩm mà mình yêu thích chỉ với vài thao tác đơn giản. Như truy cập lịch sử đọc truyện hoặc danh sách theo dõi. Bên cạnh đó có thể tham khảo các bộ hoặc truyện tranh HOT được chúng tôi tổng hợp.",
    "Hôm nay, chúng tôi hân hạnh giới thiệu đến bạn bộ truyện ",
    "trên NetTruyenWW.Com",
    "Lý do nên đọc bộ truyện",
    "Chất lượng hình ảnh sắc nét: Truyện tranh tại NetTruyenWW luôn được đảm bảo chất lượng hình ảnh tối ưu. Chúng tôi sử dụng các công nghệ hiện đại để mang đến hình ảnh rõ nét, sắc sảo, từ những khung truyện tinh tế đến các chi tiết nhỏ trong từng cảnh truyện. Bạn sẽ được thưởng thức tác phẩm mà không lo ngại về việc hình ảnh bị mờ, nhòe hay không rõ.",
    "Cập nhật nhanh chóng: Đội ngũ của chúng tôi liên tục cập nhật các chương mới nhanh chóng, thường xuyên và đều đặn. Bạn sẽ không phải chờ đợi quá lâu để biết được diễn biến tiếp theo của câu chuyện. Chúng tôi hiểu rằng cảm giác chờ đợi một chương truyện yêu thích là không dễ chịu, nên luôn nỗ lực mang đến nội dung mới cho độc giả sớm nhất có thể.",
    "Giao diện thân thiện, dễ sử dụng: NetTruyen được thiết kế với giao diện trực quan, dễ sử dụng, tối ưu cho trải nghiệm đọc truyện trên mọi thiết bị, từ máy tính đến Chức năng tìm kiếm nhanh chóng, điều hướng dễ dàng và khả năng đánh dấu trang giúp bạn có thể tiếp tục đọc từ nơi mình đã dừng mà không mất nhiều thời gian tìm kiếm.",
    "Đọc truyện miễn phí và không giới hạn: Tất cả các bộ truyện trên NetTruyenWW đều được cung cấp hoàn toàn miễn phí. Bạn có thể đọc bao nhiêu tùy thích, không lo bị giới hạn số lượng chương hay bộ truyện. Chúng tôi luôn cố gắng cung cấp nội dung chất lượng mà không có bất kỳ rào cản nào cho người đọc.",
    "Bình luận và chia sẻ dễ dàng: Mỗi bộ truyện đều có mục bình luận để bạn có thể chia sẻ cảm nhận, thảo luận với cộng đồng yêu truyện cùng sở thích. Đây là nơi lý tưởng để kết nối với những độc giả khác, giúp bạn trao đổi và bàn luận về những diễn biến thú vị trong truyện",
    "Không quảng cáo phiền phức: Hiểu được cảm giác khó chịu khi bị gián đoạn bởi quảng cáo, chúng tôi cố gắng giữ trải nghiệm đọc truyện mượt mà và liền mạch nhất có thể. NetTruyenWW được tối ưu để quảng cáo không làm phiền hay ảnh hưởng đến trải nghiệm đọc của bạn.",
    "Với những điểm mạnh này, "
]
for phrase in phrases_to_remove:
    truyen_cleaned = truyen_cleaned.withColumn("tom_tat", regexp_replace(F.col("tom_tat"), phrase, ""))

truyen_cleaned = truyen_cleaned.withColumn("tom_tat", regexp_replace(F.col("tom_tat"), r"^\s+|\s+$", ""))


ĐẨY DỮ LIỆU SẠCH LÊN MONGO

In [4]:
mongo_output_uri = "mongodb://localhost:27017/Nettruyendb_clear.Truyens_clear"
truyen_cleaned.write \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .mode("overwrite") \
    .option("uri", mongo_output_uri) \
    .save()

TÁCH BẢNG ĐỂ MÔ HÌNH HOÁ

In [None]:
truyen_with_id = truyen_cleaned.withColumn("truyen_id", F.monotonically_increasing_id())

#Truyen
Truyen = truyen_with_id.select(
    "truyen_id",
    "tieu_de",
    "tac_gia",
    "tom_tat"
).distinct().orderBy("truyen_id")

#Chi_tiet_truyen
Chi_tiet_truyen = truyen_with_id.select(
    "truyen_id",
    "xep_hang",
    "cap_nhat",
    "luot_danh_gia",
    "luot_xem",
    "nguoi_theo_doi",
    "so_chuong",
    "trang_thai"
)

#The_loai_Truyen (nhiều-nhiều)
The_loai_Truyen = truyen_with_id \
    .select("truyen_id", F.explode(F.split(F.col("the_loai"), ",\\s*")).alias("the_loai")) \
    .distinct().orderBy("truyen_id")

#The_loai (danh sách thể loại)
windowSpec = Window.orderBy("the_loai")
The_loai = The_loai_Truyen \
    .filter(F.col("the_loai").isNotNull()) \
    .filter(F.col("the_loai") != "") \
    .select("the_loai").distinct() \
    .withColumn("the_loai_id", F.row_number().over(windowSpec))

The_loai_Truyen_with_id = The_loai_Truyen.alias("tl_truyen").join(
    The_loai.alias("tl"), 
    F.col("tl_truyen.the_loai") == F.col("tl.the_loai"),
    "inner"
).select(
    F.col("tl_truyen.truyen_id"),
    F.col("tl.the_loai_id"),
    F.col("tl.the_loai").alias("the_loai")
).orderBy("truyen_id") 

ĐẨY DỮ LIỆU LÊN CÁC TOPIC TRÊN KAFKA

In [None]:
kafka_bootstrap_servers = "localhost:9092"
toppic_truyen = "TruyenTopic"
toppic_Chi_tiet_truyen = "ChitiettruyenTopic"
toppic_The_loai = "TheloaiTopic"
toppic_The_loai_Truyen_with_id = "TheloaiTruyenwithidTopic"

def write_to_kafka(df, topic):
    df.show(5)
    columns_to_fill = ['xep_hang', 'cap_nhat', 'luot_danh_gia', 'luot_xem', 'nguoi_theo_doi', 'so_chuong', 'trang_thai']
    fill_values = {
        'xep_hang': 'N/A',
        'cap_nhat': 'Chưa cập nhật',
        'luot_danh_gia': 0,
        'luot_xem': 0,
        'nguoi_theo_doi': 0,
        'so_chuong': 0,
        'trang_thai': 'Chưa cập nhật'
    }
    df = df.fillna({col: fill_values.get(col, None) for col in columns_to_fill if col in df.columns})
    df.show(5)
    df.printSchema()
    if "truyen_id" not in df.columns:
        df = df.join(The_loai_Truyen_with_id, on="the_loai", how="left")

    df = df.withColumn("truyen_id", F.col("truyen_id").cast("string"))

    df = df.selectExpr("CAST(truyen_id AS STRING) AS key", "to_json(struct(*)) AS value")

    df.write \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("topic", topic) \
        .option("checkpointLocation", "/tmp/kafka_checkpoint") \
        .save()
write_to_kafka(Truyen, toppic_truyen)
write_to_kafka(Chi_tiet_truyen, toppic_Chi_tiet_truyen)
write_to_kafka(The_loai, toppic_The_loai)
write_to_kafka(The_loai_Truyen_with_id, toppic_The_loai_Truyen_with_id)


+---------+--------------------+-------------+--------------------+
|truyen_id|             tieu_de|      tac_gia|             tom_tat|
+---------+--------------------+-------------+--------------------+
|        0|Mạt Thế Zombie 82-08|Đang cập nhật|Mạt Thế Zombie 82...|
|        1|    Bố Tôi Là Đặc Vụ|Đang cập nhật|Bố Tôi Là Đặc Vụ ...|
|        2|Nông Trường Siêu ...|Đang cập nhật|Nông Trường Siêu ...|
|        3|Verndio - Sử Thi ...| Nanao Nanaki|Verndio - Sử Thi ...|
|        4|Huyền Thoại Giáo ...|Đang cập nhật|Huyền Thoại Giáo ...|
+---------+--------------------+-------------+--------------------+
only showing top 5 rows

+---------+--------------------+-------------+--------------------+
|truyen_id|             tieu_de|      tac_gia|             tom_tat|
+---------+--------------------+-------------+--------------------+
|        0|Mạt Thế Zombie 82-08|Đang cập nhật|Mạt Thế Zombie 82...|
|        1|    Bố Tôi Là Đặc Vụ|Đang cập nhật|Bố Tôi Là Đặc Vụ ...|
|        2|Nông Trường 

KẾT NỐI ĐỂ TẠO BẢNG VÀ MÔ HÌNH HOÁ SAU ĐÓ ĐẨY DỮ LIỆU LÊN SQL SERVER

In [9]:
# CHẠY 1 LẦN
spark = SparkSession.builder \
    .appName("CreateTablesWithConstraints") \
    .config("spark.driver.extraClassPath", "C:/path/to/mssql-jdbc_auth-11.2.1.x64.dll") \
    .getOrCreate()
    
jdbc_url = "jdbc:sqlserver://ADMIN\\SQLEXPRESS;databaseName=Nettruyen;user=haohaao;password=080704;trustServerCertificate=true;"

connection_properties = {
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

def create_tables_and_constraints():
    conn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};'
                          'SERVER=ADMIN\\SQLEXPRESS;'
                          'DATABASE=Nettruyen;'
                          'Trusted_Connection=yes;')

    cursor = conn.cursor()

    # Tạo bảng truyen với ràng buộc
    cursor.execute("""
        IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='truyen' AND xtype='U')
        CREATE TABLE truyen (
            truyen_id BIGINT PRIMARY KEY, 
            tieu_de NVARCHAR (255),
            tac_gia NVARCHAR (255),
            tom_tat NVARCHAR(MAX)
        );
    """)

    # Tạo bảng chi_tiet_truyen
    cursor.execute("""
        IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='chi_tiet_truyen' AND xtype='U')
        CREATE TABLE chi_tiet_truyen (
            truyen_id BIGINT PRIMARY KEY,
            xep_hang FLOAT,
            cap_nhat DATE,
            luot_danh_gia FLOAT,
            luot_xem BIGINT,
            nguoi_theo_doi INT,
            so_chuong INT,
            trang_thai NVARCHAR (255),
            CONSTRAINT fk_chi_tiet_truyen_truyen_id FOREIGN KEY (truyen_id) REFERENCES truyen (truyen_id) ON DELETE CASCADE
        );
    """)

    # Tạo bảng the_loai
    cursor.execute("""
        IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='the_loai' AND xtype='U')
        CREATE TABLE the_loai (
            the_loai_id INT IDENTITY PRIMARY KEY,
            the_loai NVARCHAR (255)
        );
    """)

    # Tạo bảng the_loai_truyen
    cursor.execute("""
        IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='the_loai_truyen' AND xtype='U')
        CREATE TABLE the_loai_truyen (
            truyen_id BIGINT,
            the_loai_id INT,
            the_loai NVARCHAR(255),
            PRIMARY KEY (truyen_id, the_loai_id),
            CONSTRAINT fk_the_loai_truyen_truyen_id FOREIGN KEY (truyen_id) REFERENCES truyen (truyen_id) ON DELETE CASCADE,
            CONSTRAINT fk_the_loai_truyen_the_loai_id FOREIGN KEY (the_loai_id) REFERENCES the_loai (the_loai_id) ON DELETE CASCADE
        );
    """)

    conn.commit()
    cursor.close()
    conn.close()
create_tables_and_constraints()


In [10]:
# CHẠY 1 LẦN
def write_to_sql_server(df, table_name, mode):
    try:
        df.write.jdbc(url=jdbc_url, table=table_name, mode=mode, properties=connection_properties)
        print(f"Successfully wrote to {table_name} with mode {mode}")
    except Exception as e:
        print(f"Error writing to {table_name}: {e}")
write_to_sql_server(Truyen, "truyen", mode="append")
write_to_sql_server(Chi_tiet_truyen, "chi_tiet_truyen", mode="append")
The_loai_distinct = The_loai.select("the_loai").distinct()
write_to_sql_server(The_loai_distinct, "the_loai", mode="append")
write_to_sql_server(The_loai_Truyen_with_id, "the_loai_truyen", mode="append")

Successfully wrote to truyen with mode append
Successfully wrote to chi_tiet_truyen with mode append
Successfully wrote to the_loai with mode append
Successfully wrote to the_loai_truyen with mode append


KẾT NỐI ĐỂ TẠO BẢNG VÀ MÔ HÌNH HOÁ SAU ĐÓ ĐẨY DỮ LIỆU LÊN POSTGRES

In [None]:
spark = SparkSession.builder \
    .appName("CreateTablesWithConstraints") \
    .getOrCreate()

jdbc_url = "jdbc:postgresql://localhost:5432/postgres"
connection_properties = {
    "user": "postgres",
    "password": "080704",
    "driver": "org.postgresql.Driver"
}

def create_tables_and_constraints():
    conn = psycopg2.connect(dbname="postgres", user="postgres", password="080704", host="localhost", port="5432")
    cursor = conn.cursor()

    # Tạo bảng truyen
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS truyen (
            truyen_id BIGINT PRIMARY KEY, 
            tieu_de VARCHAR(255),
            tac_gia VARCHAR(255),
            tom_tat TEXT
        );
    """)

    # Tạo bảng chi_tiet_truyen
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS chi_tiet_truyen (
            truyen_id BIGINT PRIMARY KEY,
            xep_hang INT,
            cap_nhat DATE,
            luot_danh_gia FLOAT,
            luot_xem BIGINT,
            nguoi_theo_doi INT,
            so_chuong INT,
            trang_thai VARCHAR(255),
            CONSTRAINT fk_chi_tiet_truyen_truyen_id FOREIGN KEY (truyen_id) REFERENCES truyen (truyen_id) ON DELETE CASCADE
        );
    """)

    # Tạo bảng the_loai
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS the_loai (
            the_loai_id SERIAL PRIMARY KEY,
            the_loai VARCHAR(255)
        );
    """)

    # Tạo bảng the_loai_truyen
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS the_loai_truyen (
            truyen_id BIGINT,
            the_loai_id INT,
            the_loai VARCHAR(255),
            PRIMARY KEY (truyen_id, the_loai_id),
            CONSTRAINT fk_the_loai_truyen_truyen_id FOREIGN KEY (truyen_id) REFERENCES truyen (truyen_id) ON DELETE CASCADE,
            CONSTRAINT fk_the_loai_truyen_the_loai_id FOREIGN KEY (the_loai_id) REFERENCES the_loai (the_loai_id) ON DELETE CASCADE
        );
    """)

    conn.commit()
    cursor.close()
    conn.close()
create_tables_and_constraints()

In [7]:
def write_to_postgres(df, table_name):
    try:
        df.write.jdbc(url=jdbc_url, table=table_name, mode="append", properties=connection_properties)
        print(f"Successfully wrote to {table_name}")
    except Exception as e:
        print(f"Error writing to {table_name}: {e}")

# Đẩy dữ liệu lên PostgreSQL
write_to_postgres(Truyen, "truyen")
write_to_postgres(Chi_tiet_truyen, "chi_tiet_truyen")
write_to_postgres(The_loai, "the_loai")
write_to_postgres(The_loai_Truyen_with_id, "the_loai_truyen")


Successfully wrote to truyen
Successfully wrote to chi_tiet_truyen
Successfully wrote to the_loai
Successfully wrote to the_loai_truyen
