## CLIPでの主観的価値の推定

In [1]:
import numpy as np
import torch
import clip
from PIL import Image
from torch.utils.data import Dataset
from torch import nn
from torchvision import transforms
import os
import pandas as pd
from matplotlib import pyplot as plt
from tqdm import tqdm

from sklearn.linear_model import Ridge
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import mean_squared_error, make_scorer
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import loguniform, pearsonr

from src.const import DATA_PATH

plt.rcParams["font.serif"] = ["noto"]

In [2]:
resp = pd.read_csv(os.path.join(DATA_PATH, "data_responses_NCNP_2types.csv"))
food_value = pd.read_csv(os.path.join(DATA_PATH, "food_value.csv"))


In [3]:
resp["is_obesity"] = resp["BMI"] >= 25

In [4]:
outlier = resp["sub_ID"].unique()[
        (resp.groupby("sub_ID")["res_L"].value_counts().unstack() > 896 * 0.75).any(
            axis=1
        )
        | (
            (resp.groupby("sub_ID")["res_L"].unique().apply(lambda x: len(x)) <= 4)
            & (
                resp.groupby("sub_ID")["res_L"].value_counts().unstack() > 896 * 0.65
            ).any(axis=1)
        )
    ]
print("被験者", outlier, len(outlier), "人を除外")
res_L_mean = (
    resp.groupby(["img", "is_obesity"])["res_L"].mean()
)
res_H_mean = (
    resp.groupby(["img", "is_obesity"])["res_H"].mean()
)
res_T_mean = (
    resp.groupby(["img", "is_obesity"])["res_T"].mean()
)
res_L_mean

被験者 [ 50  83 104 121 130 137 138 143 147 150] 10 人を除外


img  is_obesity
1    False         5.945455
     True          5.853933
2    False         6.190909
     True          6.438202
3    False         6.118182
                     ...   
894  True          4.460674
895  False         4.209091
     True          3.258427
896  False         5.581818
     True          4.752809
Name: res_L, Length: 1792, dtype: float64

In [5]:
from src.const import ROOT_PATH


df = pd.DataFrame(
    {
        "image_path": [
            os.path.join(ROOT_PATH, "Database", f"{str(img).zfill(4)}.jpg")
            for img in resp["img"]
        ],
        "caption": [
            f"This food is {food_value.loc[food_value["id"] == img, "Item_description"].values[0]}"
            for img in resp["img"]
        ],
        "res_L": resp["res_L"],
    }
)


In [6]:
# # -*- coding: utf-8 -*-
# """
# Full training pipeline for CLIP joint regression with:
#   • TorchVision v2 GPU-based augmentation
#   • Multi-view (×5) online data expansion
#   • K-fold CV (6 splits)
#   • LoRA adapters injected into every attention block (qkv + proj)

# Tested on RTX/Ada6000 (48 GB) with bf16.
# """

# # ───────────────────────── 0.  Imports & global config ─────────────────────────
# import os
# from copy import deepcopy
# from pathlib import Path
# from typing import List, Set, Sequence

# import numpy as np
# import pandas as pd
# from PIL import Image

# import torch
# import torch.nn as nn
# import torch.nn.functional as F
# from torch.utils.data import Dataset
# from torchvision import transforms
# import torchvision.transforms.v2 as T
# from sklearn.model_selection import KFold
# from scipy.stats import pearsonr

# from transformers import Trainer, TrainingArguments
# from transformers.utils import logging as hf_logging
# from peft import LoraConfig, get_peft_model

# from open_clip import create_model_and_transforms, get_tokenizer


# hf_logging.set_verbosity_info()

# device = "cuda" if torch.cuda.is_available() else "cpu"
# CLIP_NAME = "hf-hub:timm/resnet50x4_clip.openai"  # ResNet-50×4 backbone

# # ───────────────────────── 1.  Load CLIP preprocess & tokenizer ───────────────
# clip_backbone, _, clip_preprocess = create_model_and_transforms(CLIP_NAME)
# _tokenizer = get_tokenizer(CLIP_NAME)


# def tokenize(text: str):
#     """OpenCLIP tokenizer returns (1,77); squeeze to (77,) → torch.long"""
#     return _tokenizer(text).squeeze(0).to(torch.long)


# # grab mean/std from clip_preprocess.Normalise
# norm_layer = next(
#     t for t in clip_preprocess.transforms if isinstance(t, transforms.Normalize)
# )
# CLIP_MEAN, CLIP_STD = norm_layer.mean, norm_layer.std

# # ───────────────────────── 2. TorchVision-v2 augment pipeline ─────────────────


# def build_train_transform(device: str = "cpu"):
#     return T.Compose(
#         [
#             T.ToImage(),
#             T.Resize((288, 288)),
#             T.CenterCrop(288),
#             T.RandomHorizontalFlip(),
#             T.RandomAffine(
#                 degrees=20, translate=(0.2, 0.2), scale=(0.7, 1.2), fill=(255, 255, 255)
#             ),
#             T.GaussianBlur(kernel_size=5, sigma=(0.01, 4.0)),
#             T.ColorJitter(brightness=0.075, contrast=0.0, saturation=0.03, hue=0.03),
#             T.ToDtype(torch.float32, scale=True),
#             T.Normalize(CLIP_MEAN, CLIP_STD),
#         ]
#     ).to(device)


# train_transform = build_train_transform("cuda")  # GPU augment
# val_transform = clip_preprocess  # original PIL pipeline


# # ───────────────────────── 3.  Dataset classes ────────────────────────────────
# class ClipCnnDataset(Dataset):
#     def __init__(self, df: pd.DataFrame, transform=None):
#         self.df = df.reset_index(drop=True)
#         self.transform = transform or clip_preprocess

#     def __len__(self):
#         return len(self.df)

#     def __getitem__(self, idx):
#         row = self.df.iloc[idx]
#         img = Image.open(row["image_path"]).convert("RGB")
#         img = self.transform(img)
#         return (
#             img,
#             tokenize(row["caption"]),
#             torch.tensor(row["res_L"], dtype=torch.float32),
#         )


# class MultiViewDataset(Dataset):
#     """N-view online expansion (orig + aug×4 =5)."""

#     def __init__(self, df: pd.DataFrame, transforms_list: Sequence):
#         self.df = df.reset_index(drop=True)
#         self.transforms = transforms_list
#         self.n_views = len(transforms_list)

#     def __len__(self):
#         return len(self.df) * self.n_views

#     def __getitem__(self, idx):
#         img_idx, view_idx = divmod(idx, self.n_views)
#         row = self.df.iloc[img_idx]
#         img = Image.open(row["image_path"]).convert("RGB")
#         img = self.transforms[view_idx](img)
#         return (
#             img,
#             tokenize(row["caption"]),
#             torch.tensor(row["res_L"], dtype=torch.float32),
#         )


# # ───────────────────────── 4.  Injected MHA (QKV split) ───────────────────────
# class InjectedMultiHeadAttention(nn.Module):
#     def __init__(self, embed_dim, num_heads, dropout=0.0, bias=True, batch_first=False):
#         super().__init__()
#         self.embed_dim, self.num_heads, self.dropout, self.batch_first = (
#             embed_dim,
#             num_heads,
#             dropout,
#             batch_first,
#         )
#         self.head_dim = embed_dim // num_heads
#         assert self.head_dim * num_heads == embed_dim
#         self.qkv = nn.Linear(embed_dim, embed_dim * 3, bias=bias)
#         self.proj = nn.Linear(embed_dim, embed_dim, bias=bias)
#         self.sdp = F.scaled_dot_product_attention

#     def set_parameters(self, src: nn.MultiheadAttention):
#         self.qkv.weight.data.copy_(src.in_proj_weight.data)
#         self.qkv.bias.data.copy_(src.in_proj_bias.data)
#         self.proj.load_state_dict(src.out_proj.state_dict())

#     def forward(
#         self, q, k, v, attn_mask=None, key_padding_mask=None, is_causal=False, **kw
#     ):
#         if self.batch_first and q.dim() == 3:
#             q = q.transpose(0, 1)
#         L, N, _ = q.shape
#         qkv = (
#             self.qkv(q)
#             .view(L, N, 3, self.num_heads, self.head_dim)
#             .permute(2, 1, 3, 0, 4)
#         )  # 3,N,h,L,d
#         qh, kh, vh = [
#             t.contiguous().view(N * self.num_heads, L, self.head_dim) for t in qkv
#         ]
#         out = self.sdp(
#             qh,
#             kh,
#             vh,
#             attn_mask=attn_mask,
#             dropout_p=self.dropout if self.training else 0.0,
#             is_causal=is_causal,
#         )
#         out = (
#             out.view(N, self.num_heads, L, self.head_dim)
#             .permute(2, 0, 1, 3)
#             .reshape(L, N, self.embed_dim)
#         )
#         out = self.proj(out)
#         if self.batch_first:
#             out = out.transpose(0, 1)
#         return out, None


# def inject_linear_attention(model: nn.Module, encoders: Set[str] = {"transformer"}):
#     """Replace nn.MultiheadAttention with Injected version (LoRA-friendly)."""
#     for enc in encoders:
#         tgt = getattr(model, enc, None)
#         if tgt is None or not hasattr(tgt, "resblocks"):
#             continue
#         for blk in tgt.resblocks:
#             if isinstance(blk.attn, nn.MultiheadAttention):
#                 inj = InjectedMultiHeadAttention(
#                     blk.attn.embed_dim,
#                     blk.attn.num_heads,
#                     blk.attn.dropout,
#                     batch_first=blk.attn.batch_first,
#                 )
#                 inj.set_parameters(blk.attn)
#                 blk.attn = inj
#     return model


# # ───────────────────────── 5.  LoRA injection helper ──────────────────────────


# def add_lora(model: nn.Module, r=8, alpha=32, dropout=0.05):
#     targets = {
#         name.split(".")[-1]
#         for name, m in model.named_modules()
#         if isinstance(m, nn.Linear) and name.split(".")[-1] in {"qkv", "proj", "c_proj"}
#     }
#     cfg = LoraConfig(
#         r=r,
#         lora_alpha=alpha,
#         lora_dropout=dropout,
#         bias="none",
#         target_modules=sorted(targets),
#         task_type="FEATURE_EXTRACTION",
#         # dtype=torch.bfloat16,
#     )
#     model = get_peft_model(model, cfg)
#     model.print_trainable_parameters()  # log
#     return model


# # ───────────────────────── 6.  Joint regression wrapper ───────────────────────
# class ClipJointRegression(nn.Module):
#     def __init__(self, alpha=0.4, huber_delta=1.0):
#         super().__init__()
#         self.clip, _, _ = create_model_and_transforms(CLIP_NAME)
#         inject_linear_attention(self.clip, {"transformer"})
#         self.clip = add_lora(self.clip, r=32, alpha=256)
#         self.clip = self.clip.to(device)
#         self.proj_dim = (
#             self.clip.embed_dim
#             if hasattr(self.clip, "embed_dim")
#             else self.clip.visual.attnpool.c_proj.out_features
#         )
#         self.regressor = nn.Linear(self.proj_dim, 1)
#         self.alpha = alpha
#         self.huber = nn.HuberLoss(delta=huber_delta)

#     def forward(self, pixel_values=None, input_ids=None, labels=None, **kw):
#         img_f = self.clip.encode_image(pixel_values)
#         txt_f = self.clip.encode_text(input_ids)
#         img_f, txt_f = F.normalize(img_f, dim=-1), F.normalize(txt_f, dim=-1)
#         logits = img_f @ txt_f.T
#         loss_itc = F.cross_entropy(
#             logits, torch.arange(img_f.size(0), device=logits.device)
#         )
#         preds = self.regressor(img_f).squeeze(-1)
#         loss_reg = self.huber(preds, labels) if labels is not None else 0.0
#         return {
#             "loss": self.alpha * loss_itc + (1 - self.alpha) * loss_reg,
#             "predictions": preds,
#         }


# # ───────────────────────── 7.  Collate & metric ───────────────────────────────
# PAD_ID = 0


# def collate_fn(batch):
#     imgs, toks, labs = zip(*batch)
#     return {
#         "pixel_values": torch.stack(imgs),
#         "input_ids": torch.stack(toks),
#         "attention_mask": (torch.stack(toks) != PAD_ID).long(),
#         "labels": torch.stack(labs),
#     }


# def compute_metrics(eval_pred):
#     preds, labels = eval_pred
#     return {"corr": pearsonr(labels.flatten(), preds.flatten())[0]}


# # ───────────────────────── 8.  K-fold training routine ────────────────────────


# def kfold_train(df: pd.DataFrame, n_splits=6, seed=42):
#     kf = KFold(n_splits=n_splits, shuffle=True, random_state=seed)
#     all_corr = []
#     for fold, (tr_idx, vl_idx) in enumerate(kf.split(df), 1):
#         print(f"===== Fold {fold}/{n_splits} =====")
#         tr_ds = MultiViewDataset(
#             df.iloc[tr_idx], [clip_preprocess] + [train_transform] * 5
#         )
#         vl_ds = ClipCnnDataset(df.iloc[vl_idx], val_transform)
#         args = TrainingArguments(
#             output_dir=f"outputs/fold{fold}",
#             per_device_train_batch_size=1200,
#             per_device_eval_batch_size=1200,
#             gradient_accumulation_steps=1,
#             num_train_epochs=20,
#             learning_rate=2e-3,
#             bf16=True,
#             logging_steps=50,
#             eval_strategy="epoch",
#             save_strategy="epoch",
#             remove_unused_columns=False,
#         )
#         model = ClipJointRegression().to(device)
#         trainer = Trainer(
#             model=model,
#             args=args,
#             train_dataset=tr_ds,
#             eval_dataset=vl_ds,
#             data_collator=collate_fn,
#             compute_metrics=compute_metrics,
#         )
#         trainer.train()
#         r = trainer.evaluate()["eval_corr"]
#         print(f"Fold{fold}: r={r:.4f}")
#         all_corr.append(r)
#     print(f"Mean r: {np.mean(all_corr):.4f} ± {np.std(all_corr):.4f}")
#     return all_corr


# # ───────────────────────── 9.  Entry-point helper ─────────────────────────────
# kfold_train(df)


In [None]:
import os
from copy import deepcopy
import numpy as np
import pandas as pd
from PIL import Image
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader, Subset
from torchvision import transforms
from sklearn.model_selection import KFold
from scipy.stats import pearsonr
from transformers import Trainer, TrainingArguments

from open_clip import create_model_and_transforms, get_tokenizer
from transformers.utils import logging as hf_logging

# hf_logging.set_verbosity_info()  # ← ロギングも見やすく
# -----------------------------------------------------------------------------
# 0. 基本設定
# -----------------------------------------------------------------------------
device = "cuda" if torch.cuda.is_available() else "cpu"
CLIP_NAME = "hf-hub:timm/resnet50x4_clip.openai"

# CLIP の本体と前処理、トークナイザ
clip_backbone, _, clip_preprocess = create_model_and_transforms(CLIP_NAME)
tokenizer = get_tokenizer(CLIP_NAME)


import torchvision.transforms.v2 as T
import torch

# ───────────────────────────────────────────────
# 0.  CLIP が公開している mean / std を取得
#     create_model_and_transforms が返す clip_preprocess の
#     Normalize から値を抜き出して再利用します
# ───────────────────────────────────────────────
norm_layer = next(
    t for t in clip_preprocess.transforms if isinstance(t, transforms.Normalize)
)
CLIP_MEAN, CLIP_STD = norm_layer.mean, norm_layer.std  # list[3]*


# ───────────────────────────────────────────────
# 1.  v2 版の transform を組む
# ───────────────────────────────────────────────
def build_train_transform(device: str = "cpu"):
    """
    TorchVision v2 版データ拡張パイプライン。
    device="cuda" とすると augment も GPU で実行できます。
    """

    return (
        T.Compose(
            [
                # --- PIL → Tensor(uint8, CxHxW) ---
                T.ToImage(),
                # --- 基本前処理 ---
                T.Resize((288, 288)),
                T.CenterCrop(288),
                # --- データ拡張 ---
                T.RandomHorizontalFlip(),
                T.RandomAffine(
                    degrees=20,
                    translate=(0.2, 0.2),
                    scale=(0.7, 1.2),
                    fill=(255, 255, 255),
                ),
                T.GaussianBlur(kernel_size=5, sigma=(0.01, 4.0)),
                T.ColorJitter(
                    brightness=0.075,
                    contrast=0.0,
                    saturation=0.03,
                    hue=0.03,
                ),
                # --- Tensor float32 化 & 正規化 (CLIP) ---
                T.ToDtype(torch.float32, scale=True),
                T.Normalize(CLIP_MEAN, CLIP_STD),
            ]
        ).to(device)  # ← "cuda" を渡せば GPU オーグメント
    )

train_transform = build_train_transform("cuda")  # データ拡張付き
val_transform = clip_preprocess  # 検証はデフォルトのみ

orig_transform = clip_preprocess  # = val_transform と同じ
aug_transform = train_transform
# -----------------------------------------------------------------------------
# 1. データセット
# -----------------------------------------------------------------------------
class ClipCnnDataset(Dataset):
    """画像・キャプション・連続値ラベルを扱うデータセット"""

    def __init__(self, df: pd.DataFrame, transform=None):
        self.df = df.reset_index(drop=True)
        self.transform = transform if transform is not None else clip_preprocess

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image = Image.open(row["image_path"]).convert("RGB")
        image = self.transform(image)  # → Tensor[3, H, W] - don't unsqueeze

        # テキストをトークンに変換（OpenCLIP は固定長 77、pad_id=0）
        text_tokens = tokenizer(row["caption"]).squeeze(0)

        return image, text_tokens, torch.tensor(row["res_L"], dtype=torch.float32)

# -----------------------------------------------------------------------------
# 2. モデル
# -----------------------------------------------------------------------------
class ClipJointRegression(nn.Module):
    """CLIP のコントラスト学習 + 画像特徴による回帰を同時に行うモデル"""

    def __init__(
        self,
        backbone_name: str = CLIP_NAME,
        alpha: float = 0.4,
        huber_delta: float = 1.0,
    ):
        super().__init__()
        self.clip, _, _ = create_model_and_transforms(backbone_name)
        self.clip = self.clip.to(device)
        # OpenCLIP models store the dimension in different attributes
        if hasattr(self.clip, 'embed_dim'):
            self.proj_dim = self.clip.embed_dim
        elif hasattr(self.clip, 'transformer'):
            self.proj_dim = self.clip.transformer.width
        else:
            # For ResNet CLIP models, get dim from the visual projection
            self.proj_dim = self.clip.visual.attnpool.c_proj.out_features
        # print(self.proj_dim)
        self.regressor = nn.Linear(self.proj_dim, 1)
        self.alpha = alpha
        self.huber = nn.HuberLoss(delta=huber_delta)

    def forward(
        self,
        pixel_values=None,  # 画像 (B,3,H,W)
        input_ids=None,  # テキストトークン (B,77)
        attention_mask=None,  # 使わなくても受け取る
        labels=None,  # 連続値ラベル (B)
    ):
        # --- ITC (image–text contrast) ---
        image_features = self.clip.encode_image(pixel_values)
        text_features = self.clip.encode_text(input_ids)

        image_features = image_features / image_features.norm(dim=-1, keepdim=True)
        text_features = text_features / text_features.norm(dim=-1, keepdim=True)

        logits = image_features @ text_features.T
        itc_labels = torch.arange(image_features.size(0), device=logits.device)
        loss_itc = nn.CrossEntropyLoss()(logits, itc_labels)

        # --- 回帰 ---
        preds = self.regressor(image_features).squeeze(-1)
        loss_reg = self.huber(preds, labels) if labels is not None else 0.0

        loss = self.alpha * loss_itc + (1.0 - self.alpha) * loss_reg
        return {"loss": loss, "predictions": preds}

# -----------------------------------------------------------------------------
# 3. 補助関数
# -----------------------------------------------------------------------------
PAD_ID = 0


def collate_fn(batch):
    """
    batch: list[Tuple[Tensor[C,H,W], Tensor[77], Tensor[]]]
    """
    # それぞれをリストに展開
    images, text_tokens, labels = zip(*batch)
    # 画像・トークン・ラベルをテンソルにまとめる
    pixel_values = torch.stack(images)  # [B, 3, H, W]
    input_ids = torch.stack(text_tokens)  # [B, 77]
    labels = torch.stack(labels)  # [B]

    # attention_mask を付けたい場合（任意）
    attention_mask = (input_ids != PAD_ID).long()  # [B, 77]

    return {
        "pixel_values": pixel_values,
        "input_ids": input_ids,
        "attention_mask": attention_mask,  # ←不要なら削除して OK
        "labels": labels,
    }
def compute_metrics(eval_pred):
    preds, labels = eval_pred
    preds = preds.flatten()
    labels = labels.flatten()
    corr, _ = pearsonr(labels, preds)
    return {"corr": corr}

from torch.utils.data import Dataset


class MultiViewDataset(Dataset):
    """
    1 画像 × len(transforms_list) 通りの view を返す。
    例）[orig_transform] + [aug_transform]*4 なら 5 倍。
    """

    def __init__(self, df: pd.DataFrame, transforms_list):
        self.df = df.reset_index(drop=True)
        self.transforms = transforms_list
        self.n_views = len(transforms_list)

    def __len__(self):
        return len(self.df) * self.n_views

    def __getitem__(self, idx):
        img_idx = idx // self.n_views  # 何番目の画像か
        view_idx = idx % self.n_views  # 何番目の変換か
        row = self.df.iloc[img_idx]

        # 画像を読み込み、該当する transform を適用
        image = Image.open(row["image_path"]).convert("RGB")
        image = self.transforms[view_idx](image)  # Remove unsqueeze

        text_tokens = tokenizer(row["caption"]).squeeze(0)
        label = torch.tensor(row["res_L"], dtype=torch.float32)
        return image, text_tokens, label
# -----------------------------------------------------------------------------


def kfold_train(df: pd.DataFrame, n_splits: int = 6, seed: int = 42, repeat: int = 5):
    full_dataset = ClipCnnDataset(df)
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=seed)

    all_corrs = []
    for fold, (train_idx, val_idx) in enumerate(kf.split(range(len(full_dataset))), 1):
        print(f"===== Fold {fold}/{n_splits} =====")
        train_df = df.iloc[train_idx].reset_index(drop=True)
        val_df = df.iloc[val_idx].reset_index(drop=True)

        transforms_list = [orig_transform] + [aug_transform] * 5
        train_dataset = MultiViewDataset(train_df, transforms_list)

        val_dataset = ClipCnnDataset(val_df, transform=val_transform)

            # For newer versions of transformers (4.0.0+)
        training_args = TrainingArguments(
            output_dir=f"./outputs/clip/fold{fold}",
            per_device_train_batch_size=260,
            per_device_eval_batch_size=260,
            learning_rate=1e-4,
            num_train_epochs=30,
            eval_strategy="epoch",
            save_strategy="epoch",
            save_total_limit=1,  # Keep only the best model
            load_best_model_at_end=True,
            metric_for_best_model="corr",
            greater_is_better=True,
            logging_steps=100,
            bf16=torch.cuda.is_available(),
            remove_unused_columns=False,
            dataloader_num_workers=12,  # CPU コア数と相談して増減
            dataloader_pin_memory=True,  # A100 など GPU があるなら True 推奨
            dataloader_persistent_workers=True,  # workers をエポック間で使い回す (optional)
            disable_tqdm=False,
            gradient_accumulation_steps=4,
        )


        model = ClipJointRegression(alpha=0.4, huber_delta=1.0).to(device)

        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=val_dataset,
            data_collator=collate_fn,
            compute_metrics=compute_metrics,
        )
        # 学習
        trainer.train()
        metrics = trainer.evaluate()
        fold_corr = metrics["eval_corr"]
        print(f"Fold {fold}: Pearson r = {fold_corr:.4f}")
        all_corrs.append(fold_corr)

    mean_corr = np.mean(all_corrs)
    std_corr = np.std(all_corrs)
    print(f"\nFinished {n_splits}-Fold CV. Mean r = {mean_corr:.4f} ± {std_corr:.4f}")
    return all_corrs

# -----------------------------------------------------------------------------
# 5. 学習実行
# -----------------------------------------------------------------------------
all_corrs = kfold_train(df, n_splits=6, seed=42)

===== Fold 1/6 =====


Epoch,Training Loss,Validation Loss,Corr
1,3.5827,3.496542,0.263356
2,3.0546,3.037796,0.28957
3,2.8703,2.862105,0.328615
4,2.792,2.790551,0.33987
5,2.7554,2.751549,0.343511
6,2.734,2.732594,0.336162
7,2.7102,2.71357,0.340518
8,2.6968,2.699706,0.344164
9,2.6863,2.689655,0.342932
10,2.6781,2.683159,0.342484


Fold 1: Pearson r = 0.3465
===== Fold 2/6 =====


Epoch,Training Loss,Validation Loss,Corr
1,3.5694,3.46078,0.262139
2,3.0539,3.019855,0.28764
3,2.8676,2.854257,0.316389
4,2.7889,2.786146,0.325813
5,2.7522,2.75042,0.330478
6,2.7306,2.727672,0.329574
7,2.7131,2.715343,0.325906
8,2.693,2.701537,0.331666
9,2.6856,2.692324,0.329055
10,2.6788,2.68476,0.330016


In [None]:
from src.eda import save_intermediate_outputs, register_hooks, save_layer_matrixs

layer_num = register_hooks(model)
print(layer_num)
save_dir = os.path.join(
    ROOT_PATH,
    "tmp",
    "clip",
    "intermediate_feature",
)
# layer_num = 81
save_intermediate_outputs(model, dataset, save_dir, device)
save_layer_matrixs(save_dir, layer_num)


In [None]:
VERSION = "v2"
image_features_list = []

for image, label in tqdm(dataset):
    image = image.unsqueeze(0).to(device)
    with torch.no_grad():
        image_features = model.encode_image(image)
    image_features_list.append(image_features)

image_features = torch.cat(image_features_list)
print(image_features.shape)
torch.save(image_features, os.path.join(DATA_PATH, "output", "clip", VERSION, "image_features.pt"))
image_features = torch.load(
    os.path.join(DATA_PATH, "output", "clip", VERSION, "image_features.pt")
)


In [None]:
# ridge regression
from sklearn.discriminant_analysis import StandardScaler
from sklearn.model_selection import KFold, cross_val_score


def pearson_scorer(y_true, y_pred):
    corr, _ = pearsonr(y_true, y_pred)
    return np.mean(corr)


image_dir = os.path.join(ROOT_PATH, "Database")
pearson_sklearn = make_scorer(pearson_scorer, greater_is_better=True)
X = image_features.cpu().numpy()
h = 5
skf = StratifiedKFold(n_splits=h, shuffle=True, random_state=1)
kf = KFold(n_splits=h, shuffle=True, random_state=3)

# pca = make_pipeline(StandardScaler(), PCA(n_components=0.98))
# X = pca.fit_transform(image_features.cpu().numpy())
result = {}
for i, res in enumerate([res_L_mean, res_H_mean, res_T_mean, image_brightness]):
    result[res.name] = {
        "obesity": {
            "y_tests": [],
            "y_preds": [],
        },
        "normal": {
            "y_tests": [],
            "y_preds": [],
        },
    }
    # for is_obesity, y in res.groupby("is_obesity"):
    y = res.groupby("img").mean() if res.name != "brightness" else res
    # dataset = ImageDataset(image_dir, y)
    labels = y.values
    # n_bins = 8  # ビンの数（例: 4つのカテゴリに分割）
    # bins = np.linspace(1, 8, n_bins + 1)  # ビンの境界を定義
    # binned_labels = np.digitize(labels, bins) - 1
    # normal_score = 0
    # if not is_obesity:
    #     normal_score_list = []
    pbar = tqdm(
        # enumerate(skf.split(np.zeros(len(binned_labels)), binned_labels)),
        enumerate(kf.split(np.zeros(len(labels)), labels)),
        total=h,
        leave=False,
    )
    for j, (train_idx, val_idx) in pbar:
        # Train and validation subsets
        print(y.name)
        # print("肥満" if is_obesity else "健常")
        print(len(y))
        X_train, X_test = X[train_idx], X[val_idx]
        y_train, y_test = labels[train_idx], labels[val_idx]
        ridge = make_pipeline(Ridge(alpha=1.0))
        # random search
        # param_distributions = {"ridge__alpha": loguniform(1e-3, 1e3)}

        # search = RandomizedSearchCV(
        #     ridge,
        #     param_distributions,
        #     n_iter=20,
        #     cv=5,
        #     n_jobs=4,
        #     random_state=42,
        #     scoring=pearson_sklearn,
        # )

        ridge.fit(X_train, y_train)
        # scores = cross_val_score(ridge, X, labels, cv=5, scoring=pearson_sklearn)
        # print(np.mean(scores))
        # print(ridge.best_params_)
        # print("best score", search.best_score_)
        # best_model = search.best_estimator_
        y_pred = ridge.predict(X_test)
        result[res.name]["normal"]["y_preds"].append(
            y_pred
        )
        result[res.name]["normal"]["y_tests"].append(
            y_test
        )
    print(np.mean([pearson_scorer(
                y_test,
                result[res.name]["normal"]["y_preds"][i],
            ) for i, y_test in enumerate(result[res.name]["normal"]["y_tests"])]))


In [None]:
import pickle


fig, axes = plt.subplots(1, 4, figsize=(16, 9))
for i, (name, value) in enumerate(result.items()):
    print(name)
    labels = []
    # if name == "res_L":
    #     with open(os.path.join(DATA_PATH, "output", "clip_res_L_score_v2.pkl"), "wb") as f:
    #         pickle.dump(
    #             {
    #                 "obesity": {
    #                     "y_tests": value["obesity"]["y_tests"],
    #                     "y_preds": value["obesity"]["y_preds"],
    #                     "score": pearson_scorer(
    #                         np.concatenate(value["obesity"]["y_tests"]),
    #                         np.concatenate(value["obesity"]["y_preds"]),
    #                     ),
    #                 },
    #                 "normal": {
    #                     "y_tests": value["normal"]["y_tests"],
    #                     "y_preds": value["normal"]["y_preds"],
    #                     "score": pearson_scorer(
    #                         np.concatenate(value["normal"]["y_tests"]),
    #                         np.concatenate(value["normal"]["y_preds"]),
    #                     ),
    #                 },
    #             },
    #             f,
    #         )
    # for type_name, y_dict in value.items():
    # if name == "brightness" and type_name == "obesity":
    #     continue
    y_dict = value["normal"]
    y_test = np.concatenate(y_dict["y_tests"])
    y_pred = np.concatenate(y_dict["y_preds"])
    score = pearson_scorer(y_test, y_pred)
    mse = mean_squared_error(y_test, y_pred)
    labels.append(f"normal : {round(score, 3)}")

    axes[i].scatter(
        y_test,
        y_pred,
        label=f"{name}_{'健常'}",
        alpha=0.5,
        s=10,
    )
    handle, _ = axes[i].get_legend_handles_labels()

    axes[i].legend(
        handle,
        labels,
        loc="upper left",
    )
    axes[i].set_xlabel("True")
    axes[i].set_ylabel("Predicted")
    # if y.name == "brightness":
    #     axes[i].set_xticks(
    #         np.arange(
    #             round(y_test.min() - 0.1, 2), round(y_test.max() + 0.1, 2), 0.1
    #         )
    #     )
    #     axes[i].set_yticks(
    #         np.arange(
    #             round(y_test.min() - 0.1, 2), round(y_test.max() + 0.1, 2), 0.1
    #         )
    #     )
    # else:
    #     axes[i].set_xticks(np.arange(1, 9, 1))
    #     axes[i].set_yticks(np.arange(1, 9, 1))
    print(round(mse, 3), round(score, 3))

    match name:
        case "res_L":
            title = "likablity"
        case "res_H":
            title = "healthiness"
        case "res_T":
            title = "tastiness"
        case "brightness":
            title = "brightness"
    axes[i].set_title(title + " MSE " + str(round(mse, 3)))
plt.show()
