In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, lit, current_date
import pandas as pd

In [2]:
TEMP_PATH = os.environ.get("TEMP", "C:/Temp")

In [3]:
# Inisialisasi Spark
spark = SparkSession.builder \
    .appName("FlaggingData") \
    .master("local[*]") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.default.parallelism", "8") \
    .config("spark.python.worker.memory", "512m") \
    .config("spark.sql.execution.arrow.enabled", "true") \
    .config("spark.local.dir", TEMP_PATH) \
    .getOrCreate()

In [4]:
# Load staging data

staging_df = spark.read.csv("staging_data.csv", header=True, inferSchema=True)
final_df = spark.read.csv("data.csv", header=True, inferSchema=True)

In [5]:
staging_df.show()

+-----------+-------------+-----------+
|customer_id|customer_name|    address|
+-----------+-------------+-----------+
|          1|     John Doe| 123 Elm St|
|          2|   Jane Smith| 456 Oak St|
|          3|    Jim Brown|789 Pine St|
+-----------+-------------+-----------+



In [6]:
final_df.show()

+-----------+-------------+------------+--------------+---------------+------------+
|customer_id|customer_name|     address|effective_date|expiration_date|current_flag|
+-----------+-------------+------------+--------------+---------------+------------+
|          1|     John Doe|123 Maple St|    2024-01-01|     9999-12-31|           1|
|          2|   Jane Smith|  456 Oak St|    2024-02-01|     9999-12-31|           1|
+-----------+-------------+------------+--------------+---------------+------------+



In [17]:
def apply_scd2(
    source_df: DataFrame,
    target_df: DataFrame,
    join_keys: list,
    scd_columns: list,
    effective_date_col: str = "effective_date",
    end_date_col: str = "expiration_date",
    current_flag_col: str = "current_flag"
) -> DataFrame:
    """
    Function to implement SCD2 in PySpark.

    Parameters:
    - source_df: The source DataFrame with new/updated records.
    - target_df: The target DataFrame that maintains SCD2 history.
    - join_keys: List of columns used for joining source and target DataFrames.
    - scd_columns: List of columns to check for changes in source data.
    - effective_date_col: Name of the column representing the effective date.
    - end_date_col: Name of the column representing the end date.
    - current_flag_col: Name of the column representing the current flag.

    Returns:
    - A DataFrame with updated SCD2 history.
    """

    # Step 1: Join keys and change condition
    join_condition = [source_df[key] == target_df[key] for key in join_keys]
    change_condition = None
    for col_name in scd_columns:
        condition = col(f"t.{col_name}") != col(f"s.{col_name}")
        change_condition = condition if change_condition is None else change_condition | condition

    # Step 2: Identify updated records
    updated_records = (
        source_df.alias("s")
        .join(target_df.filter(col(current_flag_col) == 1).alias("t"), join_condition, how="inner")
        .filter(change_condition)
        .select("t.*")
    )

    # Expire these records in target
    expired_records = updated_records.withColumn(
        end_date_col, current_date()-1
    ).withColumn(current_flag_col, lit(0))  # Change to 0 for expired records

    # Prepare new versions of updated records
    new_versions = (
        source_df.alias("s")
        .join(updated_records.alias("t"), join_condition, how="inner")
        .select(
            "s.*",
            lit(current_date()).alias(effective_date_col),
            lit('9999-12-31').cast("date").alias(end_date_col),
            lit(1).alias(current_flag_col),  # Change to 1 for new current records
        )
    )

    # Step 3: Identify new records (not in target)
    new_records = (
        source_df.alias("s")
        .join(target_df.alias("t"), join_condition, how="left_anti")
        .select(
            "s.*",
            lit(current_date()).alias(effective_date_col),
            lit('9999-12-31').cast("date").alias(end_date_col),
            lit(1).alias(current_flag_col),  # Change to 1 for new current records
        )
    )

    # Step 4: Combine all records
    final_df = (
        target_df.filter(col(current_flag_col) == 1)  # Unchanged records
        .subtract(updated_records)  # Exclude updated records
        .union(expired_records)  # Include expired records
        .union(new_versions)  # Include new versions of updated records
        .union(new_records)  # Include new records
    )

    # Sort the final result
    final_df = final_df.orderBy(
        *join_keys,
        col(effective_date_col).asc(),
        col(end_date_col).asc()
    )

    return final_df

In [18]:
scd_data = apply_scd2(
    source_df=staging_df,
    target_df=final_df,
    join_keys=["customer_id"],
    scd_columns=["customer_name", "address"]
)

In [19]:
scd_data.show()

+-----------+-------------+------------+--------------+---------------+------------+
|customer_id|customer_name|     address|effective_date|expiration_date|current_flag|
+-----------+-------------+------------+--------------+---------------+------------+
|          1|     John Doe|123 Maple St|    2024-01-01|     2024-11-20|           0|
|          1|     John Doe|  123 Elm St|    2024-11-21|     9999-12-31|           1|
|          2|   Jane Smith|  456 Oak St|    2024-02-01|     9999-12-31|           1|
|          3|    Jim Brown| 789 Pine St|    2024-11-21|     9999-12-31|           1|
+-----------+-------------+------------+--------------+---------------+------------+



In [24]:
scd_filter = scd_data.filter(
    (lit('2024-11-21') >= scd_data["effective_date"]) 
    & (lit('2024-11-21') <= scd_data["expiration_date"])
    & (scd_data["current_flag"] == 1)
    )

scd_filter.show()

+-----------+-------------+-----------+--------------+---------------+------------+
|customer_id|customer_name|    address|effective_date|expiration_date|current_flag|
+-----------+-------------+-----------+--------------+---------------+------------+
|          1|     John Doe| 123 Elm St|    2024-11-21|     9999-12-31|           1|
|          2|   Jane Smith| 456 Oak St|    2024-02-01|     9999-12-31|           1|
|          3|    Jim Brown|789 Pine St|    2024-11-21|     9999-12-31|           1|
+-----------+-------------+-----------+--------------+---------------+------------+



In [25]:
spark.stop()