# Feature Engineering
Будет проводить на основе выводов полученных из EDA. Включает в себя и Data Cleaning, поскольку избавиться от неинформативных признаков так же важно, как и сконструировать новые. 


## Загрузка библиотек

In [1]:
import pandas as pd
import numpy as np
from scipy.stats import uniform

import matplotlib.pyplot as plt
import seaborn as sns
from prettytable import PrettyTable

import os
import gc
import pickle
import warnings
warnings.filterwarnings("ignore")
from datetime import datetime


from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))


from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import (train_test_split, 
                                      StratifiedKFold, 
                                      RandomizedSearchCV)
from sklearn.metrics import (roc_auc_score, precision_score, 
                              recall_score, roc_curve, confusion_matrix)
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import SGDClassifier
from sklearn.ensemble import (RandomForestClassifier, ExtraTreesClassifier)
from sklearn.calibration import CalibratedClassifierCV

import xgboost as xgb
from xgboost import XGBClassifier
from xgboost import XGBRegressor

import lightgbm as lgb
from lightgbm import LGBMClassifier
from lightgbm import LGBMRegressor
print("All modules were imported successfully")


All modules were imported successfully


### Полезные функции

In [5]:
def reduce_mem_usage(data, verbose = True):
    """
    Optimizes memory usage of numeric columns in a DataFrame by downcasting their data types.
    
    The function analyzes value ranges in numeric columns and converts them to the smallest 
    possible data types while preserving all information. This significantly reduces 
    memory usage without data loss.
    
    Parameters
    ----------
    data : pandas.DataFrame
        Input DataFrame to optimize. Modified in-place.
    verbose : bool, default=True
        If True, prints detailed optimization process information:
        - Initial memory usage
        - List of optimized columns with old and new types
        - Final memory usage
        - Percentage of memory reduction
    
    Returns
    -------
    pandas.DataFrame
        Optimized DataFrame with reduced memory consumption.
        Returns the same object as input (modified in-place).
    
    Notes
    -----
    - Only numeric columns (int and float types) are optimized
    
    """

    initial_mem = data.memory_usage().sum()/1024**2
    if verbose:
        print("Initial mem usage {.:2B}".format(initial_mem))

    int_limits = {
        np.int8 : (np.iinfo(np.int8).min, np.iinfo(np.int8).max), 
        np.int16 : (np.iinfo(np.int16).min, np.iinfo(np.int16).max),
        np.int32 : (np.iinfo(np.int32).min, np.iinfo(np.int32).max),
        np.int64 : (np.iinfo(np.int64).min, np.iinfo(np.int64).max)}

    float_limits = {
        np.float8 : (np.finfo(np.float8).min, np.finfo(np.float8).max), 
        np.float16 : (np.finfo(np.float16).min, np.finfo(np.float16).max), 
        np.float32 : (np.finfo(np.float32).min, np.finfo(np.float32).max), 
        np.float64 : (np.finfo(np.float64).min, np.finfo(np.float64).max)}

    optimized_cols = []

    for col in data.columns:
        col_type = data[col].dtype

        if col_type != object:
            c_min = data[col].min()
            c_max = data[col].max()

            if np.issubdtype(col_type, np.integer):
                if int_limits[np.int8][0] <= c_min and c_max <= int_limits[np.int8][1]:
                    new_type = np.int8
                elif int_limits[np.int16][0] <= c_min and c_max <= int_limits[np.int16][1]:
                    new_type = np.int16
                elif int_limits[np.int32][0] <= c_min and c_max <= int_limits[np.int32][1]:
                    new_type = np.int32
                else:
                    new_type = np.int64

            else:
                if float_limits[np.float8][0] <= c_min and c_max <= float_limits[np.float8][1]:
                    new_type = np.float8
                elif float_limits[np.float16][0] <= c_min and c_max <= float_limits[np.float16][1]:
                    new_type = np.float16
                elif float_limits[np.float32][0] <= c_min and c_max <= float_limits[np.float32][1]:
                    new_type = np.float32
                else:
                    new_type = np.float64

            if col_type != new_type:
                data[col] = data[col].astype(new_type)
                optimized_cols.append((col, str(col_type), str(new_type)))

    end_mem = data.memory_usage()/1024**2

    if verbose:
        if optimized_cols:
            print("Optimized columns:")
            for col, old_type, new_type in optimized_cols:
                print(f"  {col}: {old_type} -> {new_type}")
    
        print(f'Memory after optimization: {end_mem:.2f} MB')
        reduction = 100 * (start_mem - end_mem) / start_mem
        print(f'Reduced by {reduction:.1f}%')
        print('-' * 80)
    
    return data
                    

In [6]:
def relational_tables_prepare(file_directory = '', verbose = True, tables = None):

    if verbose:
        print("Start merging tables")
        start = datetime.now()

    for table in tables:
        with open(file_directory + str(table) + '_preprocessed.pkl' , "rb") as file:
            table_to_merge = reduce_mem_usage(pickle.load(file), verbose = False)

    with open(file_directory + 'application_train_preprocessed.pkl', 'rb') as file:
        application_train = reduce_mem_usage(pickle.load(file), verbose = False)

    with open(file_directory + 'application_test_preprocessed.pkl', 'rb') as file:
        application_test = reduce_mem_usage(pickle.load(file), verbose = False)

In [8]:
class Modelling:
    '''
    Class for Doing Hyperparameter tuning to find best set of hyperparameters, building models on best hyperparams and
    displaying results on best hyperparameters.
    
    It has 4 methods:
        1. init method
        2. random_search_cv method
        3. train_on_best_params method
        4. proba_to_class method
        5. tune_threshold method
        6. results_on_best_params method
        7. feat_importances_show method
    '''

    def __init__(self, base_model, x_train, y_train, x_test, calibration = False, calibration_method = 'isotonic', 
                 calibration_cv = 4, k_folds = 4, random_state = 982):
        '''
        Function to initialize the class members.
        
        Inputs: 
            self
            base_model: estimator/classifier
                The base model to be used for the modelling purpose
            x_train: numpy array
                Training standardized data
            y_train: numpy array
                Training class labels
            x_test: numpy array
                Test standardized data
            calibration: bool, default = False
                Whether to calibrate the model for generating class probabilities
            calibration_method: str, default = 'isotonic'
                The type of calibration to use, i.e. sigmoid or isotonic
            calibration_cv: int, default = 4
                Number of cross-validation folds for calibrating the probabilities
            k_folds: int, default = 4
                Number of cross-validation folds for training and tuning the model
            random_state: int, default = 982
                Random state for StratifiedKFold for reproducibility
                
        Returns: 
            None      
        '''
        self.base_model = base_model
        self.num_folds = k_folds
        self.kfolds = StratifiedKFold(n_splits = k_folds, shuffle = True, random_state = random_state)
        self.x_train = x_train
        self.y_train = y_train
        self.x_test = x_test
        self.calibration = calibration
        if self.calibration:
            self.calibration_method = calibration_method
            self.calibration_cv = calibration_cv

    def random_search(self, hyperparams_dict, n_iter = 30, verbose = True, n_jobs = 1, random_State = 843):

        '''
        Function to do RandomizedSearchCV on training data.
        
        Inputs:
            self
            hyperparams_dict: dict
                Dictionary of hyperparameters to tune
            n_iter: int, default = 30
                Number of iterations to perform for random search
            verbose: bool, default = True
                Whether to keep verbosity or not
            n_jobs: int, default = 1
                Number of cores to use for Random Search
            random_state: int, default = 843
                Random state for reproducibility of RandomizedSearchCV
                
        Returns:
            None
        '''

        if verbose:
            start = datetime.now()
            print("Start doing Randomized Search CV with {n_iter} random initializations".format(n_iter))
        rscv = RandomizedSearchCV(self.base_model, hyperparams_dict, n_iter = n_iter, scoring = 'roc-auc',
                                  cv = self.kfolds, return_train_score = True, verbose = 2, n_jobs = n_jobs, random_state = random_state)
        rcsv.fit(self.x_train, self.y_train)

        if verbose:
            print("Done")
            print(f'Time elapsed = {datetime.now() - start}')

        self.tuning_results = pd.DataFrame(rcsv.cv_results_)
        self.best_model = rcsv.best_estimator_

        gc.collect()

    def train_on_best_params(self, verbose = True):
        '''
        Function to train the model on best hyperparameters obtained from previous method.
        Generates Cross-Validation predictions as Out-of-fold predictions
        
        Inputs:
            self
            verbose: bool, default = True
                Whether to keep verbosity or not
        
        Returns:
            None
            
        '''
        if verbose:
            start = datetime.now()
            print(f"{self.num_folds} - Fold Cross Validation")
            print("Fitting the model on best hyperparams...")

        self.cv_preds_probas = np.zeros(self.x_train.shape[0])
        self.best_threshold_train = 0

        for fold_number, (train_indices, val_indices) in enumerate(self.kfolds.split(self.x_train, self.y_train), 1):
            if verbose:
                print(f"Fitting Fold {fold_number}...")

                self.best_model.fit(self.x_train[train_indices], self.y_train[train_indices])
                if not self.calibration:
                    self.train_preds_probas = self.best_model.predict_proba(self.x_train[train_indices])[:, 1]
                    self.cv_preds_probas[val_indices] = self.best_model.predict_proba(self.x_train[val_indices])[:, 1]

                else:
                    self.calibrated_classifier = CalibratedClassifierCV(self.best_model, method = self.calibration_method, 
                                                                        cv = self.calibration_cv)
                    self.calibrates_classifier.fit(self.x_train[train_indices], self.y_train[train_indices])
                    self.train_preds_probas = self.calibrated_classifier.predict_proba(self.x_train[train_indices])[:, 1]
                    self.cv_preds_probas[val_indices] = self.best_model.predict_proba(self.x_train[val_indices])[:, 1]

                self.best_threshold_train += self.tuning_threshold(self.y_train[train_indices], self.train_preds_probas)/self.num_folds
                

        self.cv_preds_class = self.proba_to_class(self.cv_preds_probas, self.best_threshold_train)

        if verbose:
            print("Done")
            print(f"Time elapsed = {datetime.now() - start}")
        gc.collect()

    def proba_to_class(self, proba, threshold):
        '''
        Function to convert a given probability to class label based on a threshold value.
        
        Inputs:
            self
            proba: numpy array
                Probabilities of class label = 1
            threshold: int
                Threshold probability to be considered as Positive or Negative Class Label
            
        Returns:
            Converted Class Label
        '''
        return np.where(proba >= threshold, 1, 0)

    def tune_threshold(self, true_labels, predicted_probas):
        '''
        Function to find the optimal threshold for maximizing the TPR and minimizing the FPR from ROC-AUC Curve.
        This is found out by using the J Statistic, which is J = TPR - FPR.
        Reference: https://machinelearningmastery.com/threshold-moving-for-imbalanced-classification/
        
        Inputs:
            self
            true_labels: numpy array or pandas series
                True Class Labels
            predicted_probas: numpy array
                Predicted Probability of Positive Class label
            
        Returns:
            Threshold probability.
        '''
        fpr, tpr, threshold = roc_curve(true_labels, predicted_probas)
        j_stat = tpr - fpr

        best_index = np.argmax(j_stat)

        return threshold[best_index]

    def results_on_best_params(self, model_name):
        '''
        Function to train the whole data on best parameters and display the results.
        
        Inputs:
            self
            model_name: str
                model name to get feature importances.
        
        Returns:
            None
        '''
        self.best_model.fit(self.x_train, self.y_train)
        if not self.calibration:
            self.train_preds_probas = self.best_model.predict_proba(self.x_train)[:, 1]
            self.test_preds_probas = self.best_model.predict_proba(self.x_test)[:, 1]
        else:
            self.calibrated_classifier.fit(self.x_train, self.y_train)
            self.train_preds_probas = self.calibrated_classifier.predict_proba(self.x_train)[:, 1]
            self.test_preds_probas = self.calibrated_classifier.predict_proba(self.x_test)[:, 1]

        self.train_preds_class = self.proba_to_class(self.train_preds_probas, self.best_threshold_train)
        self.test_preds_class = self.proba_to_class(self.test_preds_probas, self.best_threshold_train)

        if mode_name == 'linear':
            self.feat_imp = self.best_model.coef_[0]
        else:
            self.feat_impt = self.best_model.feature_importances_

        print("-"*100)
        print(f"\nBest threshold (using j-stat) : {self.best_threshold_train}")
        print("Training results")
        print(f"\tROC-AUC score : {roc_auc_score(self.y_train, self.train_preds_probas)}")
        print(f"\tPrecision score : {precision_score(self.y_train, self.train_preds_class)}")
        print(f"\tRecall score : {recall_score(self.y_train, self.train_preds_class)}")
        print("CV results")
        print(f"\tROC-AUC score : {roc_auc_score(self.y_train, self.cv_preds_probas)}")
        print(f"\tPrecision score : {precision_score(self.y_train, self.cv_preds_class)}")
        print(f"\tRecall score : {recall_Score(self.y_train, self.cv_preds_class)}")

        pass

    def feat_importances(self):
        pass
        