### 依赖安装 shell 脚本 repes_install.sh
```bash
# 内容
pip install nibabel
pip install albumentations
mkdir logs model
```

In [1]:
# !bash repes_install.sh

### 数据载入与增强

In [2]:
import os, sys, glob, argparse,random
import pandas as pd
import numpy as np
from tqdm import tqdm
from functools import reduce

import cv2
from PIL import Image
from sklearn.model_selection import train_test_split, StratifiedKFold, KFold

import torch
torch.manual_seed(0)
torch.backends.cudnn.deterministic = False
torch.backends.cudnn.benchmark = True

import torchvision.models as models
# import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data.dataset import Dataset

import nibabel as nib
from nibabel.viewers import OrthoSlicer3D

from sklearn.metrics import f1_score

trn_path1 = glob.glob('./reset_pet_brain/Train/NC/*')
trn_path2 = glob.glob('./reset_pet_brain/Train/MCI/*')
# train_path = glob.glob('./reset_pet_brain/Train/*/*')
test_path = glob.glob('./reset_pet_brain/Test/*')

np.random.seed(0)
np.random.shuffle(trn_path1)
np.random.shuffle(trn_path2)
np.random.shuffle(test_path)
tmp = [list(item) for item in zip(trn_path1,trn_path2)]
train_path = reduce(lambda x,y:x+y,tmp)

DATA_CACHE = {}
class XunFeiDataset(Dataset):
    def __init__(self, img_path, transform=None,tgt_ch=64):
        self.img_path = img_path
        self.chs = tgt_ch
        if transform is not None:
            self.transform = transform
        else:
            self.transform = None
    
    def __getitem__(self, index):
        if self.img_path[index] in DATA_CACHE:
            img = DATA_CACHE[self.img_path[index]]
        else:
            img = nib.load(self.img_path[index])
            img = img.dataobj[...,0]
            img = img.astype(np.float32)
            DATA_CACHE[self.img_path[index]] = img
        x,y,z = img.shape
        idxl = list(range(z))*(self.chs//z+1)
        idx = np.random.choice(idxl, self.chs,replace=False)
        # print(type(idx),idx)
        img = img[:, :, idx]
        
        if self.transform is not None:
            img = self.transform(image = img)['image']
        img = img.transpose([2,0,1]) # (z',x',y')
        return img,torch.from_numpy(np.array(int('NC' in self.img_path[index])))

    def __len__(self):
        return len(self.img_path)

'''
    因为训练集太少，增加K折交叉验证降低数据集划分时的偶然性（验证集分布与测试集分布差距较大）
    参与训练集数据仅有 15 × 2 = 30 个，验证集数据 10 × 2 = 20 个
'''
import albumentations as A

transforms = {
    'train': A.Compose([A.Resize(128,128),
                        A.RandomRotate90(),
                        A.RandomCrop(128, 128),
                        A.HorizontalFlip(p=0.5),
                        A.RandomBrightnessContrast(p=0.5),]),
    'val': A.Compose([A.Resize(128,128),
                      # A.RandomCrop(128, 128)
                     ]),
    'test': A.Compose([A.Resize(128,128),
                       # A.RandomCrop(128, 128),
                       A.HorizontalFlip(p=0.5),
                       A.RandomBrightnessContrast(p=0.5),])
}

def make_loader(paths,loader_type='train'):
    loader = None
    if loader_type == 'train':
        loader = torch.utils.data.DataLoader(
                        XunFeiDataset(paths,transforms[loader_type]), 
                        batch_size=2, shuffle=True, num_workers=1, pin_memory=False)
    elif loader_type == 'val':
        loader = torch.utils.data.DataLoader(
                        XunFeiDataset(paths,transforms[loader_type]), 
                        batch_size=2, shuffle=False, num_workers=1, pin_memory=False)
    elif loader_type == 'test':
        loader = torch.utils.data.DataLoader(
                        XunFeiDataset(paths,transforms[loader_type]), 
                        batch_size=2, shuffle=False, num_workers=1, pin_memory=False)
    return loader

KFold_loaders = {}
n_splits = 10
kfold = KFold(n_splits=n_splits,shuffle=False,random_state=None)
train_path = np.array(train_path)
test_loader = make_loader(test_path,'test')

for i,(trn_indx,val_indx) in enumerate(kfold.split(train_path)):
    trn_paths = train_path[trn_indx]
    val_paths = train_path[val_indx]
    trn_loader = make_loader(trn_paths,'train')
    val_loader = make_loader(val_paths,'val')
    KFold_loaders[f'KFold{i:02}'] = (trn_loader,val_loader)
    print(f'{i+1:02}/{n_splits} dataset',end='\r')

10/10 dataset

### 训练与验证

In [4]:
def train(train_loader, model, criterion, optimizer):
    model.train()
    train_loss = 0.0
    for i, (inputs, target) in enumerate(train_loader):
        inputs = inputs.cuda(non_blocking=True)
        target = target.cuda(non_blocking=True)

        output = model(inputs)
        loss = criterion(output, target)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
            
        train_loss += loss.item()
    
    return train_loss/len(train_loader)
            
def validate(val_loader, model, criterion):
    model.eval()
    val_acc = 0.0
    f1_sc = 0.
    with torch.no_grad():
        for i, (inputs, target) in enumerate(val_loader):
            inputs = inputs.cuda()
            target = target.cuda()

            # compute output
            output = model(inputs)
            loss = criterion(output, target)
            
            val_acc += (output.argmax(1) == target).sum().item()
            # print(output.argmax(1).shape,target.shape)
            if i == 0:
                pred = output.argmax(1)
                tgt = target
            else:
                pred = torch.cat((pred,output.argmax(1)),axis=0)
                tgt = torch.cat((tgt,target),axis=0)
        f1_sc = f1_score(pred.cpu().numpy(),tgt.cpu().numpy())
            
    return val_acc / len(val_loader.dataset),f1_sc

### 定义模型

In [6]:
pretrain_models = {
    'resnet18': models.resnet18,
    'resnet34': models.resnet34,
    'resnet50': models.resnet50,
    'vit_b_16': models.vit_b_16,
    'convnext_tiny': models.convnext_tiny,
}

pretrain_params = {
    'resnet18': models.ResNet18_Weights.IMAGENET1K_V1,
    'resnet34': models.ResNet34_Weights.IMAGENET1K_V1,
    'resnet50': models.ResNet50_Weights.IMAGENET1K_V1,
    'vit_b_16': models.ViT_B_16_Weights.IMAGENET1K_SWAG_E2E_V1,
    'convnext_tiny': models.ConvNeXt_Tiny_Weights.IMAGENET1K_V1,
}

class XunFeiNet(nn.Module):
    def __init__(self,pretrain='resnet18'):
        super(XunFeiNet, self).__init__()
                
        model = pretrain_models[pretrain](weights=pretrain_params[pretrain])
        if pretrain == 'resnet50':
            model.conv1 = torch.nn.Conv2d(64, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
            model.avgpool = nn.AdaptiveAvgPool2d(1)
            model.fc = nn.Linear(2048, 2)
        elif pretrain == 'vit_b_16':
            model.conv_proj = torch.nn.Conv2d(64,768,kernel_size=(16,16),stride=(16,16))
            model.heads.head = torch.nn.Linear(in_features=768,out_features=2)
        elif pretrain in ['convnext_tiny','convnext_small']:
            model.features[0][0] = torch.nn.Conv2d(64,96,kernel_size=(4,4),stride=(4,4))
            model.classifier[2] = torch.nn.Linear(in_features=768, out_features=2)
        elif pretrain in ['resnet18','resnet34']:
            model.conv1 = torch.nn.Conv2d(64, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
            model.avgpool = nn.AdaptiveAvgPool2d(1)
            model.fc = nn.Linear(512, 2)
        else:
            pass
        self.net = model
        
    def forward(self, img):
        out = self.net(img)
        return out

### 进行训练并记录
分别使用 resnet18，resnet34，resnet50, vit 和 convnext_tiny 的预训练模型进行训练和识别任务。

In [7]:
from datetime import datetime
class HisRecorder:
    def __init__(self,log_name='test'):
        self.name = log_name
        self.history = {}
        t = datetime.now().strftime('%Y-%m-%d-%H-%M')
        fname = self.name + '_' + t
        self.fname = fname
    def record_info(self,epoch,train_loss,train_acc,val_acc,trn_f1sc,val_f1sc):
        self.history[epoch] = (train_loss,train_acc,val_acc,trn_f1sc,val_f1sc)
    def save_hist(self):
        f = open('./logs/'+self.fname,'w',encoding='utf-8')
        f.write(str(self.history))
        f.close()
        print(self.fname)
    def clean_hist(self):
        self.history = {}

class CheckSaver:
    def __init__(self,name):
        self.val_best_acc = 0.
        self.trn_best_acc = 0.
        self.val_best_f1sc = 0.
        self.name = name
        
    def save_check(self,model,optim,sche,trn_acc,val_acc,val_f1_sc,epoch):
        flag = self.val_best_f1sc == val_f1_sc and self.val_best_acc <= val_acc
        flag = self.val_best_f1sc < val_f1_sc or flag
        # print(epoch,f'in :{val_f1_sc} self: {self.val_best_acc}', flag)
        if flag:
            self.val_best_f1sc = val_f1_sc
            self.val_best_acc = val_acc
            self.trn_best_acc = trn_acc
            # print('stage: save checkpoint',epoch,self.val_best_f1sc)
            stat = {
                    'epoch': epoch,
                    'state_dict': model.state_dict(),
                    'best_acc': val_acc,
                    'best_f1_sc': val_f1_sc,
                    'optimizer' : optim.state_dict(),
                    'scheduler' : sche.state_dict()
                }
            torch.save(stat,f'./model/{self.name}_best.pth.tar')

#### 训练设置
一共迭代训练 100 次。起始学习率 设置为 0.0001，之后在训练中使用 CosineAnnealingLR 对学习率进行调整。优化器 使用 AdamW。


In [None]:
for i,key in enumerate(KFold_loaders):
    pretrain = 'resnet18'
    epochs = 100
    model = XunFeiNet(pretrain)
    model = model.to('cuda')
    criterion = nn.CrossEntropyLoss().cuda()
    optimizer = torch.optim.AdamW(model.parameters(), 0.0001)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer,epochs+1)
    # scheduler = torch.optim.lr_scheduler.StepLR(optimizer,10,gamma=0.95)
    recorder = HisRecorder(pretrain+f'_kfold{i+1:02}')
    che_saver = CheckSaver(pretrain+f'_kfold{i+1:02}')
    train_loader,val_loader = KFold_loaders[key]
    
    for epoch  in range(1,epochs+1):
        train_loss = train(train_loader, model, criterion, optimizer)
        val_acc,val_f1_sc  = validate(val_loader, model, criterion)
        train_acc,trn_f1_sc = validate(train_loader, model, criterion)
        che_saver.save_check(model,optimizer,scheduler,train_acc,val_acc,val_f1_sc,epoch)
        scheduler.step()
        recorder.record_info(epoch,train_loss,train_acc,val_acc,trn_f1_sc,val_f1_sc)
        print(f'Epoch: {epoch:03}  Train Loss: {train_loss:.8f}  Train Acc: {train_acc:.6f} Train F1: {trn_f1_sc:.6f}  Val Acc: {val_acc:.6f} Val F1: {val_f1_sc:.6f}',end='\r')
    print(' '*150,end='\r')
    print(f'Model: {pretrain}   KFold {i+1:02}/{n_splits}   Val Best Acc: {che_saver.val_best_acc:.6f} Val Best F1: {che_saver.val_best_f1sc:.6f}')
    recorder.save_hist()

### 训练曲线可视化

In [9]:
import matplotlib.pyplot as plt

In [None]:
f = open(f'./logs/{recorder.fname}','r',encoding='utf-8')
hist = eval(f.read())

ep_arr = []
train_acc = []
val_acc = []
train_f1 = []
val_f1 = []

for ep,info in hist.items():
    ep_arr.append(ep)
    train_acc.append(info[1])
    val_acc.append(info[2])
    train_f1.append(info[3])
    val_f1.append(info[4])

plt.plot(ep_arr,train_acc,marker='x',label='train accuracy')
plt.plot(ep_arr,val_acc,marker='o',label='val_accuracy')
plt.plot(ep_arr,train_f1,marker='x',label='train f1 score')
plt.plot(ep_arr,val_f1,marker='o',label='val f1 score')
plt.legend()

### 模型测试与提交

In [11]:
def predict(test_loader, model, criterion):
    # model.eval()
    val_acc = 0.0
    
    test_pred = []
    with torch.no_grad():
        for i, (inputs, target) in enumerate(test_loader):
            inputs = inputs.cuda()
            target = target.cuda()

            output = model(inputs)
            test_pred.append(output.data.cpu().numpy())
            
    return torch.Tensor(np.vstack(test_pred))

In [None]:
name = pretrain
model = model = XunFeiNet(name)
# 由于训练集太少，为减少偶然性，使用增强的测试集
pred = None
for i in range(n_splits):
    chep = torch.load(f'./model/{name}_kfold{i+1:02}_best.pth.tar')
    model.load_state_dict(chep['state_dict'])
    model.cuda()
    model.eval()
    tmp = None
    for j in range(20):
        if tmp is None:
            tmp = predict(test_loader, model, criterion)
        else:
            tmp += predict(test_loader, model, criterion)
        print(f'{j+1:02}/{20} test data augment.',end='\r')
    print(' '*100,end='\r')
    print(f'KFold {i+1:02}/{n_splits}')
    tmp = F.softmax(tmp,dim=-1)
    if pred is None:
        pred = tmp
    else:
        pred += tmp
# pred = predict(test_loader, model, criterion)   
pred = pred.cpu().numpy()

In [13]:
submit = pd.DataFrame(
    {
        'uuid': [int(x.split('/')[-1][:-4]) for x in test_path],
        'label': pred.argmax(1)
})
submit['label'] = submit['label'].map({1:'NC', 0: 'MCI'})
submit = submit.sort_values(by='uuid')
submit.to_csv('submit3_3.csv', index=None)