# Utils

In [89]:
import pandas as pd
from sklearn.model_selection import train_test_split, GridSearchCV, RandomizedSearchCV
from typing import Tuple, Any, Dict, List
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt


def factorize(df: pd.DataFrame, columns: List = None) -> pd.DataFrame:
    if columns is None:
        for column in df.columns:
            if df[column].dtype not in ["int64","float64"]:
                df[column] = df[column].factorize()[0]
    else:
        for column in columns:
            df[column] = df[column].factorize()[0]
    return df

def eliminate_column_na(df: pd.DataFrame, threshold: int) -> pd.DataFrame:
    threshold = threshold / 100
    threshold = threshold * len(df)
    df = df.dropna(axis=1, thresh=int(threshold))
    return df

def eliminate_row_na(df: pd.DataFrame, threshold: int) -> pd.DataFrame:
    threshold = 100 - threshold
    threshold = threshold / 100
    threshold = threshold * len(df.columns)
    df = df.dropna(axis=0, thresh=int(threshold))
    return df

def split_dataset(df: pd.DataFrame,target_column: str) -> (pd.DataFrame,pd.DataFrame):
    target_df = df[target_column]
    features_df = df.drop(columns=[target_column])
    return features_df,target_df

def join_dataframe_columns(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
    return df1.join(df2)

def join_dataframe_rows(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
    return pd.concat([df1, df2], ignore_index=True)

def read_csv_file(file) -> pd.DataFrame:
    try:
        return pd.read_csv(file)
    except FileNotFoundError:
        raise FileNotFoundError("Archivo no encontrado")
    except:
        raise ValueError("Error en la lectura del archivo")

def write_csv_file(df: pd.DataFrame, nombre_archivo: str = None, ruta: str = None):
    if nombre_archivo is None:
        raise TypeError("Nombre de archivo no especificado")
    if ruta is None:
        df.to_csv(nombre_archivo, index = False)
        return
    df.to_csv(f"{ruta}/{nombre_archivo}")

def train_test_validation_split(df: pd.DataFrame, target_column: str, test_size: float = 0.1, validation_size: float = 0.1) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    X, y = split_dataset(df, target_column)
    X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=test_size + validation_size, random_state=19)
    validation_size_proportioned = validation_size / (test_size + validation_size)
    X_test, X_validation, y_test, y_validation = train_test_split(X_temp, y_temp, test_size=validation_size_proportioned, random_state=19)

    return X_train, y_train, X_test, y_test, X_validation, y_validation
def model_best_parameters(model: Any, scoring: str = "recall", param_distributions: Dict = None ):
    print(scoring)
    random_search = RandomizedSearchCV(
        estimator=model,
        param_distributions=param_distributions,
        n_iter=1,
        cv=2,
        scoring=scoring,
        n_jobs=1,
        verbose=2,
        random_state=42
    )
    return random_search
def create_visualize_confusion_matrix(y_true: Any, y_pred: Any) -> None:
    cm = confusion_matrix(y_true,y_pred)
    plt.figure(figsize=(6, 4))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.show()
def create_visualize_classification_report(y_true: Any,y_pred: Any) -> None:
    report_dict = classification_report(y_true, y_pred, output_dict=True)
    df_report = pd.DataFrame(report_dict).transpose()
    plt.figure(figsize=(8, 4))
    sns.heatmap(df_report.iloc[:-3, :-1], annot=True, cmap='Blues')
    plt.title('Classification Report Heatmap')
    plt.show()




# Clean Stroke

In [90]:
def clean_stroke(df):
    smoke_map = {
    "never smoked": 0,
    "formerly smoked": 1,
    "smokes": 2,
    "Unknown": -1
    }
    married_map = {
        "Yes": 1,
        "No": 0
    }

    df = df.drop("id", axis = 1)
    df["bmi"] = df["bmi"].fillna(df["bmi"].median())
    df["ever_married"] = df["ever_married"].map(married_map)
    df["smoking_status"] = df["smoking_status"].map(smoke_map)
    df = factorize(df, ["gender", "Residence_type","work_type"])
    return df

# Clean Heart

In [91]:

def clean_heart(df):
    map_edades = {
        "18-24": 0,
        "25-29": 0,
        "30-34": 0,
        "35-39": 1,
        "40-44": 1,
        "45-49": 1,
        "50-54": 2,
        "55-59": 2,
        "60-64": 2,
        "65-69": 3,
        "70-74": 3,
        "75-79": 3,
        "80 or older": 3
    }
    map_diabetes = {
        "No": 0,
        "No, borderline diabetes": 1,
        "Yes (during pregnancy)": 2,
        "Yes": 3
    }
    map_genhealth = {
        "Excellent": 5,
        "Very good": 4,
        "Good": 3,
        "Fair": 2,
        "Poor": 1
    }
    map_yes_no = {
        "Yes": 1,
        "No": 0
    }
    columns = ["Sex", "Race"]
    df = eliminate_column_na(df, 30)
    df = eliminate_row_na(df, 20)
    df = df.drop(["PhysicalHealth", "MentalHealth"], axis=1)
    df["AgeCategory"] = df["AgeCategory"].map(map_edades)
    df["Diabetic"] = df["Diabetic"].map(map_diabetes)
    df["GenHealth"] = df["GenHealth"].map(map_genhealth)
    df["Smoking"] = df["Smoking"].map(map_yes_no)
    df["AlcoholDrinking"] = df["AlcoholDrinking"].map(map_yes_no)
    df["HeartDisease"] = df["HeartDisease"].map(map_yes_no)
    df["Stroke"] = df["Stroke"].map(map_yes_no)
    df["SkinCancer"] = df["SkinCancer"].map(map_yes_no)
    df["KidneyDisease"] = df["KidneyDisease"].map(map_yes_no)
    df["Asthma"] = df["Asthma"].map(map_yes_no)
    df["DiffWalking"] = df["DiffWalking"].map(map_yes_no)
    df = factorize(df, columns)
    return df




# Técnicas de Balanceo

In [92]:
def SMOTE_technique(X_train, y_train):
    from imblearn.over_sampling import SMOTE
    smote = SMOTE(random_state=50)
    X_train, y_train = smote.fit_resample(X_train, y_train)
    return X_train, y_train
def under_sampling_technique(X_train, y_train):
    from imblearn.under_sampling import RandomUnderSampler
    rus = RandomUnderSampler(sampling_strategy='auto', random_state=50)
    X_resampled, y_resampled = rus.fit_resample(X_train, y_train)
    return X_resampled, y_resampled
def over_sampling_technique(X_train, y_train):
    from imblearn.over_sampling import RandomOverSampler
    ros = RandomOverSampler(random_state=50)
    X_train, y_train = ros.fit_resample(X_train, y_train)
    return X_train, y_train
def smoteen_technique(X_train, y_train):
    from imblearn.combine import SMOTEENN
    smote_enn = SMOTEENN(random_state=50)
    X_train, y_train = smote_enn.fit_resample(X_train, y_train)
    return X_train, y_train
def smotetomek_technique(X_train, y_train):
    from imblearn.combine import SMOTETomek
    smote_tomek = SMOTETomek(random_state=50)
    X_train, y_train = smote_tomek.fit_resample(X_train, y_train)
    return X_train, y_train
def tomek_links_technique(X_train, y_train):
    from imblearn.under_sampling import TomekLinks
    tomek = TomekLinks(sampling_strategy='auto')
    X_resampled, y_resampled = tomek.fit_resample(X_train, y_train)
    return X_resampled, y_resampled
def tomek_links_smote_technique(X_train, y_train):
    from imblearn.combine import SMOTETomek
    smote_tomek = SMOTETomek(sampling_strategy='auto', random_state=50)
    X_train, y_train = smote_tomek.fit_resample(X_train, y_train)
    return X_train, y_train
def nothing(X_train, y_train):
    return X_train, y_train
lista_funciones = [under_sampling_technique, over_sampling_technique, SMOTE_technique, tomek_links_technique, tomek_links_smote_technique,nothing]

#Logistic Regression

In [93]:
import pandas as pd
from sklearn.linear_model import LogisticRegression as lr
from sklearn.metrics import classification_report, make_scorer, recall_score, fbeta_score,f1_score
import joblib
from typing import Dict
from sklearn.metrics import precision_recall_curve
import numpy as np


class LogisticRegression:
    def __init__(self, pos_label):
        self.pos_label = pos_label
        self.model = lr()
        self.trained = False

    def train(self, X_train: pd.DataFrame, y_train: pd.DataFrame, param_distributions: Dict ):
        self.trained = True
        scoring = make_scorer(f1_score, pos_label=self.pos_label)
        self.model = model_best_parameters(self.model, scoring = scoring, param_distributions=param_distributions)
        self.model.fit(X_train, y_train)
        self.model = self.model.best_estimator_

    def predict(self, X_val):
        if not self.trained:
            raise Exception("Model not trained")
        return self.model.predict(X_val)

    def predict_proba(self, X_val):
        if not self.trained:
            raise Exception("Model not trained")
        return self.model.predict_proba(X_val)

    def evaluate(self, X: pd.DataFrame = None, y_true: pd.DataFrame = None, y_pred: pd.DataFrame = None):
        if not self.trained:
            raise Exception("Model not trained")
        if y_true is None:
            raise ValueError("y set must be provided")
        if y_pred is None:
            if X is None:
                raise ValueError("y_pred set or X set must be provided")
            print("Prediciendo probabilidades...")
            y_proba = self.predict_proba(X)[:, 1]
            precision, recall, thresholds = precision_recall_curve(y_test, y_proba)
            f1_scores = 2 * (precision * recall) / (precision + recall + 1e-10)
            best_threshold = thresholds[np.argmax(f1_scores)]
            y_pred = (y_proba > best_threshold).astype(int)
        print(classification_report(y_true, y_pred))
        # create_visualize_confusion_matrix(y_true, y_pred)
        # create_visualize_classification_report(y_true, y_pred)

    def feature_importance(self, feature_names: pd.Index):
        if not self.trained:
            raise Exception("Model not trained")

        import numpy as np
        import pandas as pd

        coef = self.model.coef_[0]
        importance_df = pd.DataFrame({
            'Feature': feature_names,
            'Coefficient': coef,
            'Abs_Coefficient': np.abs(coef)
        }).sort_values(by='Abs_Coefficient', ascending=False)

        return importance_df[['Feature', 'Coefficient']]
    def save_model(self, filename: str):
        joblib.dump(self.model,f"trained_models/random_forest/{filename}.pkl")

    def load_model(self, filename: str):
        self.model = joblib.load(f"trained_models/random_forest/{filename}.pkl")


#Random Forest

In [94]:
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, make_scorer, recall_score, fbeta_score,f1_score
import joblib
from typing import Dict
from sklearn.metrics import precision_recall_curve
import numpy as np

class RandomForest:
    def __init__(self, pos_label):
        self.pos_label = pos_label
        self.model = RandomForestClassifier(random_state = 50)
        self.trained = False

    def train(self, X_train: pd.DataFrame, y_train: pd.DataFrame, param_distributions: Dict):
        self.trained = True
        scoring = make_scorer(f1_score,pos_label=self.pos_label)
        self.model = model_best_parameters(self.model,scoring = scoring, param_distributions = param_distributions)
        self.model.fit(X_train, y_train)
        self.model = self.model.best_estimator_
    def predict(self, X_val):
        if not self.trained:
            raise Exception("Model not trained")
        return self.model.predict(X_val)

    def predict_proba(self, X_val):
        if not self.trained:
            raise Exception("Model not trained")
        return self.model.predict_proba(X_val)

    def evaluate(self, X: pd.DataFrame = None, y_true: pd.DataFrame = None, y_pred: pd.DataFrame = None):
        if not self.trained:
            raise Exception("Model not trained")
        if y_true is None:
            raise ValueError("y set must be provided")
        if y_pred is None:
            if X is None:
                raise ValueError("y_pred set or X set must be provided")
            print("Prediciendo probabilidades...")
            y_proba = self.predict_proba(X)[:, 1]
            precision, recall, thresholds = precision_recall_curve(y_test, y_proba)
            f1_scores = 2 * (precision * recall) / (precision + recall + 1e-10)
            best_threshold = thresholds[np.argmax(f1_scores)]
            y_pred = (y_proba > best_threshold).astype(int)
        # create_visualize_confusion_matrix(y_true, y_pred)
        # create_visualize_classification_report(y_true,y_pred)
        print(classification_report(y_true, y_pred))
    def feature_importance(self, feature_names: pd.Index):
        if not self.trained:
            raise Exception("Model not trained")

        importances = self.model.feature_importances_

        importance_df = pd.DataFrame({
            'Feature': feature_names,
            'Importance': importances
        }).sort_values(by='Importance', ascending=False)

        return importance_df
    def save_model(self, filename: str) -> None:
        joblib.dump(self.model,f"trained_models/random_forest/{filename}.pkl")

    def load_model(self, filename: str) -> None:
        self.model = joblib.load(f"trained_models/random_forest/{filename}.pkl")
        self.trained = True

#XGBoost

In [95]:
import xgboost as xgb
import pandas as pd
from sklearn.metrics import classification_report, make_scorer, recall_score, f1_score
from typing import Dict
import joblib
from sklearn.metrics import precision_recall_curve
import numpy as np
class XGBoost:
    def __init__(self, pos_label):
        self.pos_label = pos_label
        self.model = xgb.XGBClassifier(random_state = 50)
        self.trained = False

    def train(self, X_train: pd.DataFrame, y_train: pd.DataFrame, param_distributions: Dict):
        self.trained = True
        scoring = make_scorer(f1_score, average = "binary")
        self.model = model_best_parameters(self.model, scoring=scoring, param_distributions=param_distributions)
        self.model.fit(X_train, y_train)
        self.model = self.model.best_estimator_
    def predict(self, X_val):
        if not self.trained:
            raise Exception("Model not trained")
        return self.model.predict(X_val)

    def predict_proba(self, X_val):
        if not self.trained:
            raise Exception("Model not trained")
        return self.model.predict_proba(X_val)

    def evaluate(self, X: pd.DataFrame = None, y_true: pd.DataFrame = None, y_pred: pd.DataFrame = None):
        if not self.trained:
            raise Exception("Model not trained")
        if y_true is None:
            raise ValueError("y set must be provided")
        if y_pred is None:
            if X is None:
                raise ValueError("y_pred set or X set must be provided")
            print("Prediciendo probabilidades...")
            y_proba = self.predict_proba(X)[:, 1]
            precision, recall, thresholds = precision_recall_curve(y_test, y_proba)
            f1_scores = 2 * (precision * recall) / (precision + recall + 1e-10)
            best_threshold = thresholds[np.argmax(f1_scores)]
            y_pred = (y_proba > best_threshold).astype(int)
        print(classification_report(y_true, y_pred))
        # create_visualize_confusion_matrix(y_true, y_pred)
        # create_visualize_classification_report(y_true, y_pred)

    def feature_importance(self, feature_names: pd.Index):
        if not self.trained:
            raise Exception("Model not trained")

        importances = self.model.feature_importances_

        importance_df = pd.DataFrame({
            'Feature': feature_names,
            'Importance': importances
        }).sort_values(by='Importance', ascending=False)

        return importance_df
    def save_model(self, filename: str) -> None:
        joblib.dump(self.model,f"trained_models/random_forest/{filename}.pkl")

    def load_model(self, filename: str) -> None:
        self.model = joblib.load(f"trained_models/random_forest/{filename}.pkl")
        self.trained = True

#Entrenamiento y evaluación

In [96]:
param_distributions_logistic = [
    {
        'penalty': ['l1'],
        'solver': ['liblinear'],
        'C': [0.001, 0.01, 0.1, 1, 10, 100],
        'max_iter': [100, 200, 300, 500]
    },
    {
        'penalty': ['l2'],
        'solver': ['liblinear', 'saga'],
        'C': [0.001, 0.01, 0.1, 1, 10, 100],
        'max_iter': [100, 200, 300, 500]
    },
    {
        'penalty': ['elasticnet'],
        'solver': ['saga'],
        'C': [0.001, 0.01, 0.1, 1, 10, 100],
        'l1_ratio': [0.0, 0.25, 0.5, 0.75, 1.0],
        'max_iter': [100, 200, 300, 500]
    }
]
param_distributions_random_forest = {
        "n_estimators": [100, 300, 500, 700, 1000],
        "max_depth": [None, 10, 20, 30, 40, 50],
        "min_samples_split": [2, 5, 10],
        "min_samples_leaf": [1, 2, 4],
        "max_features": ["sqrt", "log2"],
        "bootstrap": [True]
    }
param_distributions_xgboost = {
        "n_estimators": [100, 300, 500, 700],
        "learning_rate": [0.01, 0.05, 0.1, 0.2],
        "max_depth": [3, 5, 7, 10],
        "subsample": [0.6, 0.8, 1.0],
        "colsample_bytree": [0.6, 0.8, 1.0],
        "gamma": [0, 1, 3, 5],
        "reg_alpha": [0, 0.1, 0.5, 1],
        "reg_lambda": [0.5, 1, 2]
    }

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from typing import Tuple
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

for dataset, clean, target in zip([r"C:\Users\paumo\PycharmProjects\TFG\datasets\Stroke.csv", r"C:\Users\paumo\PycharmProjects\TFG\datasets\EnfermedadCorazon.csv"],[clean_stroke, clean_heart],["stroke","HeartDisease"]):
    df = read_csv_file(dataset)
    df = clean(df)
    X = df.drop(target, axis=1)
    y = df[target]
    cat_features = X.select_dtypes(include=['object', 'category']).columns.tolist()
    num_features = X.select_dtypes(include=['int64', 'float64']).columns.tolist()
    preprocessor = ColumnTransformer([
        ("num", StandardScaler(), num_features),
        ("cat", OneHotEncoder(drop='first', sparse_output=False, handle_unknown='ignore'), cat_features)
    ])
    X_encoded = preprocessor.fit_transform(X)

    X_train, X_test, y_train, y_test = train_test_split(X_encoded, y, stratify=y, test_size=0.2, random_state=50)
    for funcion in lista_funciones:
        X_train, X_test, y_train, y_test = train_test_split(X_encoded, y, stratify=y, test_size=0.2, random_state=50)
        X_train, y_train = funcion(X_train, y_train)
        weights = compute_class_weight(class_weight='balanced', classes=np.unique(y_train), y=y_train)
        class_weights = dict(zip(np.unique(y_train), weights))
        for model, param_distribution in zip([LogisticRegression(pos_label=1), RandomForest(pos_label=1), XGBoost(pos_label=1)],[param_distributions_logistic,param_distributions_random_forest, param_distributions_xgboost]):
            print(f"Dataset: {dataset}")
            print(f"Modelo: {model.__class__.__name__}")
            print(f"Función de muestreo: {funcion.__name__}")
            model.train(X_train, y_train, param_distribution)
            model.evaluate(X_test, y_test)



Dataset: C:\Users\paumo\PycharmProjects\TFG\datasets\Stroke.csv
Modelo: LogisticRegression
Función de muestreo: under_sampling_technique
make_scorer(f1_score, response_method='predict', pos_label=1)
Fitting 2 folds for each of 1 candidates, totalling 2 fits
[CV] END C=0.01, l1_ratio=0.5, max_iter=300, penalty=elasticnet, solver=saga; total time=   0.0s
[CV] END C=0.01, l1_ratio=0.5, max_iter=300, penalty=elasticnet, solver=saga; total time=   0.0s
Prediciendo probabilidades...
              precision    recall  f1-score   support

           0       0.97      0.91      0.94       972
           1       0.22      0.48      0.30        50

    accuracy                           0.89      1022
   macro avg       0.59      0.70      0.62      1022
weighted avg       0.93      0.89      0.91      1022

Dataset: C:\Users\paumo\PycharmProjects\TFG\datasets\Stroke.csv
Modelo: RandomForest
Función de muestreo: under_sampling_technique
make_scorer(f1_score, response_method='predict', pos_label=1