+---+-------+----------+----------+-----+------+
| ID|Product|Start_date|  End_date|Price|Status|
+---+-------+----------+----------+-----+------+
|  1|      A|2024-02-28|2024-04-13| 2000|     0|
|  1|      A|2024-02-28|2024-04-13| 2000|     0|
|  1|      A|2024-02-18|2024-04-13|   12|     0|
|  1|      A|2024-02-18|2024-04-13|   12|     0|
|  2|      B|2024-04-17|2024-12-04|   34|     1|
|  3|      C|2024-04-09|2024-12-04|   35|     1|
|  4|      D|2024-04-15|2024-12-04|   67|     1|
| 10|      Z|2023-12-25|2024-12-04|  109|     1|
+---+-------+----------+----------+-----+------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, current_date, date_sub, to_date, when
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("SCD Type 2 Implementation") \
    .getOrCreate()

# Define schemas
source_schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Product", StringType(), True),
    StructField("Transaction_date", StringType(), True),  # Initially as StringType
    StructField("Price", IntegerType(), True)
])

target_schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Product", StringType(), True),
    StructField("Transaction_date", StringType(), True),  # Initially as StringType
    StructField("Price", IntegerType(), True)
])

# Sample data
source_data = [
    (1, "A", "2024-04-14", 60),
    (2, "B", "2024-04-17", 34),
    (3, "C", "2024-04-09", 35),
    (4, "D", "2024-04-15", 67),
    (10, "Z", "2023-12-25", 109)
]

target_data = [
    (11, "U", "2024-02-28", 101),
    (1, "A", "2024-02-28", 2000),
    (1, "A", "2024-02-18", 12)
]

# Create DataFrames
source_df = spark.createDataFrame(source_data, schema=source_schema)
target_df = spark.createDataFrame(target_data, schema=target_schema)

# Convert date columns to DateType
source_df = source_df.withColumn("Transaction_date", to_date(col("Transaction_date"), "yyyy-MM-dd"))
target_df = target_df.withColumn("Transaction_date", to_date(col("Transaction_date"), "yyyy-MM-dd"))

# Add Start_date and End_date columns to target_df
target_df = target_df.withColumnRenamed("Transaction_date", "Start_date") \
                     .withColumn("End_date", lit(None).cast(DateType())) \
                     .withColumn("Status", lit(1))

# Alias the DataFrames to avoid column name conflicts
source_df = source_df.alias("source")
target_df = target_df.alias("target")

# Join the source and target DataFrames to identify updates
joined_df = source_df.join(target_df, (source_df.ID == target_df.ID) & (source_df.Product == target_df.Product), "left")

# Separate the DataFrames into new records and existing records that need updating
new_records = joined_df.filter(col("target.ID").isNull()).select(source_df["*"])
existing_records = joined_df.filter(col("target.ID").isNotNull()).select(
    source_df["*"],
    col("target.Start_date").alias("target_Start_date"),
    col("target.End_date").alias("target_End_date"),
    col("target.Status").alias("target_Status")
)

# For existing records, update the End_date in the target DataFrame
updated_target_df = target_df.join(existing_records, (target_df.ID == existing_records.ID) & (target_df.Product == existing_records.Product), "inner") \
    .withColumn("End_date", when(col("source.Transaction_date") > col("target_Start_date"), date_sub(col("source.Transaction_date"), 1)).otherwise(col("target_End_date"))) \
    .withColumn("Status", when(col("source.Transaction_date") > col("target_Start_date"), lit(0)).otherwise(col("target_Status"))) \
    .select("target.ID", "target.Product", "target.Start_date", "End_date", "target.Price", "Status")

# Add new records to the target DataFrame with Start_date and Status 1
new_records = new_records.withColumn("Start_date", col("Transaction_date")) \
    .withColumn("End_date", lit(None).cast(DateType())) \
    .withColumn("Status", lit(1))

# Combine the updated target DataFrame with the new records
final_target_df = updated_target_df.union(new_records.select("ID", "Product", "Start_date", "End_date", "Price", "Status"))

# Update all active records to have today's date as End_date
final_target_df = final_target_df.withColumn("End_date", when(col("Status") == 1, current_date()).otherwise(col("End_date")))

# Show the final target DataFrame
final_target_df.show(truncate=False)


+---+-------+----------+----------+-----+------+
|ID |Product|Start_date|End_date  |Price|Status|
+---+-------+----------+----------+-----+------+
|1  |A      |2024-02-28|2024-04-13|2000 |0     |
|1  |A      |2024-02-28|2024-04-13|2000 |0     |
|1  |A      |2024-02-18|2024-04-13|12   |0     |
|1  |A      |2024-02-18|2024-04-13|12   |0     |
|2  |B      |2024-04-17|2024-12-04|34   |1     |
|3  |C      |2024-04-09|2024-12-04|35   |1     |
|4  |D      |2024-04-15|2024-12-04|67   |1     |
|10 |Z      |2023-12-25|2024-12-04|109  |1     |
+---+-------+----------+----------+-----+------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, current_date, date_sub, to_date, when, lead
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("SCD Type 2 Implementation") \
    .getOrCreate()

# Define schemas
source_schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Product", StringType(), True),
    StructField("Transaction_date", StringType(), True),  # Initially as StringType
    StructField("Price", IntegerType(), True)
])

target_schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Product", StringType(), True),
    StructField("Transaction_date", StringType(), True),  # Initially as StringType
    StructField("Price", IntegerType(), True)
])

# Sample data
source_data = [
    (1, "A", "2024-04-14", 60),
    (2, "B", "2024-04-17", 34),
    (3, "C", "2024-04-09", 35),
    (4, "D", "2024-04-15", 67),
    (10, "Z", "2023-12-25", 109)
]

target_data = [
    (11, "U", "2024-02-28", 101),
    (1, "A", "2024-02-28", 2000),
    (1, "A", "2024-02-18", 12)
]

# Create DataFrames
source_df = spark.createDataFrame(source_data, schema=source_schema)
target_df = spark.createDataFrame(target_data, schema=target_schema)

# Convert date columns to DateType
source_df = source_df.withColumn("Transaction_date", to_date(col("Transaction_date"), "yyyy-MM-dd"))
target_df = target_df.withColumn("Transaction_date", to_date(col("Transaction_date"), "yyyy-MM-dd"))

# Add Start_date and End_date columns to target_df
target_df = target_df.withColumnRenamed("Transaction_date", "Start_date") \
                     .withColumn("End_date", lit(None).cast(DateType())) \
                     .withColumn("Status", lit(1))

# Alias the DataFrames to avoid column name conflicts
source_df = source_df.alias("source")
target_df = target_df.alias("target")

# Join the source and target DataFrames to identify updates
joined_df = source_df.join(target_df, (source_df.ID == target_df.ID) & (source_df.Product == target_df.Product), "left")

# Separate the DataFrames into new records and existing records that need updating
new_records = joined_df.filter(col("target.ID").isNull()).select(source_df["*"])
existing_records = joined_df.filter(col("target.ID").isNotNull()).select(
    source_df["*"],
    col("target.Start_date").alias("target_Start_date"),
    col("target.End_date").alias("target_End_date"),
    col("target.Status").alias("target_Status")
)

# For existing records, update the End_date in the target DataFrame
updated_target_df = target_df.join(existing_records, (target_df.ID == existing_records.ID) & (target_df.Product == existing_records.Product), "inner") \
    .withColumn("End_date", when(col("source.Transaction_date") > col("target_Start_date"), date_sub(col("source.Transaction_date"), 1)).otherwise(col("target_End_date"))) \
    .withColumn("Status", when(col("source.Transaction_date") > col("target_Start_date"), lit(0)).otherwise(col("target_Status"))) \
    .select("target.ID", "target.Product", "target.Start_date", "End_date", "target.Price", "Status")

# Add new records to the target DataFrame with Start_date and Status 1
new_records = new_records.withColumn("Start_date", col("Transaction_date")) \
    .withColumn("End_date", lit(None).cast(DateType())) \
    .withColumn("Status", lit(1))

# Combine the updated target DataFrame with the new records
final_target_df = updated_target_df.union(new_records.select("ID", "Product", "Start_date", "End_date", "Price", "Status"))

# Create a window specification for determining the last entry
window_spec = Window.partitionBy("ID", "Product").orderBy(col("Start_date").desc())

# Update all active records to have today's date as End_date and ensure only the last entry is active
final_target_df = final_target_df.withColumn("next_Start_date", lead("Start_date").over(window_spec)) \
    .withColumn("End_date", when(col("Status") == 1, when(col("next_Start_date").isNull(), current_date()).otherwise(date_sub(col("next_Start_date"), 1))).otherwise(col("End_date"))) \
    .withColumn("Status", when(col("next_Start_date").isNull(), lit(1)).otherwise(lit(0))) \
    .drop("next_Start_date")

# Show the final target DataFrame
final_target_df.show(truncate=False)


+---+-------+----------+----------+-----+------+
|ID |Product|Start_date|End_date  |Price|Status|
+---+-------+----------+----------+-----+------+
|1  |A      |2024-02-28|2024-04-13|2000 |0     |
|1  |A      |2024-02-28|2024-04-13|2000 |0     |
|1  |A      |2024-02-18|2024-04-13|12   |0     |
|1  |A      |2024-02-18|2024-04-13|12   |1     |
|2  |B      |2024-04-17|2024-12-04|34   |1     |
|3  |C      |2024-04-09|2024-12-04|35   |1     |
|4  |D      |2024-04-15|2024-12-04|67   |1     |
|10 |Z      |2023-12-25|2024-12-04|109  |1     |
+---+-------+----------+----------+-----+------+

