<a href="https://colab.research.google.com/github/dteso/AI-Mini-Trainer/blob/main/AI_mini_trainer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install gradio scikit-learn pandas plotly atomicwrites

In [None]:
### ML Mini Trainer - v1.0.1
### DAVID TESO POZO - 2025

import os
import json
import shutil
import pickle
import warnings

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import gradio as gr
from atomicwrites import atomic_write
from sklearn.exceptions import ConvergenceWarning
from sklearn.preprocessing import LabelEncoder
from sklearn.datasets import (
    load_iris, load_wine, load_breast_cancer, load_digits,
    load_diabetes, fetch_california_housing,
    make_regression, make_friedman1
)
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, classification_report,
    r2_score, mean_squared_error
)
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.decomposition import PCA

warnings.filterwarnings("ignore", category=ConvergenceWarning)

# --- Rutas y carpetas ---
DATASET_REGISTRY = "dataset_registry.json"
MODEL_REGISTRY   = "model_registry.json"
os.makedirs("datasets", exist_ok=True)
os.makedirs("models", exist_ok=True)

# ---------------------------
# DATASETS PREDEFINIDOS
# ---------------------------
available_datasets = {
    "Iris (Clasificación)": load_iris,
    "Wine (Clasificación)": load_wine,
    "Breast Cancer (Clasificación)": load_breast_cancer,
    "Digits (Clasificación)": load_digits,
    "Diabetes (Regresión)": load_diabetes,
    "California Housing (Regresión)": fetch_california_housing,
    "Friedman1 (Sintético)": lambda: {
        "data": make_friedman1(n_samples=200, n_features=10, random_state=42)[0],
        "target": make_friedman1(n_samples=200, n_features=10, random_state=42)[1],
        "feature_names": [f"X{i}" for i in range(10)]
    },
    "Make Regression (Sintético)": lambda: {
        "data": make_regression(n_samples=200, n_features=8, noise=0.1, random_state=42)[0],
        "target": make_regression(n_samples=200, n_features=8, noise=0.1, random_state=42)[1],
        "feature_names": [f"X{i}" for i in range(8)]
    },
}

classification_models = {
    "Logistic Regression": LogisticRegression,
    "KNN Classifier": KNeighborsClassifier,
    "Decision Tree Classifier": DecisionTreeClassifier,
    "Random Forest Classifier": RandomForestClassifier,
    "SVC": SVC,
    "Naive Bayes": GaussianNB,
}

regression_models = {
    "Linear Regression": LinearRegression,
    "Random Forest Regressor": RandomForestRegressor
}

available_models = {**classification_models, **regression_models}

# ---------------------------
# REGISTRO DE DATASETS
# ---------------------------
def load_dataset_registry():
    names = list(available_datasets.keys())
    if os.path.exists(DATASET_REGISTRY):
        try:
            reg = json.load(open(DATASET_REGISTRY))
            names += list(reg.keys())
        except:
            pass
    return names

def register_dataset(file, name):
    if not name or name in available_datasets:
        return "❌ Nombre inválido o ya existe."
    ext = os.path.splitext(file.name)[1].lower()
    if ext not in [".csv", ".xlsx", ".xls"]:
        return "❌ Solo CSV o Excel."
    dest = os.path.join("datasets", f"{name}{ext}")
    shutil.copy(file.name, dest)
    reg = json.load(open(DATASET_REGISTRY)) if os.path.exists(DATASET_REGISTRY) else {}
    reg[name] = dest
    with atomic_write(DATASET_REGISTRY, overwrite=True, encoding="utf-8") as f:
        json.dump(reg, f, indent=4)
    return f"✅ Dataset '{name}' registrado."

# ---------------------------
# CARGA Y VISTA PREVIA
# ---------------------------
def load_dataset(name):
    if name in available_datasets:
        data = available_datasets[name]()
        df = pd.DataFrame(data["data"], columns=data.get("feature_names"))
        return df, pd.Series(data["target"], name="target"), data.get("target_names")
    reg = json.load(open(DATASET_REGISTRY))
    path = reg[name]
    if path.endswith(".csv"):
        df_full = pd.read_csv(path)
    else:
        df_full = pd.read_excel(path)
    if "target" not in df_full.columns:
        raise ValueError("❌ El dataset debe tener columna 'target'.")
    return df_full.drop(columns=["target"]), df_full["target"], None

def show_dataset_with_target(name):
    df, target, tns = load_dataset(name)
    labels = ([tns[int(x)] for x in target] if tns is not None else target.tolist())
    preview = df.copy()
    preview["class"] = labels
    return preview, f"Vista del Dataset ({len(preview)} filas)"

# ---------------------------
# Detección de clasificación
# ---------------------------
def is_classification_task(target):
    return pd.Series(target).nunique() < 20

# ---------------------------
# ENTRENAMIENTO MÚLTIPLE
# ---------------------------
def update_model_choices(ds):
    df, target, _ = load_dataset(ds)
    if is_classification_task(target):
        return gr.update(choices=list(classification_models.keys()),
                         value=list(classification_models.keys())[0])
    else:
        return gr.update(choices=list(regression_models.keys()),
                         value=list(regression_models.keys())[0])

def train_multiple(ds, mods, ts):
    df, target, _ = load_dataset(ds)
    clf_task = is_classification_task(target)
    if clf_task:
        le = LabelEncoder()
        y = le.fit_transform(target)
    else:
        y = target
    Xtr, Xte, ytr, yte = train_test_split(df, y, test_size=ts, random_state=42)
    results = []
    for m in mods:
        Model = available_models[m]()
        Model.fit(Xtr, ytr)
        pred = Model.predict(Xte)
        if clf_task:
            metr = accuracy_score(yte, pred)
            rpt = classification_report(yte, pred, output_dict=True)
        else:
            metr = r2_score(yte, pred)
            mse = mean_squared_error(yte, pred)
            rpt = {"R2": metr, "MSE": mse}
        results.append({"Modelo": m, "Precisión": metr, "Reporte": rpt})
    return results

def plot_compare(res):
    fig, ax = plt.subplots()
    vals = [r["Precisión"] for r in res]
    sns.barplot(x=[r["Modelo"] for r in res], y=vals, ax=ax)
    if all(0<=v<=1 for v in vals):
        ax.set_ylim(0,1); ax.set_ylabel("Accuracy")
    else:
        ax.set_ylabel("R2 Score")
    ax.tick_params(axis="x", rotation=45)
    plt.tight_layout()
    return fig

def export_csv(res):
    df = pd.DataFrame([{"Modelo":r["Modelo"], "Precisión":r["Precisión"]} for r in res])
    return df.to_csv(index=False)

def full_training(ds, mods, ts):
    res = train_multiple(ds, mods, ts)
    df_res = pd.DataFrame(
        [{"Modelo":r["Modelo"], "Precisión":round(r["Precisión"],4)} for r in res]
    ).sort_values("Precisión", ascending=False)
    fig = plot_compare(res)
    path = "report.csv"
    open(path,"w").write(export_csv(res))
    return df_res, fig, path

def run_eda(ds):
    df, _, _ = load_dataset(ds)
    return df.describe().reset_index()

def plot_eda(ds):
    df, _, _ = load_dataset(ds)
    m = df.melt(var_name="feature", value_name="valor")
    fig, ax = plt.subplots(figsize=(12,6))
    sns.boxplot(data=m, x="feature", y="valor", hue="feature", dodge=False, ax=ax)
    if ax.get_legend(): ax.get_legend().remove()
    ax.tick_params(axis="x", rotation=30)
    plt.tight_layout()
    return fig

def run_pca(ds):
    df, _, _ = load_dataset(ds)
    comps = PCA(n_components=2).fit_transform(df)
    dfp = pd.DataFrame(comps, columns=["PC1","PC2"])
    fig, ax = plt.subplots(figsize=(8,6))
    sns.scatterplot(data=dfp, x="PC1", y="PC2", s=60, ax=ax)
    plt.tight_layout()
    return fig

# ---------------------------
# ENTRENAMIENTO INDIVIDUAL & GUARDADO
# ---------------------------
model_params = {
    "Logistic Regression":[("C",0.01,10,0.01,1),("max_iter",100,2000,100,1000)],
    "KNN Classifier":[("n_neighbors",1,30,1,5)],
    "Decision Tree Classifier":[("max_depth",1,20,1,10),("min_samples_split",2,20,1,2)],
    "Random Forest Classifier":[("n_estimators",10,200,10,100),("max_depth",1,20,1,10)],
    "SVC":[("C",0.1,10,0.1,1),("gamma",0.001,1,0.001,0.01)],
    "Naive Bayes":[],
    "Linear Regression":[],
    "Random Forest Regressor":[("n_estimators",10,200,10,100),("max_depth",1,20,1,5)]
}

def save_model_locally(model, name, headers, tns):
    if not name.endswith(".pkl"):
        name += ".pkl"
    path = os.path.join("models", name)
    pickle.dump({"model":model,"headers":headers,"target_names":tns}, open(path,"wb"))
    reg = json.load(open(MODEL_REGISTRY)) if os.path.exists(MODEL_REGISTRY) else {}
    reg[name] = {"model_path":path,"headers":headers,"target_names":tns}
    with atomic_write(MODEL_REGISTRY, overwrite=True, encoding="utf-8") as f:
        json.dump(reg, f, indent=4)
    return path

def train_one_model_with_save(
    ds, mname, ts,
    v1,v2,v3,v4,v5,v6,v7,v8,v9,v10,v11
):
    df, target, tns = load_dataset(ds)
    clf_task = is_classification_task(target)
    if clf_task:
        le = LabelEncoder()
        y = le.fit_transform(target)
        if tns is None:
            tns = list(le.classes_)
    else:
        y = target
    Xtr, Xte, ytr, yte = train_test_split(df, y, test_size=ts, random_state=42)

    # hiperparams & safe-KNN
    if mname=="Logistic Regression":
        hyper={"C":v1,"max_iter":int(v2)}; Cls=LogisticRegression
    elif mname=="KNN Classifier":
        k_req=int(v3); k=min(k_req, len(Xtr)); hyper={"n_neighbors":k}; Cls=KNeighborsClassifier
    elif mname=="Decision Tree Classifier":
        hyper={"max_depth":int(v4),"min_samples_split":int(v5)}; Cls=DecisionTreeClassifier
    elif mname=="Random Forest Classifier":
        hyper={"n_estimators":int(v6),"max_depth":int(v7)}; Cls=RandomForestClassifier
    elif mname=="SVC":
        hyper={"C":v8,"gamma":v9}; Cls=SVC
    elif mname=="Naive Bayes":
        hyper={}; Cls=GaussianNB
    elif mname=="Random Forest Regressor":
        hyper={"n_estimators":int(v10),"max_depth":int(v11)}; Cls=RandomForestRegressor
    elif mname=="Linear Regression":
        hyper={}; Cls=LinearRegression
    else:
        raise ValueError("Modelo no soportado")

    model = Cls(**hyper)
    model.fit(Xtr, ytr)
    y_pred = model.predict(Xte)

    if clf_task:
        metric=accuracy_score(yte,y_pred); title="Accuracy"
        from sklearn.metrics import confusion_matrix
        cm=confusion_matrix(yte,y_pred)
        fig,ax=plt.subplots()
        sns.heatmap(cm,annot=True,fmt="d",cmap="Blues",ax=ax)
        ax.set_title("Confusion Matrix")
        table_df=pd.DataFrame({"y_true":yte,"y_pred":y_pred})
    else:
        metric=r2_score(yte,y_pred); title="R2 Score"
        fig,ax=plt.subplots()
        ax.scatter(yte,y_pred)
        ax.plot([yte.min(),yte.max()],[yte.min(),yte.max()],"k--",lw=2)
        ax.set_title("Actual vs Predicted")
        table_df=pd.DataFrame({"Actual":yte,"Predicted":y_pred})

    state={"model":model,"headers":df.columns.tolist(),"target_names":tns}
    return f"{title}: {metric:.4f}", fig, table_df, state, hyper

def save_trained_model(state, save_name):
    if not state or "model" not in state:
        return "❌ No hay modelo entrenado para guardar."
    path = save_model_locally(
        state["model"],
        save_name,
        state["headers"],
        state["target_names"]
    )
    return f"✅ Modelo guardado en: `{path}`"

# ---------------------------
# PREDICCIÓN
# ---------------------------
MAX_F=20

def load_model_list():
    return list(json.load(open(MODEL_REGISTRY)).keys()) if os.path.exists(MODEL_REGISTRY) else []

def refresh_models():
    return gr.update(choices=load_model_list())

def update_feats(m):
    reg=json.load(open(MODEL_REGISTRY))
    hdr=reg[m]["headers"]
    ups=[]
    for i in range(MAX_F):
        if i<len(hdr):
            ups.append(gr.update(label=hdr[i],visible=True,value=""))
        else:
            ups.append(gr.update(visible=False))
    return ups

def predict_model_combined(selected_model_name, excel_file, *features):
    reg=json.load(open(MODEL_REGISTRY))
    entry=reg[selected_model_name]
    with open(entry["model_path"],"rb") as f:
        loaded=pickle.load(f)
    model=loaded["model"]
    headers=loaded["headers"]
    tns=loaded.get("target_names",None)

    def map_pred(p):
        if tns is not None:
            try: return tns[int(p)]
            except: return p
        return p

    if excel_file is not None:
        fn=excel_file.name; ext=os.path.splitext(fn)[1].lower()
        df=(pd.read_excel(fn,engine="openpyxl") if ext in [".xls",".xlsx"] else pd.read_csv(fn))
        if not set(headers).issubset(df.columns):
            return None, f"❌ Faltan columnas: {headers}"
        preds=model.predict(df[headers])
        df["Predicción"]=[map_pred(p) for p in preds]
        return df, ""
    vals=[]
    for i in range(len(headers)):
        s=features[i] if i<len(features) else ""
        try: vals.append(float(s))
        except: return None, f"❌ '{headers[i]}' debe ser numérico."
    row=pd.DataFrame([vals],columns=headers)
    try: p=model.predict(row)[0]
    except Exception as e:
        return None, f"⚠️ Error en predicción: {e}"
    return None, f"Predicción: {map_pred(p)}"

def predict_ind(m,*f):
    _,txt = predict_model_combined(m,None,*f)
    return f"<h2>{txt}</h2>"

def predict_bulk(m,excel):
    df,_=predict_model_combined(m,excel)
    return df

def gen_template(m):
    reg=json.load(open(MODEL_REGISTRY))
    hdr=reg[m]["headers"]
    df=pd.DataFrame(columns=hdr)
    fn=f"{m.replace(' ','_')}_plantilla.xlsx"
    df.to_excel(fn,index=False)
    return fn

# ---------------------------
# INTERFAZ
# ---------------------------
initial_datasets=load_dataset_registry()
initial_models=load_model_list()

with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue")) as demo:

    gr.Markdown("# 🤖 AI Mini Trainer\n_Una herramienta ligera para ML_")

    # PESTAÑA 1: DATASET + COMPARACIÓN
    with gr.Tab("📊 Dataset + Modelos ML"):
        gr.Markdown("### Explora, registra y compara modelos")

        with gr.Row():
            # COLUMNA IZQUIERDA: selector + refrescar
            with gr.Column(scale=1):
                ds_sel = gr.Dropdown(initial_datasets, value=initial_datasets[0], label="Dataset")
                ref_btn = gr.Button("Refrescar")
                ref_btn.click(lambda: gr.update(choices=load_dataset_registry()),
                              [], [ds_sel])

            # COLUMNA DERECHA: upload, nombre, guardar
            with gr.Column(scale=1):
                up_ds = gr.File(label="Subir CSV/Excel")
                nm_ds = gr.Textbox(label="Nombre")
                sv_btn = gr.Button("Guardar")
                msg_ds= gr.Markdown()
                sv_btn.click(register_dataset, [up_ds, nm_ds], [msg_ds])

        models_cb = gr.CheckboxGroup(list(classification_models.keys()),
                                     value=list(classification_models.keys()),
                                     label="Modelos")
        ts_cb     = gr.Slider(0.1,0.5,0.3,0.05, label="Test size")
        btn_tr    = gr.Button("Entrenar y Comparar")
        tbl_res   = gr.Dataframe(headers=["Modelo","Precisión"])
        plt_res   = gr.Plot()
        csv_res   = gr.File()
        btn_tr.click(full_training, [ds_sel, models_cb, ts_cb], [tbl_res, plt_res, csv_res])

        df0, lbl0 = show_dataset_with_target(initial_datasets[0])
        tbl0  = gr.Dataframe(value=df0)
        lbl0c = gr.Markdown(lbl0)
        ds_sel.change(show_dataset_with_target, [ds_sel], [tbl0, lbl0c])
        ds_sel.change(update_model_choices, [ds_sel], [models_cb])

        with gr.Tabs():
            with gr.Tab("EDA"):
                eda_t = gr.Dataframe(value=run_eda(initial_datasets[0]))
                eda_p = gr.Plot(value=plot_eda(initial_datasets[0]))
                ds_sel.change(run_eda, [ds_sel], [eda_t])
                ds_sel.change(plot_eda, [ds_sel], [eda_p])
            with gr.Tab("PCA"):
                pca_p = gr.Plot(value=run_pca(initial_datasets[0]))
                ds_sel.change(run_pca, [ds_sel], [pca_p])

    # PESTAÑA 2: ENTRENAMIENTO INDIVIDUAL
    with gr.Tab("⚙️ Entrenamiento"):
        gr.Markdown("### Ajusta hiperparámetros y guarda modelo")
        ds_tr = gr.Dropdown(initial_datasets, value=initial_datasets[0], label="Dataset")
        ref2 = gr.Button("Refrescar")
        ref2.click(lambda: gr.update(choices=load_dataset_registry()), [], [ds_tr])
        _, tgt0, _ = load_dataset(initial_datasets[0])
        init_mods = list(classification_models.keys()) if is_classification_task(tgt0) else list(regression_models.keys())
        mdl_tr = gr.Dropdown(init_mods, value=init_mods[0], label="Modelo")
        ds_tr.change(update_model_choices, [ds_tr], [mdl_tr])
        ts_tr = gr.Slider(0.1,0.5,0.3,0.05, label="Test size")

        sliders = []
        for m, params in model_params.items():
            for (_n,mn,mx,stp,dfv) in params:
                sliders.append(gr.Slider(mn,mx,value=dfv,step=stp,label=_n,visible=False))

        def show_params(name):
            return [gr.update(visible=(name==mod)) for mod in model_params for _ in model_params[mod]]

        mdl_tr.change(show_params, [mdl_tr], sliders)

        train_btn = gr.Button("Entrenar")
        out_md    = gr.Markdown("Sin entrenar")
        out_pl    = gr.Plot()
        out_tb    = gr.Dataframe()
        st        = gr.State()
        train_btn.click(train_one_model_with_save,
                        [ds_tr, mdl_tr, ts_tr] + sliders,
                        [out_md, out_pl, out_tb, st])

        save_name = gr.Textbox(label="Nombre para guardar")
        gr.Button("Guardar modelo").click(save_trained_model, [st, save_name], [out_md])

    # PESTAÑA 3: PREDICCIÓN
    with gr.Tab("🔮 Predicción"):
        gr.Markdown("### Predicción individual y masiva")
        mdl_dd = gr.Dropdown(initial_models, value=None, label="Modelos guardados")
        ref3   = gr.Button("Refrescar")
        ref3.click(refresh_models, [], [mdl_dd])

        with gr.Tabs():
            with gr.Tab("Individual"):
                feats = [gr.Textbox(visible=False) for _ in range(MAX_F)]
                mdl_dd.change(update_feats, [mdl_dd], feats)
                btn_i = gr.Button("Predecir")
                out_i = gr.HTML()
                btn_i.click(predict_ind, [mdl_dd] + feats, [out_i])

            with gr.Tab("Masiva"):
                tpl_btn = gr.Button("Generar plantilla Excel")
                tpl_f   = gr.File(label="Descargar plantilla")
                tpl_btn.click(gen_template, [mdl_dd], [tpl_f])
                up_xl   = gr.File(label="Cargar Excel")
                btn_b   = gr.Button("Predecir Masivo")
                out_b   = gr.Dataframe()
                btn_b.click(predict_bulk, [mdl_dd, up_xl], [out_b])

demo.launch(share=True, debug=True)
