###(A) Create Live Bronze Table

In [0]:
import dlt
from pyspark.sql.functions import *

# Couche Bronze avec contraintes
@dlt.table(
  name="bronze_lidl_products",
  comment="Raw  lidl product data "
)
def bronze_rides():
  df_bronze= spark.read.format("parquet").load("/Volumes/workspace/default/lidl_products/lidl_product.parquet")
  df_bronze = (
                df_bronze
                .dropDuplicates()
                .withColumn("productId", monotonically_increasing_id()) # add ProductId monotonically increasing
                .withColumn("last_updated", current_timestamp())
                .withColumn("input_file", INPUT_FILE_NAME()) 
  )

  return df_bronze

###(B) Create Live Silver Table

In [0]:

@dlt.table(
  name="silver_ldl_products",
  comment="validated lidl product data"
)
@dlt.expect_all_or_drop({
  "new_price": "old_price IS NOT NULL AND old_price > 0",
  "title": "title IS NOT NULL"
  "category": "category IS NOT NULL"
  "price_consistency": "old_price >= new_price"
  "productId":"productId IS NOT NULL"


})
def silver_products():
  df = dlt.read("bronze_rides")
  
 # Transformation standard
  transformed = (
    df
      .filter(col("title")!="title") 
      .dropDuplicates([productId])
      .withColumnRenamed("old_price", "new_price")
      .withColumnRenamed("new_price", "old_price")
      .withColumn("discount", round(col("discount"), 2))
     
  )
  
  return   transformed



  
