# IFRS 9 IMPAIRMENT MODEL ~~ BY DANIEL TIWONGE MHANGO (Development in Progress)


## PROBABILITY OF DEFAULT MODEL

The cells below contain the code used to generate the probabilities of default from the historical loan data (recommended 5 years of data).
These are derived from transition matrices that track the transition of balances between the various IFRS 9 stages (Stage 1, Stage 2, Stage 3).
We have assumed that loans cannot transition out of defualt (cannot cure) and thus created an absorbing state in the matrices. .
FLI adjustments are to be applied to the pds in the MacroEconomic Module to obtain the Final pds. Macroeconomic data is obtained from the IMF using the web API. 

In [None]:
import pandas as pd
import numpy as np
import time
import matplotlib.pyplot as plt
from datetime import datetime
from dateutil.relativedelta import relativedelta

start_time = time.time()


### IMPORT DATA FILES
Import Data; Declare Valuation/Reporting Date; Declare Periodicity; Declare Staging Criterion

In [None]:
# declare path to data and read in data file
path_read = 'Input Files/'
path_write = 'Output Files/'

# import data and create backup of original data files
pd_data_backup = pd.read_csv(path_read + 'Example PD Data.csv')
pd_data = pd_data_backup.copy()

rr_data_backup = pd.read_csv(path_read + 'Example Recoveries Data.csv')
recoveries_data = rr_data_backup.copy()

# Define keys and values for pd headers_mapping dictionary
pd_keys = ['data_date', 'client_id', 'account_number', 'loan_segment', 'eir', 'risk_status', 'out_bal', 'days_past_due']
pd_values = ['DATA_DATE', 'CLIENT_ID', 'ACCOUNT_NUMBER', 'LOAN_SEGMENT', 'EIR','RISK_STATUS', 'OUT_BAL', 'DAYS_PAST_DUE']
# Map the column headers (values) of the dataframe to the corresponding keys
pd_column_headers_dict = {k: v for k, v in zip(pd_keys, pd_values)}

# define keys and values for rr headers mapping dictionary
rr_keys = ['data_date', 'account_number', 'cash_collections', 'eir', 'default_date', 'recovery_date']
rr_values = ['DATA_DATE', 'ACCOUNT_NUMBER', 'CASH_COLLECTIONS', 'EIR', 'DEFAULT_DATE', 'RECOVERY_DATE']
# Map the column headers (values) of the dataframe to the corresponding keys
rr_column_headers_dict = {k: v for k, v in zip(rr_keys, rr_values)}

# declare valuation date
valuation_date = datetime(year=2022, month=12, day=31).date()

# Declare transition periodicity
periodicity = 'Quarterly'.casefold() # can declare 'Annual', 'Quarterly' and 'Monthly' as applicable

# Declare staging criterion: dpd or risk status
staging_criterion = 'risk status' # alternative: 'days past due'

### DATA PRE-PROCESSING
Perform data pre-processing to ensure data is in the desired format

In [None]:
def pd_data_prep(pd_data, periodicity, staging_criterion, data_date, client_id, account_number, loan_segment, eir, risk_status, out_bal, days_past_due):
    """This function takes the columns of a dataframe as its arguments and perfoms cleanup and preprocessing of PD data"""

    origin = np.datetime64('1900-01-01', 'D')

    # convert date column to datetime
    pd_data[data_date] = pd_data[data_date] - 2
    pd_data[data_date] = origin + np.array(pd_data[data_date], dtype= 'timedelta64[D]')

    # sort values based on date
    pd_data = pd_data.sort_values(by = [data_date])

    # convert client_id to str
    pd_data[client_id] = pd_data[client_id].astype(str)
    pd_data[client_id] = pd_data[client_id] #.str.replace(' ', '')
    
    # convert account number to str
    pd_data[account_number] = pd_data[account_number].astype(str).str.replace("'", '')
    pd_data[account_number] = pd_data[account_number].astype(str)

    # convert loan_segment to str
    pd_data[loan_segment] = pd_data[loan_segment].astype(str)
    pd_data[loan_segment] = pd_data[loan_segment].str.lower()

    # convert eir to float
    pd_data[eir] = pd_data[eir].astype(str).str.replace(' ', '')
    pd_data[eir] = pd_data[eir].astype(float).fillna(0.0)

    # convert risk_status to str
    pd_data[risk_status] = pd_data[risk_status].astype(str)

    # convert out_bal to float
    pd_data[out_bal] = pd_data[out_bal].astype(str).str.replace('(', '-').str.replace(r'[^0-9.-]', '', regex = True)  
    # pd_data[out_bal] = pd_data[out_bal].astype(str).str.replace(r'[^0-9.-]', '', regex = True)    
    pd_data[out_bal] = pd.to_numeric(pd_data[out_bal], errors = 'coerce').fillna(0.0)

    # convert days past due to numeric
    pd_data[days_past_due] = pd_data[days_past_due].astype(str).str.replace(r'[^0-9.-]', '', regex = True)    
    pd_data[days_past_due] = pd.to_numeric(pd_data[days_past_due], errors = 'coerce').fillna(0.0)

    # filter negative outstanding balances
    pd_data = pd_data[pd_data[out_bal] >= 0]

    # create unique identifier for current status
    pd_data['ID_NOW'] = pd_data.apply(lambda row: row[data_date].strftime('%d-%m-%Y') + '_' + row[account_number], axis = 1)

    # create unique identifier for next status determined by periodicity
    
    if periodicity == 'Annual'.casefold():
        shift_value = 13
    elif periodicity == 'Quarterly'.casefold():
        shift_value = 4
    elif periodicity == 'Monthly'.casefold():
        shift_value = 2
       
    pd_data['SHIFTED_DATE'] = (pd_data[data_date].values.astype('M8[M]') + np.timedelta64(shift_value, 'M')).astype('M8[D]') - np.timedelta64(1, 'D')

    # create ID_NEXT column for next status
    pd_data['ID_NEXT'] = pd_data.apply(lambda row: row['SHIFTED_DATE'].strftime('%d-%m-%Y') + '_' + row[account_number], axis = 1)

    # map dpd to current stage
    def dpd_staging_map(days):
        """This function returns the IFRS 9 stage based on days past due"""
        if days <= 30:
            return 'Stage 1'
        elif days <= 90:
            return 'Stage 2'
        else:
            return 'Stage 3'

    # map risk status to stage current
    def risk_status_staging_map(status):
        """This function maps the risk status to equivalent IFRS 9 stages"""
        if status.casefold() == 'normal'.casefold():
            return 'Stage 1'
        elif status.casefold() == 'watch'.casefold():
            return 'Stage 2'
        else:
            return 'Stage 3'
        
    # create stage now column based on selected staging_criterion
    if staging_criterion == 'risk status'.casefold():
        pd_data['STAGE_NOW'] = pd_data[risk_status].apply(risk_status_staging_map)

    elif staging_criterion == 'days past due'.casefold():
        pd_data['STAGE_NOW'] = pd_data[days_past_due].apply(dpd_staging_map)

    # map dpd to next stage
        # create dictionary with ID_NOW as keys and stage now as values
    lookup_id = pd_data.set_index('ID_NOW')['STAGE_NOW'].to_dict()

    # create stage next by checking mapping dictionary using get() method on ID_NEXT and returning from stage now; return 'Exit' if not found
    pd_data['STAGE_NEXT'] = pd_data['ID_NEXT'].apply(lambda x: lookup_id.get(x, 'Exit'))   
    
    # define conditions for curing
    conditions = (pd_data['STAGE_NOW'] == 'Stage 3') & ((pd_data['STAGE_NEXT'] == 'Stage 2') | (pd_data['STAGE_NEXT'] == 'Stage 1'))

    # Compare stage now and stage next to map cures based on conditions. 
    pd_data['CURES'] = np.where(conditions, 'Cured', pd_data['STAGE_NEXT'])
    # pd_data["CURED"] = pd_data.apply(lambda x: cures(stage_now=x['STAGE_NOW'], stage_next=x["STAGE_NEXT"]), axis = 1)

    cleaned_pd_data = pd_data

    return cleaned_pd_data

pd_prep_output = pd_data_prep(pd_data, periodicity, staging_criterion, *pd_column_headers_dict.values())

In [None]:
# define a function that takes in recoveries data and pre-processes it
def recoveries_data_prep(cleaned_pd_data, rr_data, periodicity, data_date, account_number, cash_collections, eir, default_date, recovery_date):
    """This function takes the recoveries data and pre-processes it."""

    origin = np.datetime64('1900-01-01', 'D')

    # ensure all date columns are datetime objects
    rr_data[data_date] = rr_data[data_date].fillna(42370)
    rr_data[data_date] = rr_data[data_date] - 2
    rr_data[data_date] = origin + np.array(rr_data[data_date], dtype='timedelta64[D]')

    rr_data[default_date] = rr_data[default_date] - 2
    rr_data[default_date] = origin + np.array(rr_data[default_date], dtype='timedelta64[D]')

    rr_data[recovery_date] = rr_data[recovery_date].fillna(rr_data[data_date])
    rr_data[recovery_date] = rr_data[recovery_date] - 2
    rr_data[recovery_date] = origin + np.array(rr_data[recovery_date], dtype='timedelta64[D]')

    # clean account number
    rr_data[account_number] = rr_data[account_number].astype(str).str.replace("'",'')
    rr_data[account_number] = rr_data[account_number].astype(str)

    # set cash collections to numeric
    rr_data[cash_collections] = rr_data[cash_collections].astype(str).str.replace('(', '-').str.replace(r'[^0-9.-]', '', regex = True)
    # rr_data[cash_collections] = rr_data[cash_collections].astype(str).str.replace(r'[^0-9.-]', '', regex = True)
    rr_data[cash_collections] = pd.to_numeric(rr_data[cash_collections], errors= 'coerce').fillna(0.0)

    # set eir to numeric
    rr_data[eir] = rr_data[eir]
    rr_data[eir] = rr_data[eir].astype(float).fillna(0.0)

    # filter negative recoveries
    rr_data = rr_data[rr_data[cash_collections] >= 0]    

    # create unique identifier for current status
    rr_data['ID_NOW'] = rr_data.apply(lambda row: row[data_date].strftime('%d-%m-%Y') + '_' + row[account_number], axis = 1)

    # create unique identifier for next status determined by periodicity
    if periodicity == 'Annual'.casefold():
        shift_value = 13
    elif periodicity == 'Quarterly'.casefold():
        shift_value = 4
    elif periodicity == 'Monthly'.casefold():
        shift_value = 2
       
    rr_data['SHIFTED_DATE'] = (rr_data[data_date].values.astype('M8[M]') + np.timedelta64(shift_value, 'M')).astype('M8[D]') - np.timedelta64(1, 'D')

    # create ID_NEXT column for next status
    rr_data['ID_NEXT'] = rr_data.apply(lambda row: row['SHIFTED_DATE'].strftime('%d-%m-%Y') + '_' + row[account_number], axis = 1)
    
    # map the account number to the first recorded instance of default occuring when there is no default date
    mapped_default_date = cleaned_pd_data.loc[((cleaned_pd_data['STAGE_NOW']
                                                .isin(['Stage 1', 'Stage 2'])) & (cleaned_pd_data['STAGE_NEXT'] == 'Stage 3'))
                                                ].groupby('ACCOUNT_NUMBER')['DATA_DATE'].first().reset_index()
    
    mapped_default_date.columns = ['ACCOUNT_NUMBER', 'MAPPED_DEFAULT_DATE']

    # merge mapped default date to rr_data
    rr_data = pd.merge(rr_data, mapped_default_date, on = 'ACCOUNT_NUMBER', how = 'left')

    # replace NaN values in mapped default date
    rr_data['MAPPED_DEFAULT_DATE'] = rr_data['MAPPED_DEFAULT_DATE'].fillna(rr_data[recovery_date] - pd.offsets.DateOffset(months = 3))

    # determine the final default date to use
    rr_data['FINAL_DEFAULT_DATE'] = rr_data[default_date].fillna(rr_data['MAPPED_DEFAULT_DATE'])

    # determine the time in default
    rr_data['TIME IN DEFAULT'] = rr_data[recovery_date] - rr_data['FINAL_DEFAULT_DATE']
    rr_data["TIME IN DEFAULT"] = rr_data['TIME IN DEFAULT'].apply(lambda x: (x.days)/365)

    # discount the cash collections 
    rr_data['DISCOUNTED_RECOVERIES'] = rr_data[cash_collections]*((1.0 + rr_data[eir])**(-rr_data['TIME IN DEFAULT']))

    # group duplicates into final discounted recoveries dataframe
    discounted_recoveries = rr_data.groupby('ID_NOW')['DISCOUNTED_RECOVERIES'].sum().reset_index()

    # merge the discounted recoveries to the cleaned pd data
    pd_rr_data = pd.merge(cleaned_pd_data, discounted_recoveries, on = 'ID_NOW', how = 'left')

    # create conditions on which to create the recoveries status column
    conditions = (pd_rr_data['STAGE_NOW'] == 'Stage 3') & (pd_rr_data['CURES'] != 'Cured') & (pd_rr_data['DISCOUNTED_RECOVERIES'] > 0)

    pd_rr_data['RECOVERIES'] = np.where(conditions, 'Recovered', pd_rr_data['CURES'])

    # rename rr_data to cleaned_rr_data
    cleaned_rr_data = rr_data

    # subset pd_rr_data based on each segment and add to dictionary
    final_data_dict = {}
    for segment in pd_rr_data['LOAN_SEGMENT'].unique():
        segment_df = pd_rr_data[pd_rr_data['LOAN_SEGMENT'] == segment]
        final_data_dict[segment] = segment_df

    return pd_rr_data, cleaned_rr_data, final_data_dict

recoveries_prep_output = recoveries_data_prep(pd_prep_output, recoveries_data, periodicity, *rr_column_headers_dict.values())

### TRANSITION MATRICES
Construct transition matrices;
Perform matrix multiplication based on periodicity;

In [None]:
# Define function that constructs transtition matrices 
def construct_matrices(pd_rr_data, periodicity, valuation_date):
    """This fun,ction creates transition matrices for each segment of loans and assigns the projected 'transition date' for the nth matrix from the valuation date"""

    # Create list of dates and insert valuation date
    date_projections = [valuation_date]
    
     # Declare n - number of transitions for the model
        # add dates to list based on transition criteria (quarterly/monthly/annualy)
    if periodicity == 'Annual'.casefold():
        n = 30
        shift_value = 12
    elif periodicity == 'Quarterly'.casefold():
        n = 45
        shift_value = 3
    elif periodicity == 'Monthly'.casefold():
        n = 120
        shift_value = 1
        
    for i in range(n):
        next_date = date_projections[i] + relativedelta(months = shift_value, day = 31)
        date_projections.append(next_date)        
 
    # define all possible stages of loans
    all_stages_pds = ['Stage 1', 'Stage 2', 'Stage 3']
    all_stages_rr = ['Cured', 'Recovered', 'Stage 3']
    base_matrices_dict = {}
    rr_matrices_dict = {}

    # ______________________________________________________________________________________________________________________________________________________________

    # segmentation for pds
    for segment in pd_rr_data['LOAN_SEGMENT'].unique():
        # filter the dataframe to obtain a df for each unique segment
        segment_pd_data = pd_rr_data[pd_rr_data['LOAN_SEGMENT'] == segment]

        # filter out exits from the population - implicit assumption: exits occur due to reasons other than clearing the balance
        segment_pd_data = segment_pd_data[segment_pd_data['STAGE_NEXT'] != 'Exit']

        # construct a base transition matrix from the filtered dataframe
        base_transition_matrix = pd.pivot_table(data= segment_pd_data, values= 'OUT_BAL', index= 'STAGE_NOW', columns= 'STAGE_NEXT', aggfunc='sum', fill_value=0)

        # reindex to ensure all stages present
        base_transition_matrix = base_transition_matrix.reindex(index= all_stages_pds, columns= all_stages_pds).fillna(0)

        # set stage 3 row to 0 - Assumption: No transitions out of default
        base_transition_matrix.loc['Stage 3'] = 0

        # set stage3-stage3 to 1 - absorbing state created
        base_transition_matrix.at['Stage 3', 'Stage 3'] = 1

        # calculate row sums
        row_sums = base_transition_matrix.sum(axis = 1)

        # convert each entry into a percentage of the row total by dividing by row_sums
        base_transition_matrix = base_transition_matrix.div(row_sums, axis = 0)
        
        # Add base transition matrix to base_matrices dictionary 
        base_matrices_dict[segment] = base_transition_matrix

    # Perform matrix multiplication to obtain the future transition matrices
    pd_matrix_dict = {} # Define dictionary to store matrices

    # create transition matrices for pd projections
    for segment in base_matrices_dict:
        # create a list to store nth transition matrices: initialize by storing base matrix.
        base_matrix = base_matrices_dict[segment]
        pd_transition_matrices = [base_matrix]

        # multiply matrices
        for _ in range(n):
            previous_matrix = pd_transition_matrices[-1]
            next_matrix = previous_matrix @ base_matrix
            pd_transition_matrices.append(next_matrix)

    # map the nth date and nth matrix to each other and store in date-matrix pairing for each segment
        date_pd_matrix_pairing = {d: m for d, m in zip(date_projections, pd_transition_matrices)}

    # map the segment to the date matrix pairing to create a nested dictionary 
        pd_matrix_dict[segment] = date_pd_matrix_pairing

    # _______________________________________________________________________________________________________________________________________________________________

    # segmentation for cures and recoveries
    for segment in pd_rr_data['LOAN_SEGMENT'].unique():
        # filter the dataframe to obtain a df for each unique segment
        segment_rr_data = pd_rr_data[pd_rr_data['LOAN_SEGMENT'] == segment]

        # filter out exits from the population - implicit assumption: exits occur due to reasons other than clearing the balance
        segment_rr_data = segment_rr_data[segment_rr_data['RECOVERIES'] != 'Exit']

        # construct a cure transition matrix from the filtered dataframe
        rr_transition_matrix = pd.pivot_table(data= segment_rr_data, values= 'OUT_BAL', index= 'STAGE_NOW', columns= 'RECOVERIES', aggfunc='sum', fill_value=0)

        # reindex to ensure all stages present
        rr_transition_matrix = rr_transition_matrix.reindex(index= all_stages_rr, columns= all_stages_rr).fillna(0)

        # set cure-cure and rr-rr to 1
        rr_transition_matrix.at['Cured', 'Cured'] = 1
        rr_transition_matrix.at['Recovered', 'Recovered'] = 1

        # calculate row sums
        row_sums = rr_transition_matrix.sum(axis = 1)

        # convert each entry into a percentage of the row total by dividing by row_sums
        rr_transition_matrix = rr_transition_matrix.div(row_sums, axis = 0)
        
        # Add base transition matrix to base_matrices dictionary 
        rr_matrices_dict[segment] = rr_transition_matrix

    # create cr_rr matrix dictionary to store cr_rr matrices
    rr_matrix_dict = {}
       
    # create transition matrices for cr-rr projections
    for segment in rr_matrices_dict:
        # create a dixctionary to store the nth transition matrices: starting with the base rr matrix
        base_rr_matrix = rr_matrices_dict[segment]
        rr_transition_matrices = [base_rr_matrix]

        # multiply matrices
        for _ in range(n):
            previous_rr_matrix = rr_transition_matrices[-1]
            next_rr_matrix = previous_rr_matrix @ base_rr_matrix
            rr_transition_matrices.append(next_rr_matrix)

        # create a dictionary to pair the nth rr_transition matrix to the nth date
        date_rr_matrix_pairing = {d: m for d, m in zip(date_projections, rr_transition_matrices)}

        # store the rr_date pairings and the relevant segment in rr_matrix_dict
        rr_matrix_dict[segment] = date_rr_matrix_pairing
    
    return pd_matrix_dict, rr_matrix_dict

construct_matrices_output = construct_matrices(recoveries_prep_output[0], periodicity, valuation_date) 

### EXTRACT DEFAULT PROBABILITIES
Extract the PDs from the transition matrices for each loan segment

In [None]:
# Define function to extract PDs, Cures rates and Recovery Rates from transition matrices
def extract_rates(pd_matrix_dict, rr_matrix_dict):
    """This function extracts the unconditional pds, cure rates and recovery rates from the input matrix dictionaries and outputs a dataframe of both
    unconditional and conditional rates."""

    # extract the pds
    segment_pds = []

    # loop through each matrix in matrices for each date and for each segment
    for segment, matrices in pd_matrix_dict.items():
        # initialize empty lists to store PDs for current segment
        pd_dates = []
        stage_1_cml_pds = []
        stage_2_cml_pds = []

        for date_index, matrix in matrices.items():
            # Access desired element of matrix
            pd_1 = matrix.iloc[0,2]
            pd_2 = matrix.iloc[1,2]

            # append each date to dates and each matrix element to its corresponding list
            pd_dates.append(date_index)
            stage_1_cml_pds.append(pd_1)
            stage_2_cml_pds.append(pd_2)

        # add cumulative pds lists to list
        cml_pds_list = [stage_1_cml_pds, stage_2_cml_pds]

        # create output list
        cond_pds_list = []

        # compute the conditional pds from the cumulative pds
        for pds_sublist in cml_pds_list:
            pd_output = [pds_sublist[i] if i == 0 else ((pds_sublist[i]-pds_sublist[i-1])/(1-pds_sublist[i-1])) for i in range(0, len(pds_sublist))]
            cond_pds_list.append(pd_output)

        # create dictionary of PDs and Dates
        cml_pds_dict = {
            'Segment': [segment] * len(pd_dates),
            'Date': pd_dates,
            'Stage 1 cml. PDs': stage_1_cml_pds,
            'Stage 2 cml. PDs': stage_2_cml_pds,
            'Stage 1 cond. PDs': cond_pds_list[0],
            'Stage 2 cond. PDs': cond_pds_list[1],
        }

        # create pds dataframe 
        cml_pds = pd.DataFrame(cml_pds_dict)

        # Add segment dataframe to the list of dataframes
        segment_pds.append(cml_pds)

    # create empty dataframe for the pds
    all_pds = pd.DataFrame()

    for segment_pd in segment_pds:
        # Add blank row
        # blank_row = pd.DataFrame([[''] * segment_pd.shape[1]], columns= segment_pd.columns)
        all_pds = pd.concat([all_pds, segment_pd], ignore_index= True)

    # ________________________________________________________________________________________________________________________________________________________________

    # extract cures and recoveries
        # extract the pds
    segment_rrs = []

    # loop through each matrix in matrices for each date and for each segment
    for segment, matrices in rr_matrix_dict.items():
        # initialize empty lists to store PDs for current segment
        rr_dates = []
        cml_cr = []
        cml_rr = []

        for date_index, matrix in matrices.items():
            # Access desired element of matrix
            cr = matrix.iloc[2,0]
            rr = matrix.iloc[2,1]

            # append each date to dates and each matrix element to its corresponding list
            rr_dates.append(date_index)
            cml_cr.append(cr)
            cml_rr.append(rr)

        # add cumulative pds lists in list
        cml_rr_list = [cml_cr, cml_rr]

        # create output list
        cond_rr_list = []

        # compute the conditional pds from the cumulative pds
        for sublist in cml_rr_list:
            rr_output = [sublist[i] if i == 0 else ((sublist[i]-sublist[i-1])/(1-sublist[i-1])) for i in range(0, len(sublist))]
            cond_rr_list.append(rr_output)

        # create dictionary of PDs and Dates
        cml_rr_dict = {
            'Segment': [segment] * len(rr_dates),
            'Date': rr_dates,
            'Cml. CRs': cml_cr,
            'Cml. RRs': cml_rr,
            'Cond. CRs': cond_rr_list[0],
            'Cond. RRs': cond_rr_list[1],
        }

        # create pds dataframe 
        cml_rr = pd.DataFrame(cml_rr_dict)

        # Add segment dataframe to the list of dataframes
        segment_rrs.append(cml_rr)

    # create empty dataframe for the recoveries
    all_rrs = pd.DataFrame()

    for segment_rr in segment_rrs:
        # Add blank row
        # blank_row = pd.DataFrame([[''] * segment_rr.shape[1]], columns= segment_rr.columns)
        all_rrs = pd.concat([all_rrs, segment_rr], ignore_index= True)

    all_pd_rr_merge = pd.merge(all_pds, all_rrs, on = ['Segment', 'Date'], how = 'left')
    
    all_pd_rr = pd.DataFrame()

    for segment in all_pd_rr_merge['Segment'].unique():
        segment_df = all_pd_rr_merge[all_pd_rr_merge['Segment'] == segment]
        all_pd_rr = pd.concat([all_pd_rr, segment_df, pd.DataFrame([[]])], ignore_index= True)


    return all_pd_rr, all_pds, all_rrs, all_pd_rr_merge

rates_output = extract_rates(construct_matrices_output[0], construct_matrices_output[1])

### EXPORT RESULTS TO .CSV
Export Transition Matrices; Export Cleaned & Transformed PD Data; Export PD Output

In [None]:
# Export transition matrices for each segment
for i, item in enumerate(construct_matrices_output):
    for segment in item.keys():
        output = pd.DataFrame([item[segment]])
        segment_name = segment.replace('/', ' ')
        if i == 0:
            output.to_csv(path_write + '/Transition Matrices/' + 'np_pd_matrices' + '_' + segment_name + '.csv', index = False)
        elif i == 1:
            output.to_csv(path_write + '/Transition Matrices/' + 'np_cr_rr_matrices' + '_' + segment_name + '.csv', index = False)

print('Successfully Exported All Transition Tatrices')

# Export pds and rrs to csv
rates_output[0].to_csv(path_write + 'example PDs_RRs.csv', index = False)
print("Successfully Exported Non-FLI-Adjusted PDs!")

# Export cleaned pd_rr data to csv
recoveries_prep_output[0].to_csv(path_write + 'example_cleaned Data.csv', index = False)
print("Successfully Exported Cleaned PD Data!")

# Export cleaned recoveries data
recoveries_prep_output[1].to_csv(path_write + 'example cleaned recoveries Data.csv', index = False)
print("Successfully Exported Pre-Processed Recoveries Data")

In [None]:
# Define a function to compute the NPLs for each loan segment in the data
def historical_segment_npl(cleaned_pd_data, periodicity):
    """This function takes in the historical PD data and periodicity and returns a DataFrame with the historical NPL rates for each unique segment computed."""

    # set npl frequency based on periodicity
    if periodicity == 'Annual'.casefold():
        frequency = 'A'
    elif periodicity == 'Quarterly'.casefold():
        frequency = 'Q'
    elif periodicity == 'Monthly'.casefold():
        frequency = 'M'

    # initialize empty lists to store NPLs
    all_npls = []
    # npls_spaced = pd.DataFrame()

    # for each segment compute the NPLs
    for segment in cleaned_pd_data['LOAN_SEGMENT'].unique():
        # initialize empty list to store npls computed for each 'frequency'
        results = []
        # subset data by unique segment
        df = cleaned_pd_data[cleaned_pd_data['LOAN_SEGMENT'] == segment]
        # group subset data by date at the required frequency level
        grouped_df = df.groupby(pd.Grouper(key = 'DATA_DATE', freq = frequency))
        # for each group (e.g., month, quarter, year) compute the NPL rate and store in result dictionary
        for group_key, group in grouped_df:
            stage1_stage3 = group[(group['STAGE_NOW'] == 'Stage 1') & (group['STAGE_NEXT'] == 'Stage 3')]
            stage1 = group[group['STAGE_NOW'] == 'Stage 1']
            stage1_exit = group[(group['STAGE_NOW'] == 'Stage 1') & (group['STAGE_NEXT'] == 'Exit')]

            x = stage1_stage3['OUT_BAL'].sum()
            y = stage1['OUT_BAL'].sum()
            z = stage1_exit['OUT_BAL'].sum()

            s1_npl_ratio = x/(y-z)

            stage2_stage3 = group[(group['STAGE_NOW'] == 'Stage 2') & (group['STAGE_NEXT'] == 'Stage 3')]
            stage2 = group[group['STAGE_NOW'] == 'Stage 2']
            stage2_exit = group[(group['STAGE_NOW'] == 'Stage 2') & (group['STAGE_NEXT'] == 'Exit')]

            a = stage2_stage3['OUT_BAL'].sum()
            b = stage2['OUT_BAL'].sum()
            c = stage2_exit['OUT_BAL'].sum()

            s2_npl_ratio = a/(b-c)           

            result = {
                'Segment': segment,
                f'{periodicity.capitalize()} Period': group_key,
                'Stage 1 NPL Ratio': s1_npl_ratio,
                'Stage 2 NPL Ratio': s2_npl_ratio,
            }
            # append result dictionary to results list to obtain list of all npls for the segment
            results.append(result)
        # convert the segment results list to a dataframe
        segment_npl = pd.DataFrame(results)
        # append the segment results list to the all_npls list
        all_npls.append(segment_npl)

    npls_spaced = []
    # convert the all_npls list to a dataframe and add a space between each item
    for df in all_npls:
        npls_spaced.append(df)
        blank_row = pd.DataFrame([[]])
        npls_spaced.append(blank_row)
    
    npls_spaced = pd.concat(npls_spaced, ignore_index= True).reset_index(drop= True)

    return npls_spaced

segment_npls = historical_segment_npl(recoveries_prep_output[0], periodicity)
segment_npls

In [None]:
# Define a function to compute the NPLs for each loan segment in the data
def historical_total_npl(cleaned_pd_data, periodicity):
    """This function takes in the historical PD data and periodicity and returns a DataFrame with the historical NPL rates for each unique segment computed."""

    # set npl frequency based on periodicity
    if periodicity == 'Annual'.casefold():
        frequency = 'A'
    elif periodicity == 'Quarterly'.casefold():
        frequency = 'Q'
    elif periodicity == 'Monthly'.casefold():
        frequency = 'M'

    # initialize empty lists to store NPLs
    all_npls = []

    # group cleaned PD Data by date based on frequency
    grouped_df = cleaned_pd_data.groupby(pd.Grouper(key = 'DATA_DATE', freq = frequency))
    # for each group (e.g., month, quarter, year) compute the NPL rate and store in result dictionary
    for group_key, group in grouped_df:
        stage1_stage3 = group[(group['STAGE_NOW'] == 'Stage 1') & (group['STAGE_NEXT'] == 'Stage 3')]
        stage1 = group[group['STAGE_NOW'] == 'Stage 1']
        stage1_exit = group[(group['STAGE_NOW'] == 'Stage 1') & (group['STAGE_NEXT'] == 'Exit')]

        x = stage1_stage3['OUT_BAL'].sum()
        y = stage1['OUT_BAL'].sum()
        z = stage1_exit['OUT_BAL'].sum()

        s1_npl_ratio = x/(y-z)

        stage2_stage3 = group[(group['STAGE_NOW'] == 'Stage 2') & (group['STAGE_NEXT'] == 'Stage 3')]
        stage2 = group[group['STAGE_NOW'] == 'Stage 2']
        stage2_exit = group[(group['STAGE_NOW'] == 'Stage 2') & (group['STAGE_NEXT'] == 'Exit')]

        a = stage2_stage3['OUT_BAL'].sum()
        b = stage2['OUT_BAL'].sum()
        c = stage2_exit['OUT_BAL'].sum()

        s2_npl_ratio = a/(b-c) 

        result = {
            'Segment': 'Overall npl ratio',
            f'{periodicity.capitalize()} Period': group_key,
            'Stage 1 NPL Ratio': s1_npl_ratio,
            'Stage 2 NPL Ratio': s2_npl_ratio,
        }
        # append result dictionary to results list to obtain list of all npls for the segment
        all_npls.append(result)

    # convert the results list to a dataframe
    all_npls = pd.DataFrame(all_npls)

    return all_npls

all_npls = historical_total_npl(recoveries_prep_output[0], periodicity)
all_npls

In [None]:
segment_npls.to_csv(path_write + 'example_segment_npls.csv')
all_npls.to_csv(path_write + 'example_all_npls.csv')

end_time = time.time()

In [None]:
duration = end_time-start_time
print(f"This took: {duration} seconds")