In [1]:
import numpy as np
import pandas as pd 
from CreditScoringToolkit import frequency_table
from CreditScoringToolkit import DiscreteNormalizer
from CreditScoringToolkit import WoeEncoder
from CreditScoringToolkit import WoeContinuousFeatureSelector
from CreditScoringToolkit import WoeDiscreteFeatureSelector
from CreditScoringToolkit import CreditScoring
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split

from scipy.stats.mstats import winsorize
from varclushi import VarClusHi
import matplotlib.pyplot as plt
import seaborn as sns

import logging
from typing import List, Tuple,Optional,Dict
import warnings
warnings.filterwarnings("ignore", category=UserWarning, module="sklearn.preprocessing._discretization")

In [2]:
train = pd.read_csv('example_data/train.csv')
valid = pd.read_csv('example_data/valid.csv')   
data = pd.concat([train,valid],ignore_index=True).sample(frac=1)
varc = [v for v in data.columns if v[:2]=='C_']
vard = [v for v in data.columns if v[:2]=='D_']
target = 'TARGET'
for v in varc:
    data[v] = pd.to_numeric(data[v],errors='coerce')
for v in vard:
    data[v] = data[v].fillna('MISSING').astype(str)
data.shape

(2000, 23)

In [3]:

logger = logging.getLogger("CreditScoringToolkit")
if logger.hasHandlers():
    logger.handlers.clear()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

In [4]:
class AutoCreditScoring:
    continuous_features: List[str]
    discrete_features: List[str]
    target: str
    data: pd.DataFrame
    train: pd.DataFrame
    valid: pd.DataFrame
    apply_multicolinearity: bool = False 
    iv_feature_threshold: float = 0.05
    treat_outliers: bool = False
    outlier_threshold: float = 0.01
    min_score = 400
    max_score = 900
    max_discretization_bins = 5
    discrete_normalization_threshold = 0.05
    discrete_normalization_default_category = 'OTHER'
    transformation: Optional[str] = None
    model: Optional[LogisticRegression] = None
    max_iter: int = 5 
    train_size: float = 0.7
    target_proportion_tolerance: float = 0.01
    max_discretization_bins:int=6
    strictly_monotonic:bool=True
    discretization_method:str = 'quantile'
    n_threads:int = 1 
    overfitting_tolerance:float = 0.01

    def __init__(self, data: pd.DataFrame, target: str, continuous_features: List[str]=None, discrete_features: List[str]=None):
        self.data = data
        self.continuous_features = continuous_features
        self.discrete_features = discrete_features
        self.target = target

    def fit(self,
            target_proportion_tolerance:float = None, 
            treat_outliers:bool = None, 
            discrete_normalization_threshold:float = None,
            discrete_normalization_default_category:str = None,
            max_discretization_bins:int = None,
            strictly_monotonic:bool = None,
            iv_feature_threshold:float = None,
            discretization_method:str = None,
            n_threads:int = None,
            overfitting_tolerance:float = None,
            verbose:bool=False):
        # Verbosity control
        if verbose:
            logger.setLevel(logging.INFO)
        else:
            logger.setLevel(logging.WARNING)
        # Check if continuous_features is provided
        if self.continuous_features is None:
            self.continuous_features = []
            logger.warning("No continuous features provided")
        # Check if discrete_features is provided
        if self.discrete_features is None:
            self.discrete_features = []
            logger.warning("No discrete features provided")
        if len(self.continuous_features)==0 and len(self.discrete_features)==0:
            logger.error("No features provided")
            raise RuntimeError("No features provided")
        
        # Check if target_proportion_tolerance is provided
        if target_proportion_tolerance:
            self.target_proportion_tolerance = target_proportion_tolerance
        # Partition data
        self.__partition_data()
        
        #Check if treat_outliers is provided
        if len(self.continuous_features)>0 and treat_outliers:
            self.treat_outliers = treat_outliers
            self.__outlier_treatment()
            
        # Check if discrete_normalization_threshold is provided
        if discrete_normalization_threshold:
            self.discrete_normalization_threshold = discrete_normalization_threshold
        # Check if discrete_normalization_default_category is provided
        if discrete_normalization_default_category:
            self.discrete_normalization_default_category = discrete_normalization_default_category
        if len(self.discrete_features)==0:
            logger.warning("No discrete features provided")
        else:
            if len(self.discrete_features)>0:
                # Normalize discrete features
                self.__normalize_discrete()
            
        #Check feature selection parameters
        if max_discretization_bins:
            self.max_discretization_bins = max_discretization_bins
        if strictly_monotonic:
            self.strictly_monotonic = strictly_monotonic
        if iv_feature_threshold:
            self.iv_feature_threshold = iv_feature_threshold
        if discretization_method:
            self.discretization_method = discretization_method
        if n_threads:
            self.n_threads = n_threads

        # Feature selection
        self.__feature_selection()

        # Woe transformation
        self.__woe_transformation()

        # Check if overfitting_tolerance is provided
        if overfitting_tolerance:
            self.overfitting_tolerance = overfitting_tolerance
        # Train model
        self.__train_model()
        
    def __partition_data(self):
        logger.info("Partitioning data...")
        self.train, self.valid = train_test_split(self.data, train_size=self.train_size)
        self.train.reset_index(drop=True, inplace=True)
        self.valid.reset_index(drop=True, inplace=True)
        # Check if target proportions are compatible between train and valid
        logger.info("Checking partition proportions...")
        iter = 1
        while(np.abs(self.train[self.target].mean()-self.valid[self.target].mean())>self.target_proportion_tolerance):
            logger.info(f"Partitioning data...Iteration {iter}")
            logger.info(f"Train target proportion: {self.train[target].mean()}")
            logger.info(f"Valid target proportion: {self.valid[target].mean()}")
            self.train, self.valid = train_test_split(self.data, train_size=self.train_size)
            self.train.reset_index(drop=True, inplace=True)
            self.valid.reset_index(drop=True, inplace=True)
            iter+=1
            if iter>self.max_iter:
                logger.error("Could not find a compatible partition")
                raise RuntimeError("Could not find a compatible partition")
            
        if iter>1:
            logger.info(f"Partitioning data...Done after {iter} iterations")
        logger.info(f"Train shape: {self.train.shape}", )
        logger.info(f"Test shape: {self.valid.shape}")
        logger.info(f"Train target proportion: {self.train[target].mean()}")
        logger.info(f"Valid target proportion: {self.valid[target].mean()}")

    def __outlier_treatment(self):
        logger.info("Outlier treatment...")
        before = self.train[self.continuous_features].mean()
        for f in self.continuous_features:
            self.train[f] = winsorize(self.train[f], limits=[self.outlier_threshold, self.outlier_threshold])
        after = self.train[self.continuous_features].mean()
        report = pd.DataFrame({'Before':before,'After':after})
        logger.info("Mean statistics before and after outlier treatment")
        logger.info(f'\n\n{report}\n')
        logger.info("Outlier treatment...Done")        

    def __normalize_discrete(self):
        logger.info("Discrete normalization...")
        logger.info(f"Discrete features: {self.discrete_features}")
        dn = DiscreteNormalizer(normalization_threshold=self.discrete_normalization_threshold, 
                                default_category=self.discrete_normalization_default_category)
        dn.fit(self.train[self.discrete_features])
        self.train_discrete_normalized = dn.transform(self.train[self.discrete_features])
        logger.info("Checking if normalization produced unary columns")
        self.unary_columns = [c for c in self.train_discrete_normalized.columns if self.train_discrete_normalized[c].nunique()==1]
        if len(self.unary_columns)>0:
            logger.warning(f"Normalization produced unary columns: {self.unary_columns}")
            logger.warning(f"Removing unary columns from discrete features")
            self.discrete_features = [f for f in self.discrete_features if f not in self.unary_columns]
            logger.warning(f"Discrete features after unary columns removal: {self.discrete_features}")
        else:
            logger.info("No unary columns produced by normalization")
        if len(self.discrete_features)==0:
            logger.warning("No discrete features left after normalization")
        else:
            dn.fit(self.train[self.discrete_features])
            self.train_discrete_normalized = dn.transform(self.train[self.discrete_features])
        self.discrete_normalizer = dn 
        logger.info("Discrete normalization...Done")

    def __feature_selection(self):
        try:
            logger.info("Feature selection...")
            if len(self.continuous_features)>0:
                logger.info("Continuous features selection...")
                woe_continuous_selector = WoeContinuousFeatureSelector()
                woe_continuous_selector.fit(self.train[self.continuous_features], self.train[self.target],
                    max_bins=self.max_discretization_bins,
                    strictly_monotonic=self.strictly_monotonic,
                    iv_threshold=self.iv_feature_threshold,
                    method=self.discretization_method,
                    n_threads=self.n_threads)
                self.iv_report_continuous = pd.DataFrame(woe_continuous_selector.selected_features)
                self.full_iv_report_continuous = woe_continuous_selector.iv_report.copy()
                self.continuous_candidate = woe_continuous_selector.transform(self.train[self.continuous_features])
                logger.info(f'\n\n{self.iv_report_continuous}\n\n')
                self.woe_continuous_selector = woe_continuous_selector
                logger.info(f"Continuous features selection...Done")
            if len(self.discrete_features)>0:
                logger.info("Discrete features selection...")
                woe_discrete_selector = WoeDiscreteFeatureSelector()
                woe_discrete_selector.fit(self.train_discrete_normalized, self.train[self.target],self.iv_feature_threshold)
                self.iv_report_discrete = pd.Series(woe_discrete_selector.selected_features).to_frame('iv').reset_index().rename(columns={'index':'feature'}).sort_values('iv',ascending=False)
                self.full_iv_report_discrete = woe_discrete_selector.iv_report.copy()
                self.discrete_candidate = woe_discrete_selector.transform(self.train_discrete_normalized)
                logger.info(f'\n\n{self.iv_report_discrete}\n\n')
                self.woe_discrete_selector = woe_discrete_selector
                logger.info("Discrete features selection...Done")    
            
            if len(self.continuous_features)>0 and len(self.discrete_features)>0:
                logger.info("Merging continuous and discrete features...")
                self.train_candidate = pd.concat([self.continuous_candidate, self.discrete_candidate], axis=1)
                logger.info("Merging continuous and discrete features...Done")
            elif len(self.continuous_features)>0:
                self.train_candidate = self.continuous_candidate
            elif len(self.discrete_features)>0:
                self.train_candidate = self.discrete_candidate
            self.candidate_features = list(self.train_candidate.columns)
            if len(self.candidate_features)==0:
                logger.error("No features selected")
                raise RuntimeError("No features selected")
            logger.info(f"Selected features ({len(self.candidate_features)}): {self.candidate_features}")
            logger.info("Feature selection...Done")
        except Exception as err:
            logger.error(f"Error in feature selection: {err}")
            raise err

    def __woe_transformation(self):
        self.woe_encoder = WoeEncoder()
        self.woe_encoder.fit(self.train_candidate, self.train[self.target])
        self.train_woe = self.woe_encoder.transform(self.train_candidate)
        if self.train_woe.isna().max().max():
            logger.error("NAs found in transformed data")
            raise RuntimeError("NAs found in transformed data, Maybe tiny missing in continuous?")
    
    def __apply_pipeline(self,data:pd.DataFrame)->pd.DataFrame:
        try:
            if len(self.continuous_features)>0:
                if self.treat_outliers:
                    for f in self.continuous_features:
                        data[f] = winsorize(data[f], limits=[self.outlier_threshold, self.outlier_threshold])
                data_continuous_candidate = self.woe_continuous_selector.transform(data[self.continuous_features])
            if len(self.discrete_features)>0:
                data_discrete_normalized = self.discrete_normalizer.transform(data[self.discrete_features])
                data_discrete_candidate = self.woe_discrete_selector.transform(data_discrete_normalized)
            if len(self.continuous_features)>0 and len(self.discrete_features)==0:
                data_candidate = data_continuous_candidate.copy()
            if len(self.continuous_features)==0 and len(self.discrete_features)>0:
                data_candidate = data_discrete_candidate.copy()
            if len(self.continuous_features)>0 and len(self.discrete_features)>0:
                data_candidate = pd.concat([data_continuous_candidate, data_discrete_candidate], axis=1)
            data_woe = self.woe_encoder.transform(data_candidate)
            if data_woe.isna().max().max():
                logger.error("NAs found in transformed data")
                raise RuntimeError("NAs found in transformed data, Maybe tiny missing in continuous?")
            return data_woe
        except Exception as err:
            logger.error(f"Error applying pipeline: {err}")
            raise err
    
    def __train_model(self):
        logger.info("Training model...")
        lr = LogisticRegression()
        lr.fit(self.train_woe,self.train[self.target])
        self.model = lr
        self.valid_woe = self.__apply_pipeline(self.valid)
        self.auc_train = roc_auc_score(y_score=lr.predict_proba(self.train_woe)[:,1],y_true=self.train[self.target])
        self.auc_valid = roc_auc_score(y_score=lr.predict_proba(self.valid_woe)[:,1],y_true=self.valid[self.target])
        logger.info(f"AUC for training: {self.auc_train}")
        logger.info(f"AUC for validation:{self.auc_valid}")
        self.betas = lr.coef_[0]
        self.alpha = lr.intercept_[0]
        if any([np.abs(b)<0.0001 for b in self.betas]):
            logger.warning("Some betas are close to zero, consider removing features")
            logger.warning(f"Betas: {dict(zip(self.candidate_features,self.betas))}")
            logger.warning(f"Suspicious features: {[f for f,b in zip(self.candidate_features,self.betas) if np.abs(b)<0.0001]}")
        if abs(self.auc_train-self.auc_valid)>self.overfitting_tolerance:
            logger.warning(f"Overfitting detected, review your hyperparameters. train_auc{self.auc_train}, valid_auc{self.auc_valid}")
        self.logistic_model = lr
        logger.info("Training model...Done")


In [5]:
acs = AutoCreditScoring(data, target, continuous_features=varc, discrete_features=vard)
acs.fit(verbose=False, 
        target_proportion_tolerance=0.01,
        treat_outliers=True,
        discrete_normalization_threshold=0.1,
        discrete_normalization_default_category='XYZW',
        max_discretization_bins=5,
        strictly_monotonic=True,
        iv_feature_threshold=0.05,
        discretization_method='quantile',
        n_threads=4)

  aux[feature] = aux[feature].replace(woe_map)
  aux[feature] = aux[feature].replace(woe_map)
