In [23]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import StructType
from delta import *
from with_reviews_columns import *

def drop_null_columns(df: DataFrame) -> DataFrame:
    non_null_counts = df.select([F.count(F.when(F.col(c).isNotNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
    to_drop = [k for k, v in non_null_counts.items() if v == 0]
    df = df.drop(*to_drop)
    return df

builder = SparkSession.builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
df = spark.read.format("json").load("/mnt/d/hust/code/thesis/warehouse/daily/tgdd/20250225/0.json")
categories_df = df.select("category").distinct().filter(df.category == "dtdd")
category_list = [row['category'] for row in categories_df.collect()]
for category in category_list:
    print(category)
    new_df = df.filter(F.col("category") == category)
    new_df = drop_null_columns(new_df)
    new_df = with_reviews_columns(new_df)
    col_to_convert = new_df.select(review_column_names).columns
    new_df = new_df.withColumn("reviews", create_list_reviews_udf(*[new_df[c] for c in col_to_convert]))
    new_df.printSchema()
    columns = new_df.columns
    for column in columns:
        field = new_df.schema[column]
        if isinstance(field.dataType, StructType):
            print("phai drop cot nay thoi:", field.name)
            struct_fields = field.dataType.fields
            for struct_field in struct_fields:
                field_name = struct_field.name
                if field_name != column:
                    new_df = new_df.withColumn(field_name, F.col(f"{column}.{field_name}"))
                else: 
                    new_df = new_df.withColumn(f"temp_{field_name}", F.col(f"{column}.{field_name}"))
            new_df = new_df.drop(field.name)
        if f"temp_{column}" in new_df.columns:
            new_df = new_df.withColumn(column, F.col(f"temp_{column}")).drop(f"temp_{column}")
    new_df = drop_null_columns(new_df)
    new_df.printSchema()
    new_df.write.format("delta").save(f"/mnt/d/hust/code/thesis/warehouse/by_categories/tgdd/{category}")

dtdd
root
 |-- brand: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- camera_man_hinh: struct (nullable = true)
 |    |-- cong_nghe_man_hinh: string (nullable = true)
 |    |-- den_flash_camera_sau: string (nullable = true)
 |    |-- do_phan_giai_camera_sau: string (nullable = true)
 |    |-- do_phan_giai_camera_truoc: string (nullable = true)
 |    |-- do_phan_giai_man_hinh: string (nullable = true)
 |    |-- do_sang_toi_da: string (nullable = true)
 |    |-- man_hinh_rong: string (nullable = true)
 |    |-- mat_kinh_cam_ung: string (nullable = true)
 |    |-- quay_phim_camera_sau: string (nullable = true)
 |    |-- tinh_nang_camera_sau: string (nullable = true)
 |    |-- tinh_nang_camera_truoc: string (nullable = true)
 |-- category: string (nullable = true)
 |-- cau_hinh_bo_nho: struct (nullable = true)
 |    |-- chip_do_hoa: string (nullable = true)
 |    |-- chip_xu_ly: string (nullable = true)
 |    |-- danh_ba: string (nullable = true)
 |    |-- dung

                                                                                

root
 |-- brand: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- category: string (nullable = true)
 |-- crawled_at: string (nullable = true)
 |-- description: string (nullable = true)
 |-- image: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- price: double (nullable = true)
 |-- price_origin: string (nullable = true)
 |-- price_present: string (nullable = true)
 |-- price_valid_ultil: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- updated_at: string (nullable = true)
 |-- url: string (nullable = true)
 |-- review_author_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- review_date: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- review_description: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- review_body: array (nullable = true)
 |    |-- element: string (containsNul

                                                                                

In [25]:
df = spark.read.format("delta").load("/mnt/d/hust/code/thesis/warehouse/by_categories/tgdd/dtdd")
df.show(df.count(),truncate=False)

+----------------+--------+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------+---------+------------+-------------+-----------------+----------------------------+-----------------------------------------------+----------+---------------------------------------------------------------+-------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------