In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit 

# 1. Tạo SparkSession
# appName: Tên ứng dụng Spark của bạn
# master: 'local[*]' nghĩa là Spark sẽ chạy cục bộ trên tất cả các nhân có sẵn
spark = SparkSession.builder \
    .appName("CSVtoParquet") \
    .master("local[*]") \
    .getOrCreate()

print("SparkSession đã được tạo thành công.")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/26 08:39:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


SparkSession đã được tạo thành công.


In [None]:
csv_file_path = "/home/phamminh/work/BIDV/kedro_research/kedro-spark/data/spark_data/source/data.csv"  # Đảm bảo file data.csv nằm trong cùng thư mục với script này
df = spark.read.csv(csv_file_path, inferSchema=True, header=True)

In [47]:
df.show()

+---+-------+------+---------------+
| id|   name|   age|processing_date|
+---+-------+------+---------------+
|  1|  Alice|    30|     2025-01-02|
|  2|    Bob|    24|     2025-01-02|
|  3|Charlie|    35|     2025-01-02|
|  4|   Minh|    25|     2025-01-03|
|  5|mmmmmmm| 22222|     2025-01-05|
|  6| djashd| 22222|     2025-01-05|
|  7|asjkdsa|238927|     2025-01-05|
|  8|   dshd| 23532|     2025-01-07|
|  9|   dshd| 23532|     2025-01-07|
| 10|   dshd| 23532|     2025-01-07|
| 11|   dshd| 23532|     2025-01-08|
| 12|   dshd| 23532|     2025-01-08|
| 13|   dshd| 23532|     2025-01-09|
+---+-------+------+---------------+



In [None]:
# custom_date_string = "2025-01-02"
# df1 = df.withColumn("processing_date", lit(custom_date_string))

In [None]:
# df1.show()

+---+-------+---+---------------+
| id|   name|age|processing_date|
+---+-------+---+---------------+
|  1|  Alice| 30|     2025-01-02|
|  2|    Bob| 24|     2025-01-02|
|  3|Charlie| 35|     2025-01-02|
+---+-------+---+---------------+



In [None]:
parquet_output_path = "/home/phamminh/work/BIDV/kedro_research/kedro-spark/data/dynamic_data/reviews"
# partitionBy("processing_date"): Spark sẽ tạo các thư mục con dựa trên giá trị của cột 'processing_date'
df.write.mode("overwrite").partitionBy("processing_date").parquet(parquet_output_path)

In [3]:
df_parquet = spark.read.parquet("/home/phamminh/work/BIDV/kedro_research/kedro-spark/data/dynamic_data/reviews/2025-01-01")

In [4]:
df_parquet.show()

+----------+--------------------+---------------------+-----------------------+------------------+------------------+----------------------+-------------------+-----------------+-----------------+
|shuttle_id|review_scores_rating|review_scores_comfort|review_scores_amenities|review_scores_trip|review_scores_crew|review_scores_location|review_scores_price|number_of_reviews|reviews_per_month|
+----------+--------------------+---------------------+-----------------------+------------------+------------------+----------------------+-------------------+-----------------+-----------------+
|     45163|                91.0|                 10.0|                    9.0|               9.0|               9.0|                   9.0|                9.0|               26|             0.77|
|     49438|                96.0|                 10.0|                   10.0|              10.0|              10.0|                  10.0|                9.0|               61|             0.62|
|     10750|   

In [52]:
df_parquet = spark.read.parquet(parquet_output_path).filter("processing_date >= '2025-01-05' and processing_date <= '2025-01-08'")

In [53]:
df_parquet.show()

+---+-------+------+---------------+
| id|   name|   age|processing_date|
+---+-------+------+---------------+
|  8|   dshd| 23532|     2025-01-07|
|  9|   dshd| 23532|     2025-01-07|
| 10|   dshd| 23532|     2025-01-07|
| 11|   dshd| 23532|     2025-01-08|
| 12|   dshd| 23532|     2025-01-08|
|  5|mmmmmmm| 22222|     2025-01-05|
|  6| djashd| 22222|     2025-01-05|
|  7|asjkdsa|238927|     2025-01-05|
+---+-------+------+---------------+



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format, to_date, lit
from datetime import date, timedelta
import calendar

def check_monthly_data_completeness(spark: SparkSession, parquet_base_path: str, year: int, month: int):
    """
    Kiểm tra sự đầy đủ của dữ liệu theo ngày trong một tháng cụ thể
    dựa trên partition 'processing_date'.

    Args:
        spark (SparkSession): Đối tượng SparkSession.
        parquet_base_path (str): Đường dẫn cơ sở đến thư mục Parquet chứa dữ liệu đã partition.
                                 Ví dụ: "/path/to/your/data"
        year (int): Năm cần kiểm tra (ví dụ: 2025).
        month (int): Tháng cần kiểm tra (ví dụ: 4 cho tháng 4).
    """

    print(f"--- Bắt đầu kiểm tra dữ liệu tháng {month}/{year} ---")

    # 1. Xác định tất cả các ngày trong tháng
    num_days = calendar.monthrange(year, month)[1] # Lấy số ngày trong tháng
    expected_dates = []
    for day in range(1, num_days + 1):
        expected_date = date(year, month, day)
        expected_dates.append(expected_date.strftime("%Y-%m-%d"))

    print(f"Tổng số ngày dự kiến trong tháng {month}/{year}: {len(expected_dates)} ngày.")
    # print(f"Các ngày dự kiến: {expected_dates}") # Có thể bỏ comment để kiểm tra

    # 2. Đọc các processing_date hiện có từ dữ liệu
    # Tạo đường dẫn cụ thể đến dữ liệu của tháng đó (nếu có thể để giảm tải)
    # Tuy nhiên, nếu bạn chỉ muốn quét các partition, Spark sẽ tự động tối ưu.
    # Đọc dữ liệu từ đường dẫn cơ sở và chọn cột processing_date
    try:
        # Sử dụng pattern để chỉ đọc các partition của tháng đó nếu có thể.
        # Lưu ý: Spark sẽ tự động lọc partition khi đọc, nhưng việc chỉ định rõ ràng
        #       có thể giúp đọc ít metadata hơn nếu có nhiều partition khác.
        #       Tuy nhiên, cách phổ biến nhất là đọc toàn bộ bảng và lọc sau.
        #       Với mục đích kiểm tra partition, chúng ta sẽ đọc từ base path
        #       và sau đó trích xuất partition key.
        df_existing = spark.read.parquet(parquet_base_path) \
                           .select(col("processing_date")) \
                           .distinct() # Lấy các ngày duy nhất

        # Chuyển đổi về định dạng "YYYY-MM-DD" để so sánh dễ dàng
        existing_dates = [row.processing_date for row in df_existing.collect()]
        existing_dates_str = [d.strftime("%Y-%m-%d") for d in existing_dates] # Chuyển Date object sang string

        print(f"Tổng số ngày thực tế tìm thấy trong dữ liệu: {len(existing_dates_str)} ngày.")
        # print(f"Các ngày thực tế: {existing_dates_str}") # Có thể bỏ comment để kiểm tra

    except Exception as e:
        print(f"LỖI: Không thể đọc dữ liệu từ đường dẫn '{parquet_base_path}'. Vui lòng kiểm tra đường dẫn hoặc quyền truy cập. Chi tiết: {e}")
        return # Thoát hàm nếu không đọc được dữ liệu

    # 3. So sánh và tìm các ngày thiếu
    missing_dates = sorted(list(set(expected_dates) - set(existing_dates_str)))

    # 4. Ghi log và báo cáo
    if missing_dates:
        print(f"\n!!! LỖI: Dữ liệu tháng {month}/{year} THIẾU các ngày sau:")
        for missing_date in missing_dates:
            print(f"- {missing_date}")
        print(f"Tổng số ngày thiếu: {len(missing_dates)} ngày.")
        # Bạn có thể thêm logic để gửi email, kích hoạt cảnh báo, v.v. ở đây
        # Ví dụ: raise Exception(f"Dữ liệu tháng {month}/{year} thiếu {len(missing_dates)} ngày.")
    else:
        print(f"\n--- TUYỆT VỜI! Dữ liệu tháng {month}/{year} ĐẦY ĐỦ tất cả các ngày. ---")

    print(f"\n--- Kết thúc kiểm tra dữ liệu tháng {month}/{year} ---")


# --- Cách sử dụng ---
if __name__ == "__main__":
    # Khởi tạo SparkSession
    spark = SparkSession.builder \
        .appName("DataCompletenessChecker") \
        .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
        .getOrCreate()

    # Đường dẫn đến thư mục Parquet gốc của bạn
    # Ví dụ: Nếu dữ liệu của bạn ở /user/hive/warehouse/your_table/
    # Và partition là /user/hive/warehouse/your_table/processing_date=2025-04-01/
    # thì base_path là /user/hive/warehouse/your_table
    parquet_data_path = "data/spark_data/target/data_demo" # THAY THẾ BẰNG ĐƯỜNG DẪN THỰC TẾ CỦA BẠN

    # Năm và tháng bạn muốn kiểm tra
    target_year = 2025
    target_month = 1 # Tháng 4

    check_monthly_data_completeness(spark, parquet_data_path, target_year, target_month)



--- Bắt đầu kiểm tra dữ liệu tháng 1/2025 ---
Tổng số ngày dự kiến trong tháng 1/2025: 31 ngày.
Tổng số ngày thực tế tìm thấy trong dữ liệu: 6 ngày.

!!! LỖI: Dữ liệu tháng 1/2025 THIẾU các ngày sau:
- 2025-01-01
- 2025-01-04
- 2025-01-06
- 2025-01-10
- 2025-01-11
- 2025-01-12
- 2025-01-13
- 2025-01-14
- 2025-01-15
- 2025-01-16
- 2025-01-17
- 2025-01-18
- 2025-01-19
- 2025-01-20
- 2025-01-21
- 2025-01-22
- 2025-01-23
- 2025-01-24
- 2025-01-25
- 2025-01-26
- 2025-01-27
- 2025-01-28
- 2025-01-29
- 2025-01-30
- 2025-01-31
Tổng số ngày thiếu: 25 ngày.

--- Kết thúc kiểm tra dữ liệu tháng 1/2025 ---
