In [0]:
from pyspark.sql.functions import col,datediff,current_date,to_date
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number


In [0]:
raw_df=spark.read.table('bronze_sales')


In [0]:
raw_df=raw_df.drop("YEAR","MONTH","_metadata")
raw_df.describe()

In [0]:
raw_df.display()

In [0]:
df_casted = (
    raw_df
    .withColumn("Order_id",col("Order_id").cast("int"))
    .withColumn("RETAIL_SALES", col("RETAIL_SALES").cast("double"))
    .withColumn("RETAIL_TRANSFERS", col("RETAIL_TRANSFERS").cast("int"))
    .withColumn("WAREHOUSE_SALES", col("WAREHOUSE_SALES").cast("int"))
    
)
df_final=df_casted.withColumn("event_date",to_date(col("event_date"),"MM-dd-yyyy"))

display(df_final)


Late col to check if file is late

In [0]:
silver_staging_df = (
    df_final
    .withColumn(
        "is_late",
        datediff(current_date(), col("event_date")) > 1
    )
)

Deduplicate on basis of order_id

In [0]:
window_spec = Window.partitionBy("order_id").orderBy(col("ingestts").desc())

silver_dedup_df = (
    silver_staging_df
    .withColumn("rn", row_number().over(window_spec))
    .filter(col("rn") == 1)
    .drop("rn")
)
silver_dedup_df.write \
        .option("mergeSchema", "true")\
        .format("delta") \
        .mode("overwrite") \
        .partitionBy("event_date") \
        .saveAsTable("silver_dedupt_staging")

In [0]:
if not spark.catalog.tableExists('silver_sales'):
    print("Silver table does not exist. Creating it for the first time.")
    
    silver_dedup_df.write \
        .option("mergeSchema", "true")\
        .format("delta") \
        .mode("overwrite") \
        .partitionBy("event_date") \
        .saveAsTable("silver_sales")
else:
    print("Silver table exists. Running MERGE.")


In [0]:
spark.sql(
    """
    MERGE INTO silver_sales s
    USING silver_dedupt_staging i
    ON s.order_id = i.order_id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
    """
)



In [0]:
%sql
DROP table silver_dedupt_staging