In [1]:
%run NB - Data Producer with Null

StatementMeta(, 437176f0-479a-4e9f-bfdd-9be7232adf7e, 6, Finished, Available, Finished)

Collecting faker
  Downloading faker-37.5.3-py3-none-any.whl.metadata (15 kB)
Downloading faker-37.5.3-py3-none-any.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m0m
[?25hInstalling collected packages: faker
Successfully installed faker-37.5.3
Note: you may need to restart the kernel to use updated packages.


SynapseWidget(Synapse.DataFrame, 240bd080-8588-4aa4-a122-4653acb58bb2)

In [2]:
from pyspark.sql.functions import col, lit, lag, when, sha2, concat_ws, lead
from pyspark.sql.window import Window

StatementMeta(, 437176f0-479a-4e9f-bfdd-9be7232adf7e, 7, Finished, Available, Finished)

In [3]:
# Separate valid and invalid records
df_valid = df_null.filter(col("LoadDate").isNotNull())
df_invalid = df_null.filter(col("LoadDate").isNull())  # keep aside for audit
valid = df_valid.count()
invalid = df_invalid.count()

print(f"Valid record {valid}")
display(df_valid)

print(f"Invalid record {valid}")
display(df_invalid)

StatementMeta(, 437176f0-479a-4e9f-bfdd-9be7232adf7e, 8, Finished, Available, Finished)

Valid record 9


SynapseWidget(Synapse.DataFrame, 928e9bdc-573f-4044-9518-6b086975ebd3)

Invalid record 9


SynapseWidget(Synapse.DataFrame, 3e3264cf-76b1-4c2d-b5f2-799894616caf)

In [4]:
# Remove duplicate records (same EmpID, LoadDate, Name, JobTitle, Department)
df_valid = df_valid.dropDuplicates(["EmpID", "LoadDate", "Name", "JobTitle", "Department"])
display(df_valid)

StatementMeta(, 437176f0-479a-4e9f-bfdd-9be7232adf7e, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e19a6b24-9af6-47e6-b8dc-c45dbcb51374)

In [5]:
# Create a hash of the tracking columns
df_hashed = df_valid.withColumn(
    "row_sha2", 
    sha2(concat_ws("||", col("Name"), col("JobTitle"), col("Department")), 256)
)
display(df_hashed)

StatementMeta(, 437176f0-479a-4e9f-bfdd-9be7232adf7e, 10, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, afdbceef-07b3-48b4-bd03-2555d26900b3)

In [6]:
# Define window partitioned by EmpID and ordered by LoadDate
window_spec = Window.partitionBy("EmpID").orderBy("LoadDate")

# Detect changes
df_with_lag = df_hashed.withColumn(
    "prev_hash", lag("row_sha2").over(window_spec)
).withColumn(
    "change_flag", when(col("row_sha2") != col("prev_hash"), lit(1)).otherwise(lit(0))
).withColumn(
    "change_flag", when(col("prev_hash").isNull(), lit(1)).otherwise(col("change_flag"))
)
display(df_with_lag)

StatementMeta(, 437176f0-479a-4e9f-bfdd-9be7232adf7e, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3aa96e6b-a409-490e-afa2-da63fe720b10)

In [7]:
# Filter only change rows
df_changes = df_with_lag.filter(col("change_flag") == 1).drop("prev_hash", "change_flag")
display(df_changes)

StatementMeta(, 437176f0-479a-4e9f-bfdd-9be7232adf7e, 12, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 06189880-f8b4-4ecd-b2d7-b64a4d97015a)

In [8]:
# Add StartDate
df_changes = df_changes.withColumn("StartDate", col("LoadDate"))
display(df_changes)

StatementMeta(, 437176f0-479a-4e9f-bfdd-9be7232adf7e, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 041c4acd-5e70-4514-8d78-c004ff92d074)

In [9]:
# Define EndDate
window_spec_ver = Window.partitionBy("EmpID").orderBy("StartDate")
df_changes = df_changes.withColumn(
    "EndDate", 
    lead("StartDate").over(window_spec_ver)
)
display(df_changes)

StatementMeta(, 437176f0-479a-4e9f-bfdd-9be7232adf7e, 14, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 48343740-264c-464c-a32c-7994b5b2de8b)

In [10]:
# Set IsActive
df_changes = df_changes.withColumn(
    "IsActive", when(col("EndDate").isNull(), lit(True)).otherwise(lit(False))
)
display(df_changes)

StatementMeta(, 437176f0-479a-4e9f-bfdd-9be7232adf7e, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a1aa1e2c-a3f7-4645-bcb3-ce7eb94ed9ca)

In [11]:

# Final SCD2 table
scd2_final = df_changes.select(
    "EmpID", "Name", "Gender", "JobTitle", "Department", 
    "StartDate", "EndDate", "IsActive"
)

display(scd2_final)

StatementMeta(, 437176f0-479a-4e9f-bfdd-9be7232adf7e, 16, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 70dbb254-e84c-4985-a3b9-c3eb991d1dbd)

In [12]:
df = scd2_final.filter(scd2_final["IsActive"] == "true")
display(df)

StatementMeta(, 437176f0-479a-4e9f-bfdd-9be7232adf7e, 17, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 537b3315-6792-4120-8bc1-360d4337c6c5)