In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, LongType, StringType

spark = SparkSession.builder \
    .appName("Check Parquet Files") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
    .getOrCreate()
parquet_schema = StructType([
    StructField("id", LongType(), True),
    StructField("name", StringType(), True),
    StructField("short_description", StringType(), True),
    StructField("price", LongType(), True),
    StructField("list_price", LongType(), True),
    StructField("discount", LongType(), True),
    StructField("discount_rate", LongType(), True),
    StructField("all_time_quantity_sold", LongType(), True),
    StructField("inventory_status", StringType(), True),
    StructField("stock_item_qty", LongType(), True),
    StructField("stock_item_max_sale_qty", LongType(), True)
])

df = spark.read.schema(parquet_schema).parquet("hdfs://namenode:9000/user/airflow/details/")
df.printSchema() 
df.show(truncate=True)

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- short_description: string (nullable = true)
 |-- price: long (nullable = true)
 |-- list_price: long (nullable = true)
 |-- discount: long (nullable = true)
 |-- discount_rate: long (nullable = true)
 |-- all_time_quantity_sold: long (nullable = true)
 |-- inventory_status: string (nullable = true)
 |-- stock_item_qty: long (nullable = true)
 |-- stock_item_max_sale_qty: long (nullable = true)

+---------+--------------------+--------------------+------+----------+--------+-------------+----------------------+----------------+--------------+-----------------------+
|       id|                name|   short_description| price|list_price|discount|discount_rate|all_time_quantity_sold|inventory_status|stock_item_qty|stock_item_max_sale_qty|
+---------+--------------------+--------------------+------+----------+--------+-------------+----------------------+----------------+--------------+-----------------------+
|2

In [3]:
from pyspark.sql.functions import col, trim

required_columns = ["id", "name", "price", "list_price", "inventory_status"]
df_clean = df.dropna(subset=required_columns)

df_clean = df_clean.dropDuplicates()

# 3. Lọc các giá trị không hợp lệ (ví dụ: giá hoặc số lượng âm)
df_clean = df_clean.filter(
    (col("price") >= 0) &
    (col("list_price") >= 0) &
    (col("discount") >= 0) &
    (col("discount_rate") >= 0) &
    (col("all_time_quantity_sold") >= 0) &
    (col("stock_item_qty") >= 0) &
    (col("stock_item_max_sale_qty") >= 0)
)

df_clean = df_clean.withColumn("name", trim(col("name"))) \
                   .withColumn("short_description", trim(col("short_description"))) \
                   .withColumn("inventory_status", trim(col("inventory_status")))

df_clean.printSchema()
df_clean.show(truncate=True)


root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- short_description: string (nullable = true)
 |-- price: long (nullable = true)
 |-- list_price: long (nullable = true)
 |-- discount: long (nullable = true)
 |-- discount_rate: long (nullable = true)
 |-- all_time_quantity_sold: long (nullable = true)
 |-- inventory_status: string (nullable = true)
 |-- stock_item_qty: long (nullable = true)
 |-- stock_item_max_sale_qty: long (nullable = true)

+---------+--------------------+--------------------+------+----------+--------+-------------+----------------------+----------------+--------------+-----------------------+
|       id|                name|   short_description| price|list_price|discount|discount_rate|all_time_quantity_sold|inventory_status|stock_item_qty|stock_item_max_sale_qty|
+---------+--------------------+--------------------+------+----------+--------+-------------+----------------------+----------------+--------------+-----------------------+
|2