In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import * 
from typing import List 
from pyspark.sql import DataFrame 
from pyspark.sql.window import Window 
from delta.tables import DeltaTable

In [0]:
import os 
import sys 

In [0]:
current_dir = os.getcwd()

sys.path.append(current_dir)

In [0]:
class transformations: 

    def dedup(self,df:DataFrame,dedup_cols:list,cdc:str):
        
        df = df.withColumn("dedupKey",concat(*dedup_cols)) 
        df = df.withColumn("dedupCounts",row_number().over(Window.partitionBy("dedupKey").orderBy(desc(cdc))))
        df = df.filter(col("dedupCounts")==1)
        df = df.drop("dedupKey","dedupCounts")

        return df 
    
    def process_timestamp(self,df): 

        df = df.withColumn("process_timestamp",current_timestamp())

        return df 
    
    def upsert(self,df,key_cols,table,cdc):
        merge_condition = " AND " .join([f"src.{i} = trg.{i}" for i in key_cols])
        dlt_obj = DeltaTable.forName(spark,f"pysparkdbt.silver.{table}") 
        dlt_obj.alias("trg").merge(df.alias("src"),merge_condition)\
                            .whenMatchedUpdateAll(condition = f"src.{cdc} >= trg.{cdc}")\
                            .whenNotMatchedInsertAll()\
                            .execute() 
                            
        return 1
    

####**CUSTOMERS**

In [0]:
df_cust = spark.read.table("pysparkdbt.bronze.customers")

In [0]:
df_cust = df_cust.withColumn("domain",split(col('email'),"@")[1])


In [0]:
df_cust = df_cust.withColumn("phone_number",regexp_replace("phone_number",r"[^0-9]",""))


In [0]:
df_cust = df_cust.withColumn("full_name",concat_ws(" ",col("first_name"),col("last_name")))
df_cust = df_cust.drop("first_name","last_name")


In [0]:
cust_obj = transformations()

cust_df_trns = cust_obj.dedup(df_cust,["customer_id"],"last_updated_timestamp")


In [0]:
df_cust = cust_obj.process_timestamp(cust_df_trns)



In [0]:


if not spark.catalog.tableExists("pysparkdbt.silver.customers"):

    df_cust.write.format("delta")\
            .mode("append")\
            .saveAsTable("pysparkdbt.silver.customers") 

else:  

    cust_obj.upsert(df_cust,['customer_id'],'customers','last_updated_timestamp')

#### **DRIVERS**


In [0]:
df_driver = spark.read.table("pysparkdbt.bronze.drivers")


In [0]:
df_driver = df_driver.withColumn("phone_number",regexp_replace("phone_number",r"[^0-9]",""))


In [0]:
df_driver = df_driver.withColumn("full_name",concat_ws(" ",col("first_name"),col("last_name")))
df_driver = df_driver.drop("first_name","last_name")

In [0]:
driver_obj = transformations()

In [0]:
df_driver = driver_obj.dedup(df_driver,["driver_id"],"last_updated_timestamp")

In [0]:
df_driver = driver_obj.process_timestamp(df_driver)

In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.drivers"):

    df_driver.write.format("delta")\
            .mode("append")\
            .saveAsTable("pysparkdbt.silver.drivers") 

else:  

    driver_obj.upsert(df_driver,['driver_id'],'drivers','last_updated_timestamp')

#### **LOCATIONS**

In [0]:
df_loc = spark.read.table("pysparkdbt.bronze.locations")

In [0]:
loc_obj = transformations()

In [0]:
df_loc = loc_obj.dedup(df_loc,["location_id"],"last_updated_timestamp")
df_loc = loc_obj.process_timestamp(df_loc)

In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.loactions"):

    df_loc.write.format("delta")\
            .mode("append")\
            .saveAsTable("pysparkdbt.silver.locations") 

else:  

    loc_obj.upsert(df_loc,['location_id'],'locations','last_updated_timestamp')

#### **PAYMENTS**

In [0]:
df_pay = spark.read.table("pysparkdbt.bronze.payments")


In [0]:
df_pay = df_pay.withColumn(
    "online_payment_status",
    when( (col('payment_method') == 'Card') & (col('payment_status') == 'Success'),"online-success" )
    .when( (col('payment_method') == 'Card') & (col('payment_status') == 'failed'),"online-failed" )
    .when( (col('payment_method') == 'Card') & (col('payment_status') == 'pending'),"online-pending" )
    .otherwise("offline")
)



In [0]:
payment_obj = transformations()
df_pay = payment_obj.dedup(df_pay,["payment_id"],"last_updated_timestamp")
df_pay = payment_obj.process_timestamp(df_pay)

In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.payments"):

    df_pay.write.format("delta")\
            .mode("append")\
            .saveAsTable("pysparkdbt.silver.payments") 

else:  

    payment_obj.upsert(df_pay,['payment_id'],'payments','last_updated_timestamp')

#### **VEHICLES**

In [0]:
df_veh = spark.read.table("pysparkdbt.bronze.vehicles")

In [0]:
df_veh = df_veh.withColumn("make",upper(col("make")))


In [0]:
vehicle_obj = transformations()
df_veh = vehicle_obj.dedup(df_veh,["vehicle_id"],"last_updated_timestamp")
df_veh = vehicle_obj.process_timestamp(df_veh)

In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.vehicles"):

    df_veh.write.format("delta")\
            .mode("append")\
            .saveAsTable("pysparkdbt.silver.vehicles") 

else:  

    vehicle_obj.upsert(df_veh,['vehicle_id'],'vehicles','last_updated_timestamp')