In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [0]:
import os
import sys

In [0]:
current_dir = os.getcwd()
sys.path.append(current_dir)

### Customers

In [0]:
df_customers = spark.read.table("pysparkdbt.bronze.customers")
display(df_customers)

In [0]:
transformations = {
    "domain" : split(col("email"), "@")[1], #get domain from each email
    "phone_number" : regexp_replace("phone_number", r"[^0-9]", ""), #remove special caracters other than numbers
    "full_name" : concat_ws(" ", col("first_name"), col("last_name"))
}

In [0]:
df_customers = df_customers.withColumns(transformations)

In [0]:
display(df_customers)

In [0]:
df_customers = df_customers.drop("first_name", "last_name")

In [0]:
from utils.custom_utils import Transformations


In [0]:
transf = Transformations()

df_customers_transf = transf.dedup(df_customers, ["customer_id"], "last_updated_timestamp")
display(df_customers_transf)

In [0]:
df_customers_transf = transf.process_timestamp(df_customers_transf)

In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.customers"):
    df_customers_transf.write.format("delta")\
                             .mode("append")\
                             .saveAsTable("pysparkdbt.silver.customers")
else:
    transf.upsert(df_customers_transf, ["customer_id"], "customers", "last_updated_timestamp")
    

In [0]:
%sql
SELECT * FROM pysparkdbt.silver.customers

### Drivers

In [0]:
df_drivers = spark.read.table("pysparkdbt.bronze.drivers")
display(df_drivers)

In [0]:
transformations2 = {
    "phone_number" : regexp_replace("phone_number", r"[^0-9]", ""), #remove special caracters other than numbers
    "full_name" : concat_ws(" ", col("first_name"), col("last_name"))
}
df_drivers = df_drivers.withColumns(transformations2)
df_drivers = df_drivers.drop("first_name", "last_name")

In [0]:
df_drivers_transf = transf.dedup(df_drivers, ["driver_id"], "last_updated_timestamp")
df_drivers_transf = transf.process_timestamp(df_drivers_transf)
display(df_drivers_transf)

In [0]:
if not spark.catalog.tableExists("pysparktdbt.silver.drivers"):
    df_drivers_transf.write.format("delta")\
                            .mode("append")\
                            .saveAsTable("pysparkdbt.silver.drivers")
else:
    transf.upsert(df_drivers_transf, ["driver_id"], "drivers", "last_updated_timestamp")

### Locations

In [0]:
df_locations = spark.read.table("pysparkdbt.bronze.locations")
display(df_locations)

In [0]:
df_location_transf = transf.dedup(df_locations, ["location_id"], "last_updated_timestamp")
df_location_transf = transf.process_timestamp(df_location_transf)
display(df_location_transf)

In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.locations"):
    df_location_transf.write.format("delta")\
                            .mode("append")\
                            .saveAsTable("pysparkdbt.silver.locations")
else:
    transf.upsert(df_location_transf, ["location_id"], "locations", "last_updated_timestamp")


### Payments

In [0]:
df_payments = spark.read.table("pysparkdbt.bronze.payments")
display(df_payments)

In [0]:
df_payments = df_payments.withColumn("online_payment_status", 
                                when(((col("payment_method") == "Card") & (col("payment_status") == "Success")), "Online_success" )
                                .when(((col("payment_method") == "Card") & (col("payment_status") == "Failed")), "Online_failed" )
                                .when(((col("payment_method") == "Card") & (col("payment_status") == "Pending")), "Online_pending")
                                .otherwise("Offline")
                                )

display(df_payments)

In [0]:
df_payments_transf = transf.dedup(df_payments, ["payment_id"], "last_updated_timestamp")
df_payments_transf = transf.process_timestamp(df_payments_transf)


In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.payments"):
    df_payments_transf.write.format("delta")\
                            .mode("append")\
                            .saveAsTable("pysparkdbt.silver.payments")
else:
    transf.upsert(df_payments_transf, ["payment_id"], "payments", "last_updated_timestamp")

### Vehicules

In [0]:
df_vehicules = spark.read.table("pysparkdbt.bronze.vehicules")
display(df_vehicules)

In [0]:
df_vehicules = df_vehicules.withColumn("make", upper(col("make")))

In [0]:
df_vehicules_transf = transf.dedup(df_vehicules, ["vehicle_id"], "last_updated_timestamp")
df_vehicules_transf = transf.process_timestamp(df_vehicules_transf)
display(df_vehicules_transf)

In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.vehicules"):
    df_vehicules_transf.write.format("delta")\
                            .mode("append")\
                            .saveAsTable("pysparkdbt.silver.vehicules")
else:
    transf.upsert(df_vehicules_transf, ["vehicle_id"], "vehicules", "last_updated_timestamp")
