In [None]:
%run NB - Load Configuration

In [None]:
from pyspark.sql.functions import sha2, concat_ws,current_timestamp,lit,cast,col, when,concat,lpad,year,month, dayofmonth
from datetime import datetime

Convert Date Value to Key for single value

In [None]:
def convertValueToKey(value):
    key= value.strftime('%Y%m%d')
    return int(key)

Add SCD fields to existing dataframe

In [None]:
def addSCDFields(df):
    df=df.withColumn("row_sha2", sha2(concat_ws("||", *df.columns), 256))
    df=df.withColumn("dwStart_Date",current_timestamp())
    df=df.withColumn("dwEnd_Date",lit(None).cast('timestamp'))
    return df

Convert a JSON loaded as string in a JSON field

In [None]:
from pyspark.sql.functions import col, from_json, regexp_replace, expr
from pyspark.sql.types import MapType, StringType


def convertFieldToJSON(df, fieldName,targetField,schema):    
    df = df.withColumn(fieldName,  expr(f"replace({fieldName}, '=', ':')"))


    df = df.withColumn(fieldName, regexp_replace(col(fieldName), r'([{,\s])(\w+)(:)',r'$1"$2"$3'))

    # Step 3: Add double quotes around string values (excluding numbers and lists)
    df = df.withColumn(fieldName, regexp_replace(col(fieldName), r':([^"{\[\]\d][^,}\]]*)', r':"$1"'))

    # Step 5: Enclose valid datetime values in quotes (ISO 8601 format)
    df = df.withColumn(fieldName, regexp_replace(
        col(fieldName),
        r'(:)(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.{0,}\d{0,}\+\d{2}:\d{2})',
        r'$1"$2"'
    ))

    dfResult = df.withColumn(targetField, from_json(col(fieldName), schema  ))

    return dfResult

Convert date value to key for dataframe

In [None]:
def convertColumnToKey(df,columnName,keyColumnName):
    df=df.withColumn(keyColumnName,concat(year(col(columnName)).cast('string'),
        lpad(month(col(columnName)).cast('string'),2,'0'),
        lpad(dayofmonth(col(columnName)).cast('string'),2,'0')).cast('int'))
    return df

Create Data Frame with SCD Functionality containing both existing and new data

In [None]:
def merge_scd_type2(existing_df, new_df,keyColumn):
    cols=new_df.columns

    new_df = new_df.alias('new')
    existing_df = existing_df.alias('existing')

    # Perform the join based on row_sha2
    new_df = new_df.join(
        existing_df, 
        on='row_sha2', 
        how='left'
    ).filter(
        # Keep records where row_sha2 is not present in existing_df (left anti join logic)
        col('existing.row_sha2').isNull() | 
        # Or, where row_sha2 matches and dwEnd_Date is null in new_df and not null in existing_df
        ((col('new.dwEnd_Date').isNull()) & (col('existing.dwEnd_Date').isNotNull()))
    ).select('new.*')

    #if the new record becoming the current one was the current one in multiple moments, this creates a duplication
    #the duplication is being removed here
    new_df=new_df.dropDuplicates()
    
    if new_df.rdd.count()==0:
       return existing_df
    
    current_df=existing_df.filter(existing_df.dwEnd_Date.isNull())
    merged_df = current_df.alias("existing") \
        .join(new_df.alias("new"), keyColumn, "outer") \
        .select(
            col('existing.*'),
            col(keyColumn).alias("NewKey"),
            col("new.row_sha2").alias("new_row_hash"),
            col("existing.row_sha2").alias("existing_row_hash"),

        )
    
    # Define the conditions to update existing records
    update_condition = (merged_df.NewKey.isNotNull()) & \
                   (merged_df.new_row_hash != merged_df.existing_row_hash)

    # Define updated records to insert
    columnToJoin="new." + keyColumn
    new_df=new_df.alias("new")    
    keysToInsert=merged_df.filter(update_condition).select("NewKey").distinct()
    recordsToInsert= new_df.join(keysToInsert,col(columnToJoin)==keysToInsert.NewKey,"inner").select(cols).drop(keysToInsert['NewKey'])  

    #Define new records
    columnToFilter="existing." + keyColumn
    keysToNew=merged_df.filter(merged_df[columnToFilter].isNull()).select("NewKey").distinct()
    inserting_df=new_df.join(keysToNew,col(columnToJoin)==keysToNew.NewKey,'right').select(cols).drop(keysToNew["NewKey"])

    # Apply the conditions to update existing records
    # process explained:
    # Identifies a distinct key list of the records which will be inserted
    # joins with the existing data
    # update the dwEnd_Date on the records which will be inserted
    # drop the additional 'newkey' column

    keysToUpdate=recordsToInsert.unionByName(inserting_df).select(col(keyColumn).alias('newKey')).distinct()
    keysToUpdate=keysToUpdate.alias('keysToUpdate')
    existing_df=existing_df.alias('existing_df')
    key1name='existing_df.' + keyColumn
    key2name='keysToUpdate.newKey'
    updatedExisting_df=existing_df.join(keysToUpdate,col(key1name)==col(key2name),'left').select('existing_df.*',col(key2name))


    updatedExisting_df = updatedExisting_df.withColumn("dwEnd_Date", 
        when( (col('newKey')==col(keyColumn)) & (col('dwEnd_Date').isNull()), current_timestamp()).otherwise(updatedExisting_df.dwEnd_Date))

    updatedExisting_df=updatedExisting_df.drop('newKey')

    oldRecords=updatedExisting_df.filter(updatedExisting_df.dwEnd_Date.isNotNull())

    #Eliminate records with updated dwend_Date
    # from the current_df dataframe
    # otherwise they will be duplicated, coming from current and old dataframes
    
    current_df=current_df.alias('current_df')
    key1name='current_df.' + keyColumn
    current_df=current_df.join(keysToUpdate,col(key1name)==col(key2name),'leftanti').select('current_df.*')


    #Final union
    final_df=current_df.unionByName(recordsToInsert).unionByName(inserting_df).unionByName(oldRecords)

    return final_df

In [None]:
def fetch_watermark(watermark_table,table_name):
    if not mssparkutils.fs.exists(f"Tables/{watermark_table}"):
        max_time = '1900-01-01'
        new_data = spark.createDataFrame([(table_name, max_time)], ["tableName", "lastIngestion"])
        new_data.write.format('delta').mode('overwrite').save(f"Tables/{watermark_table}")
        return max_time
    else:
        df = spark.read.format('delta').load(f"Tables/{watermark_table}")
        row = df.select('lastIngestion').filter(df['TableName'] == table_name ).collect()
        if row:
            max_time = row[0][0]
            return max_time
        else:
            max_time = '1900-01-01'
            return max_time