In [0]:
%load_ext autoreload
%autoreload 2


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [0]:
from pyspark.sql.functions import when, col, split, regexp_replace, concat_ws, lit, lower, upper
import importlib
import custome_utils
importlib.reload(custome_utils)
from custome_utils import transformations
from delta.tables import DeltaTable

import os
import sys

#### **Processing customers**

In [0]:
df_cust = spark.read.table("pysparkdbt.bronze.customers")
# df_cust.display()

In [0]:
df_cust = df_cust.withColumn("domain", split(col("email"), "@")[1])

df_cust = df_cust.withColumn("phone_number", regexp_replace(col("phone_number"), "[^0-9]", ""))

df_cust = df_cust.withColumn("full_name", concat_ws(" ", col("first_name"), col("last_name")))

df_cust = df_cust.drop("first_name", "last_name")

In [0]:
obj = transformations()
df_cust = obj.process_timestamp(df=df_cust)
df_cust = obj.dedupDF(df=df_cust, dedup_cols=["customer_id"], key_col="last_updated_timestamp")

df_cust = df_cust.select("customer_id", "full_name", "email", "domain", "phone_number", "city", "signup_date", "last_updated_timestamp", "process_timestamp")

In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.customers"):
    df_cust.write.format("delta")\
            .mode("append")\
            .saveAsTable("pysparkdbt.silver.customers")
else:
    obj.upsert(spark=spark, df=df_cust, table="customers", join_cols=["customer_id"], time_col="last_updated_timestamp")

#### **Processsing Drivers**

In [0]:
df_drivers = spark.read.table("pysparkdbt.bronze.drivers")
# df_drivers.display()

In [0]:
df_drivers = df_drivers.withColumn("full_name", concat_ws(" ", col("first_name"), col("last_name")))
df_drivers = df_drivers.drop("first_name", "last_name")

df_drivers = df_drivers.withColumn("phone_number",  regexp_replace(col("phone_number"), "[^0-9]", ""))

In [0]:
df_drivers = obj.process_timestamp(df_drivers)
df_drivers = obj.dedupDF(df=df_drivers, dedup_cols=["driver_id"], key_col="last_updated_timestamp")

df_drivers = df_drivers.select("driver_id", "full_name", "phone_number", "city", "vehicle_id", "driver_rating", "last_updated_timestamp", "process_timestamp")

In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.drivers"):
    df_drivers.write.format("delta")\
                .mode("append")\
                .saveAsTable("pysparkdbt.silver.drivers")
else:
    obj.upsert(spark=spark, df=df_drivers, table="drivers", join_cols=["driver_id"], time_col="last_updated_timestamp")

#### **Processing Locations** 

In [0]:
df_locations = spark.read.table("pysparkdbt.bronze.locations")

In [0]:
df_locations = obj.process_timestamp(df=df_locations)

df_locations = obj.dedupDF(df=df_locations, dedup_cols=["location_id"], key_col="last_updated_timestamp")

In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.locations"):
    df_locations.write.format("delta")\
                .mode("append")\
                .saveAsTable("pysparkdbt.silver.locations")
            
else:
    obj.upsert(spark=spark, df=df_locations, table="locations", join_cols=["location_id"], time_col="last_updated_timestamp")

#### **Processing payments**

In [0]:
df_payments = spark.read.table("pysparkdbt.bronze.payments")

In [0]:
df_payments = obj.process_timestamp(df=df_payments)
df_payments = obj.dedupDF(df=df_payments, dedup_cols=["payment_id"], key_col="last_updated_timestamp")

In [0]:
df_payments = df_payments.withColumn(
            "online_payment_status", 
            when( ((lower(col("payment_method")) == "card" ) & (lower(col("payment_status")) == "success")), "online-Success")
           .when( ((lower(col("payment_method")) == "card" ) & (lower(col("payment_status")) == "failed")), "online-Failed")
           .when( ((lower(col("payment_method")) == "card" ) & (lower(col("payment_status")) == "pending")), "online-Pending")
           .otherwise(lit("offline"))
        )

df_payments = df_payments.select("payment_id", "trip_id", "customer_id", "payment_method", "payment_status", "amount", "transaction_time", "online_payment_status", "last_updated_timestamp", "process_timestamp")

In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.payments"):
    df_payments.write.format("delta")\
                .mode("append")\
                .saveAsTable("pysparkdbt.silver.payments")

else:
    obj.upsert(spark=spark, df=df_payments, table="payments", join_cols=["payment_id"], time_col="last_updated_timestamp")

#### **Processing Vehicles** 

In [0]:
df_vehicles = spark.read.table("pysparkdbt.bronze.vehicles")

In [0]:
df_vehicles = df_vehicles.withColumn("make", upper(col("make")))\
                         .withColumn("model", upper(col("model")))

In [0]:
df_vehicles = obj.process_timestamp(df_vehicles)

df_vehicles = obj.dedupDF(df=df_vehicles, dedup_cols=["vehicle_id"], key_col="last_updated_timestamp")


In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.vehicles"):
    df_vehicles.write.format("delta")\
                .mode("append")\
                .saveAsTable("pysparkdbt.silver.vehicles")
else:
    obj.upsert(spark=spark, df=df_vehicles, table="vehicles", join_cols=["vehicle_id"], time_col="last_updated_timestamp")

#### **Processing Trips** 

In [0]:
df_trips = spark.read.table("pysparkdbt.bronze.trips")

df_trips = df_trips.drop("payment_method", "trip_status")

In [0]:
df_trips = obj.process_timestamp(df=df_trips)
df_trips = obj.dedupDF(df=df_trips, dedup_cols=["trip_id"], key_col="last_updated_timestamp")

df_trips = df_trips.select("trip_id", "driver_id", "customer_id", "vehicle_id", "start_location", "end_location", "distance_km", "fare_amount", "trip_start_time", "trip_end_time", "last_updated_timestamp", "process_timestamp")

In [0]:
if not spark.catalog.tableExists("pysparkdbt.silver.trips"):
    df_trips.write.format("delta")\
                .mode("append")\
                .saveAsTable("pysparkdbt.silver.trips")

else:
    obj.upsert(spark=spark, df=df_trips, table="trips", join_cols=["trip_id"], time_col="last_updated_timestamp")