In [1]:
import torch
print(torch.__version__)

2.0.1+cu117


In [2]:
# %load eval.py
from argparse import ArgumentParser

from pytorch_lightning import seed_everything, Trainer

from dataset.mpii_face_gaze_dataset import get_dataloaders
from train import Model

import numpy as np
import torch
from utils import calc_angle_error,compute_bias_eq3,calc_angle_error_for_eval
from sklearn.linear_model import LinearRegression

import random

INFO:albumentations.check_version:A new version of Albumentations is available: 2.0.7 (you have 1.4.11). Upgrade using: pip install --upgrade albumentations


In [3]:
def select_MGTC_calibration_samples(test_set, s=9, grid_bins=4):
    """
    ÂæûÊï¥ÂÄãÊ∏¨Ë©¶ÈõÜ‰∏≠ÈÅ∏Êìá‰æÜËá™‰∏çÂêå gaze ÊñπÂêëÁöÑ S ÂºµÊ®£Êú¨ÔºåÁî®Êñº MGTC Ê®°Êì¨„ÄÇ
    
    :param test_set: Ê∏¨Ë©¶Ë≥áÊñôÈõÜÔºàlist of dictÔºâ
    :param s: ÈúÄË¶ÅÊäΩÂèñÁöÑÊ®£Êú¨Êï∏
    :param grid_bins: pitch/yaw Á©∫ÈñìÂàÜÂâ≤Ê†ºÊï∏ÔºàÈ†êË®≠ 8x8Ôºâ
    :return: List of dictÔºàË¢´ÈÅ∏Âá∫ÁöÑ sampleÔºâ
    """

    # ÊØèÂÄã sample ÁöÑ pitch/yaw ËΩâÊàêÊ†ºÂ≠ê key
    bins = {}
    for idx, item in enumerate(test_set):
        pitch = float(item['gaze_pitch'])
        yaw = float(item['gaze_yaw'])

        pitch_bin = int((pitch + np.pi / 2) / (np.pi / grid_bins))
        yaw_bin = int((yaw + np.pi / 2) / (np.pi / grid_bins))
        key = (pitch_bin, yaw_bin)

        if key not in bins:
            bins[key] = []
        bins[key].append(item)

    # Èö®Ê©üÂæû‰∏çÂêåÊ†ºÂ≠êÊäΩÊ®£ÔºåÁõ°ÈáèÊåëÂá∫ s ÂºµÂúñ
    keys = list(bins.keys())
    random.shuffle(keys)

    selected_samples = []
    for key in keys:
        candidates = bins[key]
        if candidates:
            chosen = random.choice(candidates)
            selected_samples.append(chosen)
        if len(selected_samples) >= s:
            break

    if len(selected_samples) < s:
        print(f"‚ö†Ô∏è MGTC ÊäΩÊ®£Â§±ÊïóÔºåÂè™ÂèñÂæó {len(selected_samples)} ÂºµÊ®£Êú¨")

    return selected_samples


In [4]:
def evaluate_with_eq3_repeat(model, test_set, s=9, repeat_times=10, target_pitch=0.0, target_yaw=0.0, device='cuda', use_mgtc=False):
    """
    ÈáçË§áÂ§öÊ¨°Èö®Ê©üÊåëÈÅ∏ S ÂºµÊ†°Ê≠£ÂúñÔºåÊ®°Êì¨‰ΩúËÄÖË´ñÊñá‰∏≠ÁöÑ ¬± Ë™§Â∑ÆÁµ±Ë®à

    :param model: Â∑≤Ë®ìÁ∑¥Â•ΩÁöÑÊ®°Âûã
    :param test_set: Ê∏¨Ë©¶Ë≥áÊñôÈõÜÔºàlist of dictÔºâ
    :param s: ÊØèÊ¨°Ê†°Ê≠£ÂúñÊï∏Èáè S
    :param repeat_times: ÈáçË§áÂπæÊ¨°Ê†°Ê≠£Ë©¶È©ó
    :param target_pitch: Ê†°Ê≠£Ê≥®Ë¶ñÈªû pitchÔºàÈ†êË®≠ 0.0Ôºâ
    :param target_yaw: Ê†°Ê≠£Ê≥®Ë¶ñÈªû yawÔºàÈ†êË®≠ 0.0Ôºâ
    :param device: È†êË®≠ 'cuda'
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    print(f"üîç Ê†°Ê≠£Ê®°ÂºèÔºö{'MGTCÔºàÂ§öÊñπÂêëÔºâ' if use_mgtc else 'SGTCÔºàÂñÆÊñπÂêëÔºâ'}")

    all_mean_errors = []

    for trial in range(repeat_times):
        # Step 1: ÊâæÂá∫Á¨¶ÂêàÁõÆÊ®ôÊ≥®Ë¶ñÈªûÁöÑÊ®£Êú¨(STGC/MTGC)

        if use_mgtc:
            calibration_samples = select_MGTC_calibration_samples(test_set, s=s)
            if len(calibration_samples) < s:
                print(f"‚ö†Ô∏è Á¨¨ {trial+1} Ê¨° MGTC ÂÉÖÂèñÂæó {len(calibration_samples)} ÂºµÔºåÂ∞áÈö®Ê©üË£úÊªø")
                # ÈÄôË£°ÊØîÂ∞ç file_name ÈÅøÂÖç tensor ÁÑ°Ê≥ïÊØîËºÉÁöÑÈåØË™§
                calibration_names = set(item['file_name'] for item in calibration_samples)
                remaining = [item for item in test_set if item['file_name'] not in calibration_names]
                extra = random.sample(remaining, s - len(calibration_samples))
                calibration_samples += extra
        else:
            calibration_samples = [
                item for item in test_set
                if abs(item['gaze_pitch'] - target_pitch) < 0.1 and abs(item['gaze_yaw'] - target_yaw) < 0.1
            ]
            if len(calibration_samples) < s:
                print(f"‚ùå Á¨¨ {trial+1} Ê¨°Ê†°Ê≠£Â§±ÊïóÔºåÁ¨¶ÂêàÊ¢ù‰ª∂ÁöÑÂúñÂÉè‰∏çË∂≥ {s} Âºµ")
                continue

        selected = random.sample(calibration_samples, s)
        cal_names = set(item['file_name'] for item in selected)

        # Step 2: Ë®àÁÆóÂÅèÁßªÈáè
        b_hat = compute_bias_eq3(model, selected, device)

        # Step 3: Ê∏¨Ë©¶ÂÖ∂È§òÂúñÁâáÁöÑ angular error
        preds, labels = [], []

        model.eval()
        with torch.no_grad():
            for item in test_set:
                if item['file_name'] in cal_names:
                    continue

                full_face = item['full_face_image'].unsqueeze(0).to(device)
                right_eye = item['right_eye_image'].unsqueeze(0).to(device)
                left_eye = item['left_eye_image'].unsqueeze(0).to(device)

                t_hat = model.get_subject_independent_output(full_face, right_eye, left_eye)
                g_hat = t_hat + b_hat.to(t_hat.device)

                preds.append(g_hat.squeeze(0).cpu())
                labels.append(torch.tensor([
                    float(item['gaze_pitch']),
                    float(item['gaze_yaw'])
                ], dtype=torch.float32))

        # Step 4: Ë®àÁÆó‰∏ÄÊ¨° mean angular error
        preds = torch.stack(preds)
        labels = torch.stack(labels)
        error = calc_angle_error_for_eval(labels, preds).mean().item()
        all_mean_errors.append(error)

    # Step 5: ÊúÄÁµÇ ¬± Áµ±Ë®à
    if len(all_mean_errors) == 0:
        print("‚ö†Ô∏è ÊâÄÊúâË©¶È©óÈÉΩÂ§±ÊïóÔºåÁÑ°Ê≥ïÁî¢Áîü ¬± Áµ±Ë®à")
        return None

    mean_err = np.mean(all_mean_errors)
    std_err = np.std(all_mean_errors)
    print(f"üìê Angular Error (S={s}, N={repeat_times}): {mean_err:.2f} ¬± {std_err:.2f} degrees")

    return mean_err, std_err


In [5]:
# ‚úÖ Êñ∞Â¢ûÈÄôÂÄãËºîÂä©ÂáΩÊï∏
def calculate_error_distribution(error_list):
    """
    Ëº∏ÂÖ•ÊØèÂºµÂúñÁöÑ angular error listÔºåËº∏Âá∫ mean ¬± std ‰∏¶Âç∞Âá∫
    """
    mean_angle = np.mean(error_list)
    std_angle = np.std(error_list)
    print(f"üìê Angular Error: {mean_angle:.2f} ¬± {std_angle:.2f} degrees")
    return mean_angle, std_angle
# ‚úÖ ‰∏ªÂáΩÊï∏
def evaluate_with_eq3(model, test_set, device='cuda', s=9, target_pitch=0.0, target_yaw=0.0):
    """
    Áî® Eq.3 ÈÄ≤Ë°åÊ†°Ê≠£ÂæåÊ∏¨Ë©¶ÔºåÊ®°Êì¨ SGTC (Single Gaze Target Calibration)
    """
    # Step 1: ÈÅ∏ÊìáÊ†°Ê≠£ÂúñÂÉè
    calibration_samples = [
        item for item in test_set
        if abs(item['gaze_pitch'] - target_pitch) < 0.1 and abs(item['gaze_yaw'] - target_yaw) < 0.1
    ][:s]

    if len(calibration_samples) < s:
        print(f"‚ö†Ô∏è Ê†°Ê≠£ÂúñÊï∏‰∏çË∂≥ {s} ÂºµÔºåÂè™ÊâæÂà∞ {len(calibration_samples)} Âºµ")
        return None

    model = model.to(device)
    calibration_filenames = set(item['file_name'] for item in calibration_samples)

    # Step 2: Ë®àÁÆó Eq.3 ÂÅèÁßª bÃÇ
    b_hat = compute_bias_eq3(model, calibration_samples, device)

    # Step 3: ‰ΩøÁî®ÂÅèÁßªÈÄ≤Ë°åÈ†êÊ∏¨
    preds = []
    labels = []

    ### ‚úÖ ‰øÆÊîπÈªû STARTÔºöË®òÈåÑÊØèÂºµ angular error
    angle_errors = []
    ### ‚úÖ ‰øÆÊîπÈªû END

    model.eval()
    with torch.no_grad():
        for item in test_set:
            if item['file_name'] in calibration_filenames:
                continue

            full_face = item['full_face_image'].unsqueeze(0).to(device)
            right_eye = item['right_eye_image'].unsqueeze(0).to(device)
            left_eye = item['left_eye_image'].unsqueeze(0).to(device)

            t_hat = model.get_subject_independent_output(full_face, right_eye, left_eye)
            g_hat = t_hat + b_hat.to(t_hat.device)

            preds.append(g_hat.squeeze(0).cpu())
            label_tensor = torch.tensor([
                float(item['gaze_pitch']),
                float(item['gaze_yaw'])
            ], device=device, dtype=torch.float32)
            labels.append(label_tensor)

            ### ‚úÖ ‰øÆÊîπÈªû STARTÔºöÂñÆÂºµ angular error Ë®òÈåÑ
            angle = calc_angle_error(label_tensor.unsqueeze(0), g_hat.squeeze(0).unsqueeze(0))
            angle_errors.append(angle.item())
            ### ‚úÖ ‰øÆÊîπÈªû END

    # Step 4: Ëº∏Âá∫ ¬± Áµ±Ë®à
    calculate_error_distribution(angle_errors)

    return np.mean(angle_errors)

In [6]:
def simulate_calibration_lr(model, dataset, S=1, device='cuda'):
    """
    ‰ΩøÁî® Linear Regression ÂÅöË¶ñÁ∑öÂÅèÁßªÊ†°Ê≠£
    """
    model = model.to(device)
    model.eval()

    with torch.no_grad():
        all_data = []
        for i in range(len(dataset)):
            item = dataset[i]
            data = {
                'left_eye': item['left_eye_image'].unsqueeze(0).to(device),
                'right_eye': item['right_eye_image'].unsqueeze(0).to(device),
                'face': item['full_face_image'].unsqueeze(0).to(device),
                'label': torch.tensor(
                    [float(item['gaze_pitch']), float(item['gaze_yaw'])],
                    dtype=torch.float32
                ).to(device),
                'person_idx': torch.tensor([[item['person_idx']]]).to(device)
            }
            all_data.append(data)

        # ÈÅ∏ S ÂºµÂúñ‰ΩúÁÇ∫Ê†°Ê≠£Ë≥áÊñô
        indices = np.random.choice(len(all_data), size=S, replace=False)
        calibration_set = [all_data[i] for i in indices]
        test_set = [all_data[i] for i in range(len(all_data)) if i not in indices]

        # Êì∑Âèñ calibration Ë≥áÊñôÁöÑ pred & label
        preds, labels = [], []
        for d in calibration_set:
            pred = model(
                d['person_idx'], d['face'], d['right_eye'], d['left_eye']
            ).squeeze(0).cpu().numpy()
            label = d['label'].cpu().numpy()
            preds.append(pred)
            labels.append(label)

        # Âü∑Ë°å Linear Regression Êì¨Âêà (pred -> label)
        reg = LinearRegression().fit(preds, labels)

        # Ê∏¨Ë©¶ÊôÇÂ•óÁî® regression Ê†°Ê≠£
        angle_errors = []
        for d in test_set:
            pred = model(
                d['person_idx'], d['face'], d['right_eye'], d['left_eye']
            ).squeeze(0).cpu().numpy()
            corrected = reg.predict([pred])[0]
            angle = calc_angle_error(
                torch.tensor(corrected).unsqueeze(0).to(device),
                d['label'].unsqueeze(0)
            )
            angle_errors.append(angle.item())

        mean_err = np.mean(angle_errors)
        print(f"üìê Linear Calibration (S={S}): {mean_err:.2f} degrees")
        return mean_err


In [7]:
def simulate_calibration(model, dataset, S=1, device='cuda'):
    """
    Ê®°Êì¨ S ÂºµÂúñÁï∂Ê†°Ê≠£Ë≥áÊñôÔºåÂ∞çÂâ©‰∏ãË≥áÊñôÊ∏¨Ë©¶„ÄÇ
    Ê®°Âûã forward Êé•Êî∂Ê†ºÂºèÔºöFinalModel(person_idx, full_face, right_eye, left_eye)
    """
    model = model.to(device)
    model.eval()
    with torch.no_grad():
        all_data = []
        for i in range(len(dataset)):
            item = dataset[i]
            data = {
                'left_eye': item['left_eye_image'].unsqueeze(0).to(device),
                'right_eye': item['right_eye_image'].unsqueeze(0).to(device),
                'face': item['full_face_image'].unsqueeze(0).to(device),
                'label': torch.tensor(
                    [float(item['gaze_pitch']), float(item['gaze_yaw'])],
                    dtype=torch.float32
                ).to(device),
                'person_idx': torch.tensor([[item['person_idx']]]).to(device)  # shape [1, 1]
            }
            all_data.append(data)

        # Step 1ÔºöÈÅ∏ S ÂºµÂúñÁï∂Ê†°Ê≠£Ë≥áÊñô
        indices = np.random.choice(len(all_data), size=S, replace=False)
        calibration_set = [all_data[i] for i in indices]
        test_set = [all_data[i] for i in range(len(all_data)) if i not in indices]

        # Step 2ÔºöË®àÁÆó bias
        cal_outputs, cal_labels = [], []
        for d in calibration_set:
            pred = model(
                d['person_idx'],
                d['face'],
                d['right_eye'],
                d['left_eye']
            ).squeeze(0)
            cal_outputs.append(pred)
            cal_labels.append(d['label'])

        bias = torch.stack(cal_labels).mean(dim=0) - torch.stack(cal_outputs).mean(dim=0)

        # Step 3ÔºöÂ•óÁî® bias ÂÅöÊ∏¨Ë©¶
        angle_errors = []
        for d in test_set:
            pred = model(
                d['person_idx'],
                d['face'],
                d['right_eye'],
                d['left_eye']
            ).squeeze(0) + bias

            angle = calc_angle_error(pred.unsqueeze(0), d['label'].unsqueeze(0))
            angle_errors.append(angle.item())

        mean_err = np.mean(angle_errors)
        print(f"üìê Calibration (S={S}): {mean_err:.2f} degrees")
        return mean_err

In [8]:
if __name__ == '__main__':

    ###ÈùûËÆäÂãïÂçÄÂ°ä
    k = [9,128]
    adjust_slope = False
    ###Â∞çÊáâ‰ΩúËÄÖË°®Ê†ºgrid calibration,Â¶ÇÊûúÊòØrandomÂ∞±Â°´true
    grid_calibration_samples=True
    batch_size =32
    path_to_data = './data/mpiifacegaze_preprocessed'
    
    
    ####ËÆäÂãïÂçÄÂ°ä
    path_to_checkpoints = './saved_models/p00/p00_best-v46.ckpt'
    person_idx = 1
    validate_on_person = 0 
    
    seed_everything(42)

    print(f"{path_to_checkpoints}")
    
    model = Model.load_from_checkpoint(path_to_checkpoints, k=[9, 128], adjust_slope=False, grid_calibration_samples=True)

    trainer = Trainer(
        gpus=1,
        benchmark=True,
    )

    _, _, test_dataloader = get_dataloaders(
        path_to_data, 
        validate_on_person, 
        person_idx, 
        batch_size   
    )
     
    trainer.test(model, test_dataloader)

    ####################ÊãøÂá∫ test setÔºàÂñÆ‰∫∫ÂÆåÊï¥Ë≥áÊñôÔºâ
    test_dataset = test_dataloader.dataset
    #######################


    #####STGCÂëºÂè´##########
    #evaluate_with_eq3_repeat(model, test_dataset, s=1, repeat_times=10)
    #evaluate_with_eq3_repeat(model, test_dataset, s=5, repeat_times=10)
    #evaluate_with_eq3_repeat(model, test_dataset, s=9, repeat_times=10)
    #evaluate_with_eq3_repeat(model, test_dataset, s=16, repeat_times=10)

    ######MTGC
    #evaluate_with_eq3_repeat(model, test_dataset, s=1, repeat_times=10, use_mgtc=True)
    #evaluate_with_eq3_repeat(model, test_dataset, s=5, repeat_times=10, use_mgtc=True)
    #evaluate_with_eq3_repeat(model, test_dataset, s=9, repeat_times=10, use_mgtc=True)
    #evaluate_with_eq3_repeat(model, test_dataset, s=16, repeat_times=10, use_mgtc=True)
    #evaluate_with_eq3_repeat(model, test_dataset, s=32, repeat_times=10, use_mgtc=True)
    #evaluate_with_eq3_repeat(model, test_dataset, s=64, repeat_times=10, use_mgtc=True)
    #evaluate_with_eq3_repeat(model, test_dataset, s=128, repeat_times=10, use_mgtc=True)



    #evaluate_with_eq3(model, test_dataset, s=1, target_pitch=0.0, target_yaw=0.0)
    #evaluate_with_eq3(model, test_dataset, s=5, target_pitch=0.0, target_yaw=0.0)
    #evaluate_with_eq3(model, test_dataset, s=9, target_pitch=0.0, target_yaw=0.0)
    #evaluate_with_eq3(model, test_dataset, s=16, target_pitch=0.0, target_yaw=0.0)


Global seed set to 42


./saved_models/p00/p00_best-v46.ckpt


  rank_zero_warn(
GPU available: True, used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


train on persons [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
valid on person 0
test on person 1
len(dataset_train) 60784
len(dataset_valid) 2927


LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


len(dataset_test) 2904
Testing DataLoader 0: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 181/181 [00:26<00:00,  6.78it/s]
‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
             Test metric                         DataLoader 0
‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
   test/offset(k=0)/angular_error              3.207423686981201
        test/offset(k=0)/loss       