# 02_train
- Author: 
- Date: 2025-10-23
- Goal: 모델링/학습/통계/시각화
- Input: 
- Output: 
- Metrics: acc@val, loss@train
- Repro: seed=42, device=auto, config=../configs/


In [2]:
# This cell is only needed if xgboost is not installed in your environment.
# In Colab, run this once and then restart the runtime if necessary.
# option Cell !!! 
# xgboost 안깔려 있으면 주석 풀고 실행!
#!pip install -q xgboost

In [3]:
# ============================================================
# 02_train_classical_ml.ipynb
#
# Train classical ML models (KNN, SVM, Decision Tree,
# Random Forest, XGBoost) on Colored MNIST.
#
# - Uses preprocessed features from 01_preprocessing_colored_mnist.ipynb
# - Supports 3 tasks:
#     1) Digit classification (0-9)
#     2) Foreground color classification (7 classes, ROYGBIV)
#     3) Background color classification (7 classes, ROYGBIV)
# ============================================================

import os  # path handling
import numpy as np  # numerical operations
import matplotlib.pyplot as plt  # visualization
import seaborn as sns  # nicer plots

from sklearn.metrics import (  # evaluation metrics
    accuracy_score,
    precision_recall_fscore_support,
    classification_report,
    confusion_matrix,
)
from sklearn.model_selection import GridSearchCV  # hyperparameter tuning

from sklearn.neighbors import KNeighborsClassifier  # KNN model
from sklearn.svm import SVC  # SVM model
from sklearn.tree import DecisionTreeClassifier  # Decision Tree model
from sklearn.ensemble import RandomForestClassifier  # Random Forest model

import xgboost as xgb  # XGBoost model

# Matplotlib style (English only)
plt.rcParams["font.family"] = "DejaVu Sans"  # default font
plt.rcParams["axes.unicode_minus"] = False  # minus sign
sns.set(style="whitegrid")  # seaborn style

RANDOM_STATE = 42  # global random seed for reproducibility

print("[OK] Libraries imported.")


# ------------------------------------------------------------
# Resolve BASE_DIR and processed npz path
# ------------------------------------------------------------
cwd = os.getcwd()  # current working directory

# If current folder is 'notebooks', use its parent as repo root
if os.path.basename(cwd) == "notebooks":  # check folder name
    BASE_DIR = os.path.dirname(cwd)  # go one level up
else:
    BASE_DIR = cwd  # otherwise use current directory as root

PROCESSED_PATH = os.path.join(
    BASE_DIR, "data", "processed", "colored_mnist", "colored_mnist.npz"
)  # path to preprocessed npz

print(f"[INFO] BASE_DIR       : {BASE_DIR}")
print(f"[INFO] PROCESSED_PATH : {PROCESSED_PATH}")

if not os.path.exists(PROCESSED_PATH):  # check if file exists
    raise FileNotFoundError(
        f"[ERROR] Processed file not found at {PROCESSED_PATH}.\n"
        f"Please run 01_preprocessing_colored_mnist.ipynb first."
    )

data = np.load(PROCESSED_PATH)  # load npz file into memory
print("[OK] Loaded npz keys:", list(data.keys()))


# ------------------------------------------------------------
# Extract features (scaled + raw)
#   - X_*     : standardized features (mean=0, std=1) for classical ML
#   - X_*_raw : flattened RGB in [0,1] (for PCA or other future use)
# ------------------------------------------------------------
X_train = data["X_train"].astype(np.float32)  # scaled train features
X_val   = data["X_val"].astype(np.float32)    # scaled val features
X_test  = data["X_test"].astype(np.float32)   # scaled test features

X_train_raw = data["X_train_raw"].astype(np.float32)  # raw train features (0-1)
X_val_raw   = data["X_val_raw"].astype(np.float32)    # raw val features
X_test_raw  = data["X_test_raw"].astype(np.float32)   # raw test features

print("[INFO] Feature shapes (scaled):")
print("  X_train:", X_train.shape)
print("  X_val  :", X_val.shape)
print("  X_test :", X_test.shape)

print("[INFO] Raw features are loaded as well, reserved for future use (e.g., PCA).")


# ------------------------------------------------------------
# Define color names (same order as in 01_preprocessing)
# ------------------------------------------------------------
COLOR_NAMES = ["red", "orange", "yellow", "green", "blue", "indigo", "violet"]  # 7 colors


# ------------------------------------------------------------
# Select task: "digit", "fg", or "bg"
# ------------------------------------------------------------
TASK = "digit"  # change to "fg" or "bg" depending on experiment

if TASK == "digit":
    y_train = data["y_digit_train"]  # digit labels for train
    y_val   = data["y_digit_val"]    # digit labels for val
    y_test  = data["y_digit_test"]   # digit labels for test
    class_names = [str(i) for i in range(10)]  # class names "0"~"9"
    print("[TASK] Digit classification (0-9).")
elif TASK == "fg":
    y_train = data["y_fg_train"]  # foreground color labels for train
    y_val   = data["y_fg_val"]    # foreground color labels for val
    y_test  = data["y_fg_test"]   # foreground color labels for test
    class_names = COLOR_NAMES     # class names are color names
    print("[TASK] Foreground color classification.")
elif TASK == "bg":
    y_train = data["y_bg_train"]  # background color labels for train
    y_val   = data["y_bg_val"]    # background color labels for val
    y_test  = data["y_bg_test"]   # background color labels for test
    class_names = COLOR_NAMES     # class names are color names
    print("[TASK] Background color classification.")
else:
    raise ValueError("TASK must be one of: 'digit', 'fg', 'bg'.")

print("[INFO] Label shapes:")
print("  y_train:", y_train.shape)
print("  y_val  :", y_val.shape)
print("  y_test :", y_test.shape)
print("  num_classes:", len(class_names))

[OK] Libraries imported.
[INFO] BASE_DIR       : /Users/jaehun_jung/colored-mnist-classification
[INFO] PROCESSED_PATH : /Users/jaehun_jung/colored-mnist-classification/data/processed/colored_mnist/colored_mnist.npz
[OK] Loaded npz keys: ['X_train', 'X_val', 'X_test', 'X_train_raw', 'X_val_raw', 'X_test_raw', 'y_digit_train', 'y_digit_val', 'y_digit_test', 'y_fg_train', 'y_fg_val', 'y_fg_test', 'y_bg_train', 'y_bg_val', 'y_bg_test', 'y_source_train', 'y_source_val', 'y_source_test']
[INFO] Feature shapes (scaled):
  X_train: (56000, 2352)
  X_val  : (7000, 2352)
  X_test : (7000, 2352)
[INFO] Raw features are loaded as well, reserved for future use (e.g., PCA).
[TASK] Digit classification (0-9).
[INFO] Label shapes:
  y_train: (56000,)
  y_val  : (7000,)
  y_test : (7000,)
  num_classes: 10


In [4]:
# ------------------------------------------------------------
# [Cell 2] Utility function to evaluate a classifier
#   - Trains the model on (X_train, y_train)
#   - Evaluates on both val and test sets
#   - Prints accuracy / precision / recall / F1
#   - Shows confusion matrix for each split
# ------------------------------------------------------------

def evaluate_classifier(model,  # sklearn/xgboost-like estimator
                        model_name: str,
                        X_train, y_train,
                        X_val, y_val,
                        X_test, y_test,
                        class_names):
    """Train and evaluate a classifier on val and test sets.
    
    Parameters
    ----------
    model : estimator
        Classifier implementing fit/predict.
    model_name : str
        Human-readable model name for printing.
    X_train, y_train : array-like
        Training features and labels.
    X_val, y_val : array-like
        Validation features and labels.
    X_test, y_test : array-like
        Test features and labels.
    class_names : list of str
        Names of classes, used for reports and confusion matrices.
    """
    print("=" * 60)
    print(f"[MODEL] {model_name}")
    print("=" * 60)

    # --------- Train ---------
    model.fit(X_train, y_train)  # fit classifier on training data
    print("[OK] Training finished.")

    # --------- Validation evaluation ---------
    y_val_pred = model.predict(X_val)  # predictions for validation set
    acc_val = accuracy_score(y_val, y_val_pred)  # validation accuracy
    pr_val, rc_val, f1_val, _ = precision_recall_fscore_support(
        y_val, y_val_pred, average="weighted", zero_division=0
    )  # weighted precision/recall/F1

    print("\n[Val] Metrics:")
    print(f"  Accuracy : {acc_val:.4f}")
    print(f"  Precision: {pr_val:.4f}")
    print(f"  Recall   : {rc_val:.4f}")
    print(f"  F1-score : {f1_val:.4f}")
    print("\n[Val] Classification report:")
    print(classification_report(y_val, y_val_pred, target_names=class_names, zero_division=0))

    cm_val = confusion_matrix(y_val, y_val_pred)  # confusion matrix for validation
    plt.figure(figsize=(6, 5))
    sns.heatmap(cm_val, annot=True, fmt="d", cmap="Blues",
                xticklabels=class_names, yticklabels=class_names)
    plt.title(f"{model_name} - Val Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.tight_layout()
    plt.show()

    # --------- Test evaluation ---------
    y_test_pred = model.predict(X_test)  # predictions for test set
    acc_test = accuracy_score(y_test, y_test_pred)  # test accuracy
    pr_test, rc_test, f1_test, _ = precision_recall_fscore_support(
        y_test, y_test_pred, average="weighted", zero_division=0
    )

    print("\n[Test] Metrics:")
    print(f"  Accuracy : {acc_test:.4f}")
    print(f"  Precision: {pr_test:.4f}")
    print(f"  Recall   : {rc_test:.4f}")
    print(f"  F1-score : {f1_test:.4f}")
    print("\n[Test] Classification report:")
    print(classification_report(y_test, y_test_pred, target_names=class_names, zero_division=0))

    cm_test = confusion_matrix(y_test, y_test_pred)  # confusion matrix for test
    plt.figure(figsize=(6, 5))
    sns.heatmap(cm_test, annot=True, fmt="d", cmap="Greens",
                xticklabels=class_names, yticklabels=class_names)
    plt.title(f"{model_name} - Test Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.tight_layout()
    plt.show()

    # Return dictionary for later summary if needed
    return {
        "model": model,
        "acc_val": acc_val,
        "f1_val": f1_val,
        "acc_test": acc_test,
        "f1_test": f1_test,
    }

In [5]:
# ------------------------------------------------------------
# [Cell 3] KNN with hyperparameter tuning (GridSearchCV)
#   - Uses scaled features X_train / X_val / X_test
#   - Parameter search over k, weights, and distance metric
# ------------------------------------------------------------

# Define base KNN classifier
knn_base = KNeighborsClassifier()  # default KNN classifier

# Define hyperparameter search space for KNN
param_grid_knn = {
    "n_neighbors": [3, 5, 7, 9],         # number of neighbors
    "weights": ["uniform", "distance"],  # uniform weights vs distance weights
    "p": [1, 2],                         # 1: Manhattan, 2: Euclidean
}

# GridSearchCV configuration
knn_grid = GridSearchCV(
    estimator=knn_base,     # base KNN model
    param_grid=param_grid_knn,  # parameter search space
    scoring="accuracy",     # optimization metric (can change to 'f1_weighted')
    cv=3,                   # 3-fold cross-validation on training set
    n_jobs=-1,              # use all available CPU cores
    verbose=2,              # print progress
)

print("[KNN] Starting hyperparameter search...")
knn_grid.fit(X_train, y_train)  # fit GridSearchCV on training data

print("\n[KNN] Best parameters:", knn_grid.best_params_)
print("[KNN] Best CV accuracy:", f"{knn_grid.best_score_:.4f}")

# Extract best model from grid search
knn_best = knn_grid.best_estimator_  # best KNN model according to CV

# Evaluate best KNN model using common evaluation function
results_knn = evaluate_classifier(
    model=knn_best,
    model_name=f"KNN (best grid: {knn_grid.best_params_})",
    X_train=X_train,
    y_train=y_train,
    X_val=X_val,
    y_val=y_val,
    X_test=X_test,
    y_test=y_test,
    class_names=class_names,
)

[KNN] Starting hyperparameter search...
Fitting 3 folds for each of 16 candidates, totalling 48 fits
[CV] END ................n_neighbors=3, p=2, weights=uniform; total time= 2.1min
[CV] END ...............n_neighbors=3, p=2, weights=distance; total time= 2.1min
[CV] END ................n_neighbors=3, p=2, weights=uniform; total time= 2.1min
[CV] END ................n_neighbors=3, p=2, weights=uniform; total time= 2.1min
[CV] END ...............n_neighbors=3, p=2, weights=distance; total time= 2.0min
[CV] END ...............n_neighbors=3, p=2, weights=distance; total time= 2.0min


KeyboardInterrupt: 

In [None]:
# ------------------------------------------------------------
# [Cell 4] SVM with hyperparameter tuning (GridSearchCV)
#   - Uses RBF kernel (non-linear decision boundary)
#   - Uses scaled features X_train / X_val / X_test
#   - Parameter search over C and gamma
# ------------------------------------------------------------

# Base SVM classifier (RBF kernel)
svm_base = SVC(
    kernel="rbf",        # radial basis function kernel
    probability=False,   # set True if probability estimates are needed
    random_state=RANDOM_STATE,
)

# Hyperparameter search space for SVM
# Note: Keep grid small to avoid very long training time
param_grid_svm = {
    "C": [1.0, 5.0, 10.0],        # regularization strength
    "gamma": ["scale", 0.01, 0.001],  # kernel coefficient
}

svm_grid = GridSearchCV(
    estimator=svm_base,       # base SVM model
    param_grid=param_grid_svm,  # search space for C and gamma
    scoring="accuracy",       # optimization metric
    cv=3,                     # 3-fold cross-validation
    n_jobs=-1,                # parallel jobs
    verbose=2,                # print progress
)

print("[SVM] Starting hyperparameter search...")
svm_grid.fit(X_train, y_train)  # fit GridSearchCV

print("\n[SVM] Best parameters:", svm_grid.best_params_)
print("[SVM] Best CV accuracy:", f"{svm_grid.best_score_:.4f}")

svm_best = svm_grid.best_estimator_  # best SVM model

# Evaluate best SVM model
results_svm = evaluate_classifier(
    model=svm_best,
    model_name=f"SVM (best grid: {svm_grid.best_params_})",
    X_train=X_train,
    y_train=y_train,
    X_val=X_val,
    y_val=y_val,
    X_test=X_test,
    y_test=y_test,
    class_names=class_names,
)

In [None]:
# ------------------------------------------------------------
# [Cell 5] Decision Tree (single tree baseline)
#   - Uses scaled features (X_train), but tree-based models do not strictly require scaling
#   - Good for interpretability and feature importance (later)
# ------------------------------------------------------------

dt_clf = DecisionTreeClassifier(
    criterion="entropy",       # splitting criterion ('gini' or 'entropy')
    max_depth=None,         # allow the tree to grow until pure or min_samples constraints
    min_samples_split=2,    # minimum samples required to split an internal node
    min_samples_leaf=1,     # minimum samples required at a leaf node
    random_state=RANDOM_STATE,
)

results_dt = evaluate_classifier(
    model=dt_clf,
    model_name="Decision Tree",
    X_train=X_train,
    y_train=y_train,
    X_val=X_val,
    y_val=y_val,
    X_test=X_test,
    y_test=y_test,
    class_names=class_names,
)

In [None]:
# ------------------------------------------------------------
# [Cell 6] Random Forest
#   - Ensemble of decision trees
#   - Usually more stable and accurate than a single tree
# ------------------------------------------------------------

rf_clf = RandomForestClassifier(
    n_estimators=200,     # number of trees in the forest (can increase if time allows)
    criterion="entropy",     # splitting criterion
    max_depth=None,       # allow deep trees; can limit for speed
    min_samples_split=2,  # minimum samples to split
    min_samples_leaf=1,   # minimum samples at leaf
    n_jobs=-1,            # use all CPU cores
    random_state=RANDOM_STATE,
)

results_rf = evaluate_classifier(
    model=rf_clf,
    model_name="Random Forest (200 trees)",
    X_train=X_train,
    y_train=y_train,
    X_val=X_val,
    y_val=y_val,
    X_test=X_test,
    y_test=y_test,
    class_names=class_names,
)

In [None]:
# ------------------------------------------------------------
# [Cell 7] XGBoost (gradient boosting trees)
#   - Uses xgboost.XGBClassifier
#   - Supports early stopping with validation set
# ------------------------------------------------------------

# Number of classes for the current task
num_classes = len(class_names)  # number of output classes

xgb_clf = xgb.XGBClassifier(
    objective="multi:softprob",  # multi-class probability output
    num_class=num_classes,       # number of classes
    n_estimators=300,            # maximum number of boosting rounds
    learning_rate=0.1,           # step size shrinkage
    max_depth=6,                 # depth of individual trees
    subsample=0.8,               # row sampling
    colsample_bytree=0.8,        # column sampling
    tree_method="hist",          # fast histogram-based method
    eval_metric="mlogloss",      # evaluation metric
    random_state=RANDOM_STATE,
    n_jobs=-1,                   # parallel threads
)

print("[XGB] Starting training with early stopping...")

# XGBoost can use early_stopping_rounds with eval_set
xgb_clf.fit(
    X_train,
    y_train,
    eval_set=[(X_val, y_val)],  # use validation set for early stopping
    verbose=True,               # print evaluation metric for each round
    early_stopping_rounds=20,   # stop if no improvement for 20 rounds
)

print("[XGB] Best iteration:", xgb_clf.best_iteration)

# Evaluate using common function
results_xgb = evaluate_classifier(
    model=xgb_clf,
    model_name="XGBoost (early stopping)",
    X_train=X_train,   # note: model is already fitted; this call will refit once more
    y_train=y_train,
    X_val=X_val,
    y_val=y_val,
    X_test=X_test,
    y_test=y_test,
    class_names=class_names,
)

In [None]:
# ------------------------------------------------------------
# [Cell 8] Optional: summarize validation & test metrics
#   - Only works for models that have been executed above
# ------------------------------------------------------------

summary = []

if "results_knn" in globals():
    summary.append(["KNN", results_knn["acc_val"], results_knn["f1_val"],
                    results_knn["acc_test"], results_knn["f1_test"]])

if "results_svm" in globals():
    summary.append(["SVM", results_svm["acc_val"], results_svm["f1_val"],
                    results_svm["acc_test"], results_svm["f1_test"]])

if "results_dt" in globals():
    summary.append(["Decision Tree", results_dt["acc_val"], results_dt["f1_val"],
                    results_dt["acc_test"], results_dt["f1_test"]])

if "results_rf" in globals():
    summary.append(["Random Forest", results_rf["acc_val"], results_rf["f1_val"],
                    results_rf["acc_test"], results_rf["f1_test"]])

if "results_xgb" in globals():
    summary.append(["XGBoost", results_xgb["acc_val"], results_xgb["f1_val"],
                    results_xgb["acc_test"], results_xgb["f1_test"]])

if len(summary) > 0:
    header = ["Model", "Val Acc", "Val F1", "Test Acc", "Test F1"]
    print("\n=== Summary (Val / Test) ===")
    row_fmt = "{:<15} {:>7.4f} {:>7.4f} {:>9.4f} {:>9.4f}"
    print("{:<15} {:>7} {:>7} {:>9} {:>9}".format(*header))
    for row in summary:
        print(row_fmt.format(row[0], row[1], row[2], row[3], row[4]))
else:
    print("[INFO] No models have been run yet. Execute KNN/SVM/DT/RF/XGB cells first.")