#### **Customers**


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from typing import List
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from delta.tables import DeltaTable



In [0]:
import os
import sys

In [0]:
class transformations:

    def dedup(self,df:DataFrame,dedup_cols:List,cdc:str):
        
        df = df.withColumn('dedupkey',concat(*dedup_cols))

        df = df.withColumn('dedupcounts',row_number() \
                    .over(Window.partitionBy('dedupkey').orderBy(desc(cdc))))

        df = df.filter(col('dedupcounts')==1)

        df = df.drop('dedupkey','dedupcounts')

        return df
    
    def process(self,df):

        df = df.withColumn('process_timestamp',current_timestamp())

        return df
    
    def upsert(self,df,key_cols,table,cdc):

        merge_condition = 'AND'.join(f's.{i} = t.{i}' for i in key_cols)
        dltobject = DeltaTable.forName(spark,f'pysparkdbt.silver.{table}')
        dltobject.alias("t").merge(df.alias("s"),merge_condition) \
            .whenMatched().updateAll(condition = f's.{cdc} > t.{cdc}') \
            .whenNotMatched().insertAll() \
            .execute()
        return 1


In [0]:
df_cust = spark.read.table('pysparkdbt.bronze.customers')

In [0]:
df_cust = df_cust.withColumn('domain',split(col("email"),"@")[1])



In [0]:
df_cust = df_cust.withColumn('full_name', concat(col('first_name'), lit(' '), col('last_name'))).drop('first_name','last_name')


In [0]:
df_cust = df_cust.withColumn('phone_number',regexp_replace(col('phone_number'),'[^0-9]',''))



In [0]:
cust_obj = transformations()

cust_df_transf = cust_obj.dedup(df_cust,['customer_id'],'last_updated_timestamp')
display(cust_df_transf)


In [0]:
df_cust = cust_obj.process(cust_df_transf)
display(df_cust)

In [0]:
if not spark.catalog.tableExists('pysparkdbt.silver.customers'):

    cust_df_transf.write.format('delta').mode('append').saveAsTable('pysparkdbt.silver.customers')
else:

    cust_obj.upsert(df_cust,['customer_id'],'customers','last_updated_timestamp')


In [0]:
%sql

select count(*) from  pysparkdbt.silver.customers;

### **Drivers**

In [0]:
df_drivers = spark.read.table('pysparkdbt.bronze.drivers')
df_drivers.display()

In [0]:
df_drivers = df_drivers.withColumn('full_name',concat_ws(" ",col('first_name'),col('last_name'))).drop('first_name','last_name')
df_drivers.display()

In [0]:
df_drivers = df_drivers.withColumn('phone_number',regexp_replace(col('phone_number'),'[^0-9]',''))
df_drivers.display()

In [0]:
driver_obj = transformations()

df_drivers = driver_obj.dedup(df_drivers,['driver_id'],'last_updated_timestamp')
df_drivers.display()


In [0]:
df_drivers = driver_obj.process(df_drivers)


In [0]:
if not spark.catalog.tableExists('pysparkdbt.silver.drivers'):

    df_drivers.write.format('delta').mode('append').saveAsTable('pysparkdbt.silver.drivers')
else:

    driver_obj.upsert(df_drivers,['driver_id'],'drivers','last_updated_timestamp')

#### **Locations** 

In [0]:
df_locations = spark.read.table('pysparkdbt.bronze.locations')
df_locations.display()


In [0]:
loc_object = transformations()
df_locations = loc_object.dedup(df_locations,['location_id'],'last_updated_timestamp')
df_locations = loc_object.process(df_locations)

In [0]:
if not spark.catalog.tableExists('pysparkdbt.silver.locations'):

    df_locations.write.format('delta').mode('append').saveAsTable('pysparkdbt.silver.locations')
else:
    loc_object.upsert(df_locations,['location_id'],'locations','last_updated_timestamp')



### **Payments**

In [0]:
df_payments = spark.read.table('pysparkdbt.bronze.payments')
df_payments.display()


In [0]:
df_pay = df_payments.withColumn('online_payment_status',when(((col('payment_method') == 'Card') & (col('payment_status') == 'Sucess')),'online_sucess') .when(((col('payment_method') == 'Card') & (col('payment_status') == 'Failed')),'online_failed').when(((col('payment_method') == 'Card') & (col('payment_status') == 'Pending')),'online_pending').otherwise('offline'))
df_pay.display()


In [0]:
pay_obj = transformations()

df_pay = pay_obj.dedup(df_pay,['payment_id'],'last_updated_timestamp')
df_pay = pay_obj.process(df_pay)

In [0]:
if not spark.catalog.tableExists('pysparkdbt.silver.payments'):

    df_pay.write.format('delta').mode('append').saveAsTable('pysparkdbt.silver.payments')
else:
    pay_obj.upsert(df_pay,['payment_id'],'payments','last_updated_timestamp')

In [0]:
spark.sql('select count(*) from pysparkdbt.silver.drivers').display()

### **Vehicles**


In [0]:
df_veh = spark.read.table('pysparkdbt.bronze.vehicles')
df_veh.display()

In [0]:
df_veh = df_veh.withColumn('make',upper(col('make')))
df_veh.display()

In [0]:
veh_object = transformations()

df_veh = veh_object.dedup(df_veh,['vehicle_id'],'last_updated_timestamp')
df_veh = veh_object.process(df_veh)

if not spark.catalog.tableExists('pysparkdbt.silver.vehicles'):

    df_veh.write.format('delta').mode('append').saveAsTable('pysparkdbt.silver.vehicles')
else:
    veh_object.upsert(df_veh,['vehicle_id'],'vehicles','last_updated_timestamp')