In [2]:
import os
from datetime import datetime, timedelta
from PIL import Image
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader
import numpy as np
from matplotlib import pyplot as plt
import pandas as pd
from tqdm import tqdm
import seaborn as sns
from torchvision import datasets, models, transforms
from sklearn.metrics import roc_curve, roc_auc_score,auc
import torch.optim as optim
from torch.optim import lr_scheduler
import torchvision.models as models
import random

In [12]:
data_dir = '/mnt/ssd/2023challenge/algonauts_2023_challenge_data'
parent_submission_dir = '/mnt/ssd/2023challenge/algonauts_2023_challenge_submission'

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

8个人(subject)的图片数据数量  
train:  [9841, 9841, 9082, 8779, 9841, 9082, 9841, 8779]  
test:   [159, 159, 293, 395, 159, 293, 159, 395]

In [13]:
subj = 1            # 一共8个人，现在选择第一个人

In [14]:
class argObj:
  def __init__(self, data_dir, parent_submission_dir, subj):

    self.subj = format(subj, '02')
    self.data_dir = os.path.join(data_dir, 'subj'+self.subj)
    self.parent_submission_dir = parent_submission_dir
    self.subject_submission_dir = os.path.join(self.parent_submission_dir,
        'subj'+self.subj)

    # Create the submission directory if not existing
    if not os.path.isdir(self.subject_submission_dir):
        os.makedirs(self.subject_submission_dir)

args = argObj(data_dir, parent_submission_dir, subj)

### 脑部活动向量，即真实标签  
分为左脑和右脑 (Training stimulus images × LH/RH vertices)  
subj=1中，左右脑活动矩阵大小为(9841, 19004)和(9841, 20544)

In [16]:
fmri_dir = os.path.join(args.data_dir, 'training_split', 'training_fmri')       
lh_fmri = np.load(os.path.join(fmri_dir, 'lh_training_fmri.npy'))
rh_fmri = np.load(os.path.join(fmri_dir, 'rh_training_fmri.npy'))

## 图片数据
例如受试者1的第一个训练图片`train-0001_nsd-00013.png`，分为两个索引:  
1. `train-0001`  
    此顺序是为了匹配 fMRI 训练数据集
2. `nsd-00013`
    对应于 73,000 张 NSD(nsd实际实验的先后顺序) 图像的 ID，即映射到 `COCO` 数据集 的图像

In [17]:
train_img_dir  = os.path.join(args.data_dir, 'training_split', 'training_images')
test_img_dir  = os.path.join(args.data_dir, 'test_split', 'test_images')

# Create lists will all training and test image file names, sorted
train_img_list = os.listdir(train_img_dir)
train_img_list.sort()
test_img_list = os.listdir(test_img_dir)
test_img_list.sort()
print('Training images: ' + str(len(train_img_list)))
print('Test images: ' + str(len(test_img_list)))

Training images: 9841
Test images: 159


## 连接图片数据和脑响应

# 训练参数设置

In [None]:
# num_classes = 2
img_size = 224
batch_size = 8
epoch = 70
learning_rate = 1e-4

In [None]:
transform = transforms.Compose([
    transforms.Resize((224, 224)), 
    transforms.ToTensor(),  
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) 
])


train_dataset, val_dataset = #......

# Create DataLoader for training and validation sets
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True, num_workers=8)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, pin_memory=True, num_workers=8)

### dino模型

In [None]:
class DINOv2ViTModel(nn.Module):
    def __init__(self, num_classes=1):  # 默认num_classes设置为1，因为线性回归是回归任务
        super(DINOv2ViTModel, self).__init__()

        # Load the pretrained DINOv2-ViT model
        self.dinov2_vit = torch.hub.load('facebookresearch/dinov2', 'dinov2_vits14')

        # Freeze all layers
        for param in self.dinov2_vit.parameters():
            param.requires_grad = False

        # Modify the classification head for your task
        self.dinov2_vit.head = nn.Sequential(
            nn.Linear(384, 128),  # 加一个隐藏层
            nn.ReLU(),  # 激活函数
            nn.Linear(128, num_classes)  # 最终输出层，num_classes设置为1
        )

        # Add a linear regression layer
        self.linear_regression = nn.Linear(num_classes, 1)  # 线性回归输出1个值

    def forward(self, x):
        features = self.dinov2_vit(x)
        regression_output = self.linear_regression(features)
        return regression_output

In [None]:
#損失関数
criterion = nn.CrossEntropyLoss()
criterion.to(device)

#モデル定義
model = DINOv2ViTModel(num_classes)        # 使用dinov2作为模型
model = nn.DataParallel(model)              # 多GPU并行
model.to(device)
#オプティマイザ
#optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=0.1)           #每回合epoch自动修改学习率

# 学习率调度器
lr_scheduler = None
#lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=1) 

In [None]:
#モデルの学習
train_loss_list = []
train_acc_list = []
val_loss_list = []
val_acc_list = []
for i in range(epoch):
    print('-'*5, 'Epoch [{}/{}] start'.format(i, epoch-1), '-'*5)
    epoch_loss = 0
    epoch_accuracy = 0
    # モデルの学習モード切替
    model.train()

    # 学習データのデータローダから読み込み尽くすまでループ
    for image, target in tqdm(train_loader):
        # 入力、正解を GPU へ移動
        image, target = image.to(device), target.to(device)
        # モデルに入力を順伝播させ予測を出力
        output = model(image).squeeze()
        # 損失関数で予測と正解の誤差を導出
        loss = criterion(output, target)
        # オプティマイザの勾配0初期化処理
        optimizer.zero_grad()
        # 誤差をもとに誤差逆伝搬し勾配を導出
        loss.backward()
        # オプティマイザによる学習パラメータの更新
        optimizer.step()
        acc = (output.argmax(dim=1) == target).float().mean()   # 计算一个batch的平均准确率
        epoch_accuracy += acc / len(train_loader)       # 该epoch(loader)的平均准确率
        epoch_loss += loss / len(train_loader)
    lr = optimizer.param_groups[0]['lr']
    if lr_scheduler is not None:
        lr_scheduler.step()     # 学习率调度器

    # モデルの検証、評価モード切替
    model.eval()
    # 検証データのデータローダから読み込み尽くすまでループ
    with torch.no_grad():
        epoch_val_accuracy=0
        epoch_val_loss = 0
        for image, target in val_loader:
            # 入力、正解を GPU へ移動
            image, target = image.to(device), target.to(device)
            # モデルに入力を順伝播させ予測を出力
            output = model(image).squeeze()
            # 損失関数で予測と正解の誤差や精度を導出
            loss = criterion(output, target)
            acc = (output.argmax(dim=1) == target).float().mean()
            epoch_val_accuracy += acc / len(val_loader)
            epoch_val_loss += loss / len(val_loader)


    # 使用一个print将每个epoch的acc和loss以及lr都一起打印出来
    print(f' train_acc: {epoch_accuracy:.4f}, train_loss: {epoch_loss:.4f},\
    val_acc: {epoch_val_accuracy:.4f}, val_loss: {epoch_val_loss:.4f}, LR: {lr:e}')
    
    train_loss_list.append(epoch_loss)
    train_acc_list.append(epoch_accuracy)
    val_loss_list.append(epoch_val_loss)
    val_acc_list.append(epoch_val_accuracy)


In [None]:
#学習曲線の描画
train_acc = []
train_loss = []
val_acc = []
val_loss = []
# Define x_arr (the x-axis values) as the number of epochs
x_arr = range(1, epoch + 1)

for i in range(epoch):
  train_acc.append(train_acc_list[i].cpu().detach().numpy())
  train_loss.append(train_loss_list[i])
  val_acc.append(val_acc_list[i].cpu().detach().numpy())
  val_loss.append(val_loss_list[i])

# Create a figure and a set of subplots
fig, ax = plt.subplots(1, 2, figsize=(12, 4))

# Plot the training and validation loss
ax[0].plot(x_arr, train_loss, '-o', label='Train loss')
ax[0].plot(x_arr, val_loss, '--<', label='Validation loss')
ax[0].legend(fontsize=15)
ax[0].set_xlabel('Epoch', size=15)
ax[0].set_ylabel('Loss', size=15)

# Plot the training and validation accuracy on the second subplot
ax[1].plot(x_arr, train_acc, '-o', label='Train accuracy')
ax[1].plot(x_arr, val_acc, '--<', label='Validation accuracy')
ax[1].legend(fontsize=15)
ax[1].set_xlabel('Epoch', size=15)
ax[1].set_ylabel('Accuracy', size=15)


plt.savefig(f'{save_path}/learning_curve.png')
plt.show()

# # 读取保存的图像
# saved_image = Image.open(save_path)
