In [0]:
from pyspark.sql.functions import col, to_date , round , to_timestamp , to_utc_timestamp , when
from pyspark.sql.types import IntegerType, FloatType , DoubleType



def transform_customers(df):
    return df.withColumn("registration_date", to_date(col("registration_date"), "yyyy-MM-dd")) \
             .withColumn("total_orders", col("total_orders").cast(IntegerType())) \
             .withColumn("avg_order_value",col("avg_order_value").cast(DoubleType())) \
             .withColumn("avg_order_value", round(col("avg_order_value"), 2)) \
             .dropDuplicates(["customer_id"])

def transform_products(df):
    return df.withColumn("price", round(col("price").cast(DoubleType()),2)) \
             .withColumn("mrp", round(col("mrp").cast(DoubleType()),2)) \
             .withColumn("margin_percentage", round(col("margin_percentage").cast(DoubleType()),2)) \
             .withColumn("shelf_life_days", col("shelf_life_days").cast(IntegerType())) \
             .withColumn("min_stock_level", col("min_stock_level").cast(IntegerType())) \
             .withColumn("max_stock_level", col("max_stock_level").cast(IntegerType())) \
             .dropDuplicates(["product_id"])

def transform_orders(df):
    return df.withColumn("order_date", to_timestamp(col("order_date"), "yyyy-MM-dd HH:mm:ss")) \
             .withColumn("promised_delivery_time", to_timestamp(col("promised_delivery_time"), "yyyy-MM-dd HH:mm:ss")) \
             .withColumn("actual_delivery_time", to_timestamp(col("actual_delivery_time"), "yyyy-MM-dd HH:mm:ss")) \
             .withColumn("order_total", round(col("order_total").cast(DoubleType()),2)) \
             .dropDuplicates(["order_id"])

def transform_orderitems(df):
    return df.withColumn("quantity", col("quantity").cast(IntegerType())) \
             .withColumn("unit_price", round(col("unit_price").cast(DoubleType()),2)) \
             .dropDuplicates(["order_id"])

def transform_deliveryperformance(df):
    return df.withColumn("reasons_if_delayed", when(col("reasons_if_delayed").isNull(), "No Delay").otherwise(col("Reasons_if_Delayed"))) \
             .withColumn("delivery_time_minutes",col("delivery_time_minutes").cast(DoubleType())) \
             .withColumn("distance_km", round(col("distance_km").cast(DoubleType()),2)) \
             .withColumn("promised_time", to_timestamp(col("promised_time"), "yyyy-MM-dd HH:mm:ss")) \
             .withColumn("actual_time", to_timestamp(col("actual_time"), "yyyy-MM-dd HH:mm:ss")) \
             .dropDuplicates(["order_id"])

def transform_customerfeedback(df):
    return df.withColumn("rating", col("rating").cast(IntegerType())) \
             .withColumn("feedback_date", to_date(col("feedback_date"), "yyyy-MM-dd")) \
             .dropDuplicates(["order_id"])



# Define a dictionary of transformations for each table
transformations = {
    "customers": transform_customers,
    "products": transform_products,
    "orders": transform_orders,
    "orderitems": transform_orderitems,
    "deliveryperformance": transform_deliveryperformance,
    "customerfeedback": transform_customerfeedback,
}

   
# Define the base paths
bronze_base = "abfss://bronze@blinkitdl.dfs.core.windows.net/"
silver_base = "abfss://silver@blinkitdl.dfs.core.windows.net/"
meta_base = "blinkit.silver."

# Loop through each table and apply transformations
for table_name, transformation_fn in transformations.items():
    print(f"Processing: {table_name}")
    
    bronze_path = f"{bronze_base}{table_name}"
    silver_path = f"{silver_base}{table_name}"
    meta_silver = f"{meta_base}{table_name}"
    # Load the data from the Bronze layer
    
 
    df = spark.read.format("parquet").option("header",True).option("inferSchema",True).load(bronze_path)
    
    # Apply each transformation sequentially
    transformed_df = transformation_fn(df)
    
    # Write to the Silver layer
    transformed_df.write.format("delta") \
        .mode("append") \
        .option("path",silver_path) \
        .saveAsTable(meta_silver)

Processing: customers
Processing: products
Processing: orders
Processing: orderitems
Processing: deliveryperformance
Processing: customerfeedback
