1. 定义一个数据集的Class, 制作一个dataloader
2. 建立ViT模型
3. 数据预处理（随机旋转，随机缩放，随机扭曲）
4. 划分训练集和验证集
5. 定义模型，定义损失函数和优化器
6. 训练模型
7. 将训练好的模型放在测试集和验证集上进行预测
8. 将结果保存并可视化

In [1]:
!pip install timm



In [2]:
#!cd /content/dataset

In [3]:
import torch
# 查看 pytorch 版本
print(torch.__version__)             
# 查看 GPU 是否可用
print(torch.cuda.is_available())     # True
# 查看GPU数量，索引号从0开始
print(torch.cuda.current_device())   # 0
# 根据索引号查看GPU名字
print(torch.cuda.get_device_name(0)) # NVIDIA GeForce GTX 1070


2.0.0
False


AssertionError: Torch not compiled with CUDA enabled

需要安装pytorch，timm和torchvision，然后可以使用预训练的Vision Transformer模型。在这个例子中，我将使用timm库，它包含了许多预训练模型，包括Vision Transformer。也需要安装Pandas来处理数据集，以及使用PIL来处理图像。

我将使用预训练的Vision Transformer（ViT）进行迁移学习，而且在训练模型时，我将为错误诊断有疾病的人分配更高的权重，以尽量避免出现这种情况。这种方法被称为成本敏感的学习

我的医学数据集涉及到了一种称为"类别不平衡"的常见问题，特别是在医学图像处理中，更倾向于避免假阴性（将疾病分类为无疾病）而不是假阳性（将无疾病分类为有疾病）。可以通过为每个类别分配不同的权重来修改损失函数，使模型更倾向于正确分类疾病类别。

下面是一个例子，我将对疾病类别（1-4）分配更高的权重：


```
# 指定每个类别的权重，增大疾病类别的权重
weights = [0.5, 2.0, 2.0, 2.0, 2.0]
class_weights = torch.FloatTensor(weights).to(device)

# 使用加权交叉熵损失
criterion = nn.CrossEntropyLoss(weight=class_weights)

```

这将使得模型在错误分类疾病类别时受到更大的惩罚，因此模型更倾向于将图像分类为疾病类别，而不是无疾病类别。

注意，这种方法可能会导致更多的假阳性错误，因为模型可能会过于慎重，将一些无疾病的图像错误地分类为有疾病。因此，我们可能需要对这些权重进行调整，以找到正确分类和避免假阴性之间的最佳平衡。


In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from PIL import Image
import timm
from sklearn.model_selection import train_test_split
from torchvision import transforms
import matplotlib.pyplot as plt
import numpy as np
import time
from tqdm import tqdm

# 设定超参数
batch_size = 32
num_epochs = 60
learning_rate = 0.001
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 数据集类
class APTOSDataset(Dataset):
    def __init__(self, dataframe, root_dir, transform=None, has_labels=True):
        self.labels_frame = dataframe
        self.root_dir = root_dir
        self.transform = transform
        self.has_labels = has_labels

    def __len__(self):
        return len(self.labels_frame)

    def __getitem__(self, idx):
        img_name = self.labels_frame.iloc[idx, 0]
        image = Image.open(f"{self.root_dir}/{img_name}.png")

        if self.transform:
            image = self.transform(image)
        if self.has_labels:
          label = self.labels_frame.iloc[idx, 1]
          return image, label
        else:
          return image, _

# Vision Transformer模型类
class VisionTransformer(nn.Module):
    def __init__(self):
        super(VisionTransformer, self).__init__()
        self.model = timm.create_model('vit_base_patch16_224', pretrained=True)
        num_ftrs = self.model.head.in_features
        self.model.head = nn.Linear(num_ftrs, 5)  # 目标类别数为5

    def forward(self, x):
        x = self.model(x)
        return x

# 数据预处理
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

transform = transforms.Compose([
    transforms.RandomRotation(degrees=30),  # 随机旋转角度在 -30 到 30 度之间
    transforms.RandomResizedCrop(size=(224, 224), scale=(0.9, 1.1)),  # 随机缩放在 0.9 到 1.1 之间，然后随机裁剪到 224x224 大小
    transforms.RandomAffine(degrees=0, translate=(0.2, 0.2), shear=0.2),  # 随机扭曲，并将图像平移 x 和 y 方向的比例在 0.2 以内
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

train_csv_root = './train.csv'
test_csv_root = './test.csv'
train_img_root = './train_images'
test_img_root = './test_images'

# 划分训练集和验证集
train_labels_frame = pd.read_csv(train_csv_root)
train_df, val_df = train_test_split(train_labels_frame, test_size=0.1)

# 数据集和数据加载器
train_dataset = APTOSDataset(dataframe=train_df, root_dir=train_img_root, transform=transform)
val_dataset = APTOSDataset(dataframe=val_df, root_dir=train_img_root, transform=test_transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# 模型、损失函数和优化器
model = VisionTransformer().to(device)

# 指定每个类别的权重，增大疾病类别的权重
weights = [0.5, 2.0, 2.0, 2.0, 2.0]
class_weights = torch.FloatTensor(weights).to(device)
# 使用加权交叉熵损失
criterion = nn.CrossEntropyLoss(weight=class_weights)

optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# 训练模型
train_losses = []
val_losses = []
train_accuracies = []
val_accuracies = []

for epoch in range(num_epochs):
    train_loss = 0.0
    val_loss = 0.0
    train_correct = 0
    val_correct = 0

    start_time = time.time()  # 记录一个epoch的程序开始时间
    # print(f"epoch {epoch} training!")
    model.train()
    # for images, labels in train_loader:
    for images, labels in tqdm(train_loader, desc='train'):
        images = images.to(device)
        labels = labels.to(device)

        # 前向传播
        outputs = model(images)
        loss = criterion(outputs, labels)

        # 反向传播和优化
        optimizer.zero_grad()  # 把模型的梯度清0
        loss.backward() # 损失反向传播计算新的梯度
        optimizer.step() # 用上一步计算的新梯度来更新网络的权重

        train_loss += loss.item() * images.size(0)

        _, predicted = torch.max(outputs.data, 1)
        train_correct += (predicted == labels).sum().item()

    # print(f"epoch {epoch} valid!")
    model.eval()
    with torch.no_grad():
        # for images, labels in val_loader:
        for images, labels in tqdm(val_loader, desc='valid'):
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)
            loss = criterion(outputs, labels)

            val_loss += loss.item() * images.size(0)

            _, predicted = torch.max(outputs.data, 1)
            val_correct += (predicted == labels).sum().item()

    train_loss = train_loss/len(train_loader.sampler)
    val_loss = val_loss/len(val_loader.sampler)
    train_accuracy = train_correct / len(train_loader.sampler)
    val_accuracy = val_correct / len(val_loader.sampler)

    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accuracies.append(train_accuracy)
    val_accuracies.append(val_accuracy)


    end_time = time.time()  # 记录程序结束时间
    run_time = end_time - start_time  # 计算程序运行时间，单位为秒

    print(f'Epoch: {epoch+1} \t running time: {run_time:.2f} s \t Training Loss: {train_loss:.6f} \tValidation Loss: {val_loss:.6f} \tTraining Accuracy: {train_accuracy:.6f} \tValidation Accuracy: {val_accuracy:.6f}')
    if (epoch+1)%20==0:
      torch.save(model.state_dict(), f'model_weights_epoch{epoch+1}.pth')


# 绘制损失曲线和准确率曲线
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
ax1.plot(train_losses, label='Training loss')
ax1.plot(val_losses, label='Validation loss')
ax1.legend(frameon=False)
ax1.set_title("Loss curves")

ax2.plot(train_accuracies, label='Training accuracy')
ax2.plot(val_accuracies, label='Validation accuracy')
ax2.legend(frameon=False)
ax2.set_title("Accuracy curves")

# 保存图像到文件
plt.savefig('loss_and_accuracy_curves.png', dpi=300)

# 显示图像
plt.show()


## 保存测试集的预测结果

In [None]:
# 保存测试集的预测结果
test_labels_frame = pd.read_csv(test_csv_root)

test_dataset = APTOSDataset(dataframe=test_labels_frame, root_dir=test_img_root, transform=test_transform, has_labels=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

model.eval()
predictions = []
with torch.no_grad():
    # for images, _ in test_loader:
    for images, labels in tqdm(test_loader, desc='test'):
        images = images.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        predictions.extend(predicted.cpu().numpy())

# 保存预测结果到csv文件
submission = pd.DataFrame({'id_code': test_dataset.labels_frame['id_code'], 'diagnosis': predictions})
submission.to_csv('submission.csv', index=False)