# Load Data từ Bronze Layer sang Silver Layer

Notebook này sẽ đọc dữ liệu từ Bronze layer (MinIO) và xử lý để load vào các bảng Iceberg trong Silver layer với Nessie catalog.

## 1. Import Libraries và Khởi tạo Spark Session

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import csv, io, os, re
from datetime import datetime
from typing import Dict

# Cấu hình AWS/MinIO credentials
os.environ.update({
    'AWS_REGION': 'us-east-1',
    'AWS_ACCESS_KEY_ID': 'admin',
    'AWS_SECRET_ACCESS_KEY': 'admin123'
})

# Khởi tạo Spark Session với Nessie Catalog
spark = (
    SparkSession.builder
    .appName("Load_Bronze_To_Silver")
    .master("spark://spark-master:7077")
    .config("spark.executor.memory", "1536m")
    .config("spark.executor.cores", "2")
    # Nessie Catalog
    .config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog")
    .config("spark.sql.catalog.nessie.uri", "http://nessie:19120/api/v2")
    .config("spark.sql.catalog.nessie.ref", "main")
    .config("spark.sql.catalog.nessie.warehouse", "s3a://silver/")
    .config("spark.sql.catalog.nessie.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    # S3/MinIO Config
    .config("spark.sql.catalog.nessie.s3.endpoint", "http://minio:9000")
    .config("spark.sql.catalog.nessie.s3.access-key-id", "admin")
    .config("spark.sql.catalog.nessie.s3.secret-access-key", "admin123")
    .config("spark.sql.catalog.nessie.s3.path-style-access", "true")
    .config("spark.sql.catalog.nessie.s3.region", "us-east-1")
    # Hadoop S3A Config
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
    .config("spark.hadoop.fs.s3a.access.key", "admin")
    .config("spark.hadoop.fs.s3a.secret.key", "admin123")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .config("spark.hadoop.fs.s3a.region", "us-east-1")
    # Executor Environment
    .config("spark.executorEnv.AWS_REGION", "us-east-1")
    .config("spark.executorEnv.AWS_ACCESS_KEY_ID", "admin")
    .config("spark.executorEnv.AWS_SECRET_ACCESS_KEY", "admin123")
    # Local JAR files
    .config("spark.jars", "/opt/spark/jars/hadoop-aws-3.3.4.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.262.jar")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR")
spark.sql("CREATE DATABASE IF NOT EXISTS nessie.silver_tables")
spark.sql("USE nessie.silver_tables")
print(f" Spark Session initialized | Master: {spark.sparkContext.master} | App ID: {spark.sparkContext.applicationId}")


 Spark Session initialized | Master: spark://spark-master:7077 | App ID: app-20251206150307-0002


## 2. Load Bảng SCHOOL

In [2]:
print("=" * 80)
print("LOAD BẢNG SCHOOL")
print("=" * 80)

# Đọc và merge tất cả các năm
years = [2021, 2022, 2023, 2024, 2025]
base_path = "s3a://bronze/structured_data/danh sách các trường Đại Học (2021-2025)/Danh_sách_các_trường_Đại_Học_"
df_school = spark.read.option("header", "true").option("inferSchema", "true").csv([f"{base_path}{year}.csv" for year in years]).select("TenTruong", "MaTruong", "TinhThanh").dropDuplicates()

# Transform
df_school_silver = df_school.select(
    col("MaTruong").cast("string").alias("schoolId"),
    col("TenTruong").cast("string").alias("schoolName"),
    col("TinhThanh").cast("string").alias("province"),
    current_timestamp().alias("created_at"),
    current_timestamp().alias("updated_at")
).filter(col("schoolId").isNotNull() & col("schoolName").isNotNull())

# Ghi vào Silver
df_school_silver.writeTo("nessie.silver_tables.school").using("iceberg").createOrReplace()
print(f"Đã ghi {df_school_silver.count()} dòng vào school")

# Verify
spark.table("nessie.silver_tables.school").show(5, truncate=False)

LOAD BẢNG SCHOOL


                                                                                

Đã ghi 265 dòng vào school


[Stage 11:>                                                         (0 + 1) / 1]

+--------+--------------------------------------+---------+--------------------------+--------------------------+
|schoolId|schoolName                            |province |created_at                |updated_at                |
+--------+--------------------------------------+---------+--------------------------+--------------------------+
|DHF     |Đại học Ngoại Ngữ - Đại học Huế       |Huế      |2025-12-06 14:55:32.023036|2025-12-06 14:55:32.023036|
|DCQ     |Đại học Công Nghệ và Quản Lý Hữu Nghị |Hà Nội   |2025-12-06 14:55:32.023036|2025-12-06 14:55:32.023036|
|NTT     |Đại học Nguyễn Tất Thành              |TP HCM   |2025-12-06 14:55:32.023036|2025-12-06 14:55:32.023036|
|KGH     |Trường Sĩ Quan Không Quân - Hệ Đại học|Khánh Hòa|2025-12-06 14:55:32.023036|2025-12-06 14:55:32.023036|
|DHL     |Đại học Nông Lâm - Đại học Huế        |Huế      |2025-12-06 14:55:32.023036|2025-12-06 14:55:32.023036|
+--------+--------------------------------------+---------+--------------------------+--

                                                                                

## 3. Load Bảng MAJOR

In [3]:
from pyspark.sql.functions import col, lower, trim, regexp_replace, current_timestamp

df_major = spark.read.option("header", "true") \
    .option("inferSchema", "false") \
    .option("encoding", "UTF-8") \
    .csv("s3a://bronze/structured_data/danh sách các ngành đại học/Danh_sách_các_ngành.csv")

df_major_clean = df_major.select(
    regexp_replace(trim(col(df_major.columns[0])).cast("string"), r"\.0$", "").alias("majorId"),
    trim(col(df_major.columns[1])).cast("string").alias("majorName")
).filter(
    (col("majorId").isNotNull()) &
    (col("majorName").isNotNull()) &
    (col("majorId") != "") &
    (col("majorName") != "") &
    (lower(col("majorId")) != "nan")
)

# Chuẩn hoá để dedupe theo lowercase
df_major_silver = df_major_clean \
    .withColumn("majorId_lower", lower(col("majorId"))) \
    .dropDuplicates(["majorId_lower"]) \
    .select(
        col("majorId"),
        col("majorName"),
        current_timestamp().alias("created_at"),
        current_timestamp().alias("updated_at")
    )

df_major_silver.writeTo("nessie.silver_tables.major").using("iceberg").createOrReplace()

print(f"Đã ghi {df_major_silver.count()} dòng vào major")
spark.table("nessie.silver_tables.major").show(5, truncate=False)


Đã ghi 3085 dòng vào major
+-------+------------------------------------------------------------+--------------------------+--------------------------+
|majorId|majorName                                                   |created_at                |updated_at                |
+-------+------------------------------------------------------------+--------------------------+--------------------------+
|106    |Khoa học Máy tính                                           |2025-12-06 14:55:40.803072|2025-12-06 14:55:40.803072|
|107    |Kỹ thuật Máy tính                                           |2025-12-06 14:55:40.803072|2025-12-06 14:55:40.803072|
|108    |Điện - Điện tử - Viễn Thông - Tự động hoá - Thiết kế vi mạch|2025-12-06 14:55:40.803072|2025-12-06 14:55:40.803072|
|109    |Kỹ Thuật Cơ khí                                             |2025-12-06 14:55:40.803072|2025-12-06 14:55:40.803072|
|110    |Kỹ Thuật Cơ Điện tử                                         |2025-12-06 14:55:40.803072|2

## 4. Load Bảng SUBJECT_GROUP và SUBJECT

In [4]:
print("=" * 80)
print("LOAD BẢNG SUBJECT_GROUP và SUBJECT")
print("=" * 80)

# Đọc file tohop_mon_fixed.csv
df_tohop = spark.read.option("header", "true").option("inferSchema", "true").option("encoding", "UTF-8").csv("s3a://bronze/structured_data/tohop_mon_fixed.csv")

# --- SUBJECT_GROUP ---
df_subject_group_silver = df_tohop.select(
    col(df_tohop.columns[0]).cast("int").alias("subjectGroupId"),
    col(df_tohop.columns[1]).cast("string").alias("subjectGroupName"),
    col(df_tohop.columns[2]).cast("string").alias("subjectCombination"),
    current_timestamp().alias("created_at"),
    current_timestamp().alias("updated_at")
).filter(col("subjectGroupId").isNotNull() & col("subjectGroupName").isNotNull() & col("subjectCombination").isNotNull()).dropDuplicates(["subjectGroupName", "subjectCombination"])
df_subject_group_silver.writeTo("nessie.silver_tables.subject_group").using("iceberg").createOrReplace()
print(f"Đã ghi {df_subject_group_silver.count()} dòng vào subject_group")

# --- SUBJECT ---
df_subject = (
    df_tohop.select(explode(split(col(df_tohop.columns[2]), "-")).alias("subjectName"))
            .withColumn("subjectName", trim(col("subjectName")))
            .filter(col("subjectName").isNotNull() & (col("subjectName") != ""))
            .withColumn("subjectName_lower", lower(col("subjectName")))
            # loại bỏ trùng theo chữ thường
            .dropDuplicates(["subjectName_lower"])
)

window_spec = Window.orderBy("subjectName_lower")
df_subject_silver = df_subject.withColumn("subjectId", row_number().over(window_spec)).select(
    col("subjectId").cast("int"),
    col("subjectName").cast("string"),
    current_timestamp().alias("created_at"),
    current_timestamp().alias("updated_at")
)
df_subject_silver.writeTo("nessie.silver_tables.subject").using("iceberg").createOrReplace()
print(f"Đã ghi {df_subject_silver.count()} dòng vào subject")

# Verify
spark.table("nessie.silver_tables.subject_group").orderBy("subjectGroupId").show(5, truncate=False)
spark.table("nessie.silver_tables.subject").show(5, truncate=False)

LOAD BẢNG SUBJECT_GROUP và SUBJECT


                                                                                

Đã ghi 232 dòng vào subject_group
Đã ghi 51 dòng vào subject
+--------------+----------------+------------------+--------------------------+--------------------------+
|subjectGroupId|subjectGroupName|subjectCombination|created_at                |updated_at                |
+--------------+----------------+------------------+--------------------------+--------------------------+
|1             |A00             |Toán-Lí-Hóa       |2025-12-06 14:55:44.071023|2025-12-06 14:55:44.071023|
|2             |A01             |Toán-Lí-Ngoại ngữ |2025-12-06 14:55:44.071023|2025-12-06 14:55:44.071023|
|3             |A02             |Toán-Lí-Sinh      |2025-12-06 14:55:44.071023|2025-12-06 14:55:44.071023|
|4             |A03             |Toán-Lí-Sử        |2025-12-06 14:55:44.071023|2025-12-06 14:55:44.071023|
|5             |A04             |Toán-Lí-Địa       |2025-12-06 14:55:44.071023|2025-12-06 14:55:44.071023|
+--------------+----------------+------------------+--------------------------+----

## 5. Load Bảng SELECTION_METHOD

In [5]:
print("=" * 80)
print("LOAD BẢNG SELECTION_METHOD")
print("=" * 80)

# Đọc từ file benchmark để lấy các phương thức xét tuyển
df_benchmark = spark.read.option("header", "true").option("inferSchema", "true").option("encoding", "UTF-8").csv("s3a://bronze/structured_data/điểm chuẩn các trường (2021-2025)/Điểm_chuẩn_các_ngành_đại_học_năm(2021-2025)*.csv")

# Lấy PhuongThuc và loại bỏ "năm ..."
df_selection = df_benchmark.select(trim(regexp_replace(col("PhuongThuc"), r"\s*năm\s+\d{4}.*$", "")).alias("selectionMethodName")).filter(col("selectionMethodName").isNotNull() & (col("selectionMethodName") != "")).distinct()

window_spec = Window.orderBy("selectionMethodName")
df_selection_method_silver = df_selection.withColumn("selectionMethodId", row_number().over(window_spec)).select(
    col("selectionMethodId").cast("int"),
    col("selectionMethodName").cast("string"),
    current_timestamp().alias("created_at"),
    current_timestamp().alias("updated_at")
)
df_selection_method_silver.writeTo("nessie.silver_tables.selection_method").using("iceberg").createOrReplace()
print(f"Đã ghi {df_selection_method_silver.count()} dòng vào selection_method")

# Verify
spark.table("nessie.silver_tables.selection_method").show(10, truncate=False)

LOAD BẢNG SELECTION_METHOD


                                                                                

Đã ghi 10 dòng vào selection_method
+-----------------+------------------------------------------------------+--------------------------+--------------------------+
|selectionMethodId|selectionMethodName                                   |created_at                |updated_at                |
+-----------------+------------------------------------------------------+--------------------------+--------------------------+
|1                |Điểm chuẩn theo phương thức Điểm học bạ               |2025-12-06 14:55:51.823519|2025-12-06 14:55:51.823519|
|2                |Điểm chuẩn theo phương thức Điểm thi THPT             |2025-12-06 14:55:51.823519|2025-12-06 14:55:51.823519|
|3                |Điểm chuẩn theo phương thức Điểm xét tuyển kết hợp    |2025-12-06 14:55:51.823519|2025-12-06 14:55:51.823519|
|4                |Điểm chuẩn theo phương thức Điểm xét tốt nghiệp THPT  |2025-12-06 14:55:51.823519|2025-12-06 14:55:51.823519|
|5                |Điểm chuẩn theo phương thức Điểm ĐGNL HCM 

## 6. Load Bảng GradingScale

In [6]:
print("=" * 80)
print("LOAD BẢNG GRADING_SCALE TỪ PHANLOAITHANGDIEM")
print("=" * 80)

# 1. Đọc dữ liệu gốc từ file CSV (giống benchmark)
df_raw = (
    spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("encoding", "UTF-8")
        .csv("s3a://bronze/structured_data/điểm chuẩn các trường (2021-2025)/Điểm_chuẩn_các_ngành_đại_học_năm(2021-2025)*.csv")
)

# 2. Lấy unique PhanLoaiThangDiem
df_grading_raw = (
    df_raw
        .select(trim(col("PhanLoaiThangDiem")).alias("description"))
        .filter(col("description").isNotNull() & (col("description") != ""))
        .dropDuplicates(["description"])
)

# 3. Tách giá trị số trong description làm "value" (nếu có, vd: "thang 40" -> 40)
df_grading = (
    df_grading_raw
        .withColumn(
            "value",
            regexp_extract(col("description"), r"(\d+(?:\.\d+)?)", 1).cast("float")
        )
        .withColumn("gradingScaleId", monotonically_increasing_id().cast("int"))
        .withColumn("created_at", current_timestamp())
        .withColumn("updated_at", current_timestamp())
        .select(
            "gradingScaleId",
            "value",
            "description",
            "created_at",
            "updated_at"
        )
)

# 4. Ghi vào bảng Iceberg grading_scale đã tạo trước đó
df_grading.writeTo("nessie.silver_tables.grading_scale") \
          .using("iceberg") \
          .createOrReplace()

print(f"Đã ghi {df_grading.count()} dòng vào grading_scale")

# 5. Verify
spark.table("nessie.silver_tables.grading_scale").show(truncate=False)


LOAD BẢNG GRADING_SCALE TỪ PHANLOAITHANGDIEM


                                                                                

Đã ghi 10 dòng vào grading_scale
+--------------+------+---------------+--------------------------+--------------------------+
|gradingScaleId|value |description    |created_at                |updated_at                |
+--------------+------+---------------+--------------------------+--------------------------+
|0             |30.0  |Thang điểm 30  |2025-12-06 14:55:57.731411|2025-12-06 14:55:57.731411|
|1             |50.0  |Thang điểm 50  |2025-12-06 14:55:57.731411|2025-12-06 14:55:57.731411|
|2             |1200.0|Thang điểm 1200|2025-12-06 14:55:57.731411|2025-12-06 14:55:57.731411|
|3             |150.0 |Thang điểm 150 |2025-12-06 14:55:57.731411|2025-12-06 14:55:57.731411|
|4             |40.0  |Thang điểm 40  |2025-12-06 14:55:57.731411|2025-12-06 14:55:57.731411|
|5             |10.0  |Thang điểm 10  |2025-12-06 14:55:57.731411|2025-12-06 14:55:57.731411|
|6             |100.0 |Thang điểm 100 |2025-12-06 14:55:57.731411|2025-12-06 14:55:57.731411|
|7             |35.0  |Than

## 6. Load Bảng BENCHMARK

In [7]:
from pyspark.sql.functions import (
    col, trim, regexp_replace, current_timestamp,
    avg, round, expr
)

print("=" * 80)
print("LOAD BẢNG BENCHMARK")
print("=" * 80)

# =========================
# 1. ĐỌC & CHUẨN HÓA DỮ LIỆU BRONZE
# =========================

df_benchmark = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("encoding", "UTF-8")
    .csv("s3a://bronze/structured_data/điểm chuẩn các trường (2021-2025)/Điểm_chuẩn_các_ngành_đại_học_năm(2021-2025)*.csv")
)

# Chuẩn hóa cột PhuongThuc: bỏ phần "năm XXXX ..."
df_benchmark = df_benchmark.withColumn(
    "PhuongThuc_cleaned",
    trim(regexp_replace(col("PhuongThuc"), r"\s*năm\s+\d{4}.*$", ""))
)

# Lookup tables từ Silver
df_selection_lookup     = spark.table("nessie.silver_tables.selection_method")
df_subject_group_lookup = spark.table("nessie.silver_tables.subject_group")
df_grading_scale_lookup = spark.table("nessie.silver_tables.grading_scale")

# Join lookup + chuẩn hóa
df_benchmark_base = (
    df_benchmark
    .join(
        df_selection_lookup,
        df_benchmark["PhuongThuc_cleaned"] == df_selection_lookup["selectionMethodName"],
        "left"
    )
    .join(
        df_subject_group_lookup,
        df_benchmark["KhoiThi"] == df_subject_group_lookup["subjectGroupName"],
        "left"
    )
    .join(
        df_grading_scale_lookup,
        trim(df_benchmark["PhanLoaiThangDiem"]) == df_grading_scale_lookup["description"],
        "left"
    )
    .select(
        col("MaTruong").cast("string").alias("schoolId"),
        col("MaNganh").cast("string").alias("majorId"),
        col("subjectGroupId").cast("int"),
        col("selectionMethodId").cast("int"),
        col("gradingScaleId").cast("int"),
        col("Nam").cast("int").alias("year"),
        col("DiemChuan").cast("double").alias("score"),
    )
    .filter(
        col("schoolId").isNotNull() &
        col("majorId").isNotNull() &
        col("gradingScaleId").isNotNull() &
        col("year").isNotNull() &
        col("score").isNotNull() &
        col("selectionMethodId").isNotNull()
        # col("subjectGroupId").isNotNull()  # nếu muốn bắt buộc khối thi thì mở dòng này
    )
    .dropDuplicates([
        "schoolId",
        "majorId",
        "subjectGroupId",
        "selectionMethodId",
        "year",
        "gradingScaleId",
        "score"
    ])
)

# =========================
# 2. GROUP BY & LẤY ĐIỂM TRUNG BÌNH
# =========================

df_benchmark_grouped = (
    df_benchmark_base
    .groupBy(
        "schoolId",
        "majorId",
        "subjectGroupId",
        "selectionMethodId",
        "gradingScaleId",
        "year"
    )
    .agg(
        round(avg("score"), 2).alias("score")
    )
)

table_name = "nessie.silver_tables.benchmark"

# =========================
# 3. CHECK BẢNG SILVER ĐÃ TỒN TẠI CHƯA
# =========================

try:
    spark.table(table_name)
    table_exists = True
    print(f"Bảng {table_name} đã tồn tại → dùng MERGE (upsert).")
except Exception:
    table_exists = False
    print(f"Bảng {table_name} chưa tồn tại → tạo mới full-load.")

# =========================
# 4. LẦN ĐẦU: TẠO BẢNG FULL (DÙNG xxhash64 LÀM benchmarkId)
# =========================

if not table_exists:
    df_benchmark_silver = (
        df_benchmark_grouped
        .withColumn(
            "benchmarkId",
            expr(
                """
                CAST(
                    xxhash64(
                        schoolId,
                        majorId,
                        COALESCE(subjectGroupId, -1),
                        selectionMethodId,
                        gradingScaleId,
                        year
                    ) AS BIGINT
                )
                """
            )
        )
        .withColumn("created_at", current_timestamp())
        .withColumn("updated_at", current_timestamp())
        .select(
            "benchmarkId",
            "schoolId",
            "majorId",
            "subjectGroupId",
            "selectionMethodId",
            "gradingScaleId",
            "year",
            "score",
            "created_at",
            "updated_at"
        )
    )

    df_benchmark_silver.writeTo(table_name).using("iceberg").createOrReplace()
    print(f"Đã tạo mới benchmark với {df_benchmark_silver.count()} dòng")

# =========================
# 5. CÁC LẦN SAU: MERGE / UPSERT
# =========================

else:
    # Staging từ bronze sau khi chuẩn hóa + group
    df_staging = (
        df_benchmark_grouped
        .withColumn("created_at", current_timestamp())
        .withColumn("updated_at", current_timestamp())
    )

    df_staging.createOrReplaceTempView("benchmark_staging")

    # MERGE:
    # - MATCHED: update score + updated_at
    # - NOT MATCHED: insert bản ghi mới với benchmarkId = hash(business key)
    spark.sql(f"""
        MERGE INTO {table_name} AS t
        USING benchmark_staging AS s
        ON  t.schoolId          = s.schoolId
        AND t.majorId           = s.majorId
        AND COALESCE(t.subjectGroupId,  -1) = COALESCE(s.subjectGroupId,  -1)
        AND t.selectionMethodId = s.selectionMethodId
        AND t.gradingScaleId    = s.gradingScaleId
        AND t.year              = s.year

        WHEN MATCHED THEN UPDATE SET
            t.score      = s.score,
            t.updated_at = current_timestamp()

        WHEN NOT MATCHED THEN INSERT (
            benchmarkId,
            schoolId,
            majorId,
            subjectGroupId,
            selectionMethodId,
            gradingScaleId,
            year,
            score,
            created_at,
            updated_at
        ) VALUES (
            CAST(
                xxhash64(
                    s.schoolId,
                    s.majorId,
                    COALESCE(s.subjectGroupId, -1),
                    s.selectionMethodId,
                    s.gradingScaleId,
                    s.year
                ) AS BIGINT
            ),
            s.schoolId,
            s.majorId,
            s.subjectGroupId,
            s.selectionMethodId,
            s.gradingScaleId,
            s.year,
            s.score,
            s.created_at,
            s.updated_at
        )
    """)

    print("Đã MERGE dữ liệu mới vào bảng benchmark")

# =========================
# 6. VERIFY
# =========================

spark.table(table_name).show(5, truncate=False)
spark.table(table_name).groupBy("year").count().orderBy("year").show()


LOAD BẢNG BENCHMARK
Bảng nessie.silver_tables.benchmark đã tồn tại → dùng MERGE (upsert).


                                                                                

Đã MERGE dữ liệu mới vào bảng benchmark
+--------------------+--------+-------+--------------+-----------------+--------------+----+-----+--------------------------+--------------------------+
|benchmarkId         |schoolId|majorId|subjectGroupId|selectionMethodId|gradingScaleId|year|score|created_at                |updated_at                |
+--------------------+--------+-------+--------------+-----------------+--------------+----+-----+--------------------------+--------------------------+
|4789796714871569147 |DBG     |7620105|70            |2                |0             |2025|15.0 |2025-12-06 14:56:01.091071|2025-12-06 14:56:01.091071|
|6236320012694458209 |DBG     |7620112|187           |1                |0             |2025|18.0 |2025-12-06 14:56:01.091071|2025-12-06 14:56:01.091071|
|2144508859211824436 |DCD     |7510401|35            |2                |4             |2025|17.33|2025-12-06 14:56:01.091071|2025-12-06 14:56:01.091071|
|380800951782701353  |BKA     |EV1    |22 

## 7. Load Bảng REGION

In [8]:
print("=" * 80)
print("LOAD BẢNG REGION")
print("=" * 80)

df_region = spark.read.option("header", "true").option("inferSchema", "true").option("encoding", "UTF-8").csv("s3a://bronze/structured_data/region.csv")
df_region_silver = df_region.select(
    lpad(col(df_region.columns[0]).cast("string"), 2, "0").alias("regionId"),  # Format thành 2 chữ số: "1" -> "01"
    col(df_region.columns[1]).cast("string").alias("regionName"),
    current_timestamp().alias("created_at"),
    current_timestamp().alias("updated_at")
).filter(col("regionId").isNotNull() & col("regionName").isNotNull()).dropDuplicates(["regionId"])

df_region_silver.writeTo("nessie.silver_tables.region").using("iceberg").createOrReplace()
print(f"Đã ghi {df_region_silver.count()} dòng vào region")

# Verify
spark.table("nessie.silver_tables.region").show(10, truncate=False)

LOAD BẢNG REGION
Đã ghi 64 dòng vào region
+--------+-----------------------+--------------------------+--------------------------+
|regionId|regionName             |created_at                |updated_at                |
+--------+-----------------------+--------------------------+--------------------------+
|01      |Sở GDĐT Hà Nội         |2025-12-06 14:56:07.622558|2025-12-06 14:56:07.622558|
|02      |Sở GDĐT TP. Hồ Chí Minh|2025-12-06 14:56:07.622558|2025-12-06 14:56:07.622558|
|03      |Sở GDĐT Hải Phòng      |2025-12-06 14:56:07.622558|2025-12-06 14:56:07.622558|
|04      |Sở GDĐT Đà Nẵng        |2025-12-06 14:56:07.622558|2025-12-06 14:56:07.622558|
|05      |Sở GDĐT Hà Giang       |2025-12-06 14:56:07.622558|2025-12-06 14:56:07.622558|
|06      |Sở GDĐT Cao Bằng       |2025-12-06 14:56:07.622558|2025-12-06 14:56:07.622558|
|07      |Sở GDĐT Lai Châu       |2025-12-06 14:56:07.622558|2025-12-06 14:56:07.622558|
|08      |Sở GDĐT Lào Cai        |2025-12-06 14:56:07.622558|2025-1

## 8. Load Bảng STUDENT_SCORES

In [9]:
from pyspark.sql.functions import (
    col, trim, regexp_replace, current_timestamp, lit,
    concat, substring, udf, input_file_name, regexp_extract
)
from pyspark.sql.types import MapType, IntegerType, DoubleType
from typing import Dict

print("=" * 80)
print("LOAD BẢNG STUDENT_SCORES - INCREMENTAL BY FILE (DELETE + APPEND)")
print("=" * 80)

# =====================================================
# 0. TẠO BẢNG LOG INGEST (LƯU FILE ĐÃ XỬ LÝ) NẾU CHƯA CÓ
# =====================================================
spark.sql("""
CREATE TABLE IF NOT EXISTS nessie.silver_tables.student_scores_ingest_log (
    path STRING,
    year INT,
    processed_at TIMESTAMP
) USING iceberg
""")

# =====================================================
# 1. LẤY DANH SÁCH TẤT CẢ FILE CSV HIỆN CÓ TRONG BRONZE
#    + TRỪ ĐI NHỮNG FILE ĐÃ INGEST (log)
# =====================================================

df_files = (
    spark.read.format("binaryFile")
    .option("pathGlobFilter", "*.csv")
    .load("s3a://bronze/structured_data/điểm từng thí sinh/*/*.csv")
    .select("path")
)

df_log = spark.table("nessie.silver_tables.student_scores_ingest_log")

df_new_files = df_files.join(df_log, on="path", how="left_anti")
new_files = [r.path for r in df_new_files.collect()]

if not new_files:
    print(" Không có file mới nào, dừng job.")
else:
    print(f" Phát hiện {len(new_files)} file mới cần xử lý.")

    # =====================================================
    # 2. ĐỌC CHỈ CÁC FILE MỚI + THÊM CỘT YEAR
    # =====================================================

    df_scores_raw = (
        spark.read
        .option("header", "true")
        .option("inferSchema", "false")
        .option("encoding", "UTF-8")
        .csv(new_files)
        .withColumn("path", input_file_name())
    )

    df_scores_raw = df_scores_raw.withColumn(
        "Year",
        regexp_extract(col("path"), r"/(\d{4})/", 1).cast("int")
    )

    # =====================================================
    # 3. LOAD LOOKUP MÔN HỌC
    # =====================================================

    df_subject_lookup = spark.table("nessie.silver_tables.subject").select("subjectId", "subjectName")
    subject_map = {row.subjectName: row.subjectId for row in df_subject_lookup.collect()}
    print(f"\nĐã load {len(subject_map)} môn học để mapping")

    # =====================================================
    # 4. UDF PARSE ĐIỂM → Map<subjectId, score>
    # =====================================================

    def parse_scores_with_subject_id(score_string: str) -> Dict[int, float]:
        if not score_string or score_string.strip() == "":
            return {}
        scores_dict = {}
        try:
            pairs = score_string.split(",")
            for pair in pairs:
                if ":" in pair:
                    subject_name, score = pair.split(":")
                    subject_name = subject_name.strip()
                    # Map tên môn -> subjectId
                    if subject_name in subject_map:
                        subject_id = subject_map[subject_name]
                        try:
                            scores_dict[subject_id] = float(score.strip())
                        except:
                            pass
        except:
            pass
        return scores_dict

    parse_scores_udf = udf(parse_scores_with_subject_id, MapType(IntegerType(), DoubleType()))

    # =====================================================
    # 5. TRANSFORM → DATAFRAME STAGING (KHÔNG MERGE)
    # =====================================================

    # 1️ Biến đầy đủ để append vào silver
    df_student_scores_stage = (
        df_scores_raw
        .withColumn("studentId", concat(col("SBD"), col("Year").cast("string")))
        .withColumn("scores", parse_scores_udf(col("DiemThi")))   # UDF ở đây
        .withColumn("regionId", substring(col("SBD"), 1, 2).cast("string"))
        .select(
            col("studentId").cast("string"),
            col("regionId").cast("string"),
            col("Year").cast("int").alias("year"),
            col("scores")
        )
        .filter(
            col("studentId").isNotNull() &
            col("year").isNotNull() &
            col("scores").isNotNull()
        )
    )
    
    # 2️ Biến thứ hai chỉ có studentId — KHÔNG UDF → dùng để DELETE
    df_student_ids = (
        df_scores_raw
        .withColumn("studentId", concat(col("SBD"), col("Year").cast("string")))
        .select("studentId")
        .filter(col("studentId").isNotNull())
        # .dropDuplicates(["studentId"])
    )
    
    df_student_ids.createOrReplaceTempView("student_scores_new_ids")


    staging_count = df_student_scores_stage.count()
    print(f"Staging có {staging_count:,} dòng.")

    table_name = "nessie.silver_tables.student_scores"

        # =====================================================
    # 6. XOÁ studentId CŨ BẰNG CÁCH COLLECT RA PYTHON + DELETE IN (...)
    # =====================================================

    # Lấy list studentId distinct trong batch mới
    new_ids = [
        row.studentId
        for row in df_student_scores_stage.select("studentId").distinct().collect()
    ]

    print(f"Số studentId distinct trong batch mới: {len(new_ids):,}")

    # Kiểm tra bảng silver đã tồn tại chưa
    try:
        spark.table(table_name)
        table_exists = True
        print(f"Bảng {table_name} đã tồn tại → DELETE theo list studentId + APPEND.")
    except Exception:
        table_exists = False
        print(f"Bảng {table_name} chưa tồn tại → tạo mới từ batch, không cần xoá.")

    silver_count = spark.table(table_name).count() if table_exists else 0
    print(f"Số dòng trong bảng silver hiện tại: {silver_count:,}")

    if not table_exists:
        # 1️ BẢNG CHƯA TỒN TẠI → TẠO MỚI
        (
            df_student_scores_stage
            .withColumn("created_at", current_timestamp())
            .withColumn("updated_at", current_timestamp())
            .writeTo(table_name)
            .using("iceberg")
            .createOrReplace()
        )
        print(f" Đã tạo mới bảng {table_name} với {staging_count:,} dòng.")
    
    elif silver_count == 0:
        # 2️ BẢNG TỒN TẠI NHƯNG RỖNG → KHÔNG XOÁ, CHỈ APPEND
        print(" Bảng silver đã tồn tại nhưng rỗng → chỉ append, không xoá.")
    
        (
            df_student_scores_stage
            .withColumn("created_at", current_timestamp())
            .withColumn("updated_at", current_timestamp())
            .writeTo(table_name)
            .using("iceberg")
            .append()
        )
        print(f" Đã append {staging_count:,} dòng mới vào {table_name}.")
    
    elif new_ids:
        # 3️ BẢNG TỒN TẠI VÀ new_ids KHÔNG RỖNG → DELETE + APPEND
        print("Bảng silver có dữ liệu → DELETE + APPEND.")
    
        chunk_size = 500
        from math import ceil
    
        num_chunks = ceil(len(new_ids) / chunk_size)
        print(f"Chia studentId thành {num_chunks} chunk để xoá...")
    
        for i in range(num_chunks):
            chunk = new_ids[i * chunk_size:(i + 1) * chunk_size]
            escaped_ids = [sid.replace("'", "''") for sid in chunk]
            in_list = ",".join([f"'{sid}'" for sid in escaped_ids])
    
            sql_delete = f"""
                DELETE FROM {table_name}
                WHERE studentId IN ({in_list})
            """
            spark.sql(sql_delete)
    
        print(" Đã xoá xong các studentId cũ trong silver.")
    
        (
            df_student_scores_stage
            .withColumn("created_at", current_timestamp())
            .withColumn("updated_at", current_timestamp())
            .writeTo(table_name)
            .using("iceberg")
            .append()
        )
        print(f" Đã append {staging_count:,} dòng mới.")
    
    else:
        # 4️ new_ids rỗng → không xoá, không append
        print(" Batch mới không có studentId nào hợp lệ → không làm gì cả.")


    # =====================================================
    # 7. GHI LOG FILE ĐÃ XỬ LÝ
    # =====================================================

    from pyspark.sql.functions import array, explode

    df_new_files_log = (
        df_new_files
        .withColumn("year", regexp_extract(col("path"), r"/(\d{4})/", 1).cast("int"))
        .withColumn("processed_at", current_timestamp())
    )

    (
        df_new_files_log
        .writeTo("nessie.silver_tables.student_scores_ingest_log")
        .using("iceberg")
        .append()
    )

    print(f"Đã ghi log {df_new_files_log.count():,} file đã xử lý.")

    # =====================================================
    # 8. VERIFY
    # =====================================================

    print("\nMẫu dữ liệu student_scores:")
    spark.table(table_name).show(5, truncate=False)

LOAD BẢNG STUDENT_SCORES - INCREMENTAL BY FILE (DELETE + APPEND)


                                                                                

 Phát hiện 35 file mới cần xử lý.

Đã load 51 môn học để mapping


                                                                                

Staging có 5,202,085 dòng.


ERROR:root:KeyboardInterrupt while sending command.                 (0 + 6) / 7]
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/local/lib/python3.10/socket.py", line 717, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt
25/12/06 14:57:56 ERROR TaskSchedulerImpl: Lost executor 0 on 172.18.0.8: Command exited with code 137
[Stage 115:>                                                        (0 + 4) / 7]

KeyboardInterrupt: 

## 9. Load Bảng ARTICLE và COMMENT từ TikTok Data

In [10]:
import re
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import (
    col, lit, when, coalesce, trim,
    to_timestamp, current_timestamp,
    input_file_name, regexp_replace
)

# ====================================================
# CẤU HÌNH
# ====================================================
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
spark.conf.set("spark.sql.files.maxPartitionBytes", "33554432")

POSTS_PATH = "s3a://bronze/MangXaHoi/tiktok-data/posts/*.csv"
TABLE_ARTICLE = "nessie.silver_tables.article"
TABLE_LOG = "nessie.silver_tables.tiktok_posts_files_log"

print("=" * 80)
print("JOB 1: LOAD TIKTOK POSTS")
print("=" * 80)

# 1. Check file moi
spark.sql(f"CREATE TABLE IF NOT EXISTS {TABLE_LOG} (file_path STRING, load_time TIMESTAMP) USING iceberg")

df_all = spark.read.format("binaryFile").option("pathGlobFilter", "*.csv").load(POSTS_PATH).select("path")
try:
    df_processed = spark.table(TABLE_LOG).select("file_path")
    df_new_files = df_all.join(df_processed, df_all.path == col("file_path"), "left_anti")
except:
    df_new_files = df_all

new_files = [r.path for r in df_new_files.collect()]

if not new_files:
    print("Khong co file Post moi.")
else:
    print(f"Xu ly {len(new_files)} file Post moi.")

    # 2. Doc & Transform
    df_raw = spark.read.option("header","true").option("inferSchema","false").csv(new_files)

    df_trans = (
        df_raw
        .withColumn("timePublish", 
            coalesce(
                to_timestamp(col("TimePublish"), "dd-MM-yyyy"),
                to_timestamp(col("TimePublish"), "d-M-yyyy"), 
                to_timestamp(regexp_replace(col("TimePublish"), r".*(\d{1,2})\s+Tháng\s+(\d{1,2}),\s+(\d{4}).*", "$1-$2-$3"), "d-M-yyyy"),
                current_timestamp()
            ))
        .withColumn("likeCount", 
            when(col("Like").contains("K"), (regexp_replace(col("Like"), "K", "").cast("float")*1000).cast("int"))
            .when(col("Like").contains("M"), (regexp_replace(col("Like"), "M", "").cast("float")*1000000).cast("int"))
            .otherwise(coalesce(col("Like").cast("int"), lit(0))))
        .withColumn("commentCount", 
            when(col("Comment").contains("K"), (regexp_replace(col("Comment"), "K", "").cast("float")*1000).cast("int"))
            .when(col("Comment").contains("M"), (regexp_replace(col("Comment"), "M", "").cast("float")*1000000).cast("int"))
            .otherwise(coalesce(col("Comment").cast("int"), lit(0))))
        .withColumn("shareCount", 
            when(col("Share").contains("K"), (regexp_replace(col("Share"), "K", "").cast("float")*1000).cast("int"))
            .when(col("Share").contains("M"), (regexp_replace(col("Share"), "M", "").cast("float")*1000000).cast("int"))
            .otherwise(coalesce(col("Share").cast("int"), lit(0))))
        .select(
            trim(col("ID")).alias("articleID"),  # ID TikTok -> articleID
            col("Description").alias("description"), 
            col("Author").alias("author"),
            col("Url").alias("url"),
            col("timePublish"),
            col("likeCount"), col("commentCount"), col("shareCount"),
            lit("TikTok").alias("type"),
            current_timestamp().alias("created_at"),
            current_timestamp().alias("updated_at")
        )
    )

    # 3. Ghi du lieu (Append)
    print("Dang ghi vao Iceberg...")
    df_trans.writeTo(TABLE_ARTICLE).using("iceberg").append()
    
    # 4. Ghi Log
    spark.createDataFrame([(f,) for f in new_files], ["file_path"]) \
          .withColumn("load_time", current_timestamp()) \
          .writeTo(TABLE_LOG).using("iceberg").append()
    
    print("Hoan tat.")

JOB 1: LOAD TIKTOK POSTS


                                                                                

Xu ly 1 file Post moi.




Dang ghi vao Iceberg...


25/12/06 14:59:25 ERROR TaskSchedulerImpl: Lost executor 1 on 172.18.0.9: Command exited with code 137
Exception in thread "serve-DataFrame" java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:713)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:757)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:675)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:641)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:617)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:574)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:532)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:65)
                                                                                

Hoan tat.


In [None]:
import re
import gc
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.functions import (
    col, lit, when, coalesce, trim,
    to_timestamp, current_timestamp,
    input_file_name, regexp_replace
)

# ====================================================
# CẤU HÌNH CỰC NHẸ (ULTRA LOW RESOURCE)
# ====================================================
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
spark.conf.set("spark.sql.files.maxPartitionBytes", "16777216") # Giam xuong 16MB/task de doc it RAM hon
spark.conf.set("spark.sql.shuffle.partitions", "20")            # Tang partition len 20 de chia nho viec

# Path
POSTS_PATH = "s3a://bronze/MangXaHoi/tiktok-data/comments/*.csv"
TABLE_COMMENT = "nessie.silver_tables.comment"
TABLE_LOG = "nessie.silver_tables.tiktok_comments_files_log"

# GIAM BATCH SIZE XUONG 10 (Quan trong nhat de tranh OOM)
BATCH_SIZE = 10

print("=" * 80)
print(f"JOB 2: LOAD TIKTOK COMMENTS (TINY BATCH {BATCH_SIZE} - ULTRA SAFE MODE)")
print("=" * 80)

# 1. Tao bang Log (Fix Py4J)
try:
    spark.table(TABLE_LOG)
except:
    print(f"-> Bang log {TABLE_LOG} chua ton tai. Dang tao moi...")
    log_schema = StructType([StructField("file_path", StringType(), False), StructField("load_time", TimestampType(), False)])
    spark.createDataFrame([], log_schema).writeTo(TABLE_LOG).using("iceberg").create()

# 2. Loc file moi (DEBUG MODE)
print("-> Dang quet file nguon...")
df_all = spark.read.format("binaryFile").option("pathGlobFilter", "*.csv").load(POSTS_PATH).select("path")
total_files_count = df_all.count()
print(f"   Tong so file trong folder: {total_files_count}")

try:
    # Doc bang log
    df_processed = spark.table(TABLE_LOG).select("file_path").distinct()
    processed_count = df_processed.count()
    print(f"   So file da xu ly truoc do (Log): {processed_count}")
    
    # --- FIX LOGIC LOC FILE ---
    df_new_files = df_all.alias("src").join(
        df_processed.alias("log"), 
        col("src.path") == col("log.file_path"), 
        "left_anti"
    )
    
    if processed_count > 0 and df_new_files.count() == total_files_count:
        print("   [DEBUG] CANH BAO: Log co du lieu nhung khong loc duoc file nao.")

except Exception as e:
    print(f"   [WARNING] Loi doc Log: {e}")
    df_new_files = df_all

# Lay danh sach file can xu ly
all_new_files = [r.path for r in df_new_files.collect()]
# Quan trong: Cat batch nho
files_to_process = all_new_files[:BATCH_SIZE]

if not files_to_process:
    print("-> KHONG CO FILE COMMENT MOI.")
else:
    print(f"-> Tim thay {len(all_new_files)} file moi.")
    print(f"-> Dot nay se xu ly {len(files_to_process)} file (Batch nho de tranh sap).")

    # 3. Xu ly tung file
    for i, file_path in enumerate(files_to_process):
        filename = file_path.split('/')[-1]
        print(f"\n--- [{i+1}/{len(files_to_process)}] Dang xu ly: {filename} ---")
        
        try:
            df_raw = spark.read.option("header","true").option("inferSchema","false").csv(file_path)

            if "ID_Post" not in df_raw.columns:
                print(f"   [SKIP] File loi format (Thieu ID_Post).")
                spark.createDataFrame([(file_path,)], ["file_path"]).withColumn("load_time", current_timestamp()).writeTo(TABLE_LOG).using("iceberg").append()
                continue

            # Transform
            df_trans = df_raw.select(
                trim(col("ID_Post")).alias("articleID"), 
                col("Name").alias("name"),
                col("TagName").alias("tagName"),
                col("URL").alias("urlUser"),
                col("Comment").alias("comment"),
                coalesce(
                    to_timestamp(regexp_replace(col("Time"), r"(\d{1,2})[-/](\d{1,2})[-/](\d{4}).*", "$1-$2-$3"), "d-M-yyyy"),
                    to_timestamp(regexp_replace(col("Time"), r".*trước.*", "1970-01-01"), "yyyy-MM-dd"),
                    current_timestamp()
                ).alias("commentTime"),
                coalesce(col("Likes").cast("int"), lit(0)).alias("commentLike"),
                when(col("LevelComment") == "Yes", 2).otherwise(1).alias("levelComment"),
                col("RepliedTo").alias("replyTo"),
                coalesce(col("NumberOfReplies").cast("int"), lit(0)).alias("numberOfReply"),
                current_timestamp().alias("created_at"),
                current_timestamp().alias("updated_at")
            ).filter(col("articleID").isNotNull() & (col("articleID") != ""))
            
            # GHI TRUC TIEP
            if not df_trans.rdd.isEmpty():
                # --- CHIEN THUAT GHI AN TOAN NHAT ---
                # 1. Repartition(20): Chia nho data ra 20 task de xu ly nhe nhang hon
                # 2. sortWithinPartitions: Giup Iceberg Writer khong phai buffer qua nhieu
                print("   -> Dang chuan bi du lieu (Shuffle nhe)...")
                df_optimized = df_trans.repartition(20, "articleID").sortWithinPartitions("articleID")
                
                print("   -> Dang ghi vao Iceberg...")
                df_optimized.writeTo(TABLE_COMMENT).using("iceberg").append()
                print("   -> Da ghi xong.")
            
            # Ghi Log
            spark.createDataFrame([(file_path,)], ["file_path"]).withColumn("load_time", current_timestamp()).writeTo(TABLE_LOG).using("iceberg").append()
            
            # Don dep RAM ngay lap tuc
            df_trans.unpersist()
            spark.catalog.clearCache()
            gc.collect()

        except Exception as e:
            print(f"   [ERROR] {e}")

    print(f"Hoan tat batch {len(files_to_process)} file. Hay chay lai de lam tiep batch sau.")

## 10. Load Bảng ARTICLE và COMMENT từ Facebook Data

In [11]:
import re
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import (
    col, trim, to_timestamp, lit, current_timestamp, 
    input_file_name, coalesce, udf
)

# ====================================================
# PARSE TIME UDF
# ====================================================
MONTH_MAP = {"Tháng 1": "01", "Tháng 2": "02", "Tháng 3": "03", "Tháng 4": "04", "Tháng 5": "05", "Tháng 6": "06", "Tháng 7": "07", "Tháng 8": "08", "Tháng 9": "09", "Tháng 10": "10", "Tháng 11": "11", "Tháng 12": "12"}

def parse_vietnam_datetime(dt_str):
    if not dt_str: return None
    try:
        if "," in dt_str: dt_str = dt_str.split(",", 1)[1].strip()
        match = re.search(r"(\d+)\s+(Tháng\s+\d+)", dt_str)
        if not match: return None
        day, month_text = match.group(1), match.group(2)
        month = MONTH_MAP.get(month_text, "01")
        year = re.search(r",\s*(\d{4})", dt_str).group(1) if re.search(r",\s*(\d{4})", dt_str) else "2025"
        time_str = re.search(r"lúc\s+(\d{1,2}:\d{2})", dt_str).group(1) if re.search(r"lúc\s+(\d{1,2}:\d{2})", dt_str) else "00:00"
        return f"{year}-{month}-{int(day):02d} {time_str}:00"
    except: return None

parse_vn_time_udf = udf(parse_vietnam_datetime, StringType())

# ====================================================
# CẤU HÌNH
# ====================================================
POSTS_GLOB = "s3a://bronze/MangXaHoi/Face-data/posts/*.csv"
TABLE_ARTICLE = "nessie.silver_tables.article"
TABLE_LOG = "nessie.silver_tables.fb_posts_files_log"

print("=" * 80)
print("JOB 3: LOAD FACEBOOK POSTS (MERGE/UPSERT)")
print("=" * 80)

spark.sql(f"CREATE TABLE IF NOT EXISTS {TABLE_LOG} (file_path STRING, load_time TIMESTAMP) USING iceberg")

# 1. Loc file moi
df_all = spark.read.option("header", "true").csv(POSTS_GLOB).withColumn("file_path", input_file_name())
try:
    df_proc = spark.table(TABLE_LOG).select("file_path").distinct()
    new_files = [r.file_path for r in df_all.join(df_proc, "file_path", "left_anti").select("file_path").distinct().collect()]
except:
    new_files = [r.file_path for r in df_all.select("file_path").distinct().collect()]

if new_files:
    print(f"Xu ly {len(new_files)} file moi.")
    
    # 2. Transform
    df_raw = spark.read.option("header", "true").option("inferSchema", "false").csv(new_files)
    df_trans = df_raw.select(
        trim(col("ID")).alias("articleID"),         # ID FB -> articleID
        trim(col("Description")).alias("description"),
        trim(col("Author")).alias("author"),
        trim(col("Url")).alias("url"),
        coalesce(to_timestamp(parse_vn_time_udf(col("TimePublish"))), to_timestamp(col("TimePublish")), current_timestamp()).alias("timePublish"),
        coalesce(col("Like").cast("int"), lit(0)).alias("likeCount"),
        coalesce(col("Share").cast("int"), lit(0)).alias("shareCount"),
        coalesce(col("Comment").cast("int"), lit(0)).alias("commentCount")
    )

    # 3. MERGE (Upsert)
    df_trans.createOrReplaceTempView("fb_source")
    
    # Update metrics
    spark.sql(f"""
    MERGE INTO {TABLE_ARTICLE} t USING fb_source s
    ON t.url = s.url AND t.type = 'facebook'
    WHEN MATCHED THEN UPDATE SET
        t.description = s.description, t.timePublish = s.timePublish,
        t.likeCount = s.likeCount, t.shareCount = s.shareCount, t.commentCount = s.commentCount,
        t.updated_at = current_timestamp()
    """)

    # Insert new
    spark.sql(f"""
    INSERT INTO {TABLE_ARTICLE}
    SELECT s.articleID, s.description, s.author, s.url, s.timePublish,
           s.likeCount, s.commentCount, s.shareCount, 'facebook', current_timestamp(), current_timestamp()
    FROM fb_source s
    WHERE NOT EXISTS (SELECT 1 FROM {TABLE_ARTICLE} t WHERE t.url = s.url AND t.type = 'facebook')
    """)
    
    # 4. Log
    spark.createDataFrame([(f,) for f in new_files], ["file_path"]).withColumn("load_time", current_timestamp()).writeTo(TABLE_LOG).using("iceberg").append()
    print("Hoan tat.")
else:
    print("Khong co file moi.")

JOB 3: LOAD FACEBOOK POSTS (MERGE/UPSERT)


                                                                                

Xu ly 1 file moi.


25/12/06 15:01:38 ERROR TaskSchedulerImpl: Lost executor 3 on 172.18.0.8: worker lost: Not receiving heartbeat for 60 seconds
25/12/06 15:01:38 ERROR TaskSchedulerImpl: Lost executor 2 on 172.18.0.7: Command exited with code 137
                                                                                

Hoan tat.


In [None]:
import re
import gc
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import (
    col, lit, when, coalesce, trim, broadcast,
    to_timestamp, current_timestamp,
    input_file_name, regexp_replace
)

# ====================================================
# CẤU HÌNH TỐI ƯU
# ====================================================
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
spark.conf.set("spark.sql.files.maxPartitionBytes", "33554432")
# Giam shuffle partition de tiet kiem RAM
spark.conf.set("spark.sql.shuffle.partitions", "50")

COMMENTS_GLOB = "s3a://bronze/MangXaHoi/Face-data/comments/*.csv"
TABLE_COMMENT = "nessie.silver_tables.comment"
TABLE_LOG = "nessie.silver_tables.fb_comments_files_log"

# Batch size: Xu ly 50 file/lan chay
BATCH_SIZE = 50

print("=" * 80)
print(f"JOB 4: LOAD FACEBOOK COMMENTS (FAST APPEND - NO VALIDATION - BATCH {BATCH_SIZE})")
print("=" * 80)

# 1. Loc file moi
spark.sql(f"CREATE TABLE IF NOT EXISTS {TABLE_LOG} (file_path STRING, load_time TIMESTAMP) USING iceberg")

df_all = spark.read.format("binaryFile").option("pathGlobFilter", "*.csv").load(COMMENTS_GLOB).select("path")
try:
    df_proc = spark.table(TABLE_LOG).select("file_path").distinct()
    df_new = df_all.join(df_proc, df_all.path == col("file_path"), "left_anti")
except:
    df_new = df_all

# Lay danh sach file can xu ly (Batching)
all_new_files = [r.path for r in df_new.collect()]
files_to_process = all_new_files[:BATCH_SIZE]

if not files_to_process:
    print("Khong co file moi.")
else:
    print(f"Tim thay {len(all_new_files)} file moi.")
    print(f"-> Dot nay xu ly {len(files_to_process)} file.")

    # 2. Xu ly tung file
    for i, file_path in enumerate(files_to_process):
        filename = file_path.split('/')[-1]
        print(f"\n--- [{i+1}/{len(files_to_process)}] Dang xu ly: {filename} ---")
        
        try:
            df_raw = spark.read.option("header", "true").option("inferSchema", "false").csv(file_path)
            
            if "Id_post" not in df_raw.columns:
                print(f"   [SKIP] Thieu cot Id_post.")
                spark.createDataFrame([(file_path,)], ["file_path"]).withColumn("load_time", current_timestamp()).writeTo(TABLE_LOG).using("iceberg").append()
                continue

            # Transform (Set NULL cho cot thieu)
            df_trans = df_raw.select(
                trim(col("Id_post")).alias("articleID"), 
                lit(None).cast("string").alias("name"),
                lit(None).cast("string").alias("tagName"),
                lit(None).cast("string").alias("urlUser"),
                (col("Comment") if "Comment" in df_raw.columns else lit("")).alias("comment"),
                lit(None).cast("timestamp").alias("commentTime"),
                lit(None).cast("int").alias("commentLike"),
                lit(None).cast("int").alias("levelComment"),
                lit(None).cast("string").alias("replyTo"),
                lit(None).cast("int").alias("numberOfReply"),
                current_timestamp().alias("created_at"),
                current_timestamp().alias("updated_at")
            ).filter(col("articleID").isNotNull() & (col("articleID") != ""))

            # --- GHI TRUC TIEP (APPEND ONLY) ---
            # Khong kiem tra Article ton tai
            # Khong kiem tra Duplicate
            
            if not df_trans.rdd.isEmpty():
                print(f"   -> Dang ghi comment vao Iceberg...")
                # Repartition(1) de gom thanh 1 file parquet gon gang tren o cung
                df_trans.repartition(1).writeTo(TABLE_COMMENT).using("iceberg").append()
                print("   -> Xong.")
            else:
                print("   -> File rong hoac khong co ID bai viet.")

            # Log
            spark.createDataFrame([(file_path,)], ["file_path"]).withColumn("load_time", current_timestamp()).writeTo(TABLE_LOG).using("iceberg").append()
            
            # Giai phong bo nho
            spark.catalog.clearCache()
            gc.collect()

        except Exception as e:
            print(f"   [ERROR] {e}")

    print(f"Hoan tat batch {len(files_to_process)} file.")

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, lit

# ====================================================
# CẤU HÌNH
# ====================================================
TABLE_ARTICLE = "nessie.silver_tables.article"

print("=" * 80)
print("KIỂM TRA SỐ LƯỢNG BÀI VIẾT (FACEBOOK & TIKTOK)")
print("=" * 80)

# 1. Đọc dữ liệu từ bảng Article
try:
    df_article = spark.table(TABLE_ARTICLE)
    
    # 2. Thống kê theo loại (Type)
    print("\n--- Thống kê chi tiết theo nguồn ---")
    df_stats = df_article.groupBy("type").count().orderBy("type")
    df_stats.show()
    
    # 3. Tính tổng số lượng
    total_count = df_article.count()
    print(f"-> TỔNG CỘNG: {total_count} bài viết.")
    
    # 4. Kiểm tra mẫu dữ liệu (Optional)
    print("\n--- 5 bài viết mới nhất ---")
    df_article.select("articleID", "type", "description", "created_at") \
              .orderBy(col("created_at").desc()) \
              .show(5, truncate=50)

except Exception as e:
    print(f"Lỗi khi đọc bảng {TABLE_ARTICLE}: {e}")
    print("Có thể bảng chưa được tạo hoặc chưa có dữ liệu.")

print("\nHoàn tất kiểm tra.")

KIỂM TRA SỐ LƯỢNG BÀI VIẾT (FACEBOOK & TIKTOK)

--- Thống kê chi tiết theo nguồn ---


                                                                                

+--------+-----+
|    type|count|
+--------+-----+
|  TikTok| 1346|
|facebook| 1949|
+--------+-----+

-> TỔNG CỘNG: 3295 bài viết.

--- 5 bài viết mới nhất ---




+----------------+--------+--------------------------------------------------+--------------------------+
|       articleID|    type|                                       description|                created_at|
+----------------+--------+--------------------------------------------------+--------------------------+
|1925786774945096|facebook|                    Topic: Đại học Y Dược Cần Thơ.|2025-12-06 15:00:21.496511|
|1918505135673260|facebook|                    Topic: Ngành Kỹ thuật Hóa học.|2025-12-06 15:00:21.496511|
|1917479792442461|facebook|Chuyển Trường Đại học Phenikaa thành Đại học Ph...|2025-12-06 15:00:21.496511|
|1918095542380886|facebook|Mọi người ơi, tình hình là giờ em mới tìm được ...|2025-12-06 15:00:21.496511|
|1915006809356426|facebook|Tốt nghiệp xong, tân cử nhân Trường Đại học Thă...|2025-12-06 15:00:21.496511|
+----------------+--------+--------------------------------------------------+--------------------------+
only showing top 5 rows


Hoàn tất kiểm tra.


                                                                                

In [16]:
# Dừng Spark Session để giải phóng resources
spark.stop()
print(" Spark Session đã được dừng!")

 Spark Session đã được dừng!
