# "BinaryA" clasificación

In [None]:
import re
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.stats as st
import sklearn.ensemble as en
import sklearn.feature_selection as fs
import sklearn.linear_model as lm
import sklearn.metrics as mt
import sklearn.model_selection as ms
import sklearn.neighbors as ne
import sklearn.preprocessing as pp
import sklearn.svm as svm
import sklearn.tree as tr
import xgboost as xgb
import matplotlib.pyplot as plt
import seaborn as sns
import time as tm
import xgboost as xgb
from sklearn.dummy import DummyClassifier
from sklearn.pipeline import Pipeline

In [None]:
RANDOM_STATE=4+8+15+16+23+42
CROSS_VALIDATION_FOLDS=10#3,5,10
CROSS_VALIDATION_REPEATS=100#10, 100, 1000
HYPERPARAMETER_SEARCH_ITERATIONS=50#20, 50

In [None]:
# Cargar datos
subdatasets_file = "/content/drive/MyDrive/Documentos/7. educación/uniovi/master/tfm/data/BI_subconjuntos.txt"
with open(subdatasets_file, 'r') as f:
    content = f.read()
subsets = content.split('------------------------------------------------------------\n')
X_raw = pd.read_csv("/content/drive/MyDrive/Documentos/7. educación/uniovi/master/tfm/data/BinaryA_features.csv")
y_raw = pd.read_csv("/content/drive/MyDrive/Documentos/7. educación/uniovi/master/tfm/data/BinaryA_labels.csv")
y = y_raw.iloc[:, 1:].values.ravel() # Las etiquetas están en la segunda columna

In [None]:
# Preprocesamiento
original_feature_names = X_raw.columns.tolist()
pipe = Pipeline([
  ("feature_elimination", fs.VarianceThreshold()),
  ("scaler", pp.StandardScaler())
])
X_preprocessed = pipe.fit_transform(X_raw)
preprocessed_feature_names = pipe.get_feature_names_out(input_features=original_feature_names)
X = pd.DataFrame(X_preprocessed, columns=preprocessed_feature_names)

In [None]:
subset_values = {}
subset_dataframes = {}
added_subsets_features = {}

for i, subset in enumerate(subsets):
    # Dividir en líneas y filtrar las que tienen $ y &
    lines = subset.split('\n')
    feature_lines_raw = [line.split(' : ')[0].strip().lstrip('- ') for line in lines if line.strip() and not line.startswith('***') and '$' not in line and '&' not in line]
    feature_lines = [line for line in feature_lines_raw if line] #Líneas vacías

    # Buscar $ (nombre)
    match_dollar = re.search(r'\$(.*?)\$', subset)
    if match_dollar:
        subset_values[f"subset_{i+1}_value"] = match_dollar.group(1)
    else:
        subset_values[f"subset_{i+1}_value"] = None

    # Buscar & (tiempo)
    match_ampersand = re.search(r'&(.*?)&', subset)
    if match_ampersand:
        subset_values[f"subset_{i+1}_time"] = match_ampersand.group(1)
    else:
        subset_values[f"subset_{i+1}_time"] = None

    # Limpiar nombres
    if feature_lines and feature_lines[-1].endswith('...'):
        truncated_name = feature_lines[-1].replace('...', '').strip()
        matching_columns = [col for col in X.columns if col.startswith(truncated_name)]
        if matching_columns:
             feature_lines[-1] = matching_columns[0] # Usar la primera coincidencia
        else:
            print(f"No se encontró el nombre de la característica para la entrada truncada: {truncated_name}")

    # Comprobar que las características existen
    valid_feature_names = [name for name in feature_lines if name in X_raw.columns]
    if len(valid_feature_names) != len(feature_lines):
        missing = set(feature_lines) - set(valid_feature_names)
        missing_filtered = {item for item in missing if '$' not in item and '&' not in item}
        if missing_filtered:
            print(f"Las características del subconjunto {i+1} no se encontraron: {missing_filtered}")

    # Ordenar alfabéticamente
    valid_feature_names.sort()
    current_subset_features = tuple(valid_feature_names)

    # Comprobar si este subconjunto de características ya existe
    if current_subset_features not in added_subsets_features:
        subset_dataframes[f"subset_{i+1}"] = X_raw[valid_feature_names]
        added_subsets_features[current_subset_features] = f"subset_{i+1}"
        print(f"DataFrame creado para el subconjunto {i+1} ({subset_values.get(f'subset_{i+1}_value', 'N/A')}) con {len(valid_feature_names)} características.")
    else:
        original_subset_key = added_subsets_features[current_subset_features]
        original_subset_name = subset_values.get(f'{original_subset_key}_value', original_subset_key)
        current_subset_name = subset_values.get(f'subset_{i+1}_value', f'subset_{i+1}')
        combined_name = f"{original_subset_name} + {current_subset_name}" # Crear un nombre combinado

        # Renombrar el subconjunto original
        subset_dataframes[combined_name] = subset_dataframes.pop(original_subset_key)
        subset_values[f'{combined_name}_value'] = combined_name

        # Añadir el subconjunto actual con el mismo nombre combinado
        subset_dataframes[combined_name] = X_raw[valid_feature_names]
        added_subsets_features[current_subset_features] = combined_name

        print(f"Subconjunto {i+1} (Nombre: {current_subset_name}, Tiempo: {subset_values.get(f'subset_{i+1}_time', 'N/A')}) tiene las mismas características que el subconjunto original (Nombre: {original_subset_name}). Renombrado a: {combined_name}.")
        # Eliminar las entradas antiguas de valor y tiempo para el subconjunto actual
        subset_values.pop(f'subset_{i+1}_value', None)
        subset_values.pop(f'subset_{i+1}_time', None)

In [None]:
# Estrategias de validación cruzada
cv_strategies = [
    ms.RepeatedKFold(n_splits = CROSS_VALIDATION_FOLDS,
                     n_repeats = CROSS_VALIDATION_REPEATS,
                     random_state=RANDOM_STATE),
    ms.ShuffleSplit(n_splits = CROSS_VALIDATION_FOLDS,
                    random_state=RANDOM_STATE),
    ms.RepeatedStratifiedKFold(n_splits = CROSS_VALIDATION_FOLDS,
                               n_repeats = CROSS_VALIDATION_REPEATS,
                               random_state=RANDOM_STATE),
    ms.StratifiedShuffleSplit(n_splits = CROSS_VALIDATION_FOLDS,
                              random_state=RANDOM_STATE)
]


In [None]:
# Métricas
scorings = {
  "Exactitud": "accuracy",
  "Exactitud balanceada": "balanced_accuracy",
  "Precisión": mt.make_scorer(mt.precision_score, zero_division=np.nan),
  "Precisión balanceada": mt.make_scorer(mt.precision_score, average="weighted", zero_division=np.nan),
  "Sensibilidad": "recall",
  "Sensibilidad balanceada": mt.make_scorer(mt.recall_score, average="weighted"),
  "ROC-AUC": "roc_auc",
  "ROC-AUC balanceada": mt.make_scorer(mt.roc_auc_score, average="weighted")
  }

In [None]:
# Clasificadores
classifiers = [
    ("AdaBoost", en.AdaBoostClassifier()),
    ("ArbolesExtra", tr.ExtraTreeClassifier()),
    ("BosquesAleatorios", en.RandomForestClassifier()),
    ("Dummy", DummyClassifier(strategy="most_frequent")),
    ("KNeighbors", ne.KNeighborsClassifier()),
    ("SVCLineal", svm.SVC(probability=True)),
    ("SVCNoLineal", svm.SVC(kernel="rbf", probability=True)),
    ("XGBoost", xgb.XGBClassifier())
]

In [None]:
# Búsqueda de hiperparámetros
classifier_param_distributions = {
    "AdaBoost": {
        'AdaBoost__n_estimators': st.randint(50, 500),
        'AdaBoost__learning_rate': st.uniform(0.01, 1.0)
    },
    "ArbolesExtra": {
        'ArbolesExtra__max_depth': st.randint(3, 11)
    },
    "BosquesAleatorios": {
        'BosquesAleatorios__n_estimators': st.randint(100, 1000),
        'BosquesAleatorios__max_depth': st.randint(3, 11)
    },
    "Dummy": {},
    "KNeighbors": {
        'KNeighbors__n_neighbors': st.randint(1, 30)
    },
    "SVCLineal": {
        'SVCLineal__C': st.uniform(0.01, 10),
        'SVCLineal__gamma': st.uniform(0.01, 10)
    },
    "SVCNoLineal": {
        'SVCNoLineal__C': st.uniform(0.01, 10),
        'SVCNoLineal__gamma': st.uniform(0.01, 10)
    },
    "XGBoost": {
        'XGBoost__n_estimators': st.randint(100, 1000),
        'XGBoost__learning_rate': st.uniform(0.01, 0.29),
        'XGBoost__max_depth': st.randint(3, 11)
    }
}

In [None]:
all_metrics = []

for i, (subset_name, X_subset) in enumerate(subset_dataframes.items()):
    display_subset_name = subset_values.get(f'{subset_name}_value', 'N/A')
    print(f"--- Subconjunto {i+1}: {display_subset_name} ---")

    for clf_name, classifier in classifiers:
        print(f"--- Clasificador: {clf_name} ---")

        full_pipeline = Pipeline([
            ("feature_elimination", fs.VarianceThreshold()),
            ("scaler", pp.StandardScaler()),
            (clf_name, classifier)
        ])

        # Si hay hiperparámetros, realizar la búsqueda
        if clf_name in classifier_param_distributions:
            param_distributions = classifier_param_distributions[clf_name]

            for cv_strategy in cv_strategies:

                hyperparameter_search = ms.GridSearchCV(
                    estimator=full_pipeline,
                    param_distributions=param_distributions,
                    cv=cv_strategy,
                    n_iter=HYPERPARAMETER_SEARCH_ITERATIONS,
                    random_state=RANDOM_STATE,
                    scoring=scorings,
                    refit='ROC-AUC'
                )

                clf_best = hyperparameter_search.fit(X_subset, y)

                print("*** Iteraciones:", HYPERPARAMETER_SEARCH_ITERATIONS, "***")
                print("*** CV:", type(cv_strategy).__name__,
                      ", folds:", CROSS_VALIDATION_FOLDS,
                      ", repeticiones:", CROSS_VALIDATION_REPEATS, "***")

                print("Mejor combinación:")
                best_parameters = hyperparameter_search.best_estimator_.get_params()

                # Mostrar solo los hiperparámetros optimizados
                optimized_params = {param_name: best_parameters[param_name] for param_name in param_distributions.keys()}
                for param_name, param_value in optimized_params.items():
                  print(f"{param_name}: {param_value}")
                print("-" * 60)

                print("Métricas del mejor modelo (validación cruzada):")
                results = hyperparameter_search.cv_results_
                best_index = hyperparameter_search.best_index_

                metrics = {
                    'Subconjunto': display_subset_name,
                    'Clasificador': clf_name,
                    'Estrategia CV': type(cv_strategy).__name__,
                    'Iteraciones HS': HYPERPARAMETER_SEARCH_ITERATIONS,
                    'CV folds': CROSS_VALIDATION_FOLDS,
                    'CV repeats': CROSS_VALIDATION_REPEATS if isinstance(cv_strategy, ms.RepeatedStratifiedKFold) else 1,
                    'Exactitud': results['mean_test_Exactitud'][best_index],
                    'Exactitud balanceada': results['mean_test_Exactitud balanceada'][best_index],
                    'Precisión': results['mean_test_Precisión'][best_index],
                    'Precisión balanceada': results['mean_test_Precisión balanceada'][best_index],
                    'Sensibilidad': results['mean_test_Sensibilidad'][best_index],
                    'Sensibilidad balanceada': results['mean_test_Sensibilidad balanceada'][best_index],
                    'Especificidad': 2 * results['mean_test_Exactitud balanceada'][best_index] - results['mean_test_Sensibilidad'][best_index],
                    'ROC-AUC': results['mean_test_ROC-AUC'][best_index],
                    'ROC-AUC balanceada': results['mean_test_ROC-AUC balanceada'][best_index],
                    'Tiempo': results['mean_fit_time'][best_index],
                    'Mejores hiperparámetros': optimized_params
                }
                all_metrics.append(metrics)

                print(f"Exactitud              : {metrics['Exactitud']}")
                print(f"Exactitud balanceada   : {metrics['Exactitud balanceada']}")
                print(f"Precisión              : {metrics['Precisión']}")
                print(f"Precisión balanceada   : {metrics['Precisión balanceada']}")
                print(f"Sensibilidad           : {metrics['Sensibilidad']}")
                print(f"Sensibilidad balanceada: {metrics['Sensibilidad balanceada']}")
                print(f"Especificidad          : {metrics['Especificidad']}")
                print(f"ROC-AUC                : {metrics['ROC-AUC']}")
                print(f"ROC-AUC balanceada     : {metrics['ROC-AUC balanceada']}")
                print(f"Tiempo                 : {metrics['Tiempo']}")
                print("-" * 30)

        else: # Si no hay distribuciones, usar hiperparámetros por defecto
            print("Hiperparámetros por defecto...")
            for cv_strategy in cv_strategies:
                clf_default = full_pipeline

                scores = ms.cross_validate(
                    estimator=clf_default,
                    X=X_subset,
                    y=y,
                    cv=cv_strategy,
                    scoring=scorings,
                    return_train_score=False
                )

                print("*** CV:", type(cv_strategy).__name__,
                      ", folds:", CROSS_VALIDATION_FOLDS,
                      ", repeticiones:", CROSS_VALIDATION_REPEATS, "***")

                metrics = {
                    'Subconjunto': display_subset_name,
                    'Clasificador': clf_name,
                    'Estrategia CV': type(cv_strategy).__name__,
                    'Iteraciones HS': 'N/A',
                    'CV folds': CROSS_VALIDATION_FOLDS,
                    'CV repeats': CROSS_VALIDATION_REPEATS if isinstance(cv_strategy, ms.RepeatedStratifiedKFold) else 1,
                    'Exactitud': scores['test_Exactitud'].mean(),
                    'Exactitud blanceada': scores['test_Exactitud balanceada'].mean(),
                    'Precisión': scores['test_Precisión'].mean() if not np.isnan(scores['test_Precisión'].mean()) else 'N/A',
                    'Precisión balanceada': scores['test_Precisión balanceada'].mean() if not np.isnan(scores['test_Precisión balanceada'].mean()) else 'N/A',
                    'Sensibilidad': scores['test_Sensibilidad'].mean(),
                    'Sensibilidad balanceada': scores['test_Sensibilidad balanceada'].mean(),
                    'Especificidad': 2 * scores['test_Exactitud balanceada'].mean() - scores['test_Sensibilidad'].mean(),
                    'ROC-AUC': scores['test_ROC-AUC'].mean() if not np.isnan(scores['test_ROC-AUC'].mean()) else 'N/A',
                    'ROC-AUC balanceada': scores['test_ROC-AUC balanceada'].mean() if not np.isnan(scores['test_ROC-AUC balanceada'].mean()) else 'N/A',
                    'Tiempo': scores['fit_time'].mean(),
                    'Mejores hiperparámetros': 'N/A'
                }
                all_metrics.append(metrics)

                print(f"Exactitud              : {metrics['Exactitud']}")
                print(f"Exactitud balanceada   : {metrics['Exactitud balanceada']}")
                print(f"Precisión              : {metrics['Precisión']}")
                print(f"Precisión balanceada   : {metrics['Precisión balanceada']}")
                print(f"Sensibilidad           : {metrics['Sensibilidad']}")
                print(f"Sensibilidad balanceada: {metrics['Sensibilidad balanceada']}")
                print(f"Especificidad          : {metrics['Especificidad']}")
                print(f"ROC-AUC                : {metrics['ROC-AUC']}")
                print(f"ROC-AUC balanceada     : {metrics['ROC-AUC balanceada']}")
                print(f"Tiempo                 : {metrics['Tiempo']}")
                print("-" * 30)

        print("=" * 80)
    print("=" * 80)

metrics_df = pd.DataFrame(all_metrics)

In [None]:
# Ordenar subconjunto únicos alfabéticamente
sorted_subset_names = sorted(metrics_df['Subconjunto'].unique())

for i, subset_name in enumerate(sorted_subset_names):
    for cv_strategy_name in metrics_df['Estrategia CV'].unique():
        filtered_df = metrics_df[(metrics_df['Subconjunto'] == subset_name) & (metrics_df['Estrategia CV'] == cv_strategy_name)].copy()

        if not filtered_df.empty:
            columns_to_exclude = ['Subconjunto', 'Clasificador', 'Estrategia CV', 'Iteraciones HS',
                                 'CV folds', 'CV repeats', 'Tiempo', 'Mejores hiperparámetros']

            # Usar las columnas que no están en la lista de exclusión como métricas
            metric_columns = [col for col in filtered_df.columns if col not in columns_to_exclude]
            melted_df = filtered_df.melt(id_vars='Clasificador',
                                         value_vars=metric_columns,
                                         var_name='Métrica',
                                         value_name='Puntuación')
            heatmap_data = melted_df.pivot_table(index='Clasificador', columns='Métrica', values='Puntuación')

            # Comprobar si está vacío
            if heatmap_data.empty or heatmap_data.isnull().all().all():
                display_subset_name = subset_values.get(f'{subset_name}_value', subset_name)
                print(f"Saltando para el subconjunto '{display_subset_name}' y estrategia CV '{cv_strategy_name}' debido a que los datos están vacíos o contienen solo valores NaN.")
                continue

            # Usar .get() con el nombre del subconjunto como valor por defecto
            display_subset_name = subset_values.get(f'{subset_name}_value', subset_name)
            subset_time_key = f'{subset_name}_time'
            subset_time = subset_values.get(subset_time_key, 'N/A')

            plt.figure(figsize=(10, 6))
            sns.heatmap(heatmap_data, annot=True, fmt=".2f", cmap="viridis")
            plt.title(f'{display_subset_name}')
            plt.xlabel('Métrica')
            plt.ylabel('Clasificador')
            plt.tight_layout()
            plt.show()

In [None]:
columns_to_exclude = ['Subconjunto', 'Clasificador', 'Estrategia CV', 'Iteraciones HS',
                     'CV folds', 'CV repeats', 'Tiempo', 'Mejores hiperparámetros']
metric_columns = [col for col in metrics_df.columns if col not in columns_to_exclude]

for metric_name in metric_columns:
    for cv_strategy_name in metrics_df['Estrategia CV'].unique():
        filtered_df = metrics_df[(metrics_df['Estrategia CV'] == cv_strategy_name)].copy()

        if not filtered_df.empty:
            # Usar nombre como índice, Classifier como columnas y la métrica actual como valores
            heatmap_data = filtered_df.pivot_table(index='Subconjunto', columns='Clasificador', values=metric_name)

            subset_name_mapping = {name: subset_values.get(f'{name}_value', name) for name in heatmap_data.index}
            heatmap_data = heatmap_data.rename(index=subset_name_mapping)

            # Ordenar alfabéticamente las filas
            heatmap_data = heatmap_data.sort_index()

            plt.figure(figsize=(12, 8))
            sns.heatmap(heatmap_data, annot=True, fmt=".2f", cmap="viridis")
            plt.title(f'{metric_name}')
            plt.xlabel('Clasificador')
            plt.ylabel('Subconjunto')
            plt.tight_layout()
            plt.show()

In [None]:
# Obtener la lista de métricas
columns_to_exclude = ['Estrategia CV', 'Iteraciones HS',
                     'CV folds', 'CV repeats', 'Tiempo', 'Mejores hiperparámetros', 'Subconjunto', 'Clasificador']
metric_columns = [col for col in metrics_df.columns if col not in columns_to_exclude]

# Excluir el clasificador "Dummy"
filtered_metrics_df = metrics_df[metrics_df['Clasificador'] != 'Dummy'].copy()


# Agrupar por subconjunto para encontrar el mejor valor
best_scores_indices = filtered_metrics_df.groupby('Subconjunto')[metric_columns].idxmax()

best_scores = filtered_metrics_df.loc[best_scores_indices.stack()]
best_scores = best_scores.reset_index()
best_scores_melted = best_scores.melt(id_vars=['Subconjunto', 'Clasificador'],
                                      value_vars=metric_columns,
                                      var_name='Métrica',
                                      value_name='Mejor Puntuación')

best_scores_melted['Annotation'] = best_scores_melted.apply(lambda row: f"{row['Mejor Puntuación']:.2f}\n({row['Clasificador']})", axis=1)
heatmap_pivot_annotation = best_scores_melted.pivot_table(index='Subconjunto', columns='Métrica', values='Annotation', aggfunc='first') # Use 'first' to avoid joining multiple annotations
heatmap_pivot_values = best_scores_melted.pivot_table(index='Subconjunto', columns='Métrica', values='Mejor Puntuación')

# Mapear los nombres de los subconjuntos en el índice
subset_name_mapping = {name: subset_values.get(f'{name}_value', name) for name in heatmap_pivot_annotation.index}
heatmap_pivot_annotation = heatmap_pivot_annotation.rename(index=subset_name_mapping)
heatmap_pivot_values = heatmap_pivot_values.rename(index=subset_name_mapping)

# Ordenar alfabéticamente las filas
heatmap_pivot_annotation = heatmap_pivot_annotation.sort_index()
heatmap_pivot_values = heatmap_pivot_values.sort_index()

plt.figure(figsize=(22, 12))
sns.heatmap(heatmap_pivot_values, annot=heatmap_pivot_annotation, fmt="", cmap="viridis", annot_kws={"size": 12}) # Increase font size here
plt.title('Mejor valor y clasificador')
plt.xlabel('Métrica')
plt.ylabel('Subconjunto')
plt.tight_layout()
plt.show()

In [None]:
# Suma de los tiempos por clasificador
total_time_per_classifier = metrics_df.groupby('Clasificador')['Tiempo'].sum()

print("Suma total de tiempos:")
print(total_time_per_classifier)