In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
import os
import sys


In [0]:
curret_dir = os.getcwd()
print(curret_dir)
sys.path.append(curret_dir)


#### **CUSTOMERS**

In [0]:
df_cust = spark.table("pyspark_dbt.bronze.customers")
#spark.table("pyspark_dbt.bronze.customers").show()
#df = spark.table("pyspark_dbt.bronze.customers").show()

In [0]:
display(df_cust)

In [0]:
spark.table("pyspark_dbt.bronze.drivers").count()

In [0]:
df_cust = df_cust.withColumn("Domain",split(col("email"),'@')[1])
display(df_cust)

In [0]:
df_cust = df_cust.withColumn("phone_number",regexp_replace(col("phone_number"),r"[^0-9]",""))
display(df_cust)

In [0]:
df_cust = df_cust.withColumn("Full_Name",concat_ws(" ",col("first_name"),col("last_name")))
df_cust = df_cust.drop("first_name","last_name")
display(df_cust)


In [0]:
from typing import List
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from pyspark.sql.functions import concat,col,row_number, current_timestamp
from delta.tables import DeltaTable

In [0]:
from utils.customutils import transformationss
from pyspark.sql.functions import concat, col, row_number, current_timestamp,desc

In [0]:
from pyspark.sql.functions import concat, col, row_number, current_timestamp,desc
cust_obj = transformationss()
cust_df_transform = cust_obj.dedup(df_cust,['customer_id'],'last_updated_timestamp')
display(cust_df_transform)




from typing import List
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from pyspark.sql.functions import concat, col, row_number, current_timestamp,desc

class transformations:
    def dedup(self, df: DataFrame, dedup_cols: List, cdc: str):
        df = df.withColumn("dedupKey", concat(*dedup_cols))
        df = df.withColumn("dedupCounts", row_number().over(Window.partitionBy("dedupKey").orderBy(desc(cdc))))
        df = df.filter(col('dedupCounts') == 1)
        df = df.drop("dedupKey", "dedupCounts")

        return df
    
    def process_timestamp(self, df):
        df = df.withColumn("process_timestamp", current_timestamp())
        
        return df
    
    def upsert(self,df,key_cols,table,cdc):
        merge_condition = " AND ".join([f"src.{i} = trg.{i}" for i in key_cols])
        dltObj = DeltaTable.forName(spark,f"pyspark_dbt.silver.{table}")
        dltObj.alias("trg").merge(df.alias("src"),merge_condition)\
            .whenMatchedUpdateAll(condition=f"src.{cdc}") >= "trg.{cdc}"\
            .whenNotMatchedInsertAll()\
            .execute()
        return 1
    





In [0]:
cust_obj = transformationss()   # recreate object after class update
df2 = cust_obj.process_timestamp(cust_df_transform)
display(df2)

#### Implementing UPSERT 

In [0]:
from delta.tables import DeltaTable

if not spark.catalog.tableExists("pyspark_dbt.silver.customers"):
  df2.write.format("delta")\
      .mode("append")\
      .saveAsTable("pyspark_dbt.silver.customers")
else:
  cust_obj.upsert(
    spark,
    df2,
    ['customer_id'],
    "pyspark_dbt.silver.customers",  
    "last_updated_timestamp"
)


In [0]:
%sql
select count(*) from pyspark_dbt.silver.customers

In [0]:
%sql
select * from pyspark_dbt.silver.customers;

In [0]:
df2.select("customer_id").dtypes

In [0]:
spark.table("pyspark_dbt.silver.customers").select("customer_id").dtypes


#### Drivers

In [0]:
df_driver = spark.table("pyspark_dbt.bronze.drivers")
display(df_driver)

In [0]:
df_driver = df_driver.withColumn("phone_number",regexp_replace(col("phone_number"),r"[^0-9]",""))
display(df_driver)

In [0]:
df_driver = df_driver.withColumn("Full_Name",concat_ws(" ",col("first_name"),col("last_name")))
df_driver = df_driver.drop("first_name","last_name")
display(df_driver)

In [0]:
driver_obj = transformationss()

In [0]:
df_driver = driver_obj.dedup(df_driver,['driver_id'],'last_updated_timestamp')

In [0]:
df_driver = driver_obj.process_timestamp(df_driver)

In [0]:
if not spark.catalog.tableExists("pyspark_dbt.silver.drivers"):
  df_driver.write.format("delta")\
      .mode("append")\
      .saveAsTable("pyspark_dbt.silver.drivers")
else:
  driver_obj.upsert(
    spark,
    df_driver,
    ['driver_id'],
    "pyspark_dbt.silver.drivers",  
    "last_updated_timestamp"
)

In [0]:
%sql
select count(*) from pyspark_dbt.silver.drivers;

#### Locations

In [0]:
df_locations = spark.read.table("pyspark_dbt.bronze.locations")
display(df_locations)

In [0]:
loc_obj = transformationss()
df_locations = loc_obj.dedup(df_locations,['location_id'],'last_updated_timestamp')
df_locations = loc_obj.process_timestamp(df_locations)
if not spark.catalog.tableExists("pyspark_dbt.silver.locations"):
  df_locations.write.format("delta")\
      .mode("append")\
      .saveAsTable("pyspark_dbt.silver.locations")
else:
  loc_obj.upsert(
    spark,
    df_locations,
    ['location_id'],
    "pyspark_dbt.silver.locations",  
    "last_updated_timestamp"
)


#### Payments

In [0]:
df_payments = spark.read.table("pyspark_dbt.bronze.payments")


In [0]:
display(df_payments)

In [0]:
df_payments = df_payments.withColumn("online_payment_status",
                            when(((col("payment_method") == 'Card') & (col("payment_method") == 'Success')),'online-success')
                            .when(((col("payment_method")=='Card') & (col("payment_status") == 'Failed')),'online-failure')
                            .when(((col("payment_method") == 'Card') & (col("payment_status") == 'Pending')),'online-pending')
                            .otherwise('offline'))
                        

In [0]:
display(df_payments)

In [0]:
obj_pay = transformationss()



In [0]:
df_payments = obj_pay.dedup(df_payments,['payment_id'],'last_updated_timestamp')


In [0]:
df_payments = obj_pay.process_timestamp(df_payments)


In [0]:
if not spark.catalog.tableExists("pyspark_dbt.silver.payments"):
  df_payments.write.format("delta")\
      .mode("append")\
      .saveAsTable("pyspark_dbt.silver.payments")
else:
  obj_pay.upsert(
    spark,
    df_payments,
    ['payment_id'],
    "pyspark_dbt.silver.payments",  
    "last_updated_timestamp")

#### Vehicle


In [0]:
df_vehicle = spark.table("pyspark_dbt.bronze.vehicles")
display(df_vehicle)

In [0]:
df_vehicle = df_vehicle.withColumn("make",upper(col("make")))
display(df_vehicle)

In [0]:
vehicle_obj = transformationss()

In [0]:
df_vehicle = vehicle_obj.dedup(df_vehicle,['vehicle_id'],'last_updated_timestamp')
df_vehicle = vehicle_obj.process_timestamp(df_vehicle)
if not spark.catalog.tableExists("pyspark_dbt.silver.vehicle"):
  df_vehicle.write.format("delta")\
      .mode("append")\
      .saveAsTable("pyspark_dbt.silver.vehicle")
else:
    vehicle_obj.upsert(
    spark,
    df_vehicle,
    ['vehicle_id'],
    "pyspark_dbt.silver.vehicle",  
    "last_updated_timestamp")

In [0]:
%sql
select count(*) from pyspark_dbt.silver.vehicle;