# Modelo KNN para intencion de voto
Sube `voter_intentions_3000.csv` (Files > Upload) y ejecuta las celdas.

In [None]:
!pip install -q pandas==2.2.2 scikit-learn==1.4.2 matplotlib==3.8.4 joblib==1.4.2

In [None]:
import json
from pathlib import Path
import joblib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.experimental import enable_iterative_imputer  # noqa: F401
from sklearn.impute import IterativeImputer, SimpleImputer
from sklearn.metrics import (
    classification_report,
    ConfusionMatrixDisplay,
    f1_score,
    balanced_accuracy_score,
)
from sklearn.model_selection import GroupKFold, GridSearchCV, train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import (
    LabelEncoder,
    OneHotEncoder,
    StandardScaler,
    Normalizer,
)
from sklearn.linear_model import LinearRegression
from sklearn.decomposition import PCA


## 1. Cargar dataset

In [None]:
from google.colab import files

uploaded = files.upload()
DATA_PATH = "voter_intentions_3000.csv"
df = pd.read_csv(DATA_PATH)
df.head()

## 2. Definir transformadores

In [None]:
from dataclasses import dataclass
from typing import Iterable, Optional
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import OrdinalEncoder

@dataclass
class SecondaryChoiceImputer(BaseEstimator, TransformerMixin):
    numeric_features: Iterable[str]
    primary_feature: str = "primary_choice"
    target_feature: str = "secondary_choice"

    def __post_init__(self):
        self._classifier = RandomForestClassifier(
            n_estimators=300,
            max_depth=12,
            random_state=42,
            class_weight="balanced_subsample",
        )
        self._primary_encoder = OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1)
        self._numeric_medians: Optional[pd.Series] = None
        self._should_skip = False

    def fit(self, X, y=None):
        frame = X.copy()
        mask = frame[self.target_feature].notna()
        if mask.sum() == 0:
            self._should_skip = True
            return self
        working = frame.loc[mask, list(self.numeric_features) + [self.primary_feature, self.target_feature]]
        self._numeric_medians = working[self.numeric_features].median()
        X_num = working[self.numeric_features].fillna(self._numeric_medians).to_numpy()
        X_cat = self._primary_encoder.fit_transform(working[[self.primary_feature]])
        X_model = np.hstack([X_num, X_cat])
        y_model = working[self.target_feature]
        self._classifier.fit(X_model, y_model)
        return self

    def transform(self, X):
        frame = X.copy()
        if self._should_skip:
            frame[self.target_feature] = frame[self.target_feature].fillna("Unknown")
            return frame
        mask = frame[self.target_feature].isna()
        if mask.sum() == 0:
            return frame
        medians = self._numeric_medians if self._numeric_medians is not None else frame[self.numeric_features].median()
        X_num = frame.loc[mask, self.numeric_features].fillna(medians).to_numpy()
        X_cat = self._primary_encoder.transform(frame.loc[mask, [self.primary_feature]])
        preds = self._classifier.predict(np.hstack([X_num, X_cat]))
        frame.loc[mask, self.target_feature] = preds
        return frame

## 3. Entrenar modelo

In [None]:
TARGET = "intended_vote"
ALL_COLS = df.columns.drop([TARGET])
NUMERIC_FEATURES = df[ALL_COLS].select_dtypes(include=[np.number]).columns.tolist()
CAT_FEATURES = [c for c in ALL_COLS if c not in NUMERIC_FEATURES]

# Garantizamos que las columnas categoricas clave esten en la lista correcta
for col in ["primary_choice", "secondary_choice"]:
    if col in NUMERIC_FEATURES:
        NUMERIC_FEATURES.remove(col)
    if col not in CAT_FEATURES:
        CAT_FEATURES.append(col)

print("Num:", len(NUMERIC_FEATURES), NUMERIC_FEATURES[:5], "...")
print("Cat:", len(CAT_FEATURES), CAT_FEATURES[:5], "...")

label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(df[TARGET])
X = df[NUMERIC_FEATURES + CAT_FEATURES]
y = y_encoded

X_train, X_test, y_train, y_test = train_test_split(
    X,
    y,
    test_size=0.2,
    random_state=42,
    stratify=y,
)

X_train = X_train.reset_index(drop=False).rename(columns={"index": "source_id"})
y_train = pd.Series(y_train).reset_index(drop=True)


In [None]:
# === Balanceo de clases SOLO en entrenamiento con grupos ===
train_df = X_train.copy()
train_df["__y__"] = y_train.values

counts = train_df["__y__"].value_counts()
mayor = counts.idxmax()
n_mayor = counts.max()

cap_mayor = int(n_mayor * 0.4)
target = cap_mayor

frames = []
rng = 42
for cls, cnt in counts.items():
    dfc = train_df[train_df["__y__"] == cls]
    if cls == mayor:
        frames.append(dfc.sample(n=cap_mayor, random_state=rng))
    else:
        frames.append(dfc.sample(n=target, replace=True, random_state=rng))

train_bal = pd.concat(frames, ignore_index=True).sample(frac=1.0, random_state=rng)

X_train_bal = train_bal.drop(columns=["__y__"])
y_train_bal = train_bal["__y__"].to_numpy()
groups = X_train_bal["source_id"].to_numpy()
X_train_bal = X_train_bal.drop(columns=["source_id"])

print("Distribucion balanceada:\n", pd.Series(y_train_bal).value_counts())


In [None]:
numeric_transformer = Pipeline([
    ("imputer", IterativeImputer(estimator=LinearRegression(), max_iter=10, random_state=42)),
    ("scaler", StandardScaler()),
])

cat_transformer = Pipeline([
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("encoder", OneHotEncoder(handle_unknown="ignore")),
])

preprocessor = ColumnTransformer([
    ("num", numeric_transformer, NUMERIC_FEATURES),
    ("cat", cat_transformer, CAT_FEATURES),
])

pipeline = Pipeline([
    ("secondary_choice_imputer", SecondaryChoiceImputer(NUMERIC_FEATURES)),
    ("preprocessor", preprocessor),
    ("row_norm", "passthrough"),
    ("dimreduce", "passthrough"),
    ("knn", KNeighborsClassifier()),
])

param_grid = [
    {
        "row_norm": ["passthrough"],
        "dimreduce": ["passthrough", PCA(n_components=50)],
        "knn__metric": ["minkowski"],
        "knn__p": [1, 2],
        "knn__n_neighbors": [5, 15, 35],
        "knn__weights": ["distance"],
    },
    {
        "row_norm": [Normalizer()],
        "dimreduce": ["passthrough", PCA(n_components=50)],
        "knn__metric": ["cosine"],
        "knn__n_neighbors": [5, 15, 35],
        "knn__weights": ["distance"],
    },
]

scoring = {
    "macro_f1": "f1_macro",
    "bal_acc": "balanced_accuracy",
}

gkf = GroupKFold(n_splits=5)

search = GridSearchCV(
    estimator=pipeline,
    param_grid=param_grid,
    cv=gkf,
    scoring=scoring,
    refit="macro_f1",
    n_jobs=-1,
    verbose=2,
)

search.fit(X_train_bal, y_train_bal, groups=groups)
print("Mejores hiperparametros:", search.best_params_)
print("Mejor macro_f1 (cv):", search.best_score_)
best_model = search.best_estimator_


## 4. Evaluacion y artefactos

In [None]:
best_model = search.best_estimator_

y_pred = best_model.predict(X_test)
y_pred_labels = label_encoder.inverse_transform(y_pred)
y_test_labels = label_encoder.inverse_transform(y_test)

print(classification_report(y_test_labels, y_pred_labels, zero_division=0))
ConfusionMatrixDisplay.from_predictions(y_test_labels, y_pred_labels, xticks_rotation=45)
plt.title("Matriz de confusion (todas las clases)")
plt.show()

macro_f1 = f1_score(y_test, y_pred, average="macro", zero_division=0)
bal_acc = balanced_accuracy_score(y_test, y_pred)
print(f"Macro F1: {macro_f1:.3f} | Balanced Acc: {bal_acc:.3f}")

try:
    undec_idx = np.where(label_encoder.classes_ == "Undecided")[0][0]
    mask_decididos = y_test != undec_idx
    if mask_decididos.any():
        y_test_dec = y_test[mask_decididos]
        y_pred_dec = y_pred[mask_decididos]
        y_test_dec_lab = label_encoder.inverse_transform(y_test_dec)
        y_pred_dec_lab = label_encoder.inverse_transform(y_pred_dec)

        print("\n=== SOLO DECIDIDOS (excluye 'Undecided') ===")
        print(classification_report(y_test_dec_lab, y_pred_dec_lab, zero_division=0))
        ConfusionMatrixDisplay.from_predictions(y_test_dec_lab, y_pred_dec_lab, xticks_rotation=45)
        plt.title("Matriz de confusion (solo decididos)")
        plt.show()

        macro_f1_dec = f1_score(y_test_dec, y_pred_dec, average="macro", zero_division=0)
        bal_acc_dec = balanced_accuracy_score(y_test_dec, y_pred_dec)
        print(f"Macro F1 (decididos): {macro_f1_dec:.3f} | Balanced Acc (decididos): {bal_acc_dec:.3f}")
except Exception as e:
    print("Aviso decididos:", e)

joblib.dump({"model": best_model, "label_encoder": label_encoder}, "knn_voter_intentions.joblib")
print("Modelo guardado en knn_voter_intentions.joblib")
files.download("knn_voter_intentions.joblib")


In [None]:
# === Evaluacion post-proceso: Umbral de 'Undecided' ===
import numpy as np
from sklearn.metrics import f1_score, balanced_accuracy_score
import matplotlib.pyplot as plt

proba = best_model.predict_proba(X_test)
classes = label_encoder.classes_
idx_u = list(classes).index("Undecided")

def eval_tau(tau):
    top = proba.argmax(1)
    use_u = proba[:, idx_u] > tau
    y_pred_tau = top.copy()
    y_pred_tau[use_u] = idx_u
    f1m = f1_score(y_test, y_pred_tau, average="macro", zero_division=0)
    balc = balanced_accuracy_score(y_test, y_pred_tau)
    return f1m, balc

taus = [0.55, 0.65, 0.75, 0.8, 0.85]
f1_scores, bal_accs = [], []

print("=== Evaluacion de distintos umbrales tau para 'Undecided' ===")
for tau in taus:
    f1m, balc = eval_tau(tau)
    f1_scores.append(f1m)
    bal_accs.append(balc)
    print(f"tau={tau:.2f} -> Macro F1={f1m:.3f} | Balanced Acc={balc:.3f}")

plt.figure(figsize=(6, 4))
plt.bar([str(t) for t in taus], f1_scores)
plt.title("Comparacion de Macro F1 segun umbral tau ('Undecided')")
plt.xlabel("tau (umbral de Undecided)")
plt.ylabel("Macro F1")
plt.grid(axis="y", linestyle="--", alpha=0.6)
plt.show()


In [None]:
# === Metrica adicional: Top-2 accuracy (solo decididos) ===
top2 = np.argsort(-proba, axis=1)[:, :2]
hit_top2_dec = np.mean([
    y in top2[i] for i, y in enumerate(y_test)
    if label_encoder.classes_[y] != "Undecided"
])
print(f"Top-2 accuracy (solo decididos): {hit_top2_dec:.3f}")


In [None]:
print("""
=== RESUMEN FINAL ===
- Modelo base: KNN optimizado con PCA(50), metrica cosine, weights='distance'.
- Evaluacion tradicional + post-proceso (umbral tau y Top-2).
- Se observa mejora en Macro F1 y Balanced Accuracy tras corregir fuga de validacion y ajustar balanceo.
- La grafica muestra el mejor rango de tau (generalmente entre 0.7 y 0.8).
""")
