# Running Connection notebook

In [0]:
%run /Workspace/Users/bhimsendabby2023@gmail.com/Drafts/connection_to_adlgen2_using_ServicePrinciple

[SecretScope(name='adlgen2')]

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *

#Reading Customer File Data From ADLGen2 Bronze

In [0]:

schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("customer_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("join_date", DateType(), True),
    StructField("country", StringType(), True)
])


In [0]:
customer_df = spark.read.format('csv').option("header","true").schema(schema).load(base_path+'customer/customer.csv')


In [0]:
display(customer_df)

customer_id,customer_name,email,join_date,country
C001,Alice Johnson,alice@example.com,2023-01-15,USA
C002,Bob Smith,bob@test.com,2023-02-20,UK
C003,Charlie Brown,,2023-03-05,Canada
C004,David Miller,david.m@gmail.com,,Germany
C005,Eve White,eve@example.com,2023-05-12,France
,Mosasa,bellaxsxa@example.com,2025-08-05,UK
C006,Alice Johnson,alice@example.com,2023-01-15,USA
C006,Alice Johnson,alice@example.com,2023-01-15,USA
C007,Frank Ross,frank@test.com,2023-06-01,USA
C008,Grace Lee,,2023-06-15,Japan


# Replacing Null values like date with old date, email with unknown

In [0]:
customer_df = customer_df.fillna({'email':'unknown','join_date':'2026-02-13'})
customer_df.show()

+-----------+--------------+--------------------+----------+-------+
|customer_id| customer_name|               email| join_date|country|
+-----------+--------------+--------------------+----------+-------+
|       C001| Alice Johnson|   alice@example.com|2023-01-15|    USA|
|       C002|     Bob Smith|        bob@test.com|2023-02-20|     UK|
|       C003| Charlie Brown|             unknown|2023-03-05| Canada|
|       C004|  David Miller|   david.m@gmail.com|2026-02-13|Germany|
|       C005|     Eve White|     eve@example.com|2023-05-12| France|
|       NULL|        Mosasa|bellaxsxa@example...|2025-08-05|     UK|
|       C006| Alice Johnson|   alice@example.com|2023-01-15|    USA|
|       C006| Alice Johnson|   alice@example.com|2023-01-15|    USA|
|       C007|    Frank Ross|      frank@test.com|2023-06-01|    USA|
|       C008|     Grace Lee|             unknown|2023-06-15|  Japan|
|       C009|    Henry Ford|      henry@ford.com|2023-07-20|     UK|
|       C009|    Henry Ford|      

# Removing the null value primary keys records

In [0]:
customer_df = customer_df.dropna(subset=["customer_id"])
customer_df.show()

+-----------+--------------+-----------------+----------+-------+
|customer_id| customer_name|            email| join_date|country|
+-----------+--------------+-----------------+----------+-------+
|       C001| Alice Johnson|alice@example.com|2023-01-15|    USA|
|       C002|     Bob Smith|     bob@test.com|2023-02-20|     UK|
|       C003| Charlie Brown|          unknown|2023-03-05| Canada|
|       C004|  David Miller|david.m@gmail.com|2026-02-13|Germany|
|       C005|     Eve White|  eve@example.com|2023-05-12| France|
|       C006| Alice Johnson|alice@example.com|2023-01-15|    USA|
|       C006| Alice Johnson|alice@example.com|2023-01-15|    USA|
|       C007|    Frank Ross|   frank@test.com|2023-06-01|    USA|
|       C008|     Grace Lee|          unknown|2023-06-15|  Japan|
|       C009|    Henry Ford|   henry@ford.com|2023-07-20|     UK|
|       C009|    Henry Ford|   henry@ford.com|2023-07-20|     UK|
|       C010|Isabella Moore|bella@example.com|2023-08-05|    USA|
+---------

In [0]:
customer_df = customer_df.withColumn("Validated_Date", coalesce(to_date(col("join_date"),'MM/dd/yyyy'),to_date(col("join_date"),'yyyy-MM-dd'),to_date(col("join_date"),'MM-dd-yyyy')))
customer_df.show()



+-----------+--------------+-----------------+----------+-------+--------------+
|customer_id| customer_name|            email| join_date|country|Validated_Date|
+-----------+--------------+-----------------+----------+-------+--------------+
|       C001| Alice Johnson|alice@example.com|2023-01-15|    USA|    2023-01-15|
|       C002|     Bob Smith|     bob@test.com|2023-02-20|     UK|    2023-02-20|
|       C003| Charlie Brown|          unknown|2023-03-05| Canada|    2023-03-05|
|       C004|  David Miller|david.m@gmail.com|2026-02-13|Germany|    2026-02-13|
|       C005|     Eve White|  eve@example.com|2023-05-12| France|    2023-05-12|
|       C006| Alice Johnson|alice@example.com|2023-01-15|    USA|    2023-01-15|
|       C006| Alice Johnson|alice@example.com|2023-01-15|    USA|    2023-01-15|
|       C007|    Frank Ross|   frank@test.com|2023-06-01|    USA|    2023-06-01|
|       C008|     Grace Lee|          unknown|2023-06-15|  Japan|    2023-06-15|
|       C009|    Henry Ford|

In [0]:
customer_df = customer_df.withColumn("TimeStamp_Col", current_timestamp())
customer_df.show()

+-----------+--------------+-----------------+----------+-------+--------------+--------------------+
|customer_id| customer_name|            email| join_date|country|Validated_Date|       TimeStamp_Col|
+-----------+--------------+-----------------+----------+-------+--------------+--------------------+
|       C001| Alice Johnson|alice@example.com|2023-01-15|    USA|    2023-01-15|2026-02-13 20:35:...|
|       C002|     Bob Smith|     bob@test.com|2023-02-20|     UK|    2023-02-20|2026-02-13 20:35:...|
|       C003| Charlie Brown|          unknown|2023-03-05| Canada|    2023-03-05|2026-02-13 20:35:...|
|       C004|  David Miller|david.m@gmail.com|2026-02-13|Germany|    2026-02-13|2026-02-13 20:35:...|
|       C005|     Eve White|  eve@example.com|2023-05-12| France|    2023-05-12|2026-02-13 20:35:...|
|       C006| Alice Johnson|alice@example.com|2023-01-15|    USA|    2023-01-15|2026-02-13 20:35:...|
|       C006| Alice Johnson|alice@example.com|2023-01-15|    USA|    2023-01-15|20

In [0]:
customer_df_1 = customer_df

#Removing the duplicates using drop_duplicate based on customer_id

In [0]:
customer_df_1 = customer_df_1.drop_duplicates(['customer_id'])
customer_df_1.show()

+-----------+--------------+-----------------+----------+-------+--------------+--------------------+
|customer_id| customer_name|            email| join_date|country|Validated_Date|       TimeStamp_Col|
+-----------+--------------+-----------------+----------+-------+--------------+--------------------+
|       C006| Alice Johnson|alice@example.com|2023-01-15|    USA|    2023-01-15|2026-02-13 20:35:...|
|       C010|Isabella Moore|bella@example.com|2023-08-05|    USA|    2023-08-05|2026-02-13 20:35:...|
|       C007|    Frank Ross|   frank@test.com|2023-06-01|    USA|    2023-06-01|2026-02-13 20:35:...|
|       C003| Charlie Brown|          unknown|2023-03-05| Canada|    2023-03-05|2026-02-13 20:35:...|
|       C004|  David Miller|david.m@gmail.com|2026-02-13|Germany|    2026-02-13|2026-02-13 20:35:...|
|       C009|    Henry Ford|   henry@ford.com|2023-07-20|     UK|    2023-07-20|2026-02-13 20:35:...|
|       C008|     Grace Lee|          unknown|2023-06-15|  Japan|    2023-06-15|20

#Removing duplicates using Window function

In [0]:

windowPar = Window.partitionBy(col('customer_id')).orderBy(col('TimeStamp_Col').desc())

In [0]:
customer_df = customer_df.withColumn('count',row_number().over(windowPar))
customer_df.show()

+-----------+--------------+-----------------+----------+-------+--------------+--------------------+-----+
|customer_id| customer_name|            email| join_date|country|Validated_Date|       TimeStamp_Col|count|
+-----------+--------------+-----------------+----------+-------+--------------+--------------------+-----+
|       C001| Alice Johnson|alice@example.com|2023-01-15|    USA|    2023-01-15|2026-02-13 20:35:...|    1|
|       C002|     Bob Smith|     bob@test.com|2023-02-20|     UK|    2023-02-20|2026-02-13 20:35:...|    1|
|       C003| Charlie Brown|          unknown|2023-03-05| Canada|    2023-03-05|2026-02-13 20:35:...|    1|
|       C004|  David Miller|david.m@gmail.com|2026-02-13|Germany|    2026-02-13|2026-02-13 20:35:...|    1|
|       C005|     Eve White|  eve@example.com|2023-05-12| France|    2023-05-12|2026-02-13 20:35:...|    1|
|       C006| Alice Johnson|alice@example.com|2023-01-15|    USA|    2023-01-15|2026-02-13 20:35:...|    1|
|       C006| Alice Johnson|

In [0]:
customer_df = customer_df.filter(col('count')==1)
customer_df.show()

+-----------+--------------+-----------------+----------+-------+--------------+--------------------+-----+
|customer_id| customer_name|            email| join_date|country|Validated_Date|       TimeStamp_Col|count|
+-----------+--------------+-----------------+----------+-------+--------------+--------------------+-----+
|       C001| Alice Johnson|alice@example.com|2023-01-15|    USA|    2023-01-15|2026-02-13 20:35:...|    1|
|       C002|     Bob Smith|     bob@test.com|2023-02-20|     UK|    2023-02-20|2026-02-13 20:35:...|    1|
|       C003| Charlie Brown|          unknown|2023-03-05| Canada|    2023-03-05|2026-02-13 20:35:...|    1|
|       C004|  David Miller|david.m@gmail.com|2026-02-13|Germany|    2026-02-13|2026-02-13 20:35:...|    1|
|       C005|     Eve White|  eve@example.com|2023-05-12| France|    2023-05-12|2026-02-13 20:35:...|    1|
|       C006| Alice Johnson|alice@example.com|2023-01-15|    USA|    2023-01-15|2026-02-13 20:35:...|    1|
|       C007|    Frank Ross|

In [0]:
customer_df = customer_df.withColumnRenamed('TimeStamp_Col','date_timestamp')
customer_df = customer_df.withColumnRenamed('Validated_Date','validated_date')

In [0]:
customer_df = customer_df.drop('count','validated_date')
customer_df.show()

+-----------+--------------+-----------------+----------+-------+--------------------+
|customer_id| customer_name|            email| join_date|country|      date_timestamp|
+-----------+--------------+-----------------+----------+-------+--------------------+
|       C001| Alice Johnson|alice@example.com|2023-01-15|    USA|2026-02-13 20:35:...|
|       C002|     Bob Smith|     bob@test.com|2023-02-20|     UK|2026-02-13 20:35:...|
|       C003| Charlie Brown|          unknown|2023-03-05| Canada|2026-02-13 20:35:...|
|       C004|  David Miller|david.m@gmail.com|2026-02-13|Germany|2026-02-13 20:35:...|
|       C005|     Eve White|  eve@example.com|2023-05-12| France|2026-02-13 20:35:...|
|       C006| Alice Johnson|alice@example.com|2023-01-15|    USA|2026-02-13 20:35:...|
|       C007|    Frank Ross|   frank@test.com|2023-06-01|    USA|2026-02-13 20:35:...|
|       C008|     Grace Lee|          unknown|2023-06-15|  Japan|2026-02-13 20:35:...|
|       C009|    Henry Ford|   henry@ford.c

In [0]:
customer_df.write.format('delta').mode('overwrite').option('mergeSchema', 'true').saveAsTable('bhim_bricks.dbo.customer')

In [0]:
%sql

desc formatted bhim_bricks.dbo.customer

col_name,data_type,comment
customer_id,string,
customer_name,string,
email,string,
country,string,
validated_Date,date,
date_timestamp,timestamp,
join_date,date,
,,
# Delta Statistics Columns,,
Column Names,"date_timestamp, customer_id, customer_name, email, validated_Date, country",


# Creating Delta Table In The Unity Catalog Volumne For Storing SCDType1 Data

In [0]:
%sql

CREATE DATABASE IF NOT EXISTS bhim_bricks_scdtype1

In [0]:
%sql

CREATE TABLE IF NOT EXISTS goldlayer.customer_scdtype1 (
  customer_id STRING,
  customer_name STRING,
  email STRING,
  join_date DATE,
  country STRING,
  date_timestamp TIMESTAMP,
  updated_by STRING,
  updated_date TIMESTAMP,
  created_by STRING,
  created_date TIMESTAMP,
  hashvalue BIGINT
)

In [0]:
%sql

desc formatted goldlayer.customer_scdtype1

col_name,data_type,comment
customer_id,string,
customer_name,string,
email,string,
join_date,date,
country,string,
date_timestamp,timestamp,
updated_by,string,
updated_date,timestamp,
created_by,string,
created_date,timestamp,


In [0]:
customer_df = customer_df.withColumn("hashvalue",crc32(concat(col("customer_id"),col("customer_name"),col("email"),col("join_date"),col("country"),col("date_timestamp"))))

In [0]:
from delta.tables import DeltaTable

table_name = "goldlayer.customer_scdtype1"

delta_tgt = DeltaTable.forName(spark, table_name)

In [0]:
delta_tgt.alias("tgt").merge(
    customer_df.alias("src"),
    "tgt.customer_id = src.customer_id"
).whenMatchedUpdate(
    condition="tgt.hashvalue != src.hashvalue",
    set={
        "tgt.customer_id": "src.customer_id",
        "tgt.customer_name": "src.customer_name",
        "tgt.email": "src.email",
        "tgt.join_date": "src.join_date",
        "tgt.country": "src.country",
        "tgt.hashvalue":"src.hashvalue",
        "tgt.date_timestamp": "src.date_timestamp",
        "tgt.Updated_date" : current_timestamp(),
        "tgt.Updated_by" : lit("Databricks-Updated")
    }
).whenNotMatchedInsert(
    values = {
        "tgt.customer_id": "src.customer_id",
        "tgt.customer_name": "src.customer_name",
        "tgt.email": "src.email",
        "tgt.join_date": "src.join_date",
        "tgt.country": "src.country",
        "tgt.date_timestamp": "src.date_timestamp",
        "tgt.updated_by":  lit("Databricks-Updated") ,
        "tgt.updated_date": current_timestamp(),
        "tgt.created_by": lit("Databricks-Created"),
        "tgt.created_date": current_timestamp(),
        "tgt.hashvalue": "src.hashvalue"
    }
).execute()

In [0]:
%sql

select * from goldlayer.customer_scdtype1

customer_id,customer_name,email,join_date,country,date_timestamp,updated_by,updated_date,created_by,created_date,hashvalue
C001,Alice Johnson,alice@example.com,2023-01-15,USA,2026-02-13T20:38:29.503Z,Databricks-Updated,2026-02-13T20:38:29.503Z,Databricks-Created,2026-02-13T20:38:29.503Z,3445775555
C002,Bob Smith,bob@test.com,2023-02-20,UK,2026-02-13T20:38:29.503Z,Databricks-Updated,2026-02-13T20:38:29.503Z,Databricks-Created,2026-02-13T20:38:29.503Z,3917735542
C003,Charlie Brown,unknown,2023-03-05,Canada,2026-02-13T20:38:29.503Z,Databricks-Updated,2026-02-13T20:38:29.503Z,Databricks-Created,2026-02-13T20:38:29.503Z,1597858953
C004,David Miller,david.m@gmail.com,2026-02-13,Germany,2026-02-13T20:38:29.503Z,Databricks-Updated,2026-02-13T20:38:29.503Z,Databricks-Created,2026-02-13T20:38:29.503Z,454524778
C005,Eve White,eve@example.com,2023-05-12,France,2026-02-13T20:38:29.503Z,Databricks-Updated,2026-02-13T20:38:29.503Z,Databricks-Created,2026-02-13T20:38:29.503Z,496869729
C006,Alice Johnson,alice@example.com,2023-01-15,USA,2026-02-13T20:38:29.503Z,Databricks-Updated,2026-02-13T20:38:29.503Z,Databricks-Created,2026-02-13T20:38:29.503Z,3645238729
C007,Frank Ross,frank@test.com,2023-06-01,USA,2026-02-13T20:38:29.503Z,Databricks-Updated,2026-02-13T20:38:29.503Z,Databricks-Created,2026-02-13T20:38:29.503Z,1814422076
C008,Grace Lee,unknown,2023-06-15,Japan,2026-02-13T20:38:29.503Z,Databricks-Updated,2026-02-13T20:38:29.503Z,Databricks-Created,2026-02-13T20:38:29.503Z,1868704290
C009,Henry Ford,henry@ford.com,2023-07-20,UK,2026-02-13T20:38:29.503Z,Databricks-Updated,2026-02-13T20:38:29.503Z,Databricks-Created,2026-02-13T20:38:29.503Z,501299583
C010,Isabella Moore,bella@example.com,2023-08-05,USA,2026-02-13T20:38:29.503Z,Databricks-Updated,2026-02-13T20:38:29.503Z,Databricks-Created,2026-02-13T20:38:29.503Z,953116809
