# Credit Scorecard Project – Data Cleaning (Test Set)

## 1. Setup & Imports
- Import libraries
- Set plotting styles

In [1]:
import json
import numpy as np
import math
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from IPython.core.interactiveshell import InteractiveShell
from dateutil.relativedelta import relativedelta

from sklearn.feature_selection import VarianceThreshold

In [2]:
# Display settings
InteractiveShell.ast_node_interactivity = "all"
pd.set_option('display.float_format', '{:.3f}'.format)
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

import warnings
warnings.filterwarnings("ignore")

## 2. Load Data

In [3]:
# Read in test data and feature meta file
df_test = pd.read_csv('../data/interim/test_raw.csv')
fmeta = pd.read_csv('../dictionaries/feature_meta.csv')

In [4]:
print(df_test.shape)

(205520, 134)


## 3. Clean Test Data - Apply Training Treatment

### 3.1 Apply Transformations (Recoded Variables)

In [5]:
# Variables to be transformed
fmeta[fmeta['Transform']==1][['Variable_Name', 'trans_vars']]

Unnamed: 0,Variable_Name,trans_vars
2,addr_state,addr_state_grouped
22,earliest_cr_line,mnths_since_earliest_cr_line
23,emp_length,emp_length_num
42,home_ownership,home_ownership_grouped


In [6]:
# emp_length
# Convert to numeric scale

with open("../artifacts/addr_state_risk_map.json", "r") as f:
    addr_state_risk_map = json.load(f)

def map_state_risk_band(df, mapping):
    df['addr_state_grouped'] = df['addr_state'].map(mapping)
    df.drop(columns='addr_state', inplace=True)
    return df

df_test = map_state_risk_band(df_test, addr_state_risk_map)

In [7]:
# earliest_cr_line 
# Convert to mnths_since_earliest_cr_line

def compute_months_since_earliest_cr_line(df):
    # Ensure datetime conversion
    df['earliest_cr_line'] = pd.to_datetime(df['earliest_cr_line'], errors='coerce')
    df['issue_d'] = pd.to_datetime(df['issue_d'], errors='coerce')  # in case not yet converted

    # Calculate months difference
    def month_diff(row):
        if pd.isnull(row['earliest_cr_line']) or pd.isnull(row['issue_d']):
            return np.nan
        rd = relativedelta(row['issue_d'], row['earliest_cr_line'])
        return rd.years * 12 + rd.months

    df['mnths_since_earliest_cr_line'] = df.apply(month_diff, axis=1)
    df.drop(columns='earliest_cr_line', inplace=True)
    return df

df_test = compute_months_since_earliest_cr_line(df_test)

In [8]:
# emp_length
# Convert to numeric scale

with open("../artifacts/emp_length_map.json", "r") as f:
    emp_length_map = json.load(f)

def map_emp_length(df, mapping):
    df['emp_length_num'] = df['emp_length'].map(mapping)
    df.drop(columns='emp_length', inplace=True)
    return df

df_test = map_emp_length(df_test, emp_length_map)

In [9]:
# home_ownership
# Group ANY, OTHER, NONE into one group

def group_home_ownership(df, col='home_ownership', new_col='home_ownership_grouped'):
    def other_group(x):
        return 'Other' if x in ['ANY', 'NONE', 'OTHER'] else x
    
    df[new_col] = df[col].apply(other_group)
    df.drop(columns=col, inplace=True)
    return df

df_test = group_home_ownership(df_test)

### 3.2 Create Flags

In [10]:
# Variables to create a falg for
fmeta[fmeta['Flag']==1][['Variable_Name', 'flag_vars']]

Unnamed: 0,Variable_Name,flag_vars
0,acc_now_delinq,f_acc_now_delinq_gt0
10,chargeoff_within_12_mths,f_chargeoff_within_12_mths_gt0
12,collections_12_mths_ex_med,f_collections_12_mths_ex_med_gt0
17,delinq_amnt,f_delinq_amnt_gt0
85,num_tl_120dpd_2m,f_num_tl_120dpd_2m_gt0
86,num_tl_30dpd,f_num_tl_30dpd_gt0
130,term,f_term_60
132,tot_coll_amt,f_tot_coll_amt_gt0
150,zip_code,f_hi_risk_zip


In [11]:
# term 
def create_term_flag(df):
    df = df.copy()
    df['f_term_60'] = (df['term'] == ' 60 months').astype(int)
    df.drop(columns='term', inplace=True)
    return df 

df_test = create_term_flag(df_test)
df_test['f_term_60'].value_counts(normalize=True)

f_term_60
0   0.766
1   0.234
Name: proportion, dtype: float64

In [12]:
# High-risk zip code flags

# Read in high-risk zip code list
with open("../artifacts/hi_risk_zip.json", "r") as f:
    hi_risk_zip_list = json.load(f)

# map zip-code
def create_hi_risk_zip_flag(df, hi_risk_zip_list):
    df = df.copy()
    df['f_hi_risk_zip'] = df["zip_code"].isin(hi_risk_zip_list).astype(int)
    df.drop(columns="zip_code", inplace=True)
    return df

df_test = create_hi_risk_zip_flag(df_test, hi_risk_zip_list)
df_test['f_hi_risk_zip'].value_counts(normalize=True)

f_hi_risk_zip
0   0.950
1   0.050
Name: proportion, dtype: float64

In [13]:
# batch create gt0 flags (greater than 1)
gt0_flag_vars = list(fmeta[fmeta['flag_vars'].str.contains('gt0', na=False)]['Variable_Name'])

def create_gt0_flag(df, var_list):
    for var in var_list:
        df[f'f_{var}_gt0'] = (df[var]>0).astype(int)
        print(df[f'f_{var}_gt0'].value_counts(normalize=True),"\n")
    return df

df_test = create_gt0_flag(df_test, gt0_flag_vars)

# Drop variables after flag creation
gt0_flag_vars.remove("tot_coll_amt") # not drop tot_coll_amt
df_test.drop(columns=gt0_flag_vars, inplace=True)

f_acc_now_delinq_gt0
0   0.997
1   0.003
Name: proportion, dtype: float64 

f_chargeoff_within_12_mths_gt0
0   0.991
1   0.009
Name: proportion, dtype: float64 

f_collections_12_mths_ex_med_gt0
0   0.980
1   0.020
Name: proportion, dtype: float64 

f_delinq_amnt_gt0
0   0.997
1   0.003
Name: proportion, dtype: float64 

f_num_tl_120dpd_2m_gt0
0   0.999
1   0.001
Name: proportion, dtype: float64 

f_num_tl_30dpd_gt0
0   0.998
1   0.002
Name: proportion, dtype: float64 

f_tot_coll_amt_gt0
0   0.837
1   0.163
Name: proportion, dtype: float64 



### 3.3 Apply Capping

In [14]:
for idx, row in fmeta.query("Cap == 1").iterrows():
    var = row['Variable_Name']
    cap_up = row['cap_val_upper']
    cap_low = row.get('cap_val_lower', None)

    # Create flag if exists
    flag_name = row.get('cap_flags')
    if pd.notna(flag_name):
        df_test[flag_name] = (df_test[var] > cap_up).astype(int) if not pd.isna(cap_up) else 0
    
    if not pd.isna(cap_low):
        df_test[var] = df_test[var].clip(lower=cap_low)
    if not pd.isna(cap_up):
        df_test[var] = df_test[var].clip(upper=cap_up)

In [15]:
# Check capped pctg
cap_flags = list(set(fmeta[fmeta['Cap']==1]['cap_flags']))

for flag in cap_flags:
    print(df_test[flag].value_counts(normalize=True),"\n")

F_CAPPED_AMOUNT
0   0.998
1   0.002
Name: proportion, dtype: float64 

F_CAPPED_MNTHS
0   1.000
1   0.000
Name: proportion, dtype: float64 

F_OVERLIMIT
0   0.997
1   0.003
Name: proportion, dtype: float64 

F_CAPPED_DELQ
0   0.999
1   0.001
Name: proportion, dtype: float64 

F_CAPPED_NUMACCTS
0   0.998
1   0.002
Name: proportion, dtype: float64 

F_CAPPED_INC
0   0.992
1   0.008
Name: proportion, dtype: float64 



### 3.4 Impute Missing Values

In [16]:
# Batch imputation based on the missing treatment documentation is the feature meta file
for idx, row in fmeta[fmeta['miss_impute_val'].notna()].iterrows():
    var = row['Variable_Name']
    impute_val = row['miss_impute_val']
    flag_name = row.get('miss_flags')

    if pd.notna(flag_name):
        df_test[flag_name] = df_test[var].isna().astype(int)
    
    df_test[var] = df_test[var].fillna(impute_val)

In [17]:
# Check missing counts after imputation
df_test.isna().sum()

id                                                 0
member_id                                     205520
loan_amnt                                          0
funded_amnt                                        0
funded_amnt_inv                                    0
int_rate                                           0
installment                                        0
grade                                              0
sub_grade                                          0
emp_title                                      14453
annual_inc                                         0
verification_status                                0
issue_d                                            0
pymnt_plan                                         0
url                                                0
desc                                          205520
purpose                                            0
title                                              0
dti                                           

`emp_length_num` wasn't treated as it's transformed from the original variables `emp_length` and wasn't in the variable list of the feature meta file.

Missing values in `emp_length_num` were 
* imputed with -1, and 
* a missing flag `f_emp_length_missing` created 

in the training set and the same will be applied to the test set. 

In [18]:
# Create a flag for missing employment length
df_test['f_emp_length_missing'] = df_test['emp_length_num'].isna().astype(int)

# impute with -1
df_test['emp_length_num'] = df_test['emp_length_num'].fillna(-1)

# Check missing counts after imputation
print("Missing counts after imputation:\n", df_test[['emp_length_num']].isna().sum(),"\n")
print(df_test["f_emp_length_missing"].value_counts(normalize=True))

Missing counts after imputation:
 emp_length_num    0
dtype: int64 

f_emp_length_missing
0   0.932
1   0.068
Name: proportion, dtype: float64


### 3.5 Drop Redundant Features

Drop variables that have been discarded in the training set treatment due to leakage and other reasons.

In [19]:
# Get the dropped variable list
drop_all = fmeta[fmeta['Drop']==1]['Variable_Name'].to_list()

# Some variables are already dropped before train-test split; find the remaining ones
test_vars = df_test.columns.to_list()
drop_test = list(set(drop_all) & set(test_vars))

# Drop variables
df_test.drop(columns=drop_test, inplace=True)
print(df_test.shape)


(205520, 104)


## 4. Consistency Checking Against the Training Set

In [20]:
# Check if both datasets have the same set of variables
df_train = pd.read_csv('../data/interim/train_base_cleaned.csv')
set(df_test.columns) == set(df_train.columns)

True

In [21]:
df_test.to_csv('../data/interim/test_base_cleaned.csv', index=False)