In [7]:
!pip install category_encoders
import pandas as pd
import numpy as np
from sklearn.decomposition import PCA
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from scipy import stats as st
import category_encoders as ce
#----------------------CategoricalTransformer----------------------

class CategoricalTransformer():
    def __init__(self, max_unique_values = 20):
        self.max_unique_values = max_unique_values
        self.__selected_features = None

    def fit(self, X):
        # Identify features with 20 or fewer unique values
        self.__selected_features = X.columns[X.nunique() <= self.max_unique_values]
        return self

    def transform(self, X):
        X_transformed = X.copy()
        # Convert selected features to categorical
        X_transformed[self.__selected_features] = X_transformed[self.__selected_features].astype('object')
        return X_transformed
    def fit_transform(self, X):
        self.fit(X)
        return self.transform(X)



#----------------------MissingValuesFiller----------------------

class MissingValuesFiller():
    def __init__(self, numeric_strategy='mean', categorical_strategy='most_frequent'):
        self.numeric_strategy = numeric_strategy
        self.categorical_strategy = categorical_strategy
        self.__numeric_cols = None
        self.__categorical_cols = None
        self.__mean_numeric_series = None
        self.__mode_categorical_series = None

    def fit(self, X):
        # Separate numeric and categorical columns
        self.__numeric_cols = X.select_dtypes(include='number').columns
        self.__categorical_cols = X.select_dtypes(include='object').columns
        self.__mean_numeric_series= X[self.__numeric_cols].agg(self.numeric_strategy)
        self.__mode_categorical_series= X[self.__categorical_cols].mode().iloc[0] if self.categorical_strategy  == 'most_frequent' else self.categorical_strategy

    def transform(self, X):
        X_transformed = X.copy()
         # Fill missing values in numeric columns
        X_transformed[self.__numeric_cols] = X[self.__numeric_cols].fillna(self.__mean_numeric_series)

        # Fill missing values in categorical columns
        X_transformed[self.__categorical_cols] = X[self.__categorical_cols].fillna(self.__mode_categorical_series)

        return X_transformed

    def fit_transform(self, X):
        self.fit(X)
        return self.transform(X)



#----------------------DropHighCorrelationFeatures----------------------


class DropHighCorrelationFeatures():
    def __init__(self, threshold = 0.85):
        self.threshold = threshold
        self.to_drop_cols = None

    def fit(self, X):
        # Calculate the correlation matrix
        correlation_matrix = X.corr().abs()

        # Create a mask to identify highly correlated features
        upper_triangle_mask = correlation_matrix.where(np.triu(np.ones(correlation_matrix.shape), k=1).astype(bool))

        # Identify features with correlation above the threshold
        to_drop = [column for column in upper_triangle_mask.columns if any(upper_triangle_mask[column] > self.threshold)]

        self.to_drop_cols = to_drop



    def transform(self, X):
        X_transformed = X.copy()

        # Drop highly correlated features
        X_transformed = X_transformed.drop(columns=self.to_drop_cols)

        return X_transformed
    def fit_transform(self, X):
        self.fit(X)
        return self.transform(X)

#----------------------SplitterByType----------------------

class SplitterByType():
    def __init__(self):
        self.__numeric_cols = None
        self.__categorical_cols = None

    def fit(self, X):
        # Identify numeric and categorical columns
        self.__numeric_cols = X.select_dtypes(include='number').columns
        self.__categorical_cols = X.select_dtypes(include='object').columns



    def transform(self, X):
        X_transformed = X.copy()

        # Create DataFrames based on column types
        numeric_df = X[self.__numeric_cols].copy()
        categorical_df = X[self.__categorical_cols].copy()

        return numeric_df, categorical_df

    def fit_transform(self, X):
        self.fit(X)
        return self.transform(X)


#----------------------ConstantsAndHighCardinalityDropper----------------------
class ConstantsAndHighCardinalityDropper():
    def __init__(self,max_unique_values=2000):
        self.max_unique_values = max_unique_values
        self.__non_constant_cols = None
        self.__high_cardinality_cols = None

    def fit(self, X):
        # Identify numeric and categorical columns
        self.__non_constant_cols = X.columns[X.nunique() > 1]
        self.__high_cardinality_cols = X.columns[X.nunique() > self.max_unique_values]


    def transform(self, X):
        X_transformed = X.copy()

        # Get non constants columns
        X_transformed = X_transformed[self.__non_constant_cols]

        #drop high cardinality columns
        X_transformed = X_transformed.drop(columns=self.__high_cardinality_cols)

        return X_transformed

    def fit_transform(self, X):
        self.fit(X)
        return self.transform(X)


#----------------------ConstantsAndHighCardinalityDropper----------------------
class FrequencyEncoder():
    def __init__(self):
        self.categorical_features = None
        #dictionary with values of the categorical features as keys and dictionary of frequencies
        self.mapper = {}

    def fit(self, X):
        # Identify numeric and categorical columns
        self.categorical_features = X.select_dtypes('object').columns

        for cat_col in self.categorical_features:
            class_values_frequency = {}
            for class_value in X[cat_col].unique():
                class_values_frequency[class_value] = sum(X[cat_col]==class_value)/X.shape[0]
            self.mapper[cat_col] = class_values_frequency


    def transform(self, X):
        X_transformed = X.copy()

        # apply frequencies mapping
        for cat_col in self.categorical_features:
            X_transformed[cat_col] = X_transformed[cat_col].map(self.mapper[cat_col])

        return X_transformed

    def fit_transform(self, X):
        self.fit(X)
        return self.transform(X)



In [2]:
df = pd.read_csv('../data/raw/training.csv')
X = df.iloc[:,:-1].copy()
y = df.Target
X_val1 = pd.read_csv('../data/raw/cap_data_validation1.csv')

# Example

In [21]:
def preprocess_data(X: pd.DataFrame, X_val: pd.DataFrame, numeric_strategy='mean',categorical_strategy='most_frequent',
                   threshold=0.85,cat_transformer_max_unique_values = 20,cardinality_max_unique_values=2000,
                   n_components = 40, cat_encoder_strategy = 'woee'):
    # Step 0: Copy Values to transform
    X_transformed = X.copy()
    X_val_transformed = X_val.copy()

    # Step 1: Fill missing values
    filler = MissingValuesFiller(numeric_strategy=numeric_strategy, categorical_strategy=categorical_strategy)
    X_transformed = filler.fit_transform(X_transformed)
    X_val_transformed = filler.transform(X_val_transformed)

    # Step 2: Reduce dimensionality due to high correlation
    correlation_reducer = DropHighCorrelationFeatures(threshold=threshold)
    X_transformed = correlation_reducer.fit_transform(X_transformed)
    X_val_transformed = correlation_reducer.transform(X_val_transformed)

    # Step 3: Convert features to categorical
    categorical_transformer = CategoricalTransformer(max_unique_values = cat_transformer_max_unique_values).fit(X_transformed)
    X_transformed = categorical_transformer.transform(X_transformed)
    X_val_transformed = categorical_transformer.transform(X_val_transformed)


    # Step 4: Drop constants and high-cardinality features
    dropper = ConstantsAndHighCardinalityDropper(max_unique_values = cardinality_max_unique_values)
    X_transformed = dropper.fit_transform(X_transformed)
    X_val_transformed = dropper.transform(X_val_transformed)


    # Step 5: Split the DataFrame into numeric and categorical parts
    splitter = SplitterByType()
    X_transformed_numeric, X_transformed_categorical = splitter.fit_transform(X_transformed)
    X_val_transformed_numeric, X_val_transformed_categorical = splitter.transform(X_val_transformed)
    incosistencias_columns = ['Feature_11', 'Feature_71', 'Feature_88', 'Feature_90',
       'Feature_143', 'Feature_156', 'Feature_176', 'Feature_194',
       'Feature_200', 'Feature_216', 'Feature_237', 'Feature_268',
       'Feature_273', 'Feature_319', 'Feature_339', 'Feature_386',
       'Feature_391', 'Feature_402', 'Feature_407', 'Feature_410',
       'Feature_417', 'Feature_421', 'Feature_422', 'Feature_427',
       'Feature_431', 'Feature_441', 'Feature_471', 'Feature_477',
       'Feature_492', 'Feature_532']
    col_new = set(X_transformed_numeric)-set(incosistencias_columns)
    X_transformed_numeric = X_transformed_numeric[col_new]
    X_val_transformed_numeric = X_val_transformed_numeric[col_new]

    # Step 6: Reduce dimensionality of numeric features
    pca = PCA(n_components)
    X_transformed_numeric = pd.DataFrame(pca.fit_transform(X_transformed_numeric))
    X_val_transformed_numeric = pd.DataFrame(pca.transform(X_val_transformed_numeric))
    print(f'Explained Variance with {n_components} components: {pca.explained_variance_ratio_.sum()}')


    # Step 7: Categorical Encoding
    if cat_encoder_strategy == 'woee':
        # Step 7.1 (Optional): Apply WOE Encoding
        X_columns = X_transformed_categorical.columns
        woe_encoder = ce.WOEEncoder(cols=X_columns)

        X_transformed_categorical = woe_encoder.fit_transform(X_transformed_categorical, y)
        X_val_transformed_categorical = woe_encoder.transform(X_val_transformed_categorical)
    elif cat_encoder_strategy == 'freq':
        # Step 7.2 (Optional): Apply self defined Frequency Encoding
        X_columns = X_transformed_categorical.columns
        freq_encoder = FrequencyEncoder()

        X_transformed_categorical = freq_encoder.fit_transform(X_transformed_categorical)
        X_val_transformed_categorical = freq_encoder.transform(X_val_transformed_categorical)
    else:
        # Step 7.3 (Optional): Apply One Hot Encoding
        X_columns = X_transformed_categorical.columns
        ohe_encoder = ce.OneHotEncoder()

        X_transformed_categorical = ohe_encoder.fit_transform(X_transformed_categorical)
        X_val_transformed_categorical = ohe_encoder.transform(X_val_transformed_categorical)

    # Step 8: Mixing steps 6 & 7 to prepare data for modelling
    X_transformed = pd.concat([X_transformed_numeric,X_transformed_categorical], axis=1)
    X_val_transformed = pd.concat([X_val_transformed_numeric,X_val_transformed_categorical], axis=1)

    return X_transformed, X_val_transformed

**Next steps... concatenate dataframes applying encoders to categorical data**

In [22]:
X_transformed, X_val_transformed = preprocess_data(X, X_val1, cat_encoder_strategy='ohe')

  X_transformed_numeric = X_transformed_numeric[col_new]
  X_val_transformed_numeric = X_val_transformed_numeric[col_new]


Explained Variance with 40 components: 0.8276145411905154


# Modelling

In [24]:
from sklearn.metrics import classification_report, brier_score_loss, confusion_matrix, accuracy_score


# Create a function that return the metrics we need to compare if the model is a good one.
# The function returns 'brier_score', 'confusion matrix', 'accuracy negative and positive', 'accuracy'
def metricas_modelo(y_real, prob_pred):
    y_pred = list(map(lambda x: 1 if x==True else 0, prob_pred>0.5))
    brier = round(brier_score_loss(y_real,prob_pred)*100,2)
    cm = confusion_matrix(y_real,y_pred)
    acc_neg = round(100*cm[0,0]/(cm[0,0]+cm[0,1]),2)
    acc_pos = round(100*cm[1,1]/(cm[1,0]+cm[1,1]),2)
    acc = round(100*accuracy_score(y_real, y_pred),2)
    print(f' brier: {brier},\n confusion_m: \n{cm},\n acc_neg: {acc_neg},\n acc_pos: {acc_pos},\n accuracy: {acc}')

Usando metodología WOEE

In [25]:
# Dividir los datos en conjuntos de entrenamiento y prueba
X_train, X_test, y_train, y_test = train_test_split(X_transformed, y, test_size=0.90, random_state=42)

# Crear un objeto DMatrix para los datos de entrenamiento y prueba
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)

# Definir los parámetros del modelo
params = {
    'objective': 'binary:logistic',
    'max_depth': 3,
    'learning_rate': 0.05,
    'eval_metric': 'logloss'
}

# Entrenar el modelo
num_rounds = 100
model = xgb.train(params, dtrain, num_rounds)

# Realizar predicciones en el conjunto de prueba
y_pred = model.predict(dtest)


metricas_modelo(y_test,y_pred)

 brier: 10.16,
 confusion_m: 
[[2050   12]
 [ 240    5]],
 acc_neg: 99.42,
 acc_pos: 2.04,
 accuracy: 89.08


Usando metodología  basada en frecuencias