In [None]:
class PredictionMetrics:
    """ содержит y_true, y_pred. Служит для расчета метрик качества, 
    и вывода изображений: confusion_matrix, roc_auc, precision_recall
    """
    def __init__(self, y_true, y_pred, y_proba = None, **kwargs):

        self.y_true = y_true
        self.y_pred = y_pred
        self.y_proba = y_proba

    def metrics(self):
        dMetrics = dict()
        dMetrics['rocauc'] = round(roc_auc_score(self.y_true, self.y_pred), 2)
        dMetrics['accuracy'] = round(accuracy_score(self.y_true, self.y_pred), 2)
        dMetrics['recall'] = round(recall_score(self.y_true, self.y_pred), 2)
   
        return dMetrics

    def print_metrics(self):
        print(self.metrics())
    
    def confusion_matrix(self, normalize=None):
        return confusion_matrix(self.y_true, self.y_pred, normalize=normalize)
  
    def plot_confusion_matrix(self, normalize=None, **kwargs):
        
        width = kwargs.get('width', 600)
        height = kwargs.get('height', 400)
        classes = kwargs.get('classes', ['дефолт=0', 'дефолт=1'])
        show = kwargs.get('show', True)

        title = kwargs.get('title', 'Confusion matrix')
        colorscale = kwargs.get('colorscale', 'Viridis')
        

        cm  = self.confusion_matrix(normalize=normalize)

        fmt = '.2f' if normalize!=None else 'd'
        z_value = 'Доля' if normalize!=None else 'Count'
        # change each element of z to type string for annotations
        cm_text = [[format(y, fmt)  for y in x] for x in cm]

        # set up figure 
        fig = ff.create_annotated_heatmap(cm, x=classes, y=classes, annotation_text=cm_text, 
                                colorscale=colorscale, hovertemplate=
                                            "Real value: %{y}<br>" +
                                            "Predicted value: %{x}<br>" +
                                            z_value + ": %{z:.2f}<br>" +
                                            "<extra></extra>",)

        # add custom xaxis title
        fig.add_annotation(dict(font=dict(color="black",size=14),
                                x=0.5,
                                y=-0.15,
                                showarrow=False,
                                text="Predicted value",
                                xref="paper",
                                yref="paper"))

        # add custom yaxis title
        fig.add_annotation(dict(font=dict(color="black",size=14),
                                x=-0.35,
                                y=0.5,
                                showarrow=False,
                                text="Real value",
                                textangle=-90,
                                xref="paper",
                                yref="paper"))

        # adjust margins to make room for yaxis title
        fig.update_layout(margin=dict(t=50, l=150))
        fig.update_xaxes(side="bottom")
        fig.update_layout(title_text='<i><b>'+title+'</b></i>')
        # add colorbar
        fig['data'][0]['showscale'] = True

        fig.update_layout(autosize=False, width=width, height=height,)
        if show:
            fig.show('png' if need_svg else '')
        return fig
    
    
    def plot_roc_curve(self, **kwargs):

        assert self.y_proba is not None, 'Не задан y_proba у PredictionMetrics'
        
        title = kwargs.get('title', "")
        show = kwargs.get('show', True)
        
        fpr, tpr, thresholds = roc_curve(self.y_true, self.y_proba)
        roc_auc = auc(fpr, tpr)
        GINI = (2 * roc_auc) - 1

        if title:
            title = title + '<br>'
            
        fig = px.area(
            x=fpr, y=tpr,
            title=title+f'ROC Curve (AUC={roc_auc:.4f}, GINI={GINI:.4f})',
            labels=dict(x='False Positive Rate', y='True Positive Rate'),
            width=700, height=500
        )
        fig.add_shape(
            type='line', line=dict(dash='dash'),
            x0=0, x1=1, y0=0, y1=1
        )

        fig.update_yaxes(scaleanchor="x", scaleratio=1)
        fig.update_xaxes(constrain='domain')
        fig.update_layout(autosize=False, width=450, height=450,)
        
        if show:
            fig.show('png' if need_svg else '')
            
        return fig

    def plot_precision_recall(self, **kwargs):

        assert self.y_proba is not None, 'Не задан y_proba у PredictionMetrics'
        
        title = kwargs.get('title', "")
        show = kwargs.get('show', True)
        
        fpr, tpr, thresholds = roc_curve(self.y_true, self.y_proba)

        precision, recall, thresholds = precision_recall_curve(self.y_true, self.y_proba)

        if title:
            title = title + '<br>'

        fig = px.area(
                x=recall, y=precision,
                title=title+f'Precision-Recall Curve (AUC={auc(fpr, tpr):.4f})',
                labels=dict(x='Recall', y='Precision'),
                width=700, height=500
        )

        fig.add_shape(
            type='line', line=dict(dash='dash'),
            x0=0, x1=1, y0=1, y1=0
        )

        fig.update_yaxes(scaleanchor="x", scaleratio=1)
        fig.update_xaxes(constrain='domain')
        fig.update_layout(autosize=False, width=450, height=450,)
        if show:
            fig.show('png' if need_svg else '')
        return fig


In [None]:
class Preprocessing():
    """ Инициализация входных данных. X_train, y_train
        разбиение выборки на test и train, если needToSplit
    """
    def __init__(self, **kwargs):
        pass 
    
    def split_data(self, X_raw, y_raw, **kwargs):
        test_size = kwargs.get('test_size', 0.2)
        X_train, X_test, y_train, y_test = train_test_split(X_raw, y_raw, test_size=test_size, random_state=4)
        
        return X_train, X_test, y_train, y_test
    
    def set_scaler(self, X):
        Scaler = StandardScaler()
        Scaler.fit(X)
        
        return Scaler
    
    def scaling(self, X):
        Xscaled = X.copy()
        
        if Xscaled.ndim == 1:
            Xscaled = Xscaled.reshape((-1, 1))
            
        if self.Scaler:
            if isinstance(Xscaled, pd.DataFrame):
                Xscaled[Xscaled.columns] = self.Scaler.transform(Xscaled[Xscaled.columns])
            else:
                Xscaled = self.Scaler.transform(Xscaled)

        return Xscaled

In [None]:
# Проверка, что либо X_test, y_test одновременно заданы либо одновременно не заданы
def validate(func):
    
    @wraps(func)
    def wrapper(*args, **kwargs):
        X_test = kwargs.get('X_test', None)
        y_test = kwargs.get('y_test', None)

        if (X_test is None) ^ (y_test is None):
            assert False, """варианты входных данных: 
                        1. X_raw, y_raw, X_test, y_test - заданы
                        2. X_raw, y_raw заданы, X_test, y_test - не заданы"""
                        
        return func(*args, **kwargs)
    return wrapper

class RegressionBase(ABC, Preprocessing, PredictionMetrics):
    
    @validate
    def __init__(self, X_raw, y_raw, X_test=None, y_test=None, **kwargs):
        
        needToSplit = kwargs.get('needToSplit', True)
        # если задан X_test - уже split не делаем
        needToSplit = X_test is None and needToSplit
        
        norm = kwargs.get('norm', True)
        self.class_weight = kwargs.get('class_weight', 'balanced')
        
        # это сохраним для отладки
        self.param_add = {'needToSplit':needToSplit, 'norm':norm, 'class_weight':self.class_weight}
        
        if needToSplit:
            X_train, X_test, y_train, y_test = self.split_data(X_raw, y_raw)
        else:
            # уже разбит на train и test. ничего не делаем
            X_train = X_raw
            y_train = y_raw

        self.X_train = X_train
        self.X_test = X_test
        self.y_train = y_train
        self.y_test = y_test   
        

        self.Scaler = self.set_scaler(self.X_train) if norm == True else None
        
        self.model, self.model_not_balanced = self.set_model(**kwargs)

        if X_test is not None and y_test  is not None:
            y_pred, y_proba = self.predict(X_test)

            """  Инициализируем для PredictionMetrics """
            self.y_true = y_test
            self.y_pred = y_pred
            self.y_proba = y_proba

    @abstractmethod
    def set_model(self,  **kwargs):
        model = None
        model_not_balanced = None
        return model, model_not_balanced
    
    def predict(self, X_test):
        
        X_standard = self.scaling(X_test)
           
        y_prob = self.model_not_balanced.predict_proba(X_standard)
        y_pred = self.model.predict(X_standard)
       
        return y_pred, y_prob[:, 1]
    
    def get_coef(self):
        coef = dict()
        coef['intercept'] = round(self.model.intercept_[0], 4)
        coef['coef'] = self.model.coef_[0].round(4)
        return coef

    def print_coef(self):
        print(self.get_coef())
        
    def print_param(self):
        print("len X_train {},  len y_train {}".format(len(self.X_train),  len(self.y_train)))
        if self.X_test is not None and self.y_test is not None:
            print("len X_test {}, len y_test {}".format(len(self.X_test), len(self.y_test)))
        else:
            print('no test data')
            
        print(self.param_add)



In [None]:
class cDecisionTree(RegressionBase):
    
    def __init__(self, X_raw, y_raw, X_test=None, y_test=None, **kwargs):
        self.max_depth = kwargs.get('max_depth', None)
        self.max_leaf_nodes = kwargs.get('max_leaf_nodes', None)
        self.criterion = kwargs.get('criterion', 'gini')
        
        norm = kwargs.get('norm', False)
        kwargs['norm'] = norm
        
        super().__init__(X_raw=X_raw, y_raw=y_raw, X_test=X_test, y_test=y_test, **kwargs)
    
    def set_model(self,  **kwargs):
        X_train = self.scaling(self.X_train)

        model = DecisionTreeClassifier(random_state=0, max_depth=self.max_depth, max_leaf_nodes=self.max_leaf_nodes, 
                                       class_weight=self.class_weight)\
                            .fit(X_train, self.y_train)
        model_not_balanced = DecisionTreeClassifier(random_state=0, max_depth=self.max_depth, max_leaf_nodes=self.max_leaf_nodes)\
                            .fit(X_train, self.y_train)

        return model, model_not_balanced
    
    def get_coef(self):
        coef = dict()
        coef['depth'] = self.model.get_depth()
        coef['n_leaves'] = self.model.get_n_leaves()
        return coef

    def plot_tree(self):
        plt.figure(figsize=(15,15))
        _ = plot_tree(self.model, filled=True, impurity=True
                       , feature_names = self.X_train.columns
                    ) 
        
        return plt

In [None]:
class LogReg(RegressionBase):
 
    def __init__(self, X_raw, y_raw, X_test=None, y_test=None, **kwargs):
        super().__init__(X_raw=X_raw, y_raw=y_raw, X_test=X_test, y_test=y_test, **kwargs)
    
    def set_model(self,  **kwargs):
        X_train = self.scaling(self.X_train)

        model = LogisticRegression(solver='liblinear', class_weight=self.class_weight).fit(X_train, self.y_train)
        model_not_balanced = LogisticRegression(solver='liblinear').fit(X_train, self.y_train)

        return model, model_not_balanced

In [None]:
class LogRegCV(RegressionBase):

    def __init__(self, X_raw, y_raw, X_test=None, y_test=None, **kwargs):
        super().__init__(X_raw=X_raw, y_raw=y_raw, X_test=X_test, y_test=y_test, **kwargs)

    def set_model(self,  **kwargs):
        scoring = kwargs.get('scoring', 'roc_auc')
        X_train = self.scaling(self.X_train)

        model = LogisticRegressionCV(solver='liblinear', class_weight=self.class_weight, cv=5, scoring=scoring).fit(X_train, self.y_train)
        model_not_balanced = LogisticRegressionCV(solver='liblinear', cv=5, scoring=scoring).fit(X_train, self.y_train)

        return model, model_not_balanced


In [None]:
class Branch(PredictionMetrics):
 
    def __init__(self, data_train, data_test, **kwargs):

        X_test = data_test[['no_contracts_total', 'bad_reasons_closed']]
        y_test = data_test['Метка дефолта']

        mask = (data_train['bad_reasons_closed']>0)
        data_branch_1 = data_train[mask]
        
        # data_not_bad - data_branch_2
        data_branch_2 = data_train[~mask]
        
        X_raw = data_branch_2[['no_contracts_total']]
        y_raw = data_branch_2['Метка дефолта']
        
        kwargs['needToSplit'] = False
        self.log = LogRegCV(X_raw, y_raw, **kwargs)
         
        y_pred, y_proba = self.predict(X_test)
        
        self.y_true = y_test
        self.y_pred = y_pred
        self.y_proba = y_proba
            
    def predict(self, df):
        
        mask = (df['bad_reasons_closed']>0)
        data_branch_1 = df[mask].copy()

        # data_not_bad - data_branch_2
        data_branch_2 = df[~mask].copy()

        data_branch_1['y_prob'] = 1
        data_branch_1['y_pred'] = 1

        X_branch_2 = data_branch_2[['no_contracts_total']].copy()
        
        y_pred, y_prob = self.log.predict(X_branch_2)
     
        data_branch_2['y_prob'] = y_prob
        data_branch_2['y_pred'] = y_pred

        # соединяем результат
        tmp = pd.merge(df, data_branch_1[['y_pred',	'y_prob']], how='left', left_index=True, right_index=True, suffixes=['', '_br1'])
        df_res = pd.merge(tmp, data_branch_2[['y_pred',	'y_prob']], how='left', left_index=True, right_index=True, suffixes=['', '_br2'])

        df_res['y_pred'] = df_res['y_pred'].combine_first(df_res['y_pred_br2'])
        df_res['y_prob'] = df_res['y_prob'].combine_first(df_res['y_prob_br2'])
        
        return df_res['y_pred'], df_res['y_prob']

In [None]:
# Function to detect and remove outliers using IQR method
def remove_outliers_iqr(data, threshold=1.5):
    Q1 = data.quantile(0.25)
    Q3 = data.quantile(0.75)
    IQR = Q3 - Q1
    filtered_data = data[~((data < (Q1 - threshold * IQR)) | (data > (Q3 + threshold * IQR))).any(axis=1)]
    return filtered_data

# Function to detect and remove outliers using Z-score method
def remove_outliers_zscore(data, threshold=3):
    z_scores = np.abs(stats.zscore(data))
    filtered_data = data[(z_scores < threshold).all(axis=1)]
    return filtered_data

def replace_outliers_with_median(data, threshold=1.5):
    Q1 = data.quantile(0.25)
    Q3 = data.quantile(0.75)
    IQR = Q3 - Q1

    # Calculate lower and upper bounds for outliers
    lower_bound = Q1 - threshold * IQR
    upper_bound = Q3 + threshold * IQR

    # Replace outliers with the median value
    median_value = data.median()
    data_replaced = data.where(~((data < lower_bound) | (data > upper_bound)), median_value, axis=0)

    return data_replaced

def replace_outliers_with_median_quantile(data, lower = 0.05, upper=0.95):
    # Calculate lower and upper bounds for outliers
    lower_bound = data.quantile(lower)
    upper_bound = data.quantile(upper)

    # Replace outliers with the median value
    median_value = data.median()
    data_replaced = data.where(~((data < lower_bound) | (data > upper_bound)), median_value, axis=0)

    return data_replaced

def replace_outliers_with_none_quantile(data, lower = 0.05, upper=0.95):
    # Calculate lower and upper bounds for outliers
    lower_bound = data.quantile(lower)
    upper_bound = data.quantile(upper)

    # Replace outliers with the None value
    data_replaced = data.where(~((data < lower_bound) | (data > upper_bound)), None, axis=0)

    return data_replaced

In [None]:
def calc_vif(df):
    
    df['const'] = 1

    # # VIF dataframe
    vif_data = pd.DataFrame()
    
    # calculating VIF for each feature
    vif_data['VIF'] = [variance_inflation_factor(df.values, i) for i in range(df.shape[1])]
    vif_data['variable'] = df.columns
    
    return vif_data[vif_data['variable']!='const'].reset_index(drop=True)

Interpreting VIF

The output gives us the VIF for each variable. A VIF close to 1 indicates that the variable is not correlated with the other variables, and hence its variance is not inflated at all. A VIF greater than 1 suggests the presence of multicollinearity.

As a rule of thumb, a VIF above 5 indicates a high multicollinearity between this variable and the others, and above 10 is very high multicollinearity.

A value between 1 and 5 indicates moderate correlation between a given explanatory variable and other explanatory variables in the model, but this is often not severe enough to require attention.

A value greater than 5 indicates potentially severe correlation between a given explanatory variable and other explanatory variables in the model. In this case, the coefficient estimates and p-values in the regression output are likely unreliable.


| VIF value | Diagnosis                                        |
| --------- | ------------------------------------------------ |
| 1         | Complete absence of multicollinearity            |
| 1-2       | Absence of strong multicollinearity              |
| > 2       | Presence of moderate to strong multicollinearity |

Note: There is no universal agreement of VIF values for multicollinearity detection. The VIF > 5 or VIF > 10 indicates strong multicollinearity, but VIF < 5 also indicates multicollinearity. It is advisable to have VIF < 2.
