# Patronage V4 - Data Lineage and Column Mapping

## Delta Table Schema Documentation

This table documents the complete data lineage for all columns in the Patronage delta table, including source systems, original column names, and any transformations applied.

| Delta Table Column Name | Source | Original Column Name | Description of Transformation | Sent to DMDC |
|------------------------|--------|---------------------|------------------------------|---------------|
| **edipi** | Identity Correlations Delta Table | edipi | Direct mapping from ICN correlation lookup | **Yes** |
| **ICN** | Multiple Sources | Caregiver_ICN__c (CG)<br/>ICN (Identity Correlations)<br/>ICN (from participant_id lookup) | **CG Source**: Truncated to first 10 characters from Caregiver_ICN__c<br/>**SCD Source**: Mapped from participant_id via identity correlations<br/>**Seed File**: Truncated to first 10 characters | No |
| **Veteran_ICN** | Caregiver Sources Only | Veteran_ICN__c | **CG Source**: Truncated to first 10 characters<br/>**SCD/PAI Sources**: Set to NULL | No |
| **participant_id** | Multiple Sources | participant_id (Identity Correlations)<br/>PTCPNT_ID (SCD)<br/>PTCPNT_VET_ID (PAI) | **SCD Source**: Direct mapping from PTCPNT_ID<br/>**PAI Source**: Direct mapping from PTCPNT_VET_ID<br/>**CG Source**: Retrieved via ICN lookup from identity correlations | No |
| **Batch_CD** | System Generated | N/A | **CG Records**: Hard-coded as "CG"<br/>**SCD Records**: Hard-coded as "SCD"<br/>**PAI Records**: Hard-coded as "SCD" (processed as SCD updates) | **Yes** |
| **Applicant_Type** | Caregiver Sources Only | Applicant_Type__c | **CG Source**: Direct mapping<br/>**SCD/PAI Sources**: Set to NULL | No |
| **Caregiver_Status** | Caregiver Sources Only | Caregiver_Status__c | **CG Source**: Direct mapping<br/>**SCD/PAI Sources**: Set to NULL | No |
| **SC_Combined_Disability_Percentage** | SCD Sources Only | CMBNED_DEGREE_DSBLTY | **SCD Source**: Zero-padded to 3 digits, empty strings converted to "000"<br/>**CG/PAI Sources**: Set to NULL | **Yes** |
| **PT_Indicator** | PAI Sources + Default | PT_35_FLAG (PAI)<br/>target_PT_Indicator (existing records) | **PAI Source**: Direct mapping from PT_35_FLAG<br/>**SCD Records**: Defaults to "N" for new records, preserves existing values<br/>**CG Records**: Set to NULL | **Yes** |
| **Individual_Unemployability** | Not Currently Populated | N/A | Set to NULL for all sources (placeholder for future implementation) | **Yes** |
| **Status_Begin_Date** | Multiple Sources | Dispositioned_Date__c (CG)<br/>DSBL_DTR_DT (SCD)<br/>target_Status_Begin_Date (existing) | **CG Source**: Date formatted from Dispositioned_Date__c to YYYYMMDD<br/>**SCD Source**: Uses existing Status_Begin_Date or DSBL_DTR_DT if new record<br/>**Date Format**: Converted from MMddyyyy to yyyyMMdd | **Yes** |
| **Status_Last_Update** | Multiple Sources | DSBL_DTR_DT (SCD)<br/>N/A (CG) | **SCD Source**: Direct mapping from DSBL_DTR_DT<br/>**CG Source**: Set to NULL | **Yes** |
| **Status_Termination_Date** | Caregiver Sources Only | Benefits_End_Date__c | **CG Source**: Date formatted from Benefits_End_Date__c to YYYYMMDD<br/>**SCD/PAI Sources**: Set to NULL | **Yes** |
| **SDP_Event_Created_Timestamp** | File Metadata | _metadata.file_modification_time<br/>CreatedDate (seed) | **All File Sources**: Extracted from file modification timestamp<br/>**Seed File**: Uses configured start datetime<br/>**PAI Delta Table**: Uses current datetime | No |
| **filename** | File Metadata + System | _metadata.file_name<br/>Path (seed)<br/>Generated (PAI) | **File Sources**: Extracted from file metadata<br/>**Seed File**: Full file path<br/>**PAI Delta Updates**: Generated description with timestamp | No |
| **RecordLastUpdated** | System Generated | N/A | **New Records**: Set to NULL<br/>**Updated Records**: Set to SDP_Event_Created_Timestamp during merge | No |
| **RecordStatus** | System Generated | N/A | **Active Records**: Set to TRUE<br/>**Expired Records**: Set to FALSE during SCD Type 2 updates | No |
| **sentToDoD** | System Generated | N/A | **New Records**: Set to FALSE<br/>**Expired Records**: Set to TRUE during updates | No |
| **change_log** | System Generated | N/A | **New Records**: "New Record"<br/>**Updated Records**: Detailed log of field changes with oldâ†’new values | No |
| **RecordChangeStatus** | System Generated | N/A | **New Records**: "New Record"<br/>**Updated Records**: "Updated Record"<br/>**Expired Records**: "Expired Record" | No |

## Data Source Details

### Primary Data Sources:
1. **Caregiver Events (CG)**: CARMA system CSV files (`caregiverevent*.csv`)
2. **Service-Connected Disability (SCD)**: VA disability files (`CPIDODIEX_*.csv`)
3. **PT Indicator Legacy (PAI)**: Text files (`WRTS*.txt`)
4. **PT Indicator Modern (PAI)**: Delta table (`DW_ADHOC_RECURR.DOD_PATRONAGE_SCD_PT`)
5. **Identity Correlations**: Delta table mapping ICNs to EDIPIs and participant IDs
6. **Seed Data**: Initial caregiver population CSV file

### Key Transformation Patterns:
- **ICN Standardization**: All ICNs truncated to 10 characters for consistency
- **Date Standardization**: All dates converted to YYYYMMDD string format
- **Null Handling**: Explicit NULL assignment for irrelevant fields per source type
- **Change Detection**: xxhash64 used for efficient change identification
- **Deduplication**: Window functions ensure latest record per unique key combination
- **Audit Trail**: Complete change tracking with before/after values

In [0]:
# /mnt/Patronage/identity_correlations
from delta.tables import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import datetime, timedelta, date
from pyspark.sql.window import Window
import time
import os
import pandas as pd

In [0]:
dbutils.widgets.text("PatronageDeltaTableName","mypat_test", "Patronage Delta Table Name")
dbutils.widgets.text("PatronageDeltaTablePath","dbfs:/user/hive/warehouse/", "Patronage Delta Table Path")
patronage_tablename = dbutils.widgets.get("PatronageDeltaTableName")
patronage_table_location = dbutils.widgets.get("PatronageDeltaTablePath")
fullname = patronage_table_location + patronage_tablename

In [0]:
# Choose True to rebuild Patronage Delta Table
dbutils.widgets.dropdown("rebuild", "False", ["True", "False"], "1 Rebuild Patronage")
boolean_value = dbutils.widgets.get("rebuild")

if boolean_value == 'True':
    dropTableQuery = f""" DROP TABLE IF EXISTS {patronage_tablename} """
    spark.sql(dropTableQuery)
    dbutils.fs.rm(fullname, True)
else:
  pass

In [0]:
dbutils.widgets.text("CaregiverSource", "/mnt/ci-carma/landing/", "Caregiver Source")
dbutils.widgets.text("DisabilityPercentSource", "/mnt/ci-vadir-shared/", "Disability Percent Source")
dbutils.widgets.text("NewPTIndicatorSource", "/mnt/ci-vba-edw-2/DeltaTables/DW_ADHOC_RECURR.DOD_PATRONAGE_SCD_PT/", "New PT Indicator Source")

dbutils.widgets.text("StartDateToProcessCaregiversData","2024-12-18 23:59:59", "Start Date to Process Caregivers Data")
dbutils.widgets.text("StartDateToProcessSCDData","2025-06-01 00:00:00", "Start Date to Process SCD Data")
dbutils.widgets.text("PathToInitialSeedCaregiversCSVFile", "dbfs:/FileStore/All_Caregivers_InitialSeed_12182024_csv.csv", "Path to initial seed Caregivers csv file")
dbutils.widgets.text("PathToIdentityCorrelationsDeltaTable", "/mnt/Patronage/identity_correlations", "Path to Identity Correlations Delta Table")
initial_cg_file = dbutils.widgets.get("PathToInitialSeedCaregiversCSVFile")
cg_source = dbutils.widgets.get("CaregiverSource")
scd_source = dbutils.widgets.get("DisabilityPercentSource")
pt_new_source = dbutils.widgets.get("NewPTIndicatorSource")

if not DeltaTable.isDeltaTable(spark, fullname):
        print(f"Creating {patronage_tablename}, on location {fullname}")
        create_table_query = f"""CREATE TABLE IF NOT EXISTS {patronage_tablename} (edipi string, ICN string, Veteran_ICN string, participant_id string, Batch_CD string,  Applicant_Type string, Caregiver_Status string, SC_Combined_Disability_Percentage string, PT_Indicator string, Individual_Unemployability string, Status_Begin_Date string, Status_Last_Update string, Status_Termination_Date string, SDP_Event_Created_Timestamp timestamp, filename string, RecordLastUpdated date, RecordStatus boolean, sentToDoD boolean, change_log string, RecordChangeStatus string, DateSentToDoD date) PARTITIONED BY(Batch_CD, RecordStatus ) LOCATION '{fullname}' """
        spark.sql(create_table_query)
        file_count_query = f"SELECT COALESCE(COUNT(DISTINCT filename), 0) AS count FROM DELTA.`{fullname}`"
        no_of_files = spark.sql(file_count_query).collect()[0][0]
        print(f"Total files processed till last run: {no_of_files}")
else:
        file_count_query = f"SELECT COALESCE(COUNT(DISTINCT filename), 0) AS count FROM DELTA.`{fullname}`"
        no_of_files = spark.sql(file_count_query).collect()[0][0]
        print(f"Total files processed till last run: {no_of_files}")

In [0]:
# new_cg_schema = StructType(
#     [
#         StructField("Discharge_Revocation_Date__c", StringType(), True),
#         StructField("Caregiver_Status__c", StringType(), True),
#         StructField("CreatedById", StringType(), True),
#         StructField("Dispositioned_Date__c", StringType(), True),
#         StructField("CARMA_Case_ID__c", StringType(), True),
#         StructField("Applicant_Type__c", StringType(), True),
#         StructField("CreatedDate", StringType(), True),
#         StructField("Veteran_ICN__c", StringType(), True),
#         StructField("Benefits_End_Date__c", StringType(), True),
#         StructField("Caregiver_Id__c", StringType(), True),
#         StructField("CARMA_Case_Number__c", StringType(), True),
#         StructField("Caregiver_ICN__c", StringType(), True),
#     ]
# )

# scd_schema = StructType(
#     [
#         StructField("PTCPNT_ID", StringType()),
#         StructField("FILE_NBR", StringType()),
#         StructField("LAST_NM", StringType()),
#         StructField("FIRST_NM", StringType()),
#         StructField("MIDDLE_NM", StringType()),
#         StructField("SUFFIX_NM", StringType()),
#         StructField("STA_NBR", StringType()),
#         StructField("BRANCH_OF_SERV", StringType()),
#         StructField("DATE_OF_BIRTH", StringType()),
#         StructField("DATE_OF_DEATH", StringType()),
#         StructField("VET_SSN_NBR", StringType()),
#         StructField("SVC_NBR", StringType()),
#         StructField("AMT_GROSS_OR_NET_AWARD", IntegerType()),
#         StructField("AMT_NET_AWARD", IntegerType()),
#         StructField("NET_AWARD_DATE", StringType()),
#         StructField("SPECL_LAW_IND", IntegerType()),
#         StructField("VET_SSN_VRFCTN_IND", IntegerType()),
#         StructField("WIDOW_SSN_VRFCTN_IND", IntegerType()),
#         StructField("PAYEE_SSN", StringType()),
#         StructField("ADDRS_ONE_TEXT", StringType()),
#         StructField("ADDRS_TWO_TEXT", StringType()),
#         StructField("ADDRS_THREE_TEXT", StringType()),
#         StructField("ADDRS_CITY_NM", StringType()),
#         StructField("ADDRS_ST_CD", StringType()),
#         StructField("ADDRS_ZIP_PREFIX_NBR", IntegerType()),
#         StructField("MIL_POST_OFFICE_TYP_CD", StringType()),
#         StructField("MIL_POSTAL_TYPE_CD", StringType()),
#         StructField("COUNTRY_TYPE_CODE", IntegerType()),
#         StructField("SUSPENSE_IND", IntegerType()),
#         StructField("PAYEE_NBR", IntegerType()),
#         StructField("EOD_DT", StringType()),
#         StructField("RAD_DT", StringType()),
#         StructField("ADDTNL_SVC_IND", StringType()),
#         StructField("ENTLMT_CD", StringType()),
#         StructField("DSCHRG_PAY_GRADE_NM", StringType()),
#         StructField("AMT_OF_OTHER_RETIREMENT", IntegerType()),
#         StructField("RSRVST_IND", StringType()),
#         StructField("NBR_DAYS_ACTIVE_RESRV", IntegerType()),
#         StructField("CMBNED_DEGREE_DSBLTY", StringType()),
#         StructField("DSBL_DTR_DT", StringType()),
#         StructField("DSBL_TYP_CD", StringType()),
#         StructField("VA_SPCL_PROV_CD", IntegerType()),
#     ]
# )

scd_schema1 = StructType(
    [
        StructField("PTCPNT_ID", StringType()),
        StructField("CMBNED_DEGREE_DSBLTY", StringType()),
        StructField("DSBL_DTR_DT", StringType()),
    ]
)

# pai_schema = StructType(
#     [
#         StructField("EDI_PI", StringType()),
#         StructField("SSN_NBR", StringType()),
#         StructField("FILE_NBR", StringType()),
#         StructField("PTCPNT_VET_ID", StringType()),
#         StructField("LAST_NM", StringType()),
#         StructField("FIRST_NM", StringType()),
#         StructField("MIDDLE_NM", StringType()),
#         StructField("PT35_RATING_DT", TimestampType()),
#         StructField("PT35_PRMLGN_DT", TimestampType()),
#         StructField("PT35_EFFECTIVE_DATE", TimestampType()),
#         StructField("PT35_END_DATE", TimestampType()),
#         StructField("PT_35_FLAG", StringType()),
#         StructField("COMBND_DEGREE_PCT", StringType()),
#     ]
# )

file_list_schema = StructType(
    [
        StructField("path", StringType()),
        StructField("name", StringType()),
        StructField("size", StringType()),
        StructField("modificationTime", StringType()),
    ]
)


In [0]:
# Define join conditions based on file type
join_conditions = {
    "CG": (
        (col("ICN") == col("target_ICN"))
        & (col("Veteran_ICN") == col("target_Veteran_ICN"))
        & (col("Batch_CD") == col("target_Batch_CD"))
        & (col("Applicant_Type") == col("target_Applicant_Type"))
        & (col("target_RecordStatus") == True)
    ),
    "SCD": (
        (col("ICN") == col("target_ICN"))
        & (col("target_RecordStatus") == True)
        & (col("Batch_CD") == col("target_Batch_CD"))
    ),
}

# Define delta condition based on file type
delta_conditions = {
    "CG": xxhash64(
        col("Status_Begin_Date"),
        col("Status_Termination_Date"),
        col("Applicant_Type"),
        col("Caregiver_Status")
    ) != xxhash64(
        col("target_Status_Begin_Date"),
        col("target_Status_Termination_Date"),
        col("target_Applicant_Type"),
        col("target_Caregiver_Status")
    ),
    "SCD": xxhash64(
        col("SC_Combined_Disability_Percentage")
    ) != xxhash64(col("target_SC_Combined_Disability_Percentage")),
}

# Track changes in specified columns and create a change log
columns_to_track = {
    "CG": [
        ("Status_Begin_Date", "target_Status_Begin_Date"),
        ("Status_Termination_Date", "target_Status_Termination_Date"),
        ("Applicant_Type", "target_Applicant_Type"),
        ("Caregiver_Status", "target_Caregiver_Status")
    ],
    "SCD": [(
        "SC_Combined_Disability_Percentage", "target_SC_Combined_Disability_Percentage"),
    ],
    "PAI": [
        ("PT_Indicator", "target_PT_Indicator")
    ]
}

# Define merge conditions and batch_cd
merge_conditions = {
    "CG": "concat(target.ICN, target.Veteran_ICN, target.Applicant_Type) = source.MERGEKEY and target.RecordStatus = True",
    "SCD": "((target.ICN = source.MERGEKEY) and (target.Batch_CD = source.Batch_CD) and (target.RecordStatus = True))"
}

# Define concat_column based on file type
concat_column = {
    "CG": concat(col("ICN"), col("Veteran_ICN"), col("Applicant_Type")),
    "SCD": col("ICN")
}

In [0]:
class FileProcessor:
    def __init__(self):

        self.initial_cg_file = initial_cg_file
        self.cg_source = cg_source
        self.scd_source = scd_source
        self.pt_new_source = pt_new_source
        self.patronage_tablename = patronage_tablename
        self.patronage_table_location = patronage_table_location
        self.fullname = fullname
        self.no_of_files = no_of_files
        self.cg_start_datetime = dbutils.widgets.get("StartDateToProcessCaregiversData")
        self.others_start_datetime = dbutils.widgets.get("StartDateToProcessSCDData")
        self.identity_correlations_path = dbutils.widgets.get("PathToIdentityCorrelationsDeltaTable")
        self.icn_relationship = (spark.read.format("delta")
                    .load(self.identity_correlations_path)
                    .withColumnRenamed('MVIPersonICN', 'ICN')).persist()

    def collect_data_source(self):
        now = datetime.now()
        yesterday_end_time = datetime(now.year, now.month, now.day) - timedelta(
            hours=4
        )  # Need to adjust to blob storage time
        yesterday_end_time_ts = int(yesterday_end_time.timestamp() * 1000)
        
        scd_beginning_datetime = self.others_start_datetime
        scd_beginning_datetime = datetime(now.year, 1, 1) # Beginning of the year

        print(f"Yesterday's End Time: {yesterday_end_time}, Equivalent unix time: {yesterday_end_time_ts}")
        print(f"Number of files: {no_of_files}")

        readFiles_start_datetime = self.cg_start_datetime
        unix_start_time = (
            int(time.mktime(datetime.strptime(readFiles_start_datetime, '%Y-%m-%d %H:%M:%S').timetuple())) * 1000
        )

        file_count_query = f"SELECT COALESCE(COUNT(DISTINCT filename), 0) AS count FROM mypat_test"

        query = f""" SELECT COALESCE(MAX(SDP_Event_Created_Timestamp), to_timestamp('{readFiles_start_datetime}')) AS max_date FROM mypat_test
                """
        max_processed_date = spark.sql(query).collect()[0][0]
        print(f"Max Date Processed: {max_processed_date}")
        print(f"scd_beginning_datetime: {scd_beginning_datetime}")

        all_scd_files = (spark.createDataFrame(dbutils.fs.ls("/mnt/ci-vadir-shared/"))
                        .filter(col("name").startswith("CPIDODIEX_") 
                                & col("name").endswith(".csv") 
                                & ~(col("name").contains("NEW"))
                        ).filter(to_timestamp(col("modificationTime")/1000) > scd_beginning_datetime )
        )

        all_cg_files =  (spark.createDataFrame(dbutils.fs.ls("/mnt/ci-carma/landing/"))
                        .filter(col("name").contains("caregiverevent") 
                                & col("name").endswith(".csv")
                        ).filter(to_timestamp(col("modificationTime")/1000) > readFiles_start_datetime )
        )

        all_pt_files = (spark.createDataFrame(dbutils.fs.ls("/mnt/ci-vba-edw-2/DeltaTables/DW_ADHOC_RECURR.DOD_PATRONAGE_SCD_PT/"))
                        .filter(to_timestamp(col("modificationTime")/1000) > scd_beginning_datetime)
                        .orderBy(desc(col("modificationTime")))
                        .limit(1)
        )

        combined_files = all_scd_files.unionAll(all_cg_files).unionAll(all_pt_files)

        filtered_files = (combined_files
                        .withColumn("dateTime", to_timestamp(col("modificationTime")/1000))
                        .filter((col("dateTime") > (max_processed_date )) 
                                & (col("modificationTime") <= yesterday_end_time_ts)
                        )
                        .orderBy(col("modificationTime").desc())
        )
        print(f"files to process, {filtered_files.count()}")

        if filtered_files.count() > 0:
            return filtered_files.orderBy(col("modificationTime"))
        else:
            dbutils.notebook.exit("Notebook exited because no files to process")

    def initialize_caregivers(self):
        new_cg_df = spark.read.csv(
            self.initial_cg_file,
            header=True,
            inferSchema=True,
        )
        transformed_cg_df = new_cg_df.select(
            substring("ICN", 1, 10).alias("ICN"),
            "Applicant_Type",
            "Caregiver_Status",
            date_format("Status_Begin_Date", "yyyyMMdd").alias("Status_Begin_Date"),
            date_format("Status_Termination_Date", "yyyyMMdd").alias(
                "Status_Termination_Date"
            ),
            substring("Veteran_ICN", 1, 10).alias("Veteran_ICN"),
        )
        edipi_df = (
            broadcast(transformed_cg_df)
            .join(self.icn_relationship, ["ICN"], "left")
            .withColumn(
                "filename",
                lit(self.initial_cg_file),
            )
            .withColumn(
                "SDP_Event_Created_Timestamp",
                lit(self.cg_start_datetime).cast(TimestampType()),
            )  # lit('2024-12-18T23:59:59.000+00:00')
            .withColumn("Individual_Unemployability", lit(None).cast(StringType()))
            .withColumn("PT_Indicator", lit(None).cast(StringType()))
            .withColumn(
                "SC_Combined_Disability_Percentage", lit(None).cast(StringType())
            )
            .withColumn("RecordStatus", lit(True).cast(BooleanType()))
            .withColumn("RecordLastUpdated", lit(None).cast(DateType()))
            .withColumn("Status_Last_Update", lit(None).cast(StringType()))
            .withColumn("sentToDoD", lit(False).cast(BooleanType()))
            .withColumn("Batch_CD", lit("CG").cast(StringType()))
        )
        return edipi_df

    def process_updates(self, edipi_df, file_type):
        """
        Upserts the input dataframe depending on input file type ('CG', 'PAI or 'SCD').
        Uses Slowly Changing Dimensions type 2 logic that stores records that have been updated.
        Parameters: Pyspark dataframe and file type
        Returns: None
        """
 
        # Load the target Delta table
        targetTable = DeltaTable.forPath(
            spark, self.fullname
        )
        targetDF = targetTable.toDF().filter(
            (col("Batch_CD") == file_type) & (col("RecordStatus") == True)
        )
        # Rename columns in targetDF for clarity
        targetDF = targetDF.select(
            [col(c).alias(f"target_{c}") for c in targetDF.columns]
        )

        # Perform the join based on file type
        if file_type in ["CG", "SCD"]:
            joinDF = broadcast(edipi_df).join(
                targetDF, join_conditions[file_type], "leftouter"
            )

            # Handling logic for SCD file type
            if file_type == "SCD":
                joinDF = (
                    joinDF.withColumn("Status_Last_Update", col("DSBL_DTR_DT"))
                    .withColumn(
                        "Status_Begin_Date",
                        coalesce(col("target_Status_Begin_Date"), col("DSBL_DTR_DT")),
                    )
                    .withColumn(
                        "PT_Indicator",
                        coalesce(joinDF["target_PT_Indicator"], lit("N")),
                    )
                )

            # Filter records that have changes based on delta condition
            filterDF = joinDF.filter(delta_conditions[file_type])

            # Handle dummy records with null MERGEKEY for unmatched records
            mergeDF = filterDF.withColumn("MERGEKEY", concat_column[file_type])
            dummyDF = filterDF.filter(col("target_ICN").isNotNull()).withColumn(
                "MERGEKEY", lit(None)
            )

            # Union the filtered and dummy DataFrames
            upsert_df = mergeDF.union(dummyDF)

        if file_type == "PAI":
            upsert_df = edipi_df

        change_conditions = []
        for source_col, target_col in columns_to_track[file_type]:
            change_condition = when(
                xxhash64(coalesce(col(source_col), lit("Null")))
                != xxhash64(coalesce(col(target_col), lit("Null"))),
                concat_ws(
                    " ",
                    lit(source_col),
                    lit("old value:"),
                    coalesce(col(target_col), lit("Null")),
                    lit("changed to new value:"),
                    coalesce(col(source_col), lit("Null")),
                ),
            ).otherwise(lit(""))
            change_conditions.append(change_condition)
        new_record_condition = when(
            col("target_icn").isNull(), lit("New Record")
        ).otherwise(lit("Updated Record"))
        upsert_df = upsert_df.withColumn("RecordChangeStatus", new_record_condition)
        if len(change_conditions) > 0:
            change_log_col = concat_ws(
                " ", *[coalesce(cond, lit("")) for cond in change_conditions]
            )
        else:
            change_log_col = lit("")
        upsert_df = upsert_df.withColumn("change_log", change_log_col)
        if file_type == "PAI":
            file_type = "SCD"
        # Perform the merge operation
        targetTable.alias("target").merge(
            upsert_df.alias("source"), merge_conditions[file_type]
        ).whenMatchedUpdate(
            set={
                "RecordStatus": "False",
                "RecordLastUpdated": "source.SDP_Event_Created_Timestamp",
                "sentToDoD": "true",
                "RecordChangeStatus": lit("Expired Record"),
            }
        ).whenNotMatchedInsert(
            values={
                "edipi": "source.edipi",
                "ICN": "source.ICN",
                "Veteran_ICN": "source.Veteran_ICN",
                "Applicant_Type": "source.Applicant_Type",
                "Caregiver_Status": "source.Caregiver_Status",
                "participant_id": "source.participant_id",
                "Batch_CD": "source.Batch_CD",
                "SC_Combined_Disability_Percentage": "source.SC_Combined_Disability_Percentage",
                "PT_Indicator": "source.PT_Indicator",
                "Individual_Unemployability": "source.Individual_Unemployability",
                "Status_Begin_Date": "source.Status_Begin_Date",
                "Status_Last_Update": "source.Status_Last_Update",
                "Status_Termination_Date": "source.Status_Termination_Date",
                "SDP_Event_Created_Timestamp": "source.SDP_Event_Created_Timestamp",
                "RecordStatus": "true",
                "RecordLastUpdated": "source.RecordLastUpdated",
                "filename": "source.filename",
                "sentToDoD": "false",
                "change_log": "source.change_log",
                "RecordChangeStatus": "source.RecordChangeStatus",
            }
        ).execute()

    def prepare_caregivers_data(self, cg_csv_files):
        """
        Filters caregivers filenames from input dataframe, aggregates data and returns dataframe
        Parameters: Pyspark dataframe with all filenames and metadata that are not processed (upsert)
        Returns: Dataframe: Dataframe with required column names and edipi of a caregiver ready for upsert
        """
        print(
            f"Upserting records from {cg_csv_files.count()} caregivers aggregated files"
        )
        Window_Spec = Window.partitionBy(
            "ICN", "Veteran_ICN", "Applicant_Type"
        ).orderBy(desc("Event_Created_Date"))

        cg_csv_files_to_process = cg_csv_files.select(collect_list("path")).first()[0]
        cg_df = (
            spark.read.schema(new_cg_schema)
            .csv(cg_csv_files_to_process, header=True, inferSchema=False)
            .selectExpr("*", "_metadata.file_name as filename", "_metadata.file_modification_time as SDP_Event_Created_Timestamp " )
        )
        combined_cg_df = (
            cg_df.select(
                substring("Caregiver_ICN__c", 1, 10).alias("ICN"),
                substring("Veteran_ICN__c", 1, 10).alias("Veteran_ICN"),
                date_format("Benefits_End_Date__c", "yyyyMMdd")
                .alias("Status_Termination_Date")
                .cast(StringType()),
                col("Applicant_Type__c").alias("Applicant_Type"),
                col("Caregiver_Status__c").alias("Caregiver_Status"),
                date_format("Dispositioned_Date__c", "yyyyMMdd")
                .alias("Status_Begin_Date")
                .cast(StringType()),
                col("CreatedDate").cast("timestamp").alias("Event_Created_Date"),
                "filename",
                "SDP_Event_Created_Timestamp",
            )
        ).filter(col("Caregiver_ICN__c").isNotNull())
        edipi_df = (
            broadcast(combined_cg_df)
            .join(self.icn_relationship, ["ICN"], "left")
            .withColumn("Individual_Unemployability", lit(None).cast(StringType()))
            .withColumn("PT_Indicator", lit(None).cast(StringType()))
            .withColumn(
                "SC_Combined_Disability_Percentage", lit(None).cast(StringType())
            )
            .withColumn("RecordStatus", lit(True).cast(BooleanType()))
            .withColumn("RecordLastUpdated", lit(None).cast(DateType()))
            .withColumn("Status_Last_Update", lit(None).cast(StringType()))
            .withColumn("sentToDoD", lit(False).cast(BooleanType()))
            .withColumn("Batch_CD", lit("CG").cast(StringType()))
            .withColumn("rank", rank().over(Window_Spec))
            .filter(col("rank") == 1)
            .dropDuplicates()
            .drop("rank", "va_profile_id", "record_updated_date")
        ).orderBy(col("Event_Created_Date"))
        return edipi_df

    def prepare_scd_data(self, row):
        """
        Prepares SCD data from the input row. This is the disability % data.
        Parameters: Row of data from pyspark dataframe with filenames and metadata that are not processed (upsert)
        Returns: Dataframe: Dataframe with required column names and edipi of a Veteran ready for upsert
        """

        file_name = row.path

        print(f"Upserting records from {file_name}")

        # if len(spark.read.csv(file_name).columns) != 3:
        #     schema = scd_schema
        # else:
        #     schema = scd_schema1
        scd_updates_df = (
            spark.read.csv(file_name, schema=scd_schema1, header=True, inferSchema=False)
            .selectExpr(
                "PTCPNT_ID as participant_id", "CMBNED_DEGREE_DSBLTY", "DSBL_DTR_DT", "_metadata.file_name as filename", "_metadata.file_modification_time as SDP_Event_Created_Timestamp "
            )
            .withColumn("sentToDoD", lit(False).cast(BooleanType()))
            .withColumn(
                "SC_Combined_Disability_Percentage",
                lpad(
                    coalesce(
                        when(col("CMBNED_DEGREE_DSBLTY") == "", lit("000")).otherwise(
                            col("CMBNED_DEGREE_DSBLTY")
                        )
                    ),
                    3,
                    "0",
                ),
            )
            .withColumn(
                "DSBL_DTR_DT",
                when(col("DSBL_DTR_DT") == "", None).otherwise(
                    date_format(to_date(col("DSBL_DTR_DT"), "MMddyyyy"), "yyyyMMdd")
                ),
            )
        ).filter(col("DSBL_DTR_DT").isNotNull())

        Window_Spec = Window.partitionBy(scd_updates_df["participant_id"]).orderBy(
            desc("DSBL_DTR_DT"), desc("SC_Combined_Disability_Percentage")
        )
        edipi_df = (
            broadcast(scd_updates_df)
            .join(self.icn_relationship, ["participant_id"], "left")
            .withColumn("rank", rank().over(Window_Spec))
            .withColumn("Veteran_ICN", lit(None).cast(StringType()))
            .withColumn("Applicant_Type", lit(None).cast(StringType()))
            .withColumn("Caregiver_Status", lit(None).cast(StringType()))
            .withColumn("Individual_Unemployability", lit(None).cast(StringType()))
            .withColumn("Status_Termination_Date", lit(None).cast(StringType()))
            .withColumn("RecordLastUpdated", lit(None).cast(DateType()))
            .withColumn("Batch_CD", lit("SCD"))
            .withColumn("RecordStatus", lit(True).cast(BooleanType()))
            .filter(col("rank") == 1)
            .filter(col("ICN").isNotNull())
            .dropDuplicates()
            .drop("rank", "va_profile_id", "record_updated_date")
        )

        return edipi_df

    def update_pai_data(self, row, source_type):
        """
        Prepares PT Indicator from the input row, transforms and updates a Veteran's PT_Indicator column in delta table.
        Parameters: Row of data from pyspark dataframe with metadata that are not processed (upsert),
                source_type is 'text' means its a static file (old source)
                source_type is 'table' means its a delta table (new source)
        Returns: Dataframe: Dataframe with required column names ready for upsert
        """
        # if source_type == "text":
        #     file_name = row.path
        #     file_creation_dateTime = row.dateTime
        #     raw_pai_df = (
        #         spark.read.csv(file_name, header=True, inferSchema=True)
        #         .selectExpr("*", "_metadata.file_name as filename", "_metadata.file_modification_time as SDP_Event_Created_Timestamp " )
        #     )
        # elif source_type == "table":
        file_creation_dateTime = row.dateTime
        file_name = f"Updated from PA&I delta table on {file_creation_dateTime}"
        raw_pai_df = spark.read.format("delta").load(self.pt_new_source)

        print(f"Updating PT Indicator")

        targetTable = DeltaTable.forPath(
            spark, self.fullname
        )
        targetDF = (
            targetTable.toDF()
            .filter("Batch_CD == 'SCD'")
            .filter("RecordStatus=='True'")
        )
        targetDF = targetDF.select(
            [col(c).alias(f"target_{c}") for c in targetDF.columns]
        )

        pai_df = raw_pai_df.selectExpr(
            "PTCPNT_VET_ID as participant_id", "PT_35_FLAG as source_PT_Indicator"
        )
        existing_pai_data = f"""SELECT participant_id, PT_Indicator from {self.patronage_tablename} where RecordStatus is True and Batch_CD = 'SCD' """
        existing_pai_data_df = spark.sql(existing_pai_data)

        joinDF = (
            pai_df.join(
                broadcast(targetDF),
                pai_df["participant_id"] == targetDF["target_participant_id"],
                "left",
            )
            .filter(targetDF["target_PT_Indicator"] == "N")
            .withColumn("filename", lit(file_name))
            .withColumn("SDP_Event_Created_Timestamp", lit(file_creation_dateTime))
        )

        filterDF = joinDF.filter(
            xxhash64(joinDF.source_PT_Indicator) != xxhash64(joinDF.target_PT_Indicator)
        )

        mergeDF = filterDF.withColumn("MERGEKEY", filterDF.target_ICN)

        dummyDF = filterDF.filter("target_ICN is not null").withColumn(
            "MERGEKEY", lit(None)
        )

        paiDF = mergeDF.union(dummyDF)
        edipi_df = (
            paiDF.selectExpr(
                "target_edipi as edipi",
                "participant_id",
                "target_ICN",
                "MERGEKEY",
                "target_SC_Combined_Disability_Percentage as SC_Combined_Disability_Percentage",
                "target_Status_Begin_Date as Status_Begin_Date",
                "target_Status_Last_Update as Status_Last_Update",
                "SDP_Event_Created_Timestamp",
                "filename",
                "source_PT_Indicator",
                "target_PT_Indicator",
            )
            .withColumn("ICN", lit(col("target_ICN")))
            .withColumn("Veteran_ICN", lit(None))
            .withColumn("Applicant_Type", lit(None))
            .withColumn("Caregiver_Status", lit(None))
            .withColumn("Batch_CD", lit("SCD"))
            .withColumn("PT_Indicator", coalesce(col("source_PT_Indicator"), lit("N")))
            .withColumn("Individual_Unemployability", lit(None))
            .withColumn("Status_Termination_Date", lit(None))
            .withColumn("RecordLastUpdated", lit(None))
        )
        return edipi_df

    def process_files(self, files_to_process_now):
        """
        Segregates files based on filename and calls required function to process these files
        Parameters: Pyspark dataframe with filenames and metadata that are not processed
        Returns: None
        """
        cg_csv_files = files_to_process_now.filter(
            files_to_process_now["path"].contains("caregiverevent")
        )
        edipi_df = self.prepare_caregivers_data(cg_csv_files)
        self.process_updates(edipi_df, "CG")

        other_files = files_to_process_now.filter(
            ~files_to_process_now["path"].contains("caregiverevent")
        )
        other_rows = other_files.collect()

        for row in other_rows:
            filename = row.path
            if "CPIDODIEX" in filename:
                edipi_df = self.prepare_scd_data(row)
                self.process_updates(edipi_df, "SCD")
            elif "WRTS" in filename:
                edipi_df = self.update_pai_data(row, "text")
                self.process_updates(edipi_df, "PAI")
            elif "parquet" in filename:
                edipi_df = self.update_pai_data(row, "table")
                self.process_updates(edipi_df, "PAI")
            else:
                pass

In [0]:
def main():
    file_processor = FileProcessor()

    if file_processor.no_of_files == 0:
        print(f"Loading Caregivers Seed File into the {patronage_tablename} ")
        seed_df = file_processor.initialize_caregivers()
        file_processor.process_updates(seed_df, "CG")
        print("Loading required files for upsert")
        files_to_process_now = file_processor.collect_data_source()
        display(files_to_process_now)
        print(f"Total files to process in this run: {files_to_process_now.count()}...")
        file_processor.process_files(files_to_process_now)
    else:
        print("Loading required files for upsert")
        files_to_process_now = file_processor.collect_data_source()
        display(files_to_process_now)
        print(f"Total files to process in this run: {files_to_process_now.count()}...")
        file_processor.process_files(files_to_process_now)


In [0]:
if __name__ == "__main__":
    main()

In [0]:
file_processor = FileProcessor()
display(file_processor.collect_data_source())

In [0]:
%sql
SELECT
  version,
  timestamp,
  operation,
  cast(operationMetrics.numTargetRowsInserted as INT) as TotalInsertedRecords,
  cast(operationMetrics.numTargetRowsUpdated as INT) as UpdatedRecords,
    (cast(operationMetrics.numTargetRowsInserted as INT) -  cast(operationMetrics.numTargetRowsUpdated as INT)) as NewRecords

FROM
  (describe history mypat_test)
-- where
--   operation not in ("CREATE TABLE", "OPTIMIZE")
order by
  1 desc;

In [0]:
recordsPerDate = f""" select count(*) as NoOfRecords, SDP_Event_Created_Timestamp 
                      from {file_processor.patronage_tablename}
                      group by all
                      order by 2 desc"""

spark.sql(recordsPerDate).display()


In [0]:
# %sql
# DROP TABLE mypat_test

In [0]:
# dbutils.fs.rm("dbfs:/user/hive/warehouse/mypat_test", True)

In [0]:
# %sql
# SELECT count(*), ICN, Veteran_ICN, Applicant_Type FROM mypat_test
# WHERE RecordStatus IS TRUE
# GROUP BY ALL
# HAVING count(*) > 1


In [0]:
%sql
select count(*) from pai

In [0]:
%sql
select count(*) from may_full_pai

In [0]:
%sql
select count( distinct PTCPNT_VET_ID) from pai