### **Customer**


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from typing import List
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from delta.tables import DeltaTable


In [0]:
import os
import sys

In [0]:
from custom_utils import transformationss

In [0]:
transformationss

In [0]:
class transformations:
    def dedup(self,df:DataFrame,dedup_cols:list,cdc:str):

        # dedup_list
        df = df.withColumn("dedupKey",concat(*dedup_cols))
        df = df.withColumn("dedupCounts",row_number()\
            .over(Window.partitionBy("dedupKey").orderBy(desc(cdc))))
        df = df.filter(col('dedupCounts')==1)
        df = df.drop('dedupKey','dedupCounts')
        return df
    def process_timestamp(self,df):
        df = df.withColumn("process_timestamp",current_timestamp())
        return df
    
    def upsert(self,df,key_cols,table,cdc):
        merge_condition ="AND".join([f"src.{i}=trg.{i} "for i in key_cols])
        dlt_obj = DeltaTable.forName(spark,f"pysparkdbt.silver.{table}")
        dlt_obj.alias("trg").merge(df.alias("src"),merge_condition )\
                            .whenMatchedUpdateAll(condition = f"src.{cdc}>= trg.{cdc}")\
                            .whenNotMatchedInsertAll()\
                            .execute()

        return 1

In [0]:
df_cust = spark.read.table("pysparkdbt.bronze.customers")

In [0]:
df_cust = df_cust.withColumn("phone_number", regexp_replace(col("phone_number"), "[^0-9]", ""))
display(df_cust)

In [0]:

df_cust =df_cust.withColumn("full_name",concat_ws(" ",col("first_name"),col("last_name")))
df_cust = df_cust.drop("first_name","last_name")
display(df_cust)

In [0]:
cust_obj = transformations()
df_cust_trns = cust_obj.dedup(df_cust,['customer_id'],"last_updated_timestamp")
display(df_cust_trns)


In [0]:
df_cust = cust_obj.process_timestamp(df_cust_trns)
display(df_cust)

In [0]:


if not spark.catalog.tableExists("pysparkdbt.silver.customers"):

    df_cust.write.format("delta")\
        .mode("append")\
        .saveAsTable("pysparkdbt.silver.customers")

else:
    cust_obj.upsert(df_cust,['customer_id'],'customers','last_updated_timestamp')




In [0]:
%sql
select count(*) from pysparkdbt.silver.customers

#### Drivers


In [0]:
df_driver = spark.read.table("pysparkdbt.bronze.drivers")
display(df_driver)

## Filter Phn_numbers 


In [0]:
df_driver = df_driver.withColumn("phone_number", regexp_replace(col("phone_number"), "[^0-9]", ""))
display(df_driver)

In [0]:

df_driver =df_driver.withColumn("full_name",concat_ws(" ",col("first_name"),col("last_name")))
df_driver = df_driver.drop("first_name","last_name")
display(df_driver)

In [0]:
df_driver =df_driver.withColumn("dr_veh_ID",concat_ws(" ",col("driver_id"),col("vehicle_id")))
df_driver = df_driver.drop("driver_id","vehicle_id")
display(df_driver)

In [0]:
driver_obj = transformations()

In [0]:
df_driver = driver_obj.dedup(df_driver,["dr_veh_ID"],
"last_updated_timestamp")


In [0]:
df_driver = driver_obj.process_timestamp(df_driver)


In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.drivers"):

    df_driver.write.format("delta")\
        .mode("append")\
        .saveAsTable("pysparkdbt.silver.drivers")

else:
    driver_obj.upsert(df_driver,['dr_veh_ID'],'drivers','last_updated_timestamp')

In [0]:
%sql
select count(*) from pysparkdbt.silver.drivers

#### ****Locations****

In [0]:
df_location = spark.read.table("pysparkdbt.bronze.locations")
display(df_location)

In [0]:
location_obj = transformations()


In [0]:
df_location = location_obj.dedup(df_location,["location_id"],
"last_updated_timestamp")
df_location = location_obj.process_timestamp(df_location)
if not spark.catalog.tableExists("pysparkdbt.silver.locations"):

    df_location.write.format("delta")\
        .mode("append")\
        .saveAsTable("pysparkdbt.silver.locations")
else:
    location_obj.upsert(df_location,['location_id'],'locations','last_updated_timestamp')

In [0]:
%sql
select count(*) from pysparkdbt.bronze.locations

### ## PAYMENT

In [0]:
df_payment = spark.read.table("pysparkdbt.bronze.payments")
display(df_payment)

In [0]:
payment_obj = transformations()

In [0]:
df_payment = df_payment.withColumn("online_payment_status",
                when( ((col('payment_method')=='card')&(col('payment_status')=='success')),'online-success')
                .when( ((col("payment_method")=='card')&(col("payment_status")=="failed")),"online-failed")
                .when( ((col("payment_method")=='card')&(col("payment_status")=="pending")),"online-pending")
                .otherwise('offline')
                )
display(df_payment)

In [0]:
df_payment = payment_obj.dedup(df_payment,["payment_id"],
"last_updated_timestamp")
df_payment = payment_obj.process_timestamp(df_payment)
if not spark.catalog.tableExists("pysparkdbt.silver.payments"):

    df_payment.write.format("delta")\
        .mode("append")\
        .saveAsTable("pysparkdbt.silver.payments")       
else:
    payment_obj.upsert(df_payment,['payment_id'],'payments','last_updated_timestamp')

In [0]:
%sql
select count(*) from pysparkdbt.silver.payments

In [0]:
%sql
select * from pysparkdbt.silver.payments