In [0]:
%load_ext autoreload
%autoreload 2

###### Setting up utils dir path to use the utils functions in the notebook

In [0]:
import os
import sys

In [0]:
curr_dir = os.getcwd()

sys.path.append(curr_dir)

In [0]:
from typing import List
from pyspark.sql import DataFrame
from pyspark.sql.window import Window

#### **CUSTOMERS**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
df_cust = spark.read.table('pyspark_dbt.bronze.customers')

In [0]:
display(df_cust.limit(10))

###### Getting the customer `email` domain

In [0]:
df_cust = df_cust.withColumn('email_domain', split('email','@')[1])

display(df_cust.limit(10))

###### Cleaning the `phone_number` column

In [0]:
df_cust = df_cust.withColumn('phone_number', regexp_replace('phone_number', '[^0-9]', ''))

display(df_cust.limit(10))

###### Concating `first_name` and `last_name` as `full_name` and then dropping them 

In [0]:
df_cust = df_cust.withColumn('full_name', concat_ws(' ', 'first_name', 'last_name'))
df_cust = df_cust.drop('first_name', 'last_name')
display(df_cust.limit(10))

In [0]:
from utils.custom_utils import Transformations

In [0]:
transformer = Transformations()

df_cust = transformer.de_duplication(
    df=df_cust,
    columns=["customer_id"],
    order_by="last_updated_timestamp"
)


In [0]:
display(df_cust)

In [0]:
df_cust = transformer.process_timestamp(df_cust)

display(df_cust.limit(10))

###### UPSERT logic

In [0]:
from pyspark.sql import SparkSession 
if not 'spark' in globals():
    spark = SparkSession.builder.getOrCreate()
if not spark.catalog.tableExists('pyspark_dbt.silver.customers'):


    df_cust.write.format('delta')\
      .mode('append')\
      .saveAsTable('pyspark_dbt.silver.customers')

else:
  transformer.upsert(
    spark,
    df_cust,
    ["customer_id"],
    "customers",
    "last_updated_timestamp"
)



In [0]:
%sql
select count(*) from pyspark_dbt.silver.customers

#### **DRIVERS**

In [0]:
df_driver = spark.read.table('pyspark_dbt.bronze.drivers')
display(df_driver)

In [0]:
df_driver = df_driver.withColumn('phone_number', regexp_replace('phone_number', '[^0-9]', ''))

display(df_driver.limit(10))

In [0]:
df_driver = df_driver.withColumn('full_name', concat_ws(' ', 'first_name', 'last_name'))
df_driver = df_driver.drop('first_name', 'last_name')
display(df_driver.limit(10))

In [0]:
df_driver = transformer.de_duplication(
    df= df_driver,
    columns=["driver_id"],
    order_by="last_updated_timestamp"
)

In [0]:
df_driver = transformer.process_timestamp(df_driver)

In [0]:
from pyspark.sql import SparkSession 
if not 'spark' in globals():
    spark = SparkSession.builder.getOrCreate()
if not spark.catalog.tableExists('pyspark_dbt.silver.drivers'):


    df_driver.write.format('delta')\
      .mode('append')\
      .saveAsTable('pyspark_dbt.silver.drivers')

else:
  transformer.upsert(
    spark,
    df_driver,
    ["driver_id"],
    "drivers",
    "last_updated_timestamp"
)


In [0]:
%sql
select count(*) from pyspark_dbt.silver.drivers

#### **LOCATIONS**

In [0]:
df_loc = spark.read.table('pyspark_dbt.bronze.locations')


display(df_loc)

In [0]:

df_loc = transformer.de_duplication(
    df= df_loc,
    columns=["location_id"],
    order_by="last_updated_timestamp"
)


df_loc = transformer.process_timestamp(df_loc)



from pyspark.sql import SparkSession 
if not 'spark' in globals():
  spark = SparkSession.builder.getOrCreate()

if not spark.catalog.tableExists('pyspark_dbt.silver.locations'):
    df_loc.write.format('delta')\
      .mode('append')\
      .saveAsTable('pyspark_dbt.silver.locations')
else:
  transformer.upsert(
    spark,
    df_loc,
    ["location_id"],
    "locations",
    "last_updated_timestamp"
)

In [0]:
%sql
select count(*) from pyspark_dbt.silver.locations

#### **PAYMENTS**

In [0]:
df_pay = spark.read.table('pyspark_dbt.bronze.payments')
display(df_pay)


In [0]:
df_pay = transformer.de_duplication(
    df= df_pay,
    columns=["payment_id"],
    order_by="last_updated_timestamp"
)   

In [0]:
df_pay = transformer.process_timestamp(df_pay)

In [0]:
from pyspark.sql import SparkSession 
if not 'spark' in globals():
  spark = SparkSession.builder.getOrCreate()

if not spark.catalog.tableExists('pyspark_dbt.silver.payments'):
    df_pay.write.format('delta')\
      .mode('append')\
      .saveAsTable('pyspark_dbt.silver.payments')
else:
  transformer.upsert(
    spark,
    df_pay,
    ["payment_id"],
    "payments",
    "last_updated_timestamp"
)

In [0]:
%sql
select count(*) from pyspark_dbt.silver.payments

#### **VECHICLES**

In [0]:
df_vech = spark.read.table('pyspark_dbt.bronze.vehicles')
display(df_vech)

In [0]:
df_vech = df_vech.withColumn('make', upper(col('make')))

df_vech.display()

In [0]:
df_vech = transformer.process_timestamp(df_vech)

In [0]:
df_vech = transformer.de_duplication(
    df= df_vech,
    columns=["vehicle_id"]  ,
    order_by="last_updated_timestamp"
)

In [0]:
from pyspark.sql import SparkSession 
if not 'spark' in globals():
  spark = SparkSession.builder.getOrCreate()

if not spark.catalog.tableExists('pyspark_dbt.silver.vehicles'):
    df_vech.write.format('delta')\
      .mode('append')\
      .saveAsTable('pyspark_dbt.silver.vehicles')
else:
  transformer.upsert(
    spark,
    df_vech,
    ["vehicle_id"],
    "vehicles",
    "last_updated_timestamp"
)

In [0]:
%sql
select count(*) from pyspark_dbt.silver.vehicles