#### **CUSTOMERS DATA**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import regexp_replace
from typing import List
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp

In [0]:
import os
import sys

In [0]:
from pyspark.sql import DataFrame, Window
from pyspark.sql.functions import col, concat, row_number
from typing import List
from delta.tables import DeltaTable

class transformations:

    def dedup(self, df: DataFrame, dedup_cols: List[str], cdc: str) -> DataFrame:
        """
        Deduplicate a DataFrame by keeping the latest record per key.
        :param df: Input DataFrame
        :param dedup_cols: Columns to deduplicate on
        :param cdc: Column to use for ordering (e.g. last_updated_timestamp)
        :return: Deduplicated DataFrame
        """
        df = df.withColumn("dedupKey", concat(*[col(c) for c in dedup_cols]))
        df = df.withColumn(
            "dedupCounts",
            row_number().over(Window.partitionBy("dedupKey").orderBy(col(cdc).desc()))
        )
        df = df.filter(col("dedupCounts") == 1).drop("dedupKey", "dedupCounts")
        return df
    
    def process_timestamp(self,df):
        df = df.withColumn("process_timestamp",current_timestamp())
        return df
    
    def upsert(self,spark,df, key_cols,table,cdc):
        merge_condition = " AND ".join([f"src.{i} = trg.{i}" for i in key_cols])
        dlt_obj = DeltaTable.forName(spark,f"pysparkdbt.silver.{table}")
        dlt_obj.alias("trg").merge(df.alias("src"),merge_condition)\
                            .whenMatchedUpdateAll(condition = f"src.{cdc} >= trg.{cdc}")\
                            .whenNotMatchedInsertAll()\
                            .execute()

        return "done"
                    

In [0]:
current_dir = os.getcwd()

sys.path.append(current_dir)


In [0]:
df_cust = spark.read.table("pysparkdbt.bronze.customers")

In [0]:
display(df_cust.count())

200

In [0]:
# fetch email domain of customers
df_cust = df_cust.withColumn("domain",split(col('email'),'@')[1])

In [0]:
# clean up phone numbers
df_cust = df_cust.withColumn("phone_number",regexp_replace("phone_number",r"[^0-9]",""))

In [0]:
# concat first and last name to full name and drop the frist anad last name columns
df_cust = df_cust.withColumn("full_name",concat_ws(" ",col('first_name'),col('last_name')))
df_cust = df_cust.drop('first_name','last_name')

In [0]:
cust_obj = transformations()

cust_df_trns = cust_obj.dedup(df_cust,['customer_id'], 'last_updated_timestamp')

In [0]:
df_cust = cust_obj.process_timestamp(cust_df_trns)
display(df_cust)

customer_id,email,phone_number,city,signup_date,last_updated_timestamp,domain,full_name,process_timestamp
1,azimmerman@ramirez-nelson.com,8117241080,Tiffanyview,2023-12-25,2025-09-15T15:21:05.000Z,ramirez-nelson.com,Daniel Reed,2025-10-16T10:15:15.514Z
10,vritter@yahoo.com,19670618100482,Port Jesse,2023-03-12,2025-09-07T23:05:32.000Z,yahoo.com,Megan Dean,2025-10-16T10:15:15.514Z
100,newmanmelanie@flynn-ross.org,10575302069,North Charlestown,2022-06-22,2025-09-14T22:47:28.000Z,flynn-ross.org,Gary Barnes,2025-10-16T10:15:15.514Z
101,hannahwhite@hotmail.com,8882340212190,Lisachester,2020-09-24,2025-09-10T22:45:30.000Z,hotmail.com,Stacy Thomas,2025-10-16T10:15:15.514Z
102,cunninghamjessica@yahoo.com,10591315078,Harrisbury,2023-07-29,2025-09-19T21:57:29.000Z,yahoo.com,Alan Rogers,2025-10-16T10:15:15.514Z
103,sherry55@hernandez.com,12586465512757,Port Michael,2022-06-18,2025-09-09T06:06:57.000Z,hernandez.com,Stephen Sanders,2025-10-16T10:15:15.514Z
104,moralessandy@arnold-robinson.com,18634267566120,Lake Kelly,2020-11-07,2025-09-17T02:22:22.000Z,arnold-robinson.com,Duane Bennett,2025-10-16T10:15:15.514Z
105,matthewsbenjamin@lloyd.com,87710366965279,South Wyattfort,2021-04-11,2025-08-25T11:18:42.000Z,lloyd.com,Jasmin Patterson,2025-10-16T10:15:15.514Z
106,tyleryoung@gross.org,41127166452578,Seanview,2024-01-05,2025-08-25T04:33:25.000Z,gross.org,Kara Williams,2025-10-16T10:15:15.514Z
107,brandyphillips@hotmail.com,17891625036,Melissabury,2021-06-15,2025-09-14T13:56:22.000Z,hotmail.com,Martin Williams,2025-10-16T10:15:15.514Z


In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.customers"):

    df_cust.write.format("delta")\
        .mode("append")\
        .saveAsTable("pysparkdbt.silver.customers")

else:
    cust_obj.upsert(spark,df_cust,['customer_id'],'customers','last_updated_timestamp')

In [0]:
%sql
-- querry the table
SELECT COUNT(*) FROM pysparkdbt.silver.customers

COUNT(*)
200


#### **DRIVERS DATA**

In [0]:
df_driver = spark.read.table("pysparkdbt.bronze.drivers")
display(df_driver)

driver_id,first_name,last_name,phone_number,vehicle_id,driver_rating,city,last_updated_timestamp
1,Latasha,Lopez,262-924-2955x590,1,4.7,East Dorothy,2025-08-25T06:36:26.000Z
2,Alan,Wiley,0967969634,2,3.98,West Susan,2025-09-14T00:44:57.000Z
3,James,Taylor,424-614-1847,3,3.66,Mcintoshton,2025-08-26T22:28:17.000Z
4,Theresa,Benson,617-017-0101x91777,4,3.86,North Courtneychester,2025-09-01T11:40:55.000Z
5,Karen,Jensen,611-060-5683,5,4.87,Brownburgh,2025-09-04T16:35:04.000Z
6,Debra,Smith,556.480.9096x439,6,4.26,Port Williamland,2025-08-31T14:31:37.000Z
7,Justin,Peters,+1-798-568-6952x9778,7,3.67,West Erinborough,2025-09-17T05:57:45.000Z
8,Todd,Young,706.321.8390x08097,8,4.9,Lake Stephen,2025-08-26T06:45:51.000Z
9,Mary,Young,(172)791-0504x5499,9,4.5,West Lindsey,2025-08-27T13:04:32.000Z
10,Jacob,Mack,(509)613-4480,10,4.04,Lauraland,2025-09-09T05:50:23.000Z


In [0]:
# clean up phone numbers
df_driver = df_driver.withColumn("phone_number",regexp_replace("phone_number",r"[^0-9]",""))

In [0]:
# concat first and last name to full name and drop the frist anad last name columns
df_driver = df_driver.withColumn("full_name",concat_ws(" ",col('first_name'),col('last_name')))
df_driver = df_driver.drop('first_name','last_name')

In [0]:
driver_obj = transformations()

In [0]:
# deduplication
df_driver = driver_obj.dedup(df_driver,['driver_id'],'last_updated_timestamp')

In [0]:
#timestamp
df_driver = driver_obj.process_timestamp(df_driver)

In [0]:
#apply upsert
if not spark.catalog.tableExists("pysparkdbt.silver.drivers"):

    df_driver.write.format("delta")\
            .mode("append")\
            .saveAsTable("pysparkdbt.silver.drivers")

else:
    driver_obj.upsert(spark,df_driver,['driver_id'],'drivers','last_updated_timestamp')

In [0]:
%sql
SELECT COUNT(*) FROM pysparkdbt.silver.drivers

COUNT(*)
50


#### **LOCATIONS DATA**

In [0]:
df_loc = spark.read.table("pysparkdbt.bronze.locations")
display(df_loc)

location_id,city,state,country,latitude,longitude,last_updated_timestamp
1,Lake Davidport,Hawaii,Dominica,34.5682,151.8097,2025-09-03T13:37:24.000Z
2,Mccarthybury,Minnesota,Dominican Republic,71.6928,-162.2836,2025-09-10T10:54:18.000Z
3,Bellhaven,Arkansas,Japan,-53.3075,69.1475,2025-09-17T06:52:51.000Z
4,Moorechester,New Hampshire,Togo,77.4281,57.0052,2025-08-27T19:04:27.000Z
5,Glennview,North Carolina,Gambia,41.1122,-162.4836,2025-08-20T17:31:06.000Z
6,Davilaville,Arizona,Antarctica (the territory South of 60 deg S),64.3413,-3.912,2025-09-05T13:22:58.000Z
7,Shawnfurt,New Jersey,Lithuania,71.1198,-143.6791,2025-09-03T20:35:10.000Z
8,Bradleytown,Florida,Monaco,30.0895,88.091,2025-08-29T19:45:05.000Z
9,East Miguel,Maryland,French Guiana,-52.9274,-56.9343,2025-08-27T23:43:41.000Z
10,Masonside,Wyoming,Saint Lucia,84.9905,81.5852,2025-09-06T20:25:27.000Z


In [0]:
# Apply transformations
loc_obj = transformations()
df_loc = loc_obj.dedup(df_loc, ['location_id'], 'last_updated_timestamp')
df_loc = loc_obj.process_timestamp(df_loc)

In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.locations"):
    df_loc.write.format("delta")\
        .mode("append")\
        .saveAsTable("pysparkdbt.silver.locations")
else:
    loc_obj.upsert(spark, df_loc, ['location_id'], 'locations', 'last_updated_timestamp')

In [0]:
%sql
SELECT COUNT(*) FROM pysparkdbt.silver.locations

COUNT(*)
50


#### **PAYMENTS DATA**

In [0]:
df_pay = spark.read.table("pysparkdbt.bronze.payments")
display(df_pay)

payment_id,trip_id,customer_id,payment_method,payment_status,amount,transaction_time,last_updated_timestamp
1,274,126,Cash,Success,38.15,2025-09-17T13:00:12.000Z,2025-08-30T13:40:53.000Z
2,676,131,Cash,Success,52.07,2025-08-14T13:00:12.000Z,2025-09-08T18:21:05.000Z
3,919,132,Card,Failed,55.5,2025-07-27T13:00:12.000Z,2025-08-21T20:24:08.000Z
4,247,34,Wallet,Pending,28.78,2025-07-27T13:00:12.000Z,2025-08-27T15:03:09.000Z
5,386,62,Card,Failed,55.02,2025-08-01T13:00:12.000Z,2025-09-08T23:15:06.000Z
6,834,78,Wallet,Success,27.43,2025-08-25T13:00:12.000Z,2025-09-20T09:03:05.000Z
7,348,144,Wallet,Failed,74.94,2025-08-26T13:00:12.000Z,2025-08-30T19:11:26.000Z
8,558,82,Card,Failed,19.58,2025-09-09T13:00:12.000Z,2025-09-06T05:53:26.000Z
9,260,151,Card,Pending,24.4,2025-07-23T13:00:12.000Z,2025-09-02T17:43:14.000Z
10,133,70,Cash,Failed,69.99,2025-07-31T13:00:12.000Z,2025-09-11T10:30:16.000Z


In [0]:
df_pay = df_pay.withColumn("online_payment_status",
            when(  ((col("payment_method") == "Card") & (col("payment_status") == 'Success')),"online-success" )
            .when(  ((col("payment_method") == "Card") & (col("payment_status") == 'Failed')),"online-failure" )
            .when(  ((col("payment_method") == "Cash") & (col("payment_status") == 'pending')),"online-pending" )
            .otherwise("offline")
            )
display(df_pay)

payment_id,trip_id,customer_id,payment_method,payment_status,amount,transaction_time,last_updated_timestamp,online_payment_status
1,274,126,Cash,Success,38.15,2025-09-17T13:00:12.000Z,2025-08-30T13:40:53.000Z,offline
2,676,131,Cash,Success,52.07,2025-08-14T13:00:12.000Z,2025-09-08T18:21:05.000Z,offline
3,919,132,Card,Failed,55.5,2025-07-27T13:00:12.000Z,2025-08-21T20:24:08.000Z,online-failure
4,247,34,Wallet,Pending,28.78,2025-07-27T13:00:12.000Z,2025-08-27T15:03:09.000Z,offline
5,386,62,Card,Failed,55.02,2025-08-01T13:00:12.000Z,2025-09-08T23:15:06.000Z,online-failure
6,834,78,Wallet,Success,27.43,2025-08-25T13:00:12.000Z,2025-09-20T09:03:05.000Z,offline
7,348,144,Wallet,Failed,74.94,2025-08-26T13:00:12.000Z,2025-08-30T19:11:26.000Z,offline
8,558,82,Card,Failed,19.58,2025-09-09T13:00:12.000Z,2025-09-06T05:53:26.000Z,online-failure
9,260,151,Card,Pending,24.4,2025-07-23T13:00:12.000Z,2025-09-02T17:43:14.000Z,offline
10,133,70,Cash,Failed,69.99,2025-07-31T13:00:12.000Z,2025-09-11T10:30:16.000Z,offline


In [0]:
payment_obj = transformations()
df_pay = payment_obj.dedup(df_pay,['payment_id'],'last_updated_timestamp')
df_pay = payment_obj.process_timestamp(df_pay)

In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.payments"):
    df_pay.write.format("delta")\
        .mode("append")\
        .saveAsTable("pysparkdbt.silver.payments")
else:
    payment_obj.upsert(spark, df_pay, ['payment_id'], 'payments', 'last_updated_timestamp')

In [0]:
%sql
SELECT COUNT(*) FROM pysparkdbt.silver.payments

COUNT(*)
1000


#### **VEHICLES DATA**

In [0]:
df_veh = spark.read.table("pysparkdbt.bronze.vehicles")
display(df_veh)

vehicle_id,license_plate,model,make,year,vehicle_type,last_updated_timestamp
1,NXT-8646,Message,"Francis, Smith and Lee",2023,Hatchback,2025-09-02T06:05:20.000Z
2,03S R43,Region,Lawson Group,2017,Sedan,2025-08-31T05:55:09.000Z
3,SKO H06,Prepare,"Moreno, Ruiz and Barker",2023,Luxury,2025-08-30T05:07:29.000Z
4,5R235,Pattern,"Welch, Martinez and Hendricks",2019,Van,2025-09-09T05:52:10.000Z
5,925A,Process,Rivera-Anderson,2014,Hatchback,2025-08-27T11:42:16.000Z
6,6-1130V,On,Brown Ltd,2014,SUV,2025-09-06T06:37:52.000Z
7,L25-TWN,Plan,"Gonzalez, Rios and Rios",2020,Sedan,2025-09-15T13:14:48.000Z
8,GWY 958,Month,"Smith, Mckenzie and Bullock",2017,Sedan,2025-09-11T16:20:12.000Z
9,4FG 919,Speak,"Rice, Barnes and Hernandez",2019,SUV,2025-09-11T02:43:13.000Z
10,6FD 648,Religious,Schwartz and Sons,2017,SUV,2025-09-11T19:10:01.000Z


In [0]:
#clean - upper case make col
df_veh = df_veh.withColumn("make",upper(col("make")))
display(df_veh)

vehicle_id,license_plate,model,make,year,vehicle_type,last_updated_timestamp
1,NXT-8646,Message,"FRANCIS, SMITH AND LEE",2023,Hatchback,2025-09-02T06:05:20.000Z
2,03S R43,Region,LAWSON GROUP,2017,Sedan,2025-08-31T05:55:09.000Z
3,SKO H06,Prepare,"MORENO, RUIZ AND BARKER",2023,Luxury,2025-08-30T05:07:29.000Z
4,5R235,Pattern,"WELCH, MARTINEZ AND HENDRICKS",2019,Van,2025-09-09T05:52:10.000Z
5,925A,Process,RIVERA-ANDERSON,2014,Hatchback,2025-08-27T11:42:16.000Z
6,6-1130V,On,BROWN LTD,2014,SUV,2025-09-06T06:37:52.000Z
7,L25-TWN,Plan,"GONZALEZ, RIOS AND RIOS",2020,Sedan,2025-09-15T13:14:48.000Z
8,GWY 958,Month,"SMITH, MCKENZIE AND BULLOCK",2017,Sedan,2025-09-11T16:20:12.000Z
9,4FG 919,Speak,"RICE, BARNES AND HERNANDEZ",2019,SUV,2025-09-11T02:43:13.000Z
10,6FD 648,Religious,SCHWARTZ AND SONS,2017,SUV,2025-09-11T19:10:01.000Z


In [0]:
#apply transfromations
vehicle_obj = transformations()
df_veh = vehicle_obj.dedup(df_veh,['vehicle_id'],'last_updated_timestamp')
df_veh = vehicle_obj.process_timestamp(df_veh)

In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.vehicles"):
    df_veh.write.format("delta")\
        .mode("append")\
        .saveAsTable("pysparkdbt.silver.vehicles")
else:
    vehicle_obj.upsert(spark, df_veh, ['vehicle_id'], 'vehicles', 'last_updated_timestamp')

In [0]:
%sql
SELECT COUNT(*) FROM pysparkdbt.silver.vehicles

COUNT(*)
50
