In [None]:
#%pip install opencv-python pillow numpy pandas scikit-learn tensorflow xgboost matplotlib joblib albumentations

Collecting opencv-python
  Using cached opencv_python-4.12.0.88-cp37-abi3-win_amd64.whl.metadata (19 kB)
Collecting pillow
  Using cached pillow-11.3.0-cp311-cp311-win_amd64.whl.metadata (9.2 kB)
Collecting numpy
  Downloading numpy-2.3.3-cp311-cp311-win_amd64.whl.metadata (60 kB)
Collecting pandas
  Using cached pandas-2.3.2-cp311-cp311-win_amd64.whl.metadata (19 kB)
Collecting scikit-learn
  Downloading scikit_learn-1.7.2-cp311-cp311-win_amd64.whl.metadata (11 kB)
Collecting tensorflow
  Using cached tensorflow-2.20.0-cp311-cp311-win_amd64.whl.metadata (4.6 kB)
Collecting xgboost
  Downloading xgboost-3.0.5-py3-none-win_amd64.whl.metadata (2.1 kB)
Collecting matplotlib
  Using cached matplotlib-3.10.6-cp311-cp311-win_amd64.whl.metadata (11 kB)
Collecting joblib
  Using cached joblib-1.5.2-py3-none-any.whl.metadata (5.6 kB)
Collecting albumentations
  Using cached albumentations-2.0.8-py3-none-any.whl.metadata (43 kB)
Collecting numpy
  Using cached numpy-2.2.6-cp311-cp311-win_amd64.w

In [None]:
"""
train_pipeline_eye.py
ปรับปรุง:
- normalize target -> train stable
- MobileNetV2 with preprocess_input (input layer)
- train head (frozen backbone) -> partial fine-tune (unfreeze top N)
- extract features from same backbone for RF/XGB
- evaluate MAE in pixels and save best models
"""
import os, glob, random, math
import cv2
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
import xgboost as xgb
from sklearn.metrics import mean_absolute_error
from sklearn.preprocessing import StandardScaler
import joblib
import matplotlib.pyplot as plt
import albumentations as A
import tensorflow as tf
from tensorflow.keras import layers, models, callbacks, applications

# -------------------------
# Config
# -------------------------
IMAGE_DIR = "./data/Right/Image224x224"      # ปรับตาม dataset ของคุณ
ANNO_DIR = "./data/Right/Annotation224x224"  # ปรับตาม dataset ของคุณ
IMG_SIZE = (144, 144)    # final training size (width, height)
RANDOM_SEED = 42
TEST_SIZE = 0.10
VAL_SIZE = 0.10
BATCH_SIZE = 32
EPOCHS_HEAD = 30     # train head first
EPOCHS_FINETUNE = 20 # partial fine-tune
AUG_TIMES = 2        # augmentation copies per image in train
UNFREEZE_TOP_N = 20  # unfreeze last N layers of backbone for fine-tuning (tune this)

np.random.seed(RANDOM_SEED)
random.seed(RANDOM_SEED)
tf.random.set_seed(RANDOM_SEED)

# -------------------------
# Helpers
# -------------------------
def read_txt_safe(txt_path, orig_w=None, orig_h=None):
    """อ่านไฟล์ txt (robust: remove null bytes). คืนค่า (cx_pixel, cy_pixel) หรือ None."""
    try:
        with open(txt_path, "rb") as f:
            raw = f.read().replace(b"\x00", b"")
        s = raw.decode("utf-8", errors="ignore").strip()
        if len(s) == 0:
            return None
        parts = s.split()
        nums = []
        for p in parts:
            try:
                nums.append(float(p))
            except:
                pass
        if len(nums) == 0:
            return None
        # detect normalized (0..1) for first two values if orig dims provided
        if orig_w and orig_h and all(0.0 <= v <= 1.0 for v in nums[:2]):
            cx = nums[0] * orig_w
            cy = nums[1] * orig_h
            return cx, cy
        # otherwise assume pixel coords
        if len(nums) >= 2:
            return float(nums[0]), float(nums[1])
    except Exception as e:
        print("read_txt_safe error:", txt_path, e)
    return None

def load_dataset(image_dir, anno_dir):
    img_paths = sorted([p for p in glob.glob(os.path.join(image_dir, "*.*"))])
    imgs = []
    coords = []
    used_files = []
    for p in img_paths:
        base = os.path.basename(p)
        name, _ = os.path.splitext(base)
        txt_p = os.path.join(anno_dir, name + ".txt")
        if not os.path.exists(txt_p):
            continue
        img_orig = cv2.imread(p)
        if img_orig is None:
            continue
        orig_h, orig_w = img_orig.shape[:2]
        xy = read_txt_safe(txt_p, orig_w=orig_w, orig_h=orig_h)
        if xy is None:
            continue
        cx, cy = xy
        img_resized = cv2.resize(img_orig, IMG_SIZE)
        img_rgb = cv2.cvtColor(img_resized, cv2.COLOR_BGR2RGB)
        # scale coords to resized image pixels
        new_cx = cx * (IMG_SIZE[0] / orig_w)
        new_cy = cy * (IMG_SIZE[1] / orig_h)
        imgs.append(img_rgb)
        coords.append([new_cx, new_cy])
        used_files.append(base)
    X = np.array(imgs, dtype=np.uint8)
    y = np.array(coords, dtype=np.float32)
    print(f"Loaded {len(X)} images -> X {X.shape}, y {y.shape}")
    return X, y, used_files

# Albumentations augmentations (keypoint-aware)
def get_augmentations():
    return A.Compose([
        A.HorizontalFlip(p=0.5),
        A.ShiftScaleRotate(shift_limit=0.06, scale_limit=0.08, rotate_limit=8, p=0.7, border_mode=cv2.BORDER_REFLECT),
        A.RandomBrightnessContrast(p=0.4),
        A.GaussNoise(var_limit=(5.0, 20.0), p=0.25),
        A.OneOf([A.MotionBlur(blur_limit=3), A.MedianBlur(blur_limit=3), A.Blur(blur_limit=3)], p=0.2),
    ], keypoint_params=A.KeypointParams(format='xy', remove_invisible=False))

def augment_dataset(X, y, times=1):
    aug = get_augmentations()
    X_aug = []
    y_aug = []
    for i in range(len(X)):
        for t in range(times):
            transformed = aug(image=X[i], keypoints=[tuple(y[i])])
            img_t = transformed['image']
            kps = transformed['keypoints']
            if len(kps) == 0:
                continue
            kp = kps[0]
            cx = min(max(kp[0], 0), IMG_SIZE[0]-1)
            cy = min(max(kp[1], 0), IMG_SIZE[1]-1)
            X_aug.append(img_t)
            y_aug.append([cx, cy])
    if len(X_aug) == 0:
        return X, y
    X_all = np.concatenate([X, np.array(X_aug, dtype=np.uint8)], axis=0)
    y_all = np.concatenate([y, np.array(y_aug, dtype=np.float32)], axis=0)
    print(f"Augmented: added {len(X_aug)} samples -> total {len(X_all)}")
    return X_all, y_all

# Build CNN regressor with MobileNetV2 backbone and preprocessing
def build_cnn_regressor(input_shape=(IMG_SIZE[1], IMG_SIZE[0], 3)):
    inp = layers.Input(shape=input_shape)
    # MobileNetV2 preprocess_input (scales to [-1,1])
    x = layers.Lambda(applications.mobilenet_v2.preprocess_input, name="preprocess")(inp)
    base = applications.MobileNetV2(include_top=False, input_tensor=x, weights='imagenet')
    # head
    h = layers.GlobalAveragePooling2D()(base.output)
    h = layers.Dense(512, activation='relu')(h)
    h = layers.Dropout(0.3)(h)
    h = layers.Dense(128, activation='relu')(h)
    out = layers.Dense(2, activation='linear', name='out_xy')(h)  # predict normalized x,y (we'll train on 0..1)
    model = models.Model(inputs=inp, outputs=out)
    # freeze backbone
    for layer in base.layers:
        layer.trainable = False
    model.compile(optimizer=tf.keras.optimizers.Adam(1e-3),
                  loss=tf.keras.losses.Huber(),
                  metrics=[tf.keras.metrics.MeanAbsoluteError()])
    return model, base

def extract_features_from_backbone(model, backbone, X_array, batch_size=32):
    """
    model: full CNN model (input -> preprocess -> backbone -> head)
    backbone: MobileNetV2 base model (with input tensor connected to preprocess layer)
    We'll create feature model that maps original input -> GlobalAveragePooling(backbone.output)
    """
    feat_model = models.Model(inputs=model.input, outputs=layers.GlobalAveragePooling2D()(backbone.output))
    feats = feat_model.predict(X_array, batch_size=batch_size, verbose=1)
    return feats

# -------------------------
# Main pipeline
# -------------------------
def main():
    # Load
    X, y, files = load_dataset(IMAGE_DIR, ANNO_DIR)
    if len(X) == 0:
        print("No data loaded. Check paths.")
        return

    # Split (train/val/test)
    X_temp, X_test, y_temp, y_test = train_test_split(X, y, test_size=TEST_SIZE, random_state=RANDOM_SEED)
    X_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=VAL_SIZE/(1-TEST_SIZE), random_state=RANDOM_SEED)
    print("Splits -> train:", X_train.shape[0], "val:", X_val.shape[0], "test:", X_test.shape[0])

    # Augment training set (keypoint-aware)
    X_train_aug, y_train_aug = augment_dataset(X_train, y_train, times=AUG_TIMES)

    # Normalize images to float32 (0..255 kept, but model has preprocess layer)
    # For consistency we use float32 with pixel values 0..255 then let preprocess layer handle normalization.
    X_train_norm = X_train_aug.astype(np.float32)
    X_val_norm = X_val.astype(np.float32)
    X_test_norm = X_test.astype(np.float32)

    # Normalize targets to 0..1 (divide by width)
    # assuming IMG_SIZE[0] == IMG_SIZE[1]
    img_wh = float(IMG_SIZE[0])
    y_train_aug_norm = (y_train_aug.astype(np.float32) / img_wh)
    y_val_norm = (y_val.astype(np.float32) / img_wh)
    y_test_norm = (y_test.astype(np.float32) / img_wh)

    # Build CNN
    cnn, backbone = build_cnn_regressor(input_shape=(IMG_SIZE[1], IMG_SIZE[0], 3))
    cnn.summary()

    # Callbacks
    es = callbacks.EarlyStopping(monitor="val_mean_absolute_error", patience=8, restore_best_weights=True, mode='min', verbose=1)
    mc = callbacks.ModelCheckpoint("best_cnn.keras", monitor="val_mean_absolute_error", save_best_only=True, mode='min', verbose=1)
    rl = callbacks.ReduceLROnPlateau(monitor='val_mean_absolute_error', factor=0.5, patience=4, min_lr=1e-6, verbose=1)

    # Train head (backbone frozen)
    print("=== Train head (backbone frozen) ===")
    history_head = cnn.fit(
        X_train_norm, y_train_aug_norm,
        validation_data=(X_val_norm, y_val_norm),
        epochs=EPOCHS_HEAD,
        batch_size=BATCH_SIZE,
        callbacks=[es, mc, rl],
        verbose=2
    )

    # Partial fine-tune: unfreeze top N layers of backbone only
    print(f"=== Partial fine-tune: unfreeze last {UNFREEZE_TOP_N} layers of backbone ===")
    for layer in backbone.layers[-UNFREEZE_TOP_N:]:
        layer.trainable = True
    # lower lr for fine-tune
    cnn.compile(optimizer=tf.keras.optimizers.Adam(1e-5),
                loss=tf.keras.losses.Huber(),
                metrics=[tf.keras.metrics.MeanAbsoluteError()])

    history_ft = cnn.fit(
        X_train_norm, y_train_aug_norm,
        validation_data=(X_val_norm, y_val_norm),
        epochs=EPOCHS_FINETUNE,
        batch_size=BATCH_SIZE,
        callbacks=[es, mc, rl],
        verbose=2
    )

    # Load best saved CNN (by val MAE)
    try:
        best_cnn = models.load_model("best_cnn.keras", compile=False)
        # compile to evaluate
        best_cnn.compile(optimizer=tf.keras.optimizers.Adam(1e-5), loss=tf.keras.losses.Huber(), metrics=[tf.keras.metrics.MeanAbsoluteError()])
        print("Loaded best_cnn.keras")
    except Exception as e:
        print("Could not load best_cnn.keras, using current model. Error:", e)
        best_cnn = cnn

    # Predict with CNN on test (convert back to pixels)
    preds_cnn_norm = best_cnn.predict(X_test_norm, batch_size=BATCH_SIZE, verbose=1)  # normalized
    preds_cnn_pixels = preds_cnn_norm * img_wh
    mae_cnn = mean_absolute_error(y_test, preds_cnn_pixels)
    print("CNN MAE (pixels) on test:", mae_cnn)

    # Save final CNN model (native Keras format)
    best_cnn.save("final_cnn.keras")

    # -------------------------
    # Classical models: extract features (from backbone) and train RF/XGB
    # -------------------------
    print("=== Extract features for RandomForest/XGBoost ===")
    feats_train = extract_features_from_backbone(cnn, backbone, X_train_norm, batch_size=BATCH_SIZE)
    feats_val = extract_features_from_backbone(cnn, backbone, X_val_norm, batch_size=BATCH_SIZE)
    feats_test = extract_features_from_backbone(cnn, backbone, X_test_norm, batch_size=BATCH_SIZE)

    # Scale features
    scaler = StandardScaler()
    feats_train_s = scaler.fit_transform(feats_train)
    feats_val_s = scaler.transform(feats_val)
    feats_test_s = scaler.transform(feats_test)
    joblib.dump(scaler, "feature_scaler.joblib")

    # Note: For RF/XGB we train on pixel targets (so MAE measured in pixels)
    # Use the augmented train set features and corresponding pixel targets
    y_train_pixels_aug = (y_train_aug.copy()).astype(np.float32)  # already in pixels
    y_val_pixels = y_val.copy().astype(np.float32)
    y_test_pixels = y_test.copy().astype(np.float32)

    # RandomForest (separate regressors x and y)
    print("Training RandomForest ...")
    rf_x = RandomForestRegressor(n_estimators=300, random_state=RANDOM_SEED, n_jobs=-1)
    rf_y = RandomForestRegressor(n_estimators=300, random_state=RANDOM_SEED, n_jobs=-1)
    rf_x.fit(feats_train_s, y_train_pixels_aug[:, 0])
    rf_y.fit(feats_train_s, y_train_pixels_aug[:, 1])
    pred_x = rf_x.predict(feats_test_s)
    pred_y = rf_y.predict(feats_test_s)
    preds_rf = np.vstack([pred_x, pred_y]).T
    mae_rf = mean_absolute_error(y_test_pixels, preds_rf)
    print("RandomForest MAE on test:", mae_rf)
    joblib.dump(rf_x, "rf_x.joblib")
    joblib.dump(rf_y, "rf_y.joblib")

    # XGBoost (separate regressors)
    print("Training XGBoost ...")
    xgb_x = xgb.XGBRegressor(n_estimators=500, learning_rate=0.05, random_state=RANDOM_SEED, verbosity=0)
    xgb_y = xgb.XGBRegressor(n_estimators=500, learning_rate=0.05, random_state=RANDOM_SEED, verbosity=0)
    xgb_x.fit(feats_train_s, y_train_pixels_aug[:,0], eval_set=[(feats_val_s, y_val_pixels[:,0])], early_stopping_rounds=20, verbose=False)
    xgb_y.fit(feats_train_s, y_train_pixels_aug[:,1], eval_set=[(feats_val_s, y_val_pixels[:,1])], early_stopping_rounds=20, verbose=False)
    pred_x = xgb_x.predict(feats_test_s)
    pred_y = xgb_y.predict(feats_test_s)
    preds_xgb = np.vstack([pred_x, pred_y]).T
    mae_xgb = mean_absolute_error(y_test_pixels, preds_xgb)
    print("XGBoost MAE on test:", mae_xgb)
    joblib.dump(xgb_x, "xgb_x.joblib")
    joblib.dump(xgb_y, "xgb_y.joblib")

    # -------------------------
    # Ensemble (simple average)
    # -------------------------
    # Ensure all predictions are in pixel space
    # preds_cnn_pixels (N,2), preds_rf (N,2), preds_xgb (N,2)
    ensemble_preds = (preds_cnn_pixels + preds_rf + preds_xgb) / 3.0
    mae_ens = mean_absolute_error(y_test_pixels, ensemble_preds)
    print("Ensemble (CNN+RF+XGB) MAE (pixels):", mae_ens)

    # Save results
    results = {
        'cnn': float(mae_cnn),
        'random_forest': float(mae_rf),
        'xgboost': float(mae_xgb),
        'ensemble': float(mae_ens)
    }
    pd.Series(results).to_csv("results_mae_summary.csv")
    print("Results saved to results_mae_summary.csv", results)

    # Save predictions and ground truth for later analysis
    np.save("y_test_pixels.npy", y_test_pixels)
    np.save("preds_cnn_pixels.npy", preds_cnn_pixels)
    np.save("preds_rf.npy", preds_rf)
    np.save("preds_xgb.npy", preds_xgb)
    np.save("preds_ensemble.npy", ensemble_preds)

    # Save models already done (final_cnn.keras, rf_x.joblib, rf_y.joblib, xgb_x.joblib, xgb_y.joblib)
    # Plot training histories
    try:
        plt.figure(figsize=(6,4))
        if 'history_head' in locals():
            hist = history_head.history
            plt.plot(hist.get('mean_absolute_error', []), label='train_mae_head')
            plt.plot(hist.get('val_mean_absolute_error', []), label='val_mae_head')
        if 'history_ft' in locals():
            hist2 = history_ft.history
            plt.plot(hist2.get('mean_absolute_error', []), label='train_mae_ft')
            plt.plot(hist2.get('val_mean_absolute_error', []), label='val_mae_ft')
        plt.legend(); plt.title("CNN MAE history"); plt.xlabel("epoch"); plt.ylabel("MAE")
        plt.savefig("cnn_mae_history.png")
        plt.close()
    except Exception as e:
        print("Plotting history error:", e)

    print("Done. Best MAEs:", results)

if __name__ == "__main__":
    main()


Loaded 5554 images -> X (5554, 144, 144, 3), y (5554, 2)
Splits -> train: 4442 val: 556 test: 556


  original_init(self, **validated_kwargs)
  A.GaussNoise(var_limit=(5.0, 20.0), p=0.25),
