In [0]:
from pyspark.sql.functions import col, to_date

df = spark.table("sales_lakehouse.bronze.product_sales_raw")

# Normalize column names (remove spaces, lowercase)
for c in df.columns:
    df = df.withColumnRenamed(c, c.strip().lower())

silver_df = (
    df.dropDuplicates(["order_id"])
      .withColumn("order_date", to_date(col("order_date")))
      .withColumn("quantity", col("quantity").cast("int"))
      .withColumn("unit_price", col("unit_price").cast("double"))
      .withColumn("revenue", col("revenue").cast("double"))
      .withColumn("profit", col("profit").cast("double"))
      .filter(col("quantity") > 0)
      .filter(col("revenue") >= 0)
)

silver_df.write.mode("overwrite").format("delta") \
  .saveAsTable("sales_lakehouse.silver.product_sales_clean")

# Validate
spark.table("sales_lakehouse.silver.product_sales_clean").count()
