###**Creating Schema**
Creating Source and Target Schemas in Spark SQL

In [0]:
spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.source")  # Create source schema if it doesn't exist
spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.target")  # Create target schema if it doesn't exist

DataFrame[]

### **Loading and Saving sales2 Data Table**
Read from AccuWeather Source and Overwrite to Workspace Source Schema

In [0]:
sales2 = spark.sql("select * from samples.bakehouse.sales_suppliers")
sales2.write.mode("overwrite").saveAsTable("workspace.source.sales2")

### **Read and Display SCD1 Source Table**
Load Data from workspace.source.scd1 and Visualize Contents

In [0]:
source = spark.read.table('workspace.source.sales2')
source.display()

### Generate Row-Level Hash Using SHA-256
Create RowHash from Concatenated Values and Drop ConCatValue

In [0]:
# Create a hash key by concatenating all columns into a single string column 'RowHash'
from pyspark.sql import functions as F

# Concatenate all columns in 'source' DataFrame into 'RowHash'
source = source.withColumn('RowHash', F.sha2(F.concat_ws('', *source.columns), 256))

In [0]:
display(source)

supplierID,name,ingredient,continent,city,district,size,longitude,latitude,approved,RowHash
4000000,Cacao Wonders,cacao,South America,Guayaquil,Las Peñas,M,-79.8974,-2.1791,Y,19e37f9076246ae6ffb69e9be7ed2073d94656fb55aadc78ee09807ffd86ff36
4000001,Coconut Grove,coconut,Asia,Manila,Intramuros,S,121.0221,14.6042,Y,dd8379700727ada5448616395cee167291078ba63a10ae62f20ea3708b19004a
4000002,Almond Delights,almonds,Europe,Valencia,Ruzafa,L,-0.3762,39.4699,Y,2618ef1944d020b72ae94ce0343b054c4818855400650829c903c0a0bec5004c
4000003,Sugar Cane Harvest,cane sugar,South America,Sao Paulo,Vila Madalena,XL,-46.6333,-23.5489,Y,98327b4f32a42f832fa286d6fd6c96ba0734010fc2fdb4cb3ac62aa321f2b06c
4000004,Vanilla Valley,vanilla,North America,Mexico City,Roma Norte,M,-99.1332,19.4326,Y,e6d6a60d578c4b2907df80d8b136e67370b976e9a0754f68a4dd914058b61839
4000005,Pecan Pleasures,pecans,North America,Atlanta,Virginia-Highland,S,-84.3888,33.749,Y,eb24f9d60f76123cd4744b9bff18c66b5c1babd2a3950faef75a353837c96dd0
4000006,Hazelnut Haven,hazelnuts,Europe,Istanbul,Kadıköy,XXL,28.9784,41.0082,Y,432fd8759f3212a0b0fb1b90a232b03f65ff6883d1cf4247f71366737d55d732
4000007,Cinnamon Spice,cinnamon,Asia,Colombo,Galle Face Green,L,79.8612,6.9271,Y,5bb8fa1ebe1c59e25cf6022593adba52ad61c9fb79d47eccd81d402afa43597c
4000008,Cashew Corner,cashews,Asia,Goa,Anjuna Beach,XL,73.8067,15.3173,Y,edfa576dd2e0c945b119604b91e8d4680a64e069945367ff633b7ae2d72be8fe
4000009,Maple Monarch,maple syrup,North America,Montreal,Plateau Mont-Royal,M,-73.5673,45.5017,Y,2de584aa231a74956a5ac5dfd69a06aab3050410b11f8c89c095b95b4feaee24


### Add Metadata Columns to Source Data
Include IndCurrent, CreatedDate, and ModifiedDate in the Dataset

In [0]:
# Add three new columns to source :
# 1. 'IndCurrent': Set to 1 for all rows, indicating the current/active record.
# 2. 'CreatedDate': Set to the current timestamp, representing when the record was created.
# 3. 'ModifiedDate': Set to the current timestamp, representing when the record was last modified.
source = source.withColumn("IndCurrent", F.lit(1)) \
    .withColumn("CreatedDate", F.current_timestamp()) \
    .withColumn("ModifiedDate", F.current_timestamp())

### Add storage_id in Ascending Order
Assign Unique Row Numbers and Place storage_id as the First Column

In [0]:
from pyspark.sql.window import Window
# Define the window specification
window_spec = Window.orderBy(F.monotonically_increasing_id())

# Add a row number column based on the window specification
source = source.withColumn("storage_id", F.row_number().over(window_spec))

first_cols = ["storage_id"]
other_cols = [col for col in source.columns if col not in first_cols]
source = source.select(first_cols + other_cols)



### Append Data to Delta Table
Write source DataFrame to workspace.target.sales2 Using Delta Format


In [0]:
#table_name= 'workspace.target.sales2'
#source.write.format('delta').mode('append').saveAsTable(table_name)

### Update Ingredient Column Based on SupplierID
Set ingredient to 'Mumbai' Where supplierID is "4000000"

In [0]:
from pyspark.sql.functions import col, when

Source = source.withColumn(
    "ingredient",
    when(col("supplierID") == "4000000", "Mumbai").otherwise(col("ingredient"))
)



### Select Subset of Columns
Extract All Columns from Source Except the First and Last Four

> Slicing is the process of extracting a subset of elements from a sequence (like a list, string, or array) using their index positions.
eg. 1:4 is the slice — it includes elements from index 1 to 3 (4 is excluded).
  You can also use slicing like list[1:-2] to exclude from end.

In [0]:
cols= Source.columns
cols = cols[1:-4]

### Generate Secure Hash from Selected Columns
Create RowHash Using SHA-256 on Specific Columns in SourceDf

In [0]:
from pyspark.sql import functions as F

# Concatenate all columns in 'source' DataFrame into 'RowHash'
SourceDf = Source.withColumn('RowHash', F.sha2(F.concat_ws('', *cols), 256))



In [0]:
SourceDf=SourceDf.drop("storage_Id")



###Detect New or Changed Records by Comparing Hashes
Join with Target Table, Set Flag, Filter Only New Records, and Drop TargetHash


In [0]:
TargetDf=spark.read.table(table_name).select(['supplierID','RowHash','storage_id']).withColumnRenamed('RowHash','TargetHash')
SourceDf=SourceDf.join(TargetDf, on =['supplierID'], how='left').withColumn('Flag', F.when(col('TargetHash').isNull() | (col('TargetHash') != col('RowHash')), 'New').when(col('TargetHash') == col('RowHash'), 'NoChange').otherwise('Update'))
# Drop the TargetHash column
SourceDf=SourceDf.drop('TargetHash')
SourceDf=SourceDf.filter(col("Flag") == "New")
SourceDf.display()

###Implement SCD Type 2 with Delta Lake Merge
Mark Old Records as Inactive and Insert New Rows with Unique Surrogate Keys

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp, lit, col, udf
from pyspark.sql.types import StringType
import uuid

# Configuration
table_name = "workspace.target.sales2"
key_column = "supplierID"
hash_column = "RowHash"
is_current_column = "IndCurrent"
surrogate_key_column = "storage_id"
created_column = "CreatedDate"

# Reference Delta table
target_table = DeltaTable.forName(spark, table_name)

# Add new columns to source DataFrame
uuid_udf = udf(lambda: str(uuid.uuid4()), StringType())
SourceDf = SourceDf \
    .withColumn(surrogate_key_column, uuid_udf()) \
    .withColumn(created_column, current_timestamp()) \
    .withColumn(is_current_column, lit(1))

# Use aliases properly
src = SourceDf.alias("src")
tgt = target_table.alias("tgt")

# Use column expressions (not strings) in merge condition
tgt.merge(
    source=src,
    condition=(
        (col(f"tgt.{key_column}") == col(f"src.{key_column}")) &
        (col(f"tgt.{is_current_column}") == lit(1))
    )
).whenMatchedUpdate(
    condition=col(f"tgt.{hash_column}") != col(f"src.{hash_column}"),
    set={
        is_current_column: lit(0)
    }
)

<delta.connect.tables.DeltaMergeBuilder at 0xff8ceec04650>



### Reassign and Append New Storage IDs
Drop Old Columns, Generate New storage_id, Set IndCurrent, and Append to Delta Table

In [0]:
SourceDf = SourceDf.drop('storage_id','Flag')
max_storage_id = spark.sql(f"select max(storage_id) as max_id from {table_name}").first()['max_id']
next_storage_id = 1 if not max_storage_id or max_storage_id == 0 else max_storage_id + 1

SourceDf = SourceDf.withColumn('storage_id', lit(next_storage_id))
SourceDf = SourceDf.withColumn('IndCurrent', lit(1))
SourceDf.write.format('delta').mode('append').saveAsTable(table_name)

