In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("SCD_Type_II_SQL").getOrCreate()

25/07/23 09:42:51 WARN Utils: Your hostname, DESKTOP-HG7VAEJ resolves to a loopback address: 127.0.1.1; using 172.28.202.26 instead (on interface eth0)
25/07/23 09:42:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/23 09:42:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/07/23 09:42:54 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [None]:
customer_df = spark.read.option("header", True).csv("dataset/customer.csv")
new_customer_df = spark.read.option("header", True).csv("dataset/newcustomer.csv")

In [4]:
customer_df.createOrReplaceTempView("customer")
new_customer_df.createOrReplaceTempView("new_customer")

In [5]:
effective_date = "2025-07-20"
previous_date = "2025-07-19"

In [6]:
# Step 1: Expire old records where address changed
expired_records = spark.sql(f"""
SELECT
    c.Customer_ID,
    c.Name,
    c.Address,
    c.Start_Date,
    '{previous_date}' AS End_Date,
    'N' AS Is_Current
FROM customer c
JOIN new_customer n
  ON c.Customer_ID = n.Customer_ID
WHERE c.Address <> n.Address
  AND c.Is_Current = 'Y'
""")

In [7]:
expired_records.show()

+-----------+----------+--------+----------+----------+----------+
|Customer_ID|      Name| Address|Start_Date|  End_Date|Is_Current|
+-----------+----------+--------+----------+----------+----------+
|        101|  John Doe|New York|01-01-2022|2025-07-19|         N|
|        102|Jane Smith| Chicago|15-03-2023|2025-07-19|         N|
+-----------+----------+--------+----------+----------+----------+



In [8]:
# Step 2: Insert new updated records
new_records = spark.sql(f"""
SELECT
    n.Customer_ID,
    n.Name,
    n.Address,
    '{effective_date}' AS Start_Date,
    '' AS End_Date,
    'Y' AS Is_Current
FROM customer c
JOIN new_customer n
  ON c.Customer_ID = n.Customer_ID
WHERE c.Address <> n.Address
  AND c.Is_Current = 'Y'
""")

In [9]:
new_records.show()

+-----------+----------+-----------+----------+--------+----------+
|Customer_ID|      Name|    Address|Start_Date|End_Date|Is_Current|
+-----------+----------+-----------+----------+--------+----------+
|        101|  John Doe|Los Angeles|2025-07-20|        |         Y|
|        102|Jane Smith|     Dallas|2025-07-20|        |         Y|
+-----------+----------+-----------+----------+--------+----------+



In [10]:
# Step 3: Keep unchanged records
unchanged_records = spark.sql("""
SELECT *
FROM customer
WHERE Customer_ID NOT IN (
    SELECT c.Customer_ID
    FROM customer c
    JOIN new_customer n
      ON c.Customer_ID = n.Customer_ID
    WHERE c.Address <> n.Address
      AND c.Is_Current = 'Y'
)
""")


In [11]:
unchanged_records.show()

+-----------+------------+-------------+----------+--------+----------+
|Customer_ID|        Name|      Address|Start_Date|End_Date|Is_Current|
+-----------+------------+-------------+----------+--------+----------+
|        103| Michael Lee|San Francisco|20-07-2021|    NULL|         Y|
|        104| Emily Davis|       Boston|10-01-2024|    NULL|         Y|
|        105|David Wilson|      Seattle|05-11-2023|    NULL|         Y|
+-----------+------------+-------------+----------+--------+----------+



In [12]:
# Step 4: Combine final result
final_df = expired_records.union(new_records).union(unchanged_records)

In [13]:
final_df.orderBy("Customer_ID", "Start_Date").show(truncate=False)

+-----------+------------+-------------+----------+----------+----------+
|Customer_ID|Name        |Address      |Start_Date|End_Date  |Is_Current|
+-----------+------------+-------------+----------+----------+----------+
|101        |John Doe    |New York     |01-01-2022|2025-07-19|N         |
|101        |John Doe    |Los Angeles  |2025-07-20|          |Y         |
|102        |Jane Smith  |Chicago      |15-03-2023|2025-07-19|N         |
|102        |Jane Smith  |Dallas       |2025-07-20|          |Y         |
|103        |Michael Lee |San Francisco|20-07-2021|NULL      |Y         |
|104        |Emily Davis |Boston       |10-01-2024|NULL      |Y         |
|105        |David Wilson|Seattle      |05-11-2023|NULL      |Y         |
+-----------+------------+-------------+----------+----------+----------+



In [14]:
final_df.orderBy("Customer_ID", "Start_Date") \
    .coalesce(1) \
    .write.mode("overwrite") \
    .option("header", True) \
    .csv("scd_output")