##bronze layer.
####lee desde archivo csv y guarda en tabla delta en capa bronze
##silver layer.
####lee bronze, tranforma y limpia datos y guarda en tabla delta en capa bronze
##gold layer
#####lee tabla de silver, crea tabla de hechos y dimensiones

In [0]:
import dlt
from pyspark.sql.functions import col, trim, regexp_replace, monotonically_increasing_id, row_number
from pyspark.sql.window import Window

# ----------------------------------
# BRONZE LAYER: Ingest Raw CSV File
# ----------------------------------

dlt.create_table(
    name="clothes_bronze",
    comment="Raw clothes dataset loaded from CSV"
)

@dlt.table(name="clothes_bronze")
def load_bronze():
    return (
        spark.read.format("csv")
            .option("inferSchema", False)
            .option("header", True)
            .option("sep", ",")
            .option("skipRows", 1)
            .load("/FileStore/tables/clothes_price_prediction_dat.csv")
            .withColumnRenamed("Price;;;", "Price")
    )

# ----------------------------------
# SILVER LAYER: Clean the Data
# ----------------------------------

dlt.create_table(
    name="clothes_silver",
    comment="Cleaned clothes data without nulls or duplicates"
)

@dlt.table(name="clothes_silver")
def clean_clothes():
    df = dlt.read("clothes_bronze")
    df_cleaned = (
        df.dropna()
          .dropDuplicates()
          .select([trim(col(c)).alias(c) for c in df.columns])
    )
    return df_cleaned.withColumn("Price", regexp_replace(col("Price"), ";", "").cast("double"))

# ----------------------------------
# GOLD LAYER: Create Star Schema (Dimensions + Fact Table)
# ----------------------------------

# Utility function to add ID
def add_surrogate_key(df, col_name):
    return df.select(col(col_name)).distinct().withColumn(f"{col_name}_id", row_number().over(Window.orderBy(col(col_name))))

# DIMENSION: Brand
@dlt.table(name="dim_brand")
def dim_brand():
    return add_surrogate_key(dlt.read("clothes_silver"), "Brand")

# DIMENSION: Category
@dlt.table(name="dim_category")
def dim_category():
    return add_surrogate_key(dlt.read("clothes_silver"), "Category")

# DIMENSION: Color
@dlt.table(name="dim_color")
def dim_color():
    return add_surrogate_key(dlt.read("clothes_silver"), "Color")

# DIMENSION: Size
@dlt.table(name="dim_size")
def dim_size():
    return add_surrogate_key(dlt.read("clothes_silver"), "Size")

# DIMENSION: Material
@dlt.table(name="dim_material")
def dim_material():
    return add_surrogate_key(dlt.read("clothes_silver"), "Material")

# FACT TABLE: Clothes Pricing
@dlt.table(name="fact_clothes_price")
def fact_clothes_price():
    silver_df = dlt.read("clothes_silver")

    brand = dlt.read("dim_brand")
    category = dlt.read("dim_category")
    color = dlt.read("dim_color")
    size = dlt.read("dim_size")
    material = dlt.read("dim_material")

    return (
        silver_df
            .join(brand, "Brand")
            .join(category, "Category")
            .join(color, "Color")
            .join(size, "Size")
            .join(material, "Material")
            .select(
                col("Brand_id"),
                col("Category_id"),
                col("Color_id"),
                col("Size_id"),
                col("Material_id"),
                col("Price")
            )
    )
