In [None]:
#### CARGA DE DATOS



import numpy as np
import pandas as pd
import h5py
import matplotlib.pyplot as plt
import seaborn as sns
import os

# Configuración de estilo
sns.set(style="whitegrid")
plt.rcParams["figure.figsize"] = (12, 4)

# Cargar dataset HAR
train_file = "../data/train.h5"
test_file = "../data/test.h5"
labels_file = "../data/activity_labels.txt"

# Cargar etiquetas de actividades
activity_labels = pd.read_csv(labels_file, sep=" ", header=None, names=["id", "activity"])
print("Etiquetas de actividades:\n", activity_labels.to_string(index=False))

# Leer archivo H5 de entrenamiento
with h5py.File(train_file, "r") as f:
    channels = [
        np.array(f["body_acc_x"]),
        np.array(f["body_acc_y"]),
        np.array(f["body_acc_z"]),
        np.array(f["body_gyro_x"]),
        np.array(f["body_gyro_y"]),
        np.array(f["body_gyro_z"]),
        np.array(f["total_acc_x"]),
        np.array(f["total_acc_y"]),
        np.array(f["total_acc_z"]),
    ]
    y_train = np.array(f["y"]).astype(int)-1  # etiquetas

# Apilar para formar un único tensor (n_muestras, 128, 9)
X_train = np.stack(channels, axis=-1)
print("X_train shape:", X_train.shape)
print("y_train shape:", y_train.shape)

# print(np.min(y_train), np.max(y_train))

# n, t, c = X_train.shape   # n = n_muestras, t=128, c=9
# Xn = X_train.reshape(n, t * c)
# print("Xn shape:", Xn.shape)


#############################################################
#############################################################
import h5py

with h5py.File(test_file, "r") as f:
    print(list(f.keys()))

with h5py.File(train_file, "r") as f:
    print(list(f.keys()))

with h5py.File(test_file, "r") as f:
    channels_test = [
        np.array(f["body_acc_x"]),
        np.array(f["body_acc_y"]),
        np.array(f["body_acc_z"]),
        np.array(f["body_gyro_x"]),
        np.array(f["body_gyro_y"]),
        np.array(f["body_gyro_z"]),
        np.array(f["total_acc_x"]),
        np.array(f["total_acc_y"]),
        np.array(f["total_acc_z"]),
    ]
    #y_test = np.array(f["y"]).astype(int)-1  # etiquetas en 0..5

# Apilar canales → tensor (n_muestras, 128, 9)
X_testORI = np.stack(channels_test, axis=-1)
print("X_test shape:", X_testORI.shape)
#print("y_test shape:", y_test.shape)


Etiquetas de actividades:
  id           activity
  1            WALKING
  2   WALKING_UPSTAIRS
  3 WALKING_DOWNSTAIRS
  4            SITTING
  5           STANDING
  6             LAYING
X_train shape: (7352, 128, 9)
y_train shape: (7352,)
['body_acc_x', 'body_acc_y', 'body_acc_z', 'body_gyro_x', 'body_gyro_y', 'body_gyro_z', 'total_acc_x', 'total_acc_y', 'total_acc_z']
['body_acc_x', 'body_acc_y', 'body_acc_z', 'body_gyro_x', 'body_gyro_y', 'body_gyro_z', 'total_acc_x', 'total_acc_y', 'total_acc_z', 'y']
X_test shape: (2947, 128, 9)


In [None]:




import numpy as np
import pandas as pd
from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score

def train_and_evaluate_decision_tree(X, y,
                                     test_size=0.2,
                                     random_state=9,
                                     max_depth=5,
                                     min_samples_split=2,
                                     min_samples_leaf=1,
                                     max_features='all',   # 'all' | 'sqrt' | int
                                     criterion='gini',     # only 'gini' implemented
                                     verbose=True):
    """
    Implementación didáctica de Decision Tree (CART, criterio Gini).
    X: ndarray (n_samples, n_features) o DataFrame
    y: ndarray (n_samples,)
    Si test_size is None -> no se hace split (usa X,y como dataset completo para entrenar y devuelve árbol).
    Devuelve dict con árbol, splits y métricas sobre test (si existiera).
    """
    rng = np.random.RandomState(random_state)

    # --- Helpers ---
    def gini(y_subset):
        if len(y_subset) == 0:
            return 0.0
        counts = np.array(list(Counter(y_subset).values()), dtype=float)
        p = counts / counts.sum()
        return 1.0 - np.sum(p**2)

    def weighted_impurity(left, right):
        nL, nR = len(left), len(right)
        n = nL + nR
        if n == 0: return 0.0
        return (nL / n) * gini(left) + (nR / n) * gini(right)

    def candidate_thresholds(col):
        vals = np.unique(col)
        if vals.shape[0] <= 10:
            return (vals[:-1] + vals[1:]) / 2.0
        # muchos valores: muestreamos percentiles para velocidad y evitar demasiados thresholds
        pct = np.linspace(0, 100, num=11)[1:-1]  # 9 percentiles
        qs = np.percentile(col, pct)
        return np.unique(qs)

    def best_split(X_arr, y_arr):
        best = {'feature_idx': None, 'threshold': None, 'imp': float('inf')}
        n_features = X_arr.shape[1]

        # seleccionar features segun max_features
        if max_features == 'all':
            feats = range(n_features)
        elif max_features == 'sqrt':
            k = max(1, int(np.sqrt(n_features)))
            feats = rng.choice(range(n_features), size=k, replace=False)
        elif isinstance(max_features, int):
            k = min(max_features, n_features)
            feats = rng.choice(range(n_features), size=k, replace=False)
        else:
            feats = range(n_features)

        for fi in feats:
            col = X_arr[:, fi]
            thr_candidates = candidate_thresholds(col)
            for thr in thr_candidates:
                left_mask = col <= thr
                right_mask = col > thr
                if left_mask.sum() < min_samples_leaf or right_mask.sum() < min_samples_leaf:
                    continue
                imp = weighted_impurity(y_arr[left_mask], y_arr[right_mask])
                if imp < best['imp']:
                    best = {'feature_idx': fi, 'threshold': thr, 'imp': imp}
        return best

    def majority_class(y_subset):
        return Counter(y_subset).most_common(1)[0][0]

    # Recursiva
    def build_tree(X_arr, y_arr, depth=0):
        n = len(y_arr)
        classes = np.unique(y_arr)
        # condiciones de paro
        if (n < min_samples_split) or (depth >= max_depth) or (len(classes) == 1):
            return {'type': 'leaf', 'class': int(majority_class(y_arr)), 'n': int(n)}
        split = best_split(X_arr, y_arr)
        if split['feature_idx'] is None:
            return {'type': 'leaf', 'class': int(majority_class(y_arr)), 'n': int(n)}

        fi, thr = split['feature_idx'], split['threshold']
        left_mask = X_arr[:, fi] <= thr
        right_mask = X_arr[:, fi] > thr

        # proteger recursión infinita
        if left_mask.sum() == 0 or right_mask.sum() == 0:
            return {'type': 'leaf', 'class': int(majority_class(y_arr)), 'n': int(n)}

        left = build_tree(X_arr[left_mask], y_arr[left_mask], depth + 1)
        right = build_tree(X_arr[right_mask], y_arr[right_mask], depth + 1)
        return {'type': 'node',
                'feature_idx': int(fi),
                'threshold': float(thr),
                'left': left,
                'right': right,
                'n': int(n)}

    def predict_single(node, x_row):
        while node['type'] != 'leaf':
            if x_row[node['feature_idx']] <= node['threshold']:
                node = node['left']
            else:
                node = node['right']
        return int(node['class'])

    def predict(tree, X_arr):
        return np.array([predict_single(tree, row) for row in X_arr], dtype=int)

    # ---- Preparar datos ----
    if isinstance(X, pd.DataFrame):
        feature_names = X.columns.tolist()
        X_arr = X.values
    else:
        X_arr = np.array(X)
        feature_names = [f'x{i}' for i in range(X_arr.shape[1])]

    y_arr = np.array(y).astype(int)

    # Validar que y no tenga valores negativos o no enteros
    if np.any(y_arr < 0):
        raise ValueError("y contiene etiquetas negativas, revisar el encoding de clases.")

    # split train/test
    if test_size is None:
        X_tr, y_tr = X_arr, y_arr
        X_te, y_te = None, None
    else:
        X_tr, X_te, y_tr, y_te = train_test_split(
            X_arr, y_arr, test_size=test_size,
            stratify=y_arr if len(np.unique(y_arr))>1 else None,
            random_state=random_state)

    # construir árbol
    tree = build_tree(X_tr, y_tr)

    # predecir sobre test si existe
    if X_te is not None:
        y_pred_test = predict(tree, X_te)
        cm = confusion_matrix(y_te, y_pred_test)
        f1 = f1_score(y_te, y_pred_test, average='weighted', zero_division=0)
        prec = precision_score(y_te, y_pred_test, average='weighted', zero_division=0)
        rec = recall_score(y_te, y_pred_test, average='weighted', zero_division=0)
    else:
        y_pred_test = None
        cm = None; f1 = None; prec = None; rec = None

    if verbose:
        print("Decision Tree (impl. propia) — métricas (test):")
        print("max_depth:", max_depth, "min_samples_split:", min_samples_split,
              "min_samples_leaf:", min_samples_leaf, "max_features:", max_features)
        if cm is not None:
            print("Confusion matrix:\n", cm)
            print(f"F1 (weighted): {f1:.4f}  Precision: {prec:.4f}  Recall: {rec:.4f}")
        else:
            print("No test split provided (test_size=None).")

    return {
        'tree': tree,
        'feature_names': feature_names,
        'X_train': X_tr, 'X_test': X_te, 'y_train': y_tr, 'y_test': y_te,
        'y_pred_test': y_pred_test,
        'confusion_matrix': cm,
        'f1': f1, 'precision': prec, 'recall': rec
    }

def predict_from_tree(tree, X_arr):
    """Predict robusto sobre un árbol construido por la función anterior."""
    def predict_one(node, x_row):
        while node['type'] != 'leaf':
            if x_row[node['feature_idx']] <= node['threshold']:
                node = node['left']
            else:
                node = node['right']
        return int(node['class'])
    return np.array([predict_one(tree, row) for row in X_arr], dtype=int)


## Decision Tree Optimizado

In [None]:
import numpy as np
import pandas as pd
from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score, accuracy_score
from scipy import stats
import warnings
warnings.filterwarnings('ignore')

def extract_time_domain_features(X_sequence):
    """
    Extrae características del dominio temporal para cada canal de la secuencia
    X_sequence: (128, 9) - 128 timesteps, 9 canales
    Returns: array de características (1D)
    """
    features = []
    
    for channel in range(X_sequence.shape[1]):  # Para cada canal (9)
        signal = X_sequence[:, channel]
        
        # Características básicas
        mean = np.mean(signal)
        std = np.std(signal)
        var = np.var(signal)
        rms = np.sqrt(np.mean(signal**2))
        
        # Características de forma
        skewness = stats.skew(signal)
        kurt = stats.kurtosis(signal)
        
        # Valores extremos
        max_val = np.max(signal)
        min_val = np.min(signal)
        peak_to_peak = max_val - min_val
        
        # Energía y potencia
        energy = np.sum(signal**2)
        
        # Características estadísticas adicionales
        median = np.median(signal)
        mad = np.mean(np.abs(signal - mean))  # mean absolute deviation
        
        # Percentiles
        p25, p50, p75 = np.percentile(signal, [25, 50, 75])
        iqr = p75 - p25
        
        features.extend([mean, std, var, rms, skewness, kurt, 
                        max_val, min_val, peak_to_peak, energy,
                        median, mad, p25, p50, p75, iqr])
    
    return np.array(features)

def extract_frequency_domain_features(X_sequence):
    """
    Extrae características del dominio frecuencial para cada canal
    """
    features = []
    
    for channel in range(X_sequence.shape[1]):
        signal = X_sequence[:, channel]
        
        # Transformada rápida de Fourier
        fft_vals = np.fft.fft(signal)
        fft_magnitude = np.abs(fft_vals)[:len(fft_vals)//2]  # Solo frecuencias positivas
        
        if len(fft_magnitude) > 0:
            # Características espectrales
            spectral_centroid = np.sum(np.arange(len(fft_magnitude)) * fft_magnitude) / np.sum(fft_magnitude)
            spectral_energy = np.sum(fft_magnitude**2)
            spectral_entropy = -np.sum(fft_magnitude * np.log2(fft_magnitude + 1e-12))
            
            # Frecuencias dominantes
            dominant_freq1 = np.argmax(fft_magnitude)
            max_magnitude = np.max(fft_magnitude)
            
            features.extend([spectral_centroid, spectral_energy, spectral_entropy, 
                            dominant_freq1, max_magnitude])
        else:
            features.extend([0, 0, 0, 0, 0])
    
    return np.array(features)

def create_engineered_features(X_original):
    """
    Convierte tensor (n, 128, 9) a (n, n_features_engineered)
    """
    n_samples = X_original.shape[0]
    all_features = []
    
    print("Extrayendo características de ingeniería de features...")
    for i in range(n_samples):
        sequence = X_original[i]  # (128, 9)
        
        # Características temporales
        time_features = extract_time_domain_features(sequence)
        
        # Características frecuenciales
        freq_features = extract_frequency_domain_features(sequence)
        
        # Combinar todas las características
        combined_features = np.concatenate([time_features, freq_features])
        all_features.append(combined_features)
        
        if i % 1000 == 0:
            print(f"Procesadas {i}/{n_samples} muestras...")
    
    return np.array(all_features)

def improved_decision_tree(X, y,
                          test_size=0.2,
                          random_state=9,
                          max_depth=15,  # Aumentado para datos más complejos
                          min_samples_split=20,  # Aumentado para evitar overfitting
                          min_samples_leaf=10,   # Aumentado para evitar overfitting
                          max_features='sqrt',
                          criterion='gini',
                          verbose=True):
    """
    Versión mejorada del Decision Tree para datos de sensores
    """
    rng = np.random.RandomState(random_state)

    def gini(y_subset):
        if len(y_subset) == 0:
            return 0.0
        counts = np.array(list(Counter(y_subset).values()), dtype=float)
        p = counts / counts.sum()
        return 1.0 - np.sum(p**2)

    def weighted_impurity(left, right):
        nL, nR = len(left), len(right)
        n = nL + nR
        if n == 0: return 0.0
        return (nL / n) * gini(left) + (nR / n) * gini(right)

    def candidate_thresholds(col):
        vals = np.unique(col)
        if len(vals) <= 20:
            return (vals[:-1] + vals[1:]) / 2.0
        # Para muchas características, usar percentiles
        pct = np.linspace(5, 95, num=10)  # 10 percentiles entre 5% y 95%
        qs = np.percentile(col, pct)
        return np.unique(qs)

    def best_split(X_arr, y_arr):
        best = {'feature_idx': None, 'threshold': None, 'imp': float('inf')}
        n_features = X_arr.shape[1]

        # Selección de features
        if max_features == 'all':
            feats = range(n_features)
        elif max_features == 'sqrt':
            k = max(1, int(np.sqrt(n_features)))
            feats = rng.choice(range(n_features), size=k, replace=False)
        elif isinstance(max_features, int):
            k = min(max_features, n_features)
            feats = rng.choice(range(n_features), size=k, replace=False)
        else:
            feats = range(n_features)

        for fi in feats:
            col = X_arr[:, fi]
            if np.std(col) < 1e-8:  # Saltar características constantes
                continue
                
            thr_candidates = candidate_thresholds(col)
            for thr in thr_candidates:
                left_mask = col <= thr
                right_mask = col > thr
                
                if left_mask.sum() < min_samples_leaf or right_mask.sum() < min_samples_leaf:
                    continue
                    
                imp = weighted_impurity(y_arr[left_mask], y_arr[right_mask])
                if imp < best['imp']:
                    best = {'feature_idx': fi, 'threshold': thr, 'imp': imp}
        return best

    def majority_class(y_subset):
        return Counter(y_subset).most_common(1)[0][0]

    def build_tree(X_arr, y_arr, depth=0):
        n = len(y_arr)
        classes = np.unique(y_arr)
        
        # Condiciones de parada más robustas
        if (n < min_samples_split) or (depth >= max_depth) or (len(classes) == 1):
            return {'type': 'leaf', 'class': int(majority_class(y_arr)), 'n': int(n)}
            
        split = best_split(X_arr, y_arr)
        if split['feature_idx'] is None:
            return {'type': 'leaf', 'class': int(majority_class(y_arr)), 'n': int(n)}

        fi, thr = split['feature_idx'], split['threshold']
        left_mask = X_arr[:, fi] <= thr
        right_mask = X_arr[:, fi] > thr

        if left_mask.sum() == 0 or right_mask.sum() == 0:
            return {'type': 'leaf', 'class': int(majority_class(y_arr)), 'n': int(n)}

        left = build_tree(X_arr[left_mask], y_arr[left_mask], depth + 1)
        right = build_tree(X_arr[right_mask], y_arr[right_mask], depth + 1)
        
        return {'type': 'node',
                'feature_idx': int(fi),
                'threshold': float(thr),
                'left': left,
                'right': right,
                'n': int(n)}

    def predict_single(node, x_row):
        while node['type'] != 'leaf':
            if x_row[node['feature_idx']] <= node['threshold']:
                node = node['left']
            else:
                node = node['right']
        return int(node['class'])

    def predict(tree, X_arr):
        return np.array([predict_single(tree, row) for row in X_arr], dtype=int)

    # ---- Preparar datos con feature engineering ----
    print("Aplicando feature engineering...")
    X_engineered = create_engineered_features(X) if len(X.shape) == 3 else X
    y_arr = np.array(y).astype(int)
    
    print(f"Shape después de feature engineering: {X_engineered.shape}")
    
    if isinstance(X_engineered, pd.DataFrame):
        feature_names = X_engineered.columns.tolist()
        X_arr = X_engineered.values
    else:
        X_arr = np.array(X_engineered)
        feature_names = [f'feature_{i}' for i in range(X_arr.shape[1])]

    # Split train/test
    if test_size is None:
        X_tr, y_tr = X_arr, y_arr
        X_te, y_te = None, None
    else:
        X_tr, X_te, y_tr, y_te = train_test_split(
            X_arr, y_arr, test_size=test_size,
            stratify=y_arr,
            random_state=random_state)

    # Construir árbol
    print("Entrenando árbol de decisión...")
    tree = build_tree(X_tr, y_tr)

    # Métricas
    if X_te is not None:
        y_pred_test = predict(tree, X_te)
        accuracy = accuracy_score(y_te, y_pred_test)
        cm = confusion_matrix(y_te, y_pred_test)
        f1 = f1_score(y_te, y_pred_test, average='weighted', zero_division=0)
        prec = precision_score(y_te, y_pred_test, average='weighted', zero_division=0)
        rec = recall_score(y_te, y_pred_test, average='weighted', zero_division=0)
    else:
        y_pred_test = None
        accuracy = None; cm = None; f1 = None; prec = None; rec = None

    if verbose:
        print("\n" + "="*50)
        print("DECISION TREE MEJORADO - RESULTADOS")
        print("="*50)
        print(f"Dimensiones: {X_arr.shape}")
        print(f"Hiperparámetros: max_depth={max_depth}, min_samples_split={min_samples_split}")
        print(f"                min_samples_leaf={min_samples_leaf}, max_features={max_features}")
        
        if cm is not None:
            print(f"\nExactitud (Accuracy): {accuracy:.4f}")
            print(f"F1-Score (weighted): {f1:.4f}")
            print(f"Precision (weighted): {prec:.4f}")
            print(f"Recall (weighted): {rec:.4f}")
            print(f"\nMatriz de Confusión:\n{cm}")
        else:
            print("No se realizó división train/test (test_size=None)")

    return {
        'tree': tree,
        'feature_names': feature_names,
        'X_engineered': X_engineered,
        'X_train': X_tr, 'X_test': X_te, 'y_train': y_tr, 'y_test': y_te,
        'y_pred_test': y_pred_test,
        'confusion_matrix': cm,
        'accuracy': accuracy,
        'f1': f1, 'precision': prec, 'recall': rec
    }

def predict_from_tree(tree, X_arr):
    """Predict robusto para el árbol mejorado"""
    def predict_one(node, x_row):
        while node['type'] != 'leaf':
            if x_row[node['feature_idx']] <= node['threshold']:
                node = node['left']
            else:
                node = node['right']
        return int(node['class'])
    
    return np.array([predict_one(tree, row) for row in X_arr], dtype=int)

# CÓDIGO PRINCIPAL MEJORADO
print("=== PROCESAMIENTO DE DATOS HAR ===")

# 1. Preparar datos de entrenamiento con feature engineering
print("Preparando datos de entrenamiento...")
X_train_engineered = create_engineered_features(X_train)
y_train_processed = y_train

print(f"Train features shape: {X_train_engineered.shape}")
print(f"Train labels shape: {y_train_processed.shape}")

# 2. Entrenar modelo mejorado
print("\nEntrenando modelo mejorado...")
res = improved_decision_tree(
    X_train,  # El feature engineering se hace internamente
    y_train_processed,
    test_size=0.2,
    random_state=42,
    max_depth=20,
    min_samples_split=15,
    min_samples_leaf=8,
    max_features='sqrt',
    verbose=True
)

# 3. Preparar datos de test para predicción final
print("\nPreparando datos de test...")
X_test_engineered = create_engineered_features(X_testORI)
print(f"Test features shape: {X_test_engineered.shape}")

# 4. Predecir en test externo
print("Realizando predicciones en test externo...")
y_pred_test_final = predict_from_tree(res['tree'], X_test_engineered)

# 5. Guardar resultados para Kaggle
print("Guardando resultados...")
results = pd.DataFrame({
    'ID': np.arange(1, len(y_pred_test_final) + 1),
    'Prediction': y_pred_test_final + 1  # Convertir 0-5 back to 1-6
})

results.to_csv("KaggleUpload_improved.csv", index=False)
print(f"Predicciones guardadas en KaggleUpload_improved.csv")
print(f"Shape del archivo de resultados: {results.shape}")

# 6. Mostrar distribución de predicciones
print("\nDistribución de predicciones:")
pred_counts = pd.Series(y_pred_test_final + 1).value_counts().sort_index()
for pred, count in pred_counts.items():
    activity_name = activity_labels[activity_labels['id'] == pred]['activity'].values[0]
    print(f"  Clase {pred} ({activity_name}): {count} muestras")

print("\n¡Proceso completado!")

## Decision Tree con hiperparámetros sin optimizar

In [None]:
# tu partición propuesta:

Xn = X_train.reshape(X_train.shape[0], -1)
yn= y_train
Xn_test  = X_testORI.reshape(X_testORI.shape[0], -1)


# Entrenar árbol en X_train_final
res = train_and_evaluate_decision_tree(
    Xn, yn,
    test_size=0.2,            # aquí si quieres que la función cree su propio test interno, o usa None para no dividir
    random_state=20,
    max_depth=8,
    min_samples_split=10,
    min_samples_leaf=4,
    max_features='sqrt',      # suele ayudar a generalizar
    verbose=True
)

# Evaluar en tu test externo (X_test) con predict_from_tree
y_pred_test_ext = predict_from_tree(res['tree'], X_test)
cm_ext = confusion_matrix(y_test, y_pred_test_ext)
f1_ext = f1_score(y_test, y_pred_test_ext, average='weighted')
prec_ext = precision_score(y_test, y_pred_test_ext, average='weighted', zero_division=0)
rec_ext = recall_score(y_test, y_pred_test_ext, average='weighted', zero_division=0)

print("=== Evaluación en tu TEST externo ===")
print("Confusion matrix:\n", cm_ext)
print(f"F1: {f1_ext:.4f}  Precision: {prec_ext:.4f}  Recall: {rec_ext:.4f}")


Decision Tree (impl. propia) — métricas (test):
max_depth: 8 min_samples_split: 10 min_samples_leaf: 4 max_features: sqrt
Confusion matrix:
 [[139  55  40   0  11   0]
 [ 41 143  23   1   7   0]
 [ 45  34 110   4   4   0]
 [  3   0   1 229  24   0]
 [  6   0   2  29 238   0]
 [  0   0   0   2   0 280]]
F1 (weighted): 0.7727  Precision: 0.7721  Recall: 0.7743
=== Evaluación en tu TEST externo ===
Confusion matrix:
 [[173  44  18   0  10   0]
 [ 30 156  18   0  11   0]
 [ 35  30 125   1   6   0]
 [  6   0   0 230  21   0]
 [  7   0   2  23 243   0]
 [  0   0   0   2   0 280]]
F1: 0.8199  Precision: 0.8214  Recall: 0.8205


## Optimizador de hiperparámetros del Decision Tree

In [9]:
from itertools import product
import numpy as np
import pandas as pd
from sklearn.metrics import f1_score, precision_score, recall_score, confusion_matrix
from time import time

def optimize_dt_hyperparams(X_train_final, y_train_final, X_val, y_val,
                            param_grid = None,
                            scoring = 'f1_weighted',
                            random_state=42,
                            verbose=True):
    """
    Grid search sobre hiperparámetros para el Decision Tree implementado.
    Entrena con X_train_final/y_train_final (sin split interno) y evalúa en X_val/y_val.
    Devuelve dict con best_params, best_tree y DataFrame con resultados.
    """
    rng = np.random.RandomState(random_state)

    if param_grid is None:
        # grilla por defecto razonable (no muy grande)
        param_grid = {
            'max_depth':        [6, 8, 10],
            'min_samples_split':[2, 5, 10],
            'min_samples_leaf': [1, 2, 4],
            'max_features':     ['all', 'sqrt', 10],
        }

    # helper: predecir desde la estructura del árbol
    def predict_from_tree_local(tree, X):
        def one(node, x):
            while node['type'] != 'leaf':
                node = node['left'] if x[node['feature_idx']] <= node['threshold'] else node['right']
            return int(node['class'])
        return np.array([one(tree, np.asarray(x)) for x in X], dtype=int)

    results = []
    best_score = -np.inf
    best_params = None
    best_tree = None

    combos = list(product(param_grid['max_depth'],
                          param_grid['min_samples_split'],
                          param_grid['min_samples_leaf'],
                          param_grid['max_features']))

    if verbose:
        print(f"Grid search: {len(combos)} combinaciones. Empezando...")

    t0 = time()
    for idx, (md, mss, msl, mf) in enumerate(combos, start=1):
        t_iter = time()
        # Entrenar en todo X_train_final (train_and_evaluate_decision_tree con test_size=None devuelve árbol)
        res = train_and_evaluate_decision_tree(
            X_train_final, y_train_final,
            test_size=None,
            verbose=False,
            random_state=random_state,
            max_depth=md,
            min_samples_split=mss,
            min_samples_leaf=msl,
            max_features=mf
        )
        tree = res['tree']
        val_pred = predict_from_tree_local(tree, np.array(X_val))
        val_f1 = f1_score(y_val, val_pred, average='weighted', zero_division=0)

        results.append({
            'max_depth': md,
            'min_samples_split': mss,
            'min_samples_leaf': msl,
            'max_features': mf,
            'val_f1': val_f1
        })

        # actualizar mejor
        if val_f1 > best_score + 1e-12:
            best_score = val_f1
            best_params = {'max_depth': md, 'min_samples_split': mss,
                           'min_samples_leaf': msl, 'max_features': mf}
            best_tree = tree

        if verbose:
            print(f"[{idx}/{len(combos)}] md={md} mss={mss} msl={msl} mf={mf} -> val_f1={val_f1:.4f}  (iter time {time()-t_iter:.1f}s)")

    total_time = time() - t0
    if verbose:
        print(f"Grid search terminado en {total_time:.1f}s. Mejor f1={best_score:.4f} con {best_params}")

    results_df = pd.DataFrame(results).sort_values('val_f1', ascending=False).reset_index(drop=True)
    return {'best_params': best_params, 'best_tree': best_tree, 'results_df': results_df}


# función predict_from_tree pública (si no la tienes en el scope)
def predict_from_tree(tree, X_arr):
    """Predict robusto sobre un árbol construido por train_and_evaluate_decision_tree."""
    def predict_one(node, x_row):
        while node['type'] != 'leaf':
            if x_row[node['feature_idx']] <= node['threshold']:
                node = node['left']
            else:
                node = node['right']
        return int(node['class'])
    return np.array([predict_one(tree, row) for row in X_arr], dtype=int)


In [10]:
# ------------------------------
# EJEMPLO DE USO (reemplaza variables si hace falta)
# ------------------------------
# Asumiendo que tienes:
# X_train_final, y_train_final, X_val, y_val, X_test, y_test
# y que train_and_evaluate_decision_tree está definida en el entorno.

# Ejecutar búsqueda:
opt = optimize_dt_hyperparams(X_train_final, y_train_final, X_val, y_val,
                              param_grid=None,   # usa la grilla por defecto (pequeña)
                              verbose=True)

print("\nMejores hiperparámetros encontrados:")
print(opt['best_params'])

# Evaluar en test externo
best_tree = opt['best_tree']
y_pred_test = predict_from_tree(best_tree, np.array(X_test))

cm = confusion_matrix(y_test, y_pred_test)
f1 = f1_score(y_test, y_pred_test, average='weighted', zero_division=0)
prec = precision_score(y_test, y_pred_test, average='weighted', zero_division=0)
rec = recall_score(y_test, y_pred_test, average='weighted', zero_division=0)

print("\n=== Evaluación en TEST externo ===")
print("Confusion matrix:\n", cm)
print(f"F1 (weighted): {f1:.4f}  Precision: {prec:.4f}  Recall: {rec:.4f}")

Grid search: 81 combinaciones. Empezando...
[1/81] md=6 mss=2 msl=1 mf=all -> val_f1=0.7094  (iter time 69.1s)
[2/81] md=6 mss=2 msl=1 mf=sqrt -> val_f1=0.7036  (iter time 2.4s)
[3/81] md=6 mss=2 msl=1 mf=10 -> val_f1=0.7160  (iter time 1.1s)
[4/81] md=6 mss=2 msl=2 mf=all -> val_f1=0.7094  (iter time 66.1s)
[5/81] md=6 mss=2 msl=2 mf=sqrt -> val_f1=0.7028  (iter time 2.2s)
[6/81] md=6 mss=2 msl=2 mf=10 -> val_f1=0.7051  (iter time 0.7s)
[7/81] md=6 mss=2 msl=4 mf=all -> val_f1=0.7064  (iter time 74.1s)
[8/81] md=6 mss=2 msl=4 mf=sqrt -> val_f1=0.7041  (iter time 2.0s)
[9/81] md=6 mss=2 msl=4 mf=10 -> val_f1=0.7042  (iter time 0.7s)
[10/81] md=6 mss=5 msl=1 mf=all -> val_f1=0.7094  (iter time 65.7s)
[11/81] md=6 mss=5 msl=1 mf=sqrt -> val_f1=0.7061  (iter time 2.1s)
[12/81] md=6 mss=5 msl=1 mf=10 -> val_f1=0.7035  (iter time 0.8s)
[13/81] md=6 mss=5 msl=2 mf=all -> val_f1=0.7094  (iter time 68.4s)
[14/81] md=6 mss=5 msl=2 mf=sqrt -> val_f1=0.7061  (iter time 1.9s)
[15/81] md=6 mss=5 ms