In [0]:
%load_ext autoreload
%autoreload 2
# Enables autoreload; learn more at https://docs.databricks.com/en/files/workspace-modules.html#autoreload-for-python-modules
# To disable autoreload; run %autoreload 0

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from typing import List
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from delta.tables import DeltaTable


In [0]:
import os
import sys

In [0]:
df_cust=spark.read.csv("dbfs:/mnt/globalmart/customers.csv")
display(df_cust)

In [0]:
df_cust = df_cust.withColumn("domain",split(col('_c3'), '@')[1])
display(df_cust)

In [0]:
df_cust = df_cust.withColumn("phone_number",regexp_replace(col('_c4'), r'[^0-9]', ''))
display(df_cust)

In [0]:
df_cust = df_cust.withColumn("full_name",concat_ws(" ", col('_c1'), col('_c2')))
df_cust = df_cust.drop('_c1', '_c2')
display(df_cust)

In [0]:
class transformations:
    def deedup(self,df:DataFrame,dedup_cols:List,cdc:str):
        df=df.withColumn("dedupKey",concat(*dedup_cols))
        df= df.withColumn("dedupCounts",row_number().over(Window.partitionBy("dedupKey").orderBy(desc(cdc))))
        df=df.filter(col("dedupCounts")==1)
        df=df.drop("dedupKey","dedupcounts")

        return df
    
    def process_timestamp(self,df):
        df=df.withColumn("process_timestamp",current_timestamp())
        return df
    
    def upsert(self,df,key_cols,table,cdc):
        merge_condistion = " AND ".join([f"src.{i} = trg.{i}" for i in key_cols])
        dlt_obj = DeltaTable.forName(spark, f".silver.{table}")
        dlt_obj.alias("trg").merge(df.alias('src'),merge_condistion)\
            .whenMatchedUpdateAll(condition = f"src.{cdc} >= trg.{cdc}")\
                .whenNotMatchedInsertAll()\
                    .execute()
        return 1                                    



cust_obj = transformations()
cust_of_trans = cust_obj.deedup(df_cust,['_c0'],'_c7')
display(cust_of_trans)

In [0]:
df_cust = cust_obj.process_timestamp(cust_of_trans)
display(df_cust)

In [0]:
spark.sql("SHOW CATALOGS").show()


In [0]:
spark.sql("SHOW SCHEMAS IN thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com").show()


In [0]:
spark.sql("""
CREATE SCHEMA IF NOT EXISTS
thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.silver_transform
""")


In [0]:
display(df_cust)

In [0]:
spark.table("thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.silver_transform.customers").printSchema()


In [0]:
merge_condition = "src.phone_number = trg.phone_number"


In [0]:
from delta.tables import DeltaTable

table_name = "thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.silver_transform.customers"

if not spark.catalog.tableExists(table_name):
    df_cust.write.format("delta")\
        .mode("overwrite")\
        .saveAsTable(table_name)
else:
    dlt_obj = DeltaTable.forName(spark, table_name)
    merge_condition = "src.phone_number = trg.phone_number"
    (
        dlt_obj.alias("trg")
        .merge(
            df_cust.alias("src"),
            merge_condition
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )

In [0]:
%sql
select count(*) from thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.silver_transform.customers

In [0]:
df_driver =    spark.read.table("thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.bronzelayer1.drivers")
display(df_driver)

In [0]:
df_driver = df_driver.withColumn("phone_number",regexp_replace(col('phone_number'), r'[^0-9]', ''))
display(df_driver)

In [0]:
df_driver = df_driver.withColumn("full_name",concat_ws(" ", col('first_name'), col('last_name')))
df_driver = df_driver.drop('first_name', 'last_name')
display(df_driver)

In [0]:
driver_obj = transformations()


In [0]:
df_driver = driver_obj.deedup(df_driver,['driver_id'],'last_updated_timestamp')

In [0]:
df_driver = driver_obj.process_timestamp(df_driver)
if not spark.catalog.tableExists("thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.silver_transform.drivers"):
    df_driver.write.format("delta")\
        .mode("overwrite")\
        .saveAsTable("thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.silver_transform.drivers")
else:
    driver_obj.deedup(df_driver,['driver_id'],'last_updated_timestamp')
    

In [0]:
%sql
select count(*) from thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.silver_transform.drivers

In [0]:
df_loc =    spark.read.table("thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.bronzelayer1.locations")
display(df_loc)

In [0]:
loc_obj = transformations()

In [0]:
df_loc = loc_obj.deedup(df_loc,['location_id'],'last_updated_timestamp')
df_loc = loc_obj.process_timestamp(df_loc)

In [0]:
if not spark.catalog.tableExists("thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.silver_transform.locations"):
    df_loc.write.format("delta")\
        .mode("overwrite")  \
        .saveAsTable("thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.silver_transform.locations")
else:
    loc_obj.deedup(df_loc,['location_id'],'last_updated_timestamp')

In [0]:
%sql select count(*) from thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.silver_transform.locations


In [0]:
df_pay =    spark.read.table("thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.bronzelayer1.payments")
display(df_pay)

In [0]:
df_pay = df_pay.withColumn("online_payment_status",
                           when(((col("payment_method")=='card') & (col('payment_status')=='success')),'online-success')
                          .when(((col("payment_method")=='card') & (col('payment_status')=='failed')),'online-failed')
                          .when(((col("payment_method")=='cash') & (col('payment_status')=='pending')),'online-pending')
                          .otherwise('offline'))
display(df_pay)

In [0]:
pay_obj = transformations()
df_pay = pay_obj.deedup(df_pay,['payment_id'],'last_updated_timestamp')
df_pay = pay_obj.process_timestamp(df_pay)

In [0]:
if not spark.catalog.tableExists("thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.silver_transform.payments"):
    df_pay.write.format("delta")\
        .mode("overwrite")  \
        .saveAsTable("thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.silver_transform.payments")
else:
    pay_obj.deedup(df_pay,['payment_id'],'last_updated_timestamp')

In [0]:
%sql
select count(*) from thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.silver_transform.payments

In [0]:
df_veh = spark.read.table("thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.bronzelayer1.vehicles")
display(df_veh)


In [0]:
df_veh = df_veh.withColumn("make",upper(col("make")))
display(df_veh)


In [0]:
veh_obj = transformations()
df_veh = veh_obj.deedup(df_veh,['vehicle_id'],'last_updated_timestamp')
df_veh = veh_obj.process_timestamp(df_veh)
if not spark.catalog.tableExists("thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.silver_transform.vehicles"):
    df_veh.write.format("delta")\
        .mode("overwrite")  \
        .saveAsTable("thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.silver_transform.vehicles")
else:
    veh_obj.deedup(df_veh,['vehicle_id'],'last_updated_timestamp')

In [0]:
%sql
select count(*) from thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.silver_transform.vehicles

In [0]:
df_trips = spark.read.table("thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.bronzelayer1.trips")
display(df_trips)

In [0]:
df_trips.write.format("delta").mode("overwrite").saveAsTable(
    "thiruvengadam_k_databricks_npmentorskool_onmicrosoft_com.silver_transform.trips")
