# Scikit-learn

Scikit-learn es una biblioteca esencial para el aprendizaje automático en Python construida sobre numpy. Ofrece una interfaz simple y eficiente para realizar tareas como preprocesamiento de datos, clasificación, regresión, agrupamiento, y evaluación de modelos. Es ampliamente utilizada tanto en investigación como en producción, y se integra bien con otras herramientas de Python.

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.utils.validation import check_is_fitted
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
import pathlib
import kaggle
import numpy as np
import pandas as pd

## Transformación personalizada

In [None]:
class mb_woe_encoder(BaseEstimator, TransformerMixin):
    """
    Clase creada como encoder de woe sobre variables
    categoricas. Esta transformacion podría utilizarse
    en un pipeline de sklearn
    """

    def __init__(self, pc_min_other=0.05, keep_na=True):
        """ """
        self.pc_min_other = pc_min_other
        self.keep_na = keep_na
        self.dict_woe = {}
        self.feature_names_in_ = None

    def fit(self, X, y=None):
        """ """
        self.feature_names_in_ = X.columns.to_numpy()

        if isinstance(y, pd.DataFrame) or isinstance(y, pd.Series):
            y = y.values.squeeze()

        for c in self.feature_names_in_:
            self.dict_woe[c] = {"WOES": {}, "OTHERS": None}
            # Se eliminan registros con NAs
            target_sm = y[np.where(X[c].notna())]
            feature_sm = X.loc[(X[c].notna().values), [c]]

            df_aux = pd.concat(
                [
                    feature_sm.reset_index(drop=True),
                    pd.DataFrame(target_sm.reshape(-1, 1)),
                ],
                axis=1,
                ignore_index=False,
            )
            # Se calcula el numero de eventos por categoria
            event_x_cat = df_aux.groupby(c).sum()
            # Se calcula el numero de registros de cada categoria
            reg_x_cat = df_aux.groupby(c).count()
            # Se calcula el numero de no eventos por categoria
            non_event_x_cat = reg_x_cat - event_x_cat

            # Se calcula woe (Esta al reves de la teorica)
            woe = np.log((event_x_cat + 1) / (non_event_x_cat + 1))

            # Se resetean indices para tener columna con categorias
            woe = woe.reset_index()
            # Se renombran columnas
            woe.columns = ["CATEGORIA", "VALOR_WOE"]

            # Se calcula el numero de registros minimos que debe tener
            # una categoria para no ser considerada minoritaria
            lim_value_other = self.pc_min_other * len(y)

            # Se obtienen las categorias minoritarias
            ls_cat_excl = (
                reg_x_cat[(reg_x_cat < lim_value_other).values]
                .reset_index()
                .iloc[:, 0]
                .tolist()
            )

            # Se filtran WoE para quedarse solo con las categorias
            # mayoritarias
            woe_filtrado = woe[~(woe["CATEGORIA"].isin(ls_cat_excl))]
            # Se almacenan los woe en formato pares clave-valor de
            # categoria-valor
            self.dict_woe[c]["WOES"] = dict(
                zip(woe_filtrado["CATEGORIA"], woe_filtrado["VALOR_WOE"])
            )

            # Se calcula el woe comun para las clases minoritarias
            event_total_other = target_sm[
                np.where(~(feature_sm.isin(ls_cat_excl).values.squeeze()))
            ].sum()
            non_event_other = (
                target_sm[
                    np.where(~(feature_sm.isin(ls_cat_excl).values.squeeze()))
                ].size
                - event_total_other
            )
            self.dict_woe[c]["OTHERS"] = np.log(
                (event_total_other + 1) / (non_event_other + 1)
            )

        return self

    def transform(self, X):
        """ """
        for c in X.columns:
            X[c] = (
                X[c]
                .map(
                    lambda k: self._get_value_from_dict(
                        k, self.dict_woe[c]["WOES"], self.dict_woe[c]["OTHERS"]
                    )
                )
                .astype(np.float64)
            )

        return X

    # def fit_transform(self, X, y=None):
    #     """
    #     """
    #     return self.fit(X, y).transform(X)

    def _get_value_from_dict(self, key_dict, dict_values, value_other):
        """
        Función utilizada para que sustituye clave de un diccionario
        por su valor. En caso de que la clave sea None, np.nan,
        esta se sustituye por np.nan. Y para cuando no es
        desconocido el valor pero tampoco aparece en el diccionario
        se sustituye por value_other

        Keyword arguments:
        :key_dict: clave a sustituir por su valor
        :dict_values: diccionario con las tuplas clave-valor
        """
        if (pd.isna(key_dict)) and (self.keep_na):
            return np.nan
        else:
            return dict_values.get(key_dict, value_other)

    def get_feature_names_out(self, input_features=None):
        check_is_fitted(self)
        if input_features is None:
            input_features = self.feature_names_in_
        return np.array(input_features, dtype=object)


class mb_clean_text(BaseEstimator, TransformerMixin):
    """ """

    def __init__(self, replace_value=None):
        """ """
        self.replace_value = replace_value

    def fit(self, X, y=None):
        """ """
        self.feature_names_in_ = X.columns.to_numpy()

        return self

    def transform(self, X):
        """ """
        for c in X.columns:
            X[c] = X[c].str.replace(r"_", "").str.strip()
            X.loc[(X[c] == ""), c] = self.replace_value

            if self.replace_value is not None:
                X.loc[(X[c].isna()), c] = self.replace_value

        return X

    def get_feature_names_out(self, input_features=None):
        check_is_fitted(self)
        if input_features is None:
            input_features = self.feature_names_in_
        return np.array(input_features, dtype=object)

In [None]:
class mb_clean_text_number(BaseEstimator, TransformerMixin):
    """ """

    def __init__(
        self,
    ):
        """ """

    def fit(self, X, y=None):
        """ """
        self.feature_names_in_ = X.columns.to_numpy()

        return self

    def transform(self, X):
        """ """
        for c in X.columns:
            X[c] = X[c].astype(str).str.strip('_ ,"')
            X[c] = pd.to_numeric(X[c], errors="coerce")
        return X

    def get_feature_names_out(self, input_features=None):
        check_is_fitted(self)
        if input_features is None:
            input_features = self.feature_names_in_
        return np.array(input_features, dtype=object)

In [None]:
class mb_standard_scaler(StandardScaler):
    """ """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.feature_names_in_ = None

    def fit(self, X, y=None):
        # Guardamos los nombres de columnas si es DataFrame
        if isinstance(X, pd.DataFrame):
            self.feature_names_in_ = X.columns.to_numpy()
        else:
            self.feature_names_in_ = None
        return super().fit(X, y)

    def get_feature_names_out(self, input_features=None):
        check_is_fitted(self)
        if input_features is None:
            input_features = self.feature_names_in_
        return np.array(input_features, dtype=object)


class mb_simple_imputer(SimpleImputer):
    """ """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.feature_names_in_ = None

    def fit(self, X, y=None):
        # Guardamos los nombres de columnas si es DataFrame
        if isinstance(X, pd.DataFrame):
            self.feature_names_in_ = X.columns.to_numpy()
        else:
            self.feature_names_in_ = None
        return super().fit(X, y)

    def get_feature_names_out(self, input_features=None):
        check_is_fitted(self)
        if input_features is None:
            input_features = self.feature_names_in_
        return np.array(input_features, dtype=object)

## Descarga del dataframe

In [None]:
URL_DATASET = r"parisrohan/credit-score-classification"
PATH_DATA = pathlib.Path("../data/")

In [None]:
kaggle.api.dataset_download_files(URL_DATASET, path=PATH_DATA, unzip=True)
filenames = [f.name for f in kaggle.api.dataset_list_files(URL_DATASET).files]
print(filenames)

## Carga data

In [None]:
data = pd.read_csv(PATH_DATA.joinpath(filenames[1]))
data.columns = data.columns.str.strip().str.upper()
data.head(2)

In [None]:
for c in [
    "OCCUPATION",
    "AGE",
    "ANNUAL_INCOME",
    "NUM_OF_LOAN",
    "NUM_OF_DELAYED_PAYMENT",
    "CHANGED_CREDIT_LIMIT",
    "OUTSTANDING_DEBT",
    "AMOUNT_INVESTED_MONTHLY",
    "MONTHLY_BALANCE",
]:
    print(data.loc[:, c].unique())

In [None]:
data.loc[
    :,
    [
        "MONTHLY_INHAND_SALARY",
        "NUM_BANK_ACCOUNTS",
        "NUM_CREDIT_CARD",
        "INTEREST_RATE",
        "DELAY_FROM_DUE_DATE",
        "NUM_CREDIT_INQUIRIES",
        "CREDIT_UTILIZATION_RATIO",
        "TOTAL_EMI_PER_MONTH",
    ],
].describe()

In [None]:
data.loc[:, ["CREDIT_SCORE"]].value_counts(dropna=False, normalize=True)

In [None]:
data = data.loc[
    :,
    [
        "OCCUPATION",
        "AGE",
        "ANNUAL_INCOME",
        "NUM_OF_LOAN",
        "NUM_OF_DELAYED_PAYMENT",
        "CHANGED_CREDIT_LIMIT",
        "OUTSTANDING_DEBT",
        "AMOUNT_INVESTED_MONTHLY",
        "MONTHLY_BALANCE",
        "MONTHLY_INHAND_SALARY",
        "NUM_BANK_ACCOUNTS",
        "NUM_CREDIT_CARD",
        "INTEREST_RATE",
        "DELAY_FROM_DUE_DATE",
        "NUM_CREDIT_INQUIRIES",
        "CREDIT_UTILIZATION_RATIO",
        "TOTAL_EMI_PER_MONTH",
        "CREDIT_SCORE",
    ],
]

## Preparación train y test

In [None]:
SEED = 2025
np.random.seed(SEED)

In [None]:
encoder_target = LabelEncoder()
y = encoder_target.fit_transform(data["CREDIT_SCORE"])
print(encoder_target.inverse_transform(y))
print(encoder_target.classes_)
y = pd.DataFrame(np.where((y == 1), 1, 0), columns=["CREDIT_SCORE"])


x_train, x_test, y_train, y_test = train_test_split(
    data.drop(columns=["CREDIT_SCORE"]),
    y.values.ravel(),
    test_size=0.2,
    random_state=SEED,
    stratify=y.values.ravel(),
    shuffle=True,
)

## Generación del Pipeline

In [None]:
pl_cat = Pipeline(
    [
        ("clean", mb_clean_text("MISSING")),
        ("woe", mb_woe_encoder()),
    ]
)

pl_cat_num = Pipeline(
    [
        ("clean", mb_clean_text_number()),
        ("mb_simple_imputer", mb_simple_imputer(strategy="median")),
        ("scaler", mb_standard_scaler()),
    ]
)

pl_num = Pipeline(
    [
        ("mb_simple_imputer", mb_simple_imputer(strategy="median")),
        ("scaler", mb_standard_scaler()),
    ]
)

pl_preprocess = ColumnTransformer(
    [
        ("prep_cat", pl_cat, ["OCCUPATION"]),
        (
            "prep_cat_num",
            pl_cat_num,
            [
                "AGE",
                "ANNUAL_INCOME",
                "NUM_OF_LOAN",
                "NUM_OF_DELAYED_PAYMENT",
                "CHANGED_CREDIT_LIMIT",
                "OUTSTANDING_DEBT",
                "AMOUNT_INVESTED_MONTHLY",
                "MONTHLY_BALANCE",
            ],
        ),
        (
            "prep_num",
            pl_num,
            [
                "MONTHLY_INHAND_SALARY",
                "NUM_BANK_ACCOUNTS",
                "NUM_CREDIT_CARD",
                "INTEREST_RATE",
                "DELAY_FROM_DUE_DATE",
                "NUM_CREDIT_INQUIRIES",
                "CREDIT_UTILIZATION_RATIO",
                "TOTAL_EMI_PER_MONTH",
            ],
        ),
    ],
    force_int_remainder_cols=False,
)

In [None]:
pl_preprocess

In [None]:
data_clean = pl_preprocess.fit_transform(x_train, y_train)
data_clean

In [None]:
pl_preprocess.get_feature_names_out()

In [None]:
pl_preprocess.transformers_[0][1].named_steps["woe"].dict_woe

## Pipeline con preprocesamiento y modelo

In [None]:
pipeline_model = Pipeline(
    [("pl_prep", pl_preprocess), ("lineal_model", LogisticRegression())]
)

In [None]:
pipeline_model.fit(x_train, y_train)

## Resultados

In [None]:
print(
    f"Gini con Regresion Lineal en Train: {2 * roc_auc_score(y_train, pipeline_model.predict_proba(x_train)[:, 1]) - 1: .2%}"
)
print(
    f"Gini con Regresion Lineal en Test: {2 * roc_auc_score(y_test, pipeline_model.predict_proba(x_test)[:, 1]) - 1: .2%}"
)