In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as pyplot
import seaborn as sns 

In [2]:
def read_csv_to_dataframe(file_path, header=None, index_col=None):

    # Reads a CSV file into a pandas DataFrame using a specified delimiter.

    try:
        df = pd.read_csv(file_path, sep='|', header=header, index_col=index_col)
        return df
    except FileNotFoundError:
        print(f"Error: The file '{file_path}' was not found.")
    except pd.errors.EmptyDataError:
        print("Error: The file is empty.")
    except pd.errors.ParserError:
        print("Error: There was a problem parsing the file.")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")


In [3]:
def add_header_to_dataframe(df):
    
    # Adds header to a pandas DataFrame.

    
    if not isinstance(df, pd.DataFrame):
        raise ValueError("The first argument must be a pandas DataFrame.")
    
    # Hardcoded list of column names
    column_names = ['Reference_Pool_ID', 'Loan_Identifier', 'Monthly_Reporting_Period', 'Channel',
    'Seller_Name', 'Servicer_Name', 'Master_Servicer', 'Original_Interest_Rate',
    'Current_Interest_Rate', 'Original_UPB', 'UPB_at_Issuance', 'Current_Actual_UPB',
    'Original_Loan_Term', 'Origination_Date', 'First_Payment_Date', 'Loan_Age',
    'Remaining_Months_to_Legal_Maturity', 'Remaining_Months_To_Maturity', 'Maturity_Date',
    'Original_Loan_to_Value_Ratio', 'Original_Combined_Loan_to_Value_Ratio',
    'Number_of_Borrowers', 'Debt_To_Income', 'Borrower_Credit_Score_at_Origination',
    'Co_Borrower_Credit_Score_at_Origination', 'First_Time_Home_Buyer_Indicator',
    'Loan_Purpose', 'Property_Type', 'Number_of_Units', 'Occupancy_Status',
    'Property_State', 'Metropolitan_Statistical_Area', 'Zip_Code_Short',
    'Mortgage_Insurance_Percentage', 'Amortization_Type', 'Prepayment_Penalty_Indicator',
    'Interest_Only_Loan_Indicator', 'Interest_Only_First_Principal_And_Interest_Payment_Date',
    'Months_to_Amortization', 'Current_Loan_Delinquency_Status', 'Loan_Payment_History',
    'Modification_Flag', 'Mortgage_Insurance_Cancellation_Indicator', 'Zero_Balance_Code',
    'Zero_Balance_Effective_Date', 'UPB_at_the_Time_of_Removal', 'Repurchase_Date',
    'Scheduled_Principal_Current', 'Total_Principal_Current', 'Unscheduled_Principal_Current',
    'Last_Paid_Installment_Date', 'Foreclosure_Date', 'Disposition_Date',
    'Foreclosure_Costs', 'Property_Preservation_and_Repair_Costs', 'Asset_Recovery_Costs',
    'Miscellaneous_Holding_Expenses_and_Credits', 'Associated_Taxes_for_Holding_Property',
    'Net_Sales_Proceeds', 'Credit_Enhancement_Proceeds', 'Repurchase_Make_Whole_Proceeds',
    'Other_Foreclosure_Proceeds', 'Modification_Related_Non_Interest_Bearing_UPB',
    'Principal_Forgiveness_Amount', 'Original_List_Start_Date', 'Original_List_Price',
    'Current_List_Start_Date', 'Current_List_Price', 'Borrower_Credit_Score_At_Issuance',
    'Co_Borrower_Credit_Score_At_Issuance', 'Borrower_Credit_Score_Current',
    'Co_Borrower_Credit_Score_Current', 'Mortgage_Insurance_Type',
    'Servicing_Activity_Indicator', 'Current_Period_Modification_Loss_Amount',
    'Cumulative_Modification_Loss_Amount', 'Current_Period_Credit_Event_Net_Gain_or_Loss',
    'Cumulative_Credit_Event_Net_Gain_or_Loss','Special Eligibility Program','Foreclosure Principal Write-off Amount',
    'Relocation Mortgage Indicator','Zero Balance Code Change Date','Loan Holdback Indicator',
    'Loan Holdback Effective Date','Delinquent Accrued Interest','Property Valuation Method',
    'High Balance Loan Indicator','ARM Initial Fixed-Rate Period  ≤ 5 YR Indicator','ARM Product Type',
    'Initial Fixed-Rate Period','Interest Rate Adjustment Frequency','Next Interest Rate Adjustment Date',
    'Next Payment Change Date','Index','ARM Cap Structure','Initial Interest Rate Cap Up Percent',
    'Periodic Interest Rate Cap Up Percent','Lifetime Interest Rate Cap Up Percent','Mortgage Margin',
    'ARM Balloon Indicator','ARM Plan Number','Borrower Assistance Plan','High Loan to Value (HLTV) Refinance Option Indicator',
    'Deal Name','Repurchase Make Whole Proceeds Flag','Alternative Delinquency Resolution',
    'Alternative Delinquency  Resolution Count','Total Deferral Amount']  

    if len(column_names) != df.shape[1]:
        raise ValueError("The number of column names must match the number of columns in the DataFrame.")
    
    df.columns = column_names
    
    return df

In [4]:
def convert_mmYYYY_to_datetime(df):
    
    # Convert specified columns in a DataFrame from MMYYYY integer format to datetime.

    columns = ['Monthly_Reporting_Period','Origination_Date', 'First_Payment_Date','Maturity_Date','Interest_Only_First_Principal_And_Interest_Payment_Date', 
    'Zero_Balance_Effective_Date', 'Repurchase_Date', 'Last_Paid_Installment_Date', 'Foreclosure_Date', 'Disposition_Date', 'Original_List_Start_Date', 
    'Current_List_Start_Date', 'Zero Balance Code Change Date', 'Loan Holdback Effective Date', 'Next Interest Rate Adjustment Date', 'Next Payment Change Date']

    for column in columns:
        if column in df.columns:
            # Convert the integer to string, then to datetime
            df[column] = pd.to_datetime(df[column].astype(str), format='%m%Y', errors='coerce')
        else:
            print(f"Warning: Column '{column}' does not exist in the DataFrame.")
    
    return df

In [5]:
def handle_duplicates(df, subset=None, keep='first', drop=True):
    
    # Handle duplicates in a DataFrame.
    
    if drop:
        # Drop duplicates
        df_deduped = df.drop_duplicates(subset=subset, keep=keep)
    
    return df_deduped

In [6]:
'''
These are the major columns that would be required in our code to calculate CECL score using Transition Matrix Multiplication:

1) Current_Loan_Delinquency_Status
2) Loan_Age 
3) Current_Actual_UPB
4) Loan_Identifier
5) Current_Interest_Rate
6) Original_Loan_Term 
7) Remaining_Months_To_Maturity

'''

'\nThese are the major columns that would be required in our code to calculate CECL score using Transition Matrix Multiplication:\n\n1) Current_Loan_Delinquency_Status\n2) Loan_Age \n3) Current_Actual_UPB\n4) Loan_Identifier\n5) Current_Interest_Rate\n6) Original_Loan_Term \n7) Remaining_Months_To_Maturity\n\n'

In [7]:
def handling_missing_data(df):

    # Missing value imputation for Loan_Age 
    
    df["Loan_Age"] = df.apply(
        lambda row: row["Original_Loan_Term"] - row['Remaining_Months_To_Maturity'] if pd.notna(row["Original_Loan_Term"]) and pd.notna(row['Remaining_Months_To_Maturity']) else row["Loan_Age"],
        axis=1
    )

    # Dropping the rows with left out missing values for Loan_Age

    df = df.dropna(subset=['Loan_Age'])

    # Dropping the rows with null values in loan identifer column 

    df = df.dropna(subset=['Loan_Identifier'])

    # Dropping the rows with no delinquency status

    df = df.dropna(subset=['Current_Loan_Delinquency_Status'])

    # Dropping the rows with no current actual unpaid balance 

    df = df.dropna(subset=['Current_Actual_UPB'])

    # Dropping the rows with no monthly reporting period 

    df = df.dropna(subset=['Monthly_Reporting_Period'])
    
    
    return df    
    

In [8]:
# These columns contain business-critical metrics where extreme values may represent significant insights or events. 
# Therefore, outlier detection and removal should be avoided to preserve the integrity of the analysis.

def outlier_detection(df):

    return df

In [10]:
def update_status_column(df):
    """
    Update the Current_Loan_Delinquency_Status in the DataFrame based on the given mapping.
    
    """

    column_name_1 = "Current_Loan_Delinquency_Status"
    column_name_2 = "Next_Loan_Delinquency_Status"


    # Define the mapping
    mapping = {
        "00": "current",
        "01": "ingrace",
        "02": "30dpd",
        "03": "60dpd",
        "04": "90dpd",
        "05": "chargedoff"
    }

    # Use .loc to avoid SettingWithCopyWarning
    df.loc[:, column_name_1] = df[column_name_1].map(mapping).fillna(np.nan)
    df.loc[:, column_name_2] = df[column_name_2].map(mapping).fillna(np.nan)

In [11]:
def add_next_delinquency_status(df):
    """
    Add a new column showing the next month's delinquency status for each loan.
    
    """
    # Sort the DataFrame by Loan_Identifier and Monthly_Reporting_Period
    df_sorted = df.sort_values(['Loan_Identifier', 'Monthly_Reporting_Period'])
    
    # Create the next status column by shifting within each loan group
    df_sorted['Next_Loan_Delinquency_Status'] = (
        df_sorted.groupby('Loan_Identifier')['Current_Loan_Delinquency_Status']
        .shift(-1)  # Negative shift to get next month's status
    )
    
    # Sort back to original order if needed
    df_result = df_sorted.sort_index()
    
    return df_result

In [12]:
def update_status_column(df):
    """
    Update the specified column in the DataFrame based on the given mapping.
    
    """

    column_name_1 = "Current_Loan_Delinquency_Status"
    column_name_2 = "Next_Loan_Delinquency_Status"
    # Define the mapping
    mapping = {
        "00": "current",
        "01": "ingrace",
        "02": "30dpd",
        "03": "60dpd",
        "04": "90dpd"
    }

    # Use .loc to avoid SettingWithCopyWarning
    df.loc[:, column_name_1] = df[column_name_1].map(mapping).fillna("chargedoff")
    df.loc[:, column_name_2] = df[column_name_2].map(mapping).fillna("chargedoff")


    df = df.dropna(subset=['Next_Loan_Delinquency_Status'])

    return df

In [13]:
def analyze_loan_transitions(df):
    """
    Analyze transitions between loan delinquency status and create a transition matrix.
    
    """

    current_status_col = "Current_Loan_Delinquency_Status"
    next_status_col = "Next_Loan_Delinquency_Status"

    # Calculate transition counts and probabilities
    transition_counts = pd.crosstab(
        df[current_status_col],
        df[next_status_col]
    )
    
    transition_matrix = pd.crosstab(
        df[current_status_col],
        df[next_status_col],
        normalize='index'
    )

    rows = [ 'current', 'ingrace', '30dpd', '60dpd', '90dpd', 'chargedoff'] 
    cols = [ 'current', 'ingrace', '30dpd', '60dpd', '90dpd', 'chargedoff']
    transition_matrix = transition_matrix[cols]
    transition_matrix = transition_matrix.reindex(rows)


    return transition_matrix

In [14]:
def weighted_average_age(df):

    # Calculating weighted average 
    
    wavg = ((df["Loan_Age"]*df["Current_Actual_UPB"]).sum()/df["Current_Actual_UPB"].sum())
    wavg = int(wavg.round(0))
    return wavg

In [15]:
def compute_cgl_curve(transition_matrix, wavg):
    """
    Computes the Cgl Curve based on the initial state and transition matrix.

    """
    initial_state = np.array([1.0, 0.0, 0.0, 0.0, 0.0, 0.0])

    Cgl_Curve = []
    
    for i in range(wavg + 24):
        state_probability = np.dot(initial_state, np.linalg.matrix_power(transition_matrix, i))
        Cgl_Curve.append(state_probability)
    
    #Cgl_Curve = pd.DataFrame(Cgl_Curve)
    return Cgl_Curve

In [16]:
def compute_ALLL(Cgl_Curve,wavg):

    ALLL =Cgl_Curve[wavg+11][5]-Cgl_Curve[wavg-1][5]

    return ALLL


In [32]:
def compute_cecl(file_path):
    # Step 1: Read CSV to DataFrame
    df = read_csv_to_dataframe(file_path)

    # Step 2: Add header to DataFrame
    df = add_header_to_dataframe(df)

    # Step 3: Convert mmYYYY to datetime
    df = convert_mmYYYY_to_datetime(df)

    # Step 4: Handle duplicates
    df = handle_duplicates(df)

    # Step 5: Handle missing data
    df = handling_missing_data(df)

    # Step 6: Outlier detection
    df = outlier_detection(df)

    # Step 7: Add next delinquency status
    df = add_next_delinquency_status(df)

    # Step 8: Update status column
    df = update_status_column(df)

    # Step 9: Analyze loan transitions to get transition matrix
    TM = analyze_loan_transitions(df)

    # Step 10: Calculate weighted average age
    WAVG = weighted_average_age(df)

    # Step 11: Compute Cgl Curve
    Cgl_Curve = compute_cgl_curve(TM, WAVG)

    # Step 12: Compute CECL
    ALLL = compute_ALLL(Cgl_Curve, WAVG)

    # Calculate CECL
    CECL = 1.5 * ALLL

    return CECL,TM, Cgl_Curve, df


In [33]:
cecl_value, TMM ,cgl, df= compute_cecl("data.csv")
print("Computed CECL:", cecl_value)

Computed CECL: 0.023455441871400834
