In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from delta.tables import DeltaTable
from pyspark.sql.window import Window

In [0]:
tables = ['customers','products','categories','orders','payments',]

In [0]:
for table in tables:
    df = spark.sql(f'select * from main.bronze_al.{table}')

    numeric_col=[c for c in df.columns if 'amount' in c.lower() or 'price' in c.lower()]
    for i in numeric_col:
        df = df.withColumn(i, col(i).cast('double'))

    primary_keys = {
    "customers": "customer_id",
    "products": "product_id",
    "categories": "category_id",
    "orders": "order_id",
    "payments": "payment_id"
    }
    pk = primary_keys[table]

    df = df.filter(col(pk).isNotNull())
    
    silver_table = f'main.silver_al.{table}'
    
    windows  = Window.partitionBy(pk).orderBy(col('ingest_time').desc(),col('source_file').desc())
    df= df.withColumn('rn', row_number().over(windows)).filter(col('rn')==1).drop('rn')


    if table in ['orders','payments']:
        if spark.catalog.tableExists(silver_table):
            delta = DeltaTable.forName(spark,silver_table)
            delta.alias('trg').merge(df.alias('src'),f'src.{pk}=trg.{pk}')\
            .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll().execute()
        else:
            df.write.format('delta').mode('overwrite').saveAsTable(silver_table)
    else:
         df.write.format('delta').mode('overwrite').option('overwriteSchema',True).saveAsTable(silver_table)