# Slowly Changing Dimensions - Type 4

In [1]:
import os
import shutil
from datetime import datetime

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.types as T
import pyspark.sql.functions as F

In [3]:
sc = SparkContext()
spark = SparkSession.builder.appName('scd1').getOrCreate()

## 1. Preperation steps
In the following cells we will perform the following steps:
1. Read-in our target dataframe
2. Add our technical columns to this dataframe
3. Save the target dataframe as our 'source' dataframe (initial load)

### Read-in dataframe

In [126]:
# Define schema
schema = T.StructType([
    T.StructField('INDEX', T.IntegerType(), True), 
    T.StructField('CUSTOMER_ID', T.StringType(), True), 
    T.StructField('FIRST_NAME', T.StringType(), True), 
    T.StructField('LAST_NAME', T.StringType(), True), 
    T.StructField('COMPANY', T.StringType(), True), 
    T.StructField('CITY', T.StringType(), True), 
    T.StructField('COUNTRY', T.StringType(), True), 
    T.StructField('PHONE_1', T.StringType(), True), 
    T.StructField('PHONE_2', T.StringType(), True), 
    T.StructField('EMAIL', T.StringType(), True), 
    T.StructField('SUBSCRIPTION_DATE', T.DateType(), True), 
    T.StructField('WEBSITE', T.StringType(), True)
])

# Read-in dataframe
df = (
    spark
    .read
    .option('header', True)
    .option('schema', schema)
    .csv('scd1_data/source.csv')
)

# Show dataframe
df.show(5)

+-----+---------------+----------+---------+--------------------+-----------------+------------------+--------------------+----------------+--------------------+-----------------+--------------------+
|INDEX|    CUSTOMER_ID|FIRST_NAME|LAST_NAME|             COMPANY|             CITY|           COUNTRY|             PHONE_1|         PHONE_2|               EMAIL|SUBSCRIPTION_DATE|             WEBSITE|
+-----+---------------+----------+---------+--------------------+-----------------+------------------+--------------------+----------------+--------------------+-----------------+--------------------+
|    1|DD37Cf93aecA6Dc|   Sheryll|   Baxter|     Rasmussen Group|     East Leonard|             Chile|        229.077.5154|397.884.0519x718|zunigavanessa@smi...|       2020-08-24|http://www.stephe...|
|    1|xxxxxxxxxxxxxxx|Skaarrrrrt|    Baked|     Rasmussen Group|     East Leonard|             Chile|        229.077.5154|397.884.0519x718|zunigavanessa@smi...|       2020-08-24|http://www.stephe

### Saving dataframe as 'output'
PySpark has a particular way of saving parquet, delta, and csv files.
Because of this, we need to create a helper function, so that our output is saved as a single csv file.
Do not worry to much about understanding this code.

In [127]:
def save_as_csv(df: DataFrame, file_path: str):
    tmp_folder = file_path + 'tmp'
    
    # Save DataFrame to a temporary folder
    (
        df
        .coalesce(1)  # Ensure a single partition
        .write
        .mode('overwrite')
        .format('csv')
        .option('header', True)
        .save(tmp_folder)
    )
    
    # Find the single partition file
    for file_name in os.listdir(tmp_folder):
        if file_name.endswith('.csv'):
            tmp_file_path = os.path.join(tmp_folder, file_name)
            break
    
    # Move and rename the file to the final destination
    shutil.move(tmp_file_path, file_path)
    
    # Remove the temporary folder
    shutil.rmtree(tmp_folder)

In [128]:
save_as_csv(df=df, file_path='scd1_data/target.csv')

## 2. Starting the SCD1 Proces
Now, we will begin with the implementation of the Slowly Changing Dimensions type 1. We will be implementing the following steps:
1. Change the target dataframe by adding or editing some rows.
2. Read-in the target and source dataframe.
3. Select the rows in source dataframe that are new.
4. Select the rows in source dataframe that have been deleted.
5. Select the rows in source dataframe that are updated.
6. Insert, update, and/or delete the selected rows in the source dataframe.

### Step 1: Change the target dataframe by adding or editing some rows

* Make some alterations to the source data.
* You can find this file under notebooks/scd1_data/source.csv

### Step 2: Read-in the target and source dataframe.

In [129]:
# Read-in the source dataframe
source_schema = T.StructType([
    T.StructField('INDEX', T.IntegerType(), True), 
    T.StructField('CUSTOMER_ID', T.StringType(), True), 
    T.StructField('FIRST_NAME', T.StringType(), True), 
    T.StructField('LAST_NAME', T.StringType(), True), 
    T.StructField('COMPANY', T.StringType(), True), 
    T.StructField('CITY', T.StringType(), True), 
    T.StructField('COUNTRY', T.StringType(), True), 
    T.StructField('PHONE_1', T.StringType(), True), 
    T.StructField('PHONE_2', T.StringType(), True), 
    T.StructField('EMAIL', T.StringType(), True), 
    T.StructField('SUBSCRIPTION_DATE', T.DateType(), True), 
    T.StructField('WEBSITE', T.StringType(), True)
])
source_df = (
    spark
    .read
    .option('header', True)
    .option('schema', schema)
    .csv('scd1_data/source.csv')
)

# Read-in the target dataframe
target_schema = T.StructType([
    T.StructField('INDEX', T.IntegerType(), True), 
    T.StructField('CUSTOMER_ID', T.StringType(), True), 
    T.StructField('FIRST_NAME', T.StringType(), True), 
    T.StructField('LAST_NAME', T.StringType(), True), 
    T.StructField('COMPANY', T.StringType(), True), 
    T.StructField('CITY', T.StringType(), True), 
    T.StructField('COUNTRY', T.StringType(), True), 
    T.StructField('PHONE_1', T.StringType(), True), 
    T.StructField('PHONE_2', T.StringType(), True), 
    T.StructField('EMAIL', T.StringType(), True), 
    T.StructField('SUBSCRIPTION_DATE', T.DateType(), True), 
    T.StructField('WEBSITE', T.StringType(), True),
])
target_df = (
    spark
    .read
    .option('header', True)
    .option('schema', schema)
    .csv('scd1_data/target.csv')
)

In [130]:
source_df.show(5)

+-----+---------------+----------+---------+---------------+-----------------+--------+------------+----------------+--------------------+-----------------+--------------------+
|INDEX|    CUSTOMER_ID|FIRST_NAME|LAST_NAME|        COMPANY|             CITY| COUNTRY|     PHONE_1|         PHONE_2|               EMAIL|SUBSCRIPTION_DATE|             WEBSITE|
+-----+---------------+----------+---------+---------------+-----------------+--------+------------+----------------+--------------------+-----------------+--------------------+
|    1|DD37Cf93aecA6Dc|   Sheryll|   Baxter|Rasmussen Group|     East Leonard|   Chile|229.077.5154|397.884.0519x718|zunigavanessa@smi...|       2020-08-24|http://www.stephe...|
|    1|xxxxxxxxxxxxxxx|Skaarrrrrt|    Baked|Rasmussen Group|     East Leonard|   Chile|229.077.5154|397.884.0519x718|zunigavanessa@smi...|       2020-08-24|http://www.stephe...|
|    2|1Ef7b82A4CAAD10|   Preston|   Lozano|    Vega-Gentry|East Jimmychester|Djibouti|  5153435776|686-620-18

In [131]:
target_df.show(5)

+-----+---------------+----------+---------+--------------------+-----------------+------------------+--------------------+----------------+--------------------+-----------------+--------------------+
|INDEX|    CUSTOMER_ID|FIRST_NAME|LAST_NAME|             COMPANY|             CITY|           COUNTRY|             PHONE_1|         PHONE_2|               EMAIL|SUBSCRIPTION_DATE|             WEBSITE|
+-----+---------------+----------+---------+--------------------+-----------------+------------------+--------------------+----------------+--------------------+-----------------+--------------------+
|    1|DD37Cf93aecA6Dc|   Sheryll|   Baxter|     Rasmussen Group|     East Leonard|             Chile|        229.077.5154|397.884.0519x718|zunigavanessa@smi...|       2020-08-24|http://www.stephe...|
|    1|xxxxxxxxxxxxxxx|Skaarrrrrt|    Baked|     Rasmussen Group|     East Leonard|             Chile|        229.077.5154|397.884.0519x718|zunigavanessa@smi...|       2020-08-24|http://www.stephe

### Step 3: Select the rows in source dataframe that are new.

In [132]:
def get_inserts(source_df: DataFrame, target_df: DataFrame):
    # Find rows in source_df that are not present in target_df
    insert_df = source_df.join(target_df, on='CUSTOMER_ID', how='leftanti')
    
    return insert_df

In [133]:
insert_df = get_inserts(source_df, target_df)

In [134]:
insert_df.show(5)

+-----------+-----+----------+---------+-----------+-----------------+--------+----------+----------------+---------------+-----------------+--------------------+
|CUSTOMER_ID|INDEX|FIRST_NAME|LAST_NAME|    COMPANY|             CITY| COUNTRY|   PHONE_1|         PHONE_2|          EMAIL|SUBSCRIPTION_DATE|             WEBSITE|
+-----------+-----+----------+---------+-----------+-----------------+--------+----------+----------------+---------------+-----------------+--------------------+
|         aa|    2|  Preston2|  Lozano2|Vega-Gentry|East Jimmychester|Djibouti|5153435776|686-620-1820x944|vmata@colon.com|       2021-04-23|http://www.hobbs....|
|         bb|    2|  Preston2|  Lozano2|Vega-Gentry|East Jimmychester|Djibouti|5153435776|686-620-1820x944|vmata@colon.com|       2021-04-23|http://www.hobbs....|
|         cc|    2|  Preston2|  Lozano2|Vega-Gentry|East Jimmychester|Djibouti|5153435776|686-620-1820x944|vmata@colon.com|       2021-04-23|http://www.hobbs....|
|         dd|    2|  P

### Step 4: Select the rows in source dataframe that have been deleted.

In [135]:
def get_deletes(source_df: DataFrame, target_df: DataFrame):
    # Find rows in target_df that are not present in source_df
    delete_df = target_df.join(source_df, on='CUSTOMER_ID', how='leftanti')
    
    return delete_df

In [136]:
delete_df = get_deletes(source_df, target_df).select(*source_df.columns) # extra select to keep column order

In [137]:
delete_df.show(5)

+-----+-----------+----------+---------+-------+----+-------+-------+-------+-----+-----------------+-------+
|INDEX|CUSTOMER_ID|FIRST_NAME|LAST_NAME|COMPANY|CITY|COUNTRY|PHONE_1|PHONE_2|EMAIL|SUBSCRIPTION_DATE|WEBSITE|
+-----+-----------+----------+---------+-------+----+-------+-------+-------+-----+-----------------+-------+
+-----+-----------+----------+---------+-------+----+-------+-------+-------+-----+-----------------+-------+



### Step 5: Select the rows in source dataframe that are updated.

In [138]:
def add_hash_column(df: DataFrame, columns: list, hash_column_name: str = 'CTC_HASH') -> DataFrame:
    # Add a hash column to the DataFrame based on the specified columns.
    return df.withColumn(hash_column_name, F.sha2(F.concat_ws('||', *columns), 256))

def get_updates(source_df: DataFrame, target_df: DataFrame, ctc_cols: list):
    # Add hash columns based on the specified columns
    source_df_hash = add_hash_column(source_df, ctc_cols)
    target_df_hash = add_hash_column(target_df, ctc_cols)
    
    # Find corresponding rows between source_df and target_df
    overlap_df = source_df_hash.alias('src').join(target_df_hash.alias('tgt'), on='CUSTOMER_ID', how='inner')
    
    # Apply filter to get rows where hash values are different
    update_df = (
        overlap_df
        .filter(F.col('src.CTC_HASH') != F.col('tgt.CTC_HASH'))
        .select('src.*')
        .drop('CTC_HASH')
    )
    
    return update_df

In [139]:
key_cols = ['COSTUMER_ID']
tech_cols = []    # empty now, but this will be filled in other SCD processes
ctc_cols = [col for col in source_df.columns if col not in key_cols + tech_cols]

In [140]:
update_df = get_updates(source_df, target_df, ctc_cols).select(*source_df.columns) # extra select to keep column order

In [141]:
update_df.show(5)

+-----+-----------+----------+---------+-------+----+-------+-------+-------+-----+-----------------+-------+
|INDEX|CUSTOMER_ID|FIRST_NAME|LAST_NAME|COMPANY|CITY|COUNTRY|PHONE_1|PHONE_2|EMAIL|SUBSCRIPTION_DATE|WEBSITE|
+-----+-----------+----------+---------+-------+----+-------+-------+-------+-----+-----------------+-------+
+-----+-----------+----------+---------+-------+----+-------+-------+-------+-----+-----------------+-------+



### Step 6: Insert, update, and/or delete the selected rows in the source dataframe.

In [142]:
# Update the target DataFrame
def update_target_df(target_df: DataFrame, insert_df: DataFrame, delete_df: DataFrame, update_df: DataFrame) -> DataFrame:
    # Step 1: Delete rows in target_df that are present in delete_df
    target_df = target_df.join(delete_df, on='CUSTOMER_ID', how='leftanti')
    
    # Step 2: Insert new rows from insert_df to target_df
    target_df = target_df.union(insert_df)
    
    # Step 3: Update existing rows in target_df with rows from update_df
    if not update_df.rdd.isEmpty():
        # Remove rows from target_df that need to be updated
        target_df = target_df.join(update_df.select('CUSTOMER_ID'), on='CUSTOMER_ID', how='leftanti')
        # Add the updated rows
        target_df = target_df.union(update_df.select(target_df.columns))
    
    return target_df

In [143]:
target_df = update_target_df(
    target_df=target_df,
    insert_df=insert_df,
    delete_df=delete_df,
    update_df=update_df
).select(*[field.name for field in target_schema.fields])

In [144]:
target_df.show(5)

+-----+---------------+----------+---------+--------------------+-----------------+------------------+--------------------+----------------+--------------------+-----------------+--------------------+
|INDEX|    CUSTOMER_ID|FIRST_NAME|LAST_NAME|             COMPANY|             CITY|           COUNTRY|             PHONE_1|         PHONE_2|               EMAIL|SUBSCRIPTION_DATE|             WEBSITE|
+-----+---------------+----------+---------+--------------------+-----------------+------------------+--------------------+----------------+--------------------+-----------------+--------------------+
|    1|DD37Cf93aecA6Dc|   Sheryll|   Baxter|     Rasmussen Group|     East Leonard|             Chile|        229.077.5154|397.884.0519x718|zunigavanessa@smi...|       2020-08-24|http://www.stephe...|
|    1|xxxxxxxxxxxxxxx|Skaarrrrrt|    Baked|     Rasmussen Group|     East Leonard|             Chile|        229.077.5154|397.884.0519x718|zunigavanessa@smi...|       2020-08-24|http://www.stephe

In [145]:
save_as_csv(df=target_df, file_path='scd1_data/target.csv')

In [None]:
def scd2_create_staging_table(df_input: DataFrame, target_table: str, key_cols: List[str], ctc_cols: List[str], dts_from: str, dts_to: str) -> DataFrame:
    """
    Prepares a staging DataFrame for a Slowly Changing Dimension Type 2 (SCD2) operation in a data warehouse
    by identifying rows in the input DataFrame that need to be inserted or staated in the target table.

    Parameters:
    - df_input (DataFrame): The source DataFrame containing new data to be merged into the target table.
    - target_table (str): The name of the target table in the data warehouse where updates and inserts will be applied.
    - key_cols (List[str]): A list of column names that uniquely identify a row in the target table, used for matching rows between the input DataFrame and the target table.
    - ctc_cols (List[str]): A list of column names used for change tracking. Rows with changes in any of these columns will be marked for update.
    - dts_from (str): Start date/time column name in target table (e.g. 'LOAD_FROM_DTS').
    - dts_to (str): Start date/time column name in target table (e.g. 'LOAD_UNTIL_DTS').

    Returns:
    - DataFrame: A Spark DataFrame representing the staging area for SCD2 operations. 
    """
    # Create hash for input dataframe
    if not "BK_HASH" in df_input.columns:
        df_input = create_hash_key(df=df_input, cols=key_cols, hash_col_name="BK_HASH")
    if not "CTC_HASH" in df_input.columns:
        df_input = create_hash_key(df=df_input, cols=ctc_cols, hash_col_name="CTC_HASH")
    df_input.cache()

    # Create hash in target table
    target_columns = spark.table(target_table).columns  # Accesses table's metadata
    if not "BK_HASH" in target_columns:
        create_hash_key_sql(table_name=target_table, cols=key_cols, hash_col_name="BK_HASH")
    if not "CTC_HASH" in target_columns:
        create_hash_key_sql(table_name=target_table, cols=ctc_cols, hash_col_name="CTC_HASH")

    # Create view for df_input
    input_view = create_temp_view(df_input)

    # Do not select certain columns if they are already in df
    cols_not_select = [dts_to, "isCurrent", "INSERTUPD"]

    # Step 1: get all the insert rows.
    query = f"""
        SELECT
            {get_joined_cols(df=df_input, cols_not_select=cols_not_select, col_prefix="a")}
            ,CAST('{HI_DTS}' AS TIMESTAMP) AS {dts_to}  -- HI_DTS is defined as general constant
            ,'Y' as isCurrent
            ,'ins' as INSERTUPD
        FROM {input_view} a
        LEFT JOIN {target_table} b
            ON a.BK_HASH = b.BK_HASH        -- Join on key columns
            AND b.isCurrent = 'Y'
        WHERE a.CTC_HASH <> b.CTC_HASH      -- 1. Row exists in target with different values
            OR b.CTC_HASH IS NULL           -- 2. No row exists in target
    """
    ins_df = spark.sql(query)

    # Step 2: get all the update rows
    query = f"""
        SELECT
            a.*
            ,'upd' AS INSERTUPD
        FROM {input_view} a
        LEFT JOIN {target_table} b
            ON a.BK_HASH = b.BK_HASH        -- Join on key columns
            AND b.isCurrent = 'Y'
         WHERE a.CTC_HASH <> b.CTC_HASH     -- Row exists in target with different values
    """
    upd_df = spark.sql(query)

    # Step 3: union the insert and update rows into staging table
    df_staging = union_all([ins_df, upd_df])

    # Step 4: Add current datetime as LOAD_FROM_DTS
    curr_datetime = get_current_datetime()
    df_staging = df_staging.withColumn(dts_from, F.lit(curr_datetime))

    # Unpersists cache
    df_input.unpersist()
    return df_staging
    
    
    
    
    
    def scd2_update_rows(df_updates: DataFrame, target_table: str, key_cols:List[str], dts_from: str, dts_to: str) -> None:
    """
    Updates target table rows for SCD2, marking existing records as historical and updating their end dates 
    to the start dates of new staging DataFrame records.

    Parameters:
    - df_updates (DataFrame): DataFrame with update records.
    - target_table (str): Name of the target table for updates.
    - key_cols (List[str]): Unique key columns for record matching.
    - dts_from (str): Start date/time column name in target table (e.g. 'LOAD_FROM_DTS').
    - dts_to (str): End date/time column name for updated records (e.g. 'LOAD_UNTIL_DTS').

    Returns: None. Directly updates the target table.
    """
    # Load in the DeltaTable
    deltaTable = DeltaTable.forName(spark, f"default.{target_table}")

    # Define the matching condition
    condition = "t.BK_HASH = s.BK_HASH"

    # Perform merge (upsert) operation
    deltaTable.alias("t") \
        .merge(df_updates.alias("s"), condition) \
        .whenMatchedUpdate(set={
            "isCurrent": "'N'",
            f"{dts_to}": f"s.{dts_from}"
        }) \
        .execute()
        
        
        
  
  
  def scd2_insert_rows(df_inserts: DataFrame, target_table: str) -> None:
    """
    Inserts new SCD2 records from a staging DataFrame into the target table, identified by an 'ins' flag.

    Parameters:
    - df_inserts (DataFrame): Staging DataFrame with new/updated records.
    - target_table (str): Target table name for record insertion.

    Returns: None. Directly inserts new records into the target table.
    """
    # Get columns of target table
    target_columns = spark.table(target_table).columns  # Accesses table's metadata

    # Insert rows in the target table
    (
        df_inserts
        .select(*target_columns)
        .write
        .format('delta')
        .mode('append')
        .saveAsTable(target_table)
    )

In [None]:
def add_technical_columns(df: DataFrame) -> DataFrame:
    return (
        df
        .withColumn('VALID_FROM', F.current_timestamp().cast(T.TimestampType()))
        .withColumn('VALID_TO', F.lit('9999-12-31 23:59:59').cast(T.TimestampType()))
    )