### 载入环境

In [None]:
%pip install scikit-learn
%pip install tqdm

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
import torch.nn.functional as F
from torchvision import transforms
from torch.utils.data import DataLoader,Dataset
from sklearn.model_selection import train_test_split

import random
from tqdm import tqdm
import os
from PIL import Image
import numpy as np


### 数据预处理

In [None]:
class CustomImageDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label
    
base_path = '../dataset/01-猫咪分类/data/' 
dataset_path = '../dataset/01-猫咪分类/data/cat_12_train'
annotation_file = '../dataset/01-猫咪分类/data/train_list.txt'

def read_annotation_file(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()

    image_paths = []
    labels = []
    for line in lines:
        path, label = line.strip().split('\t')
        full_path = os.path.join(base_path, path)
        image_paths.append(full_path)
        labels.append(int(label))
    return image_paths, labels

image_paths, labels = read_annotation_file(annotation_file)

### 构建数据集和加载器
使用transform进行数据增广

In [None]:
# 定义训练集的图像变换，包括数据增广操作
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),  # 随机水平翻转
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0),  # 改变图像的亮度等
    transforms.Resize((224, 224)),
    transforms.ToTensor(), 
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 使用ImageNet的均值和标准差进行归一化
])

# 定义测试集的图像变换
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),  # 将图像转换为PyTorch张量
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 使用ImageNet的均值和标准差进行归一化
])

# 随机分割图像路径和标签
train_image_paths, test_image_paths, train_labels, test_labels = train_test_split(
    image_paths, labels, test_size=0.1, stratify=labels)

# 创建训练集和测试集
train_dataset = CustomImageDataset(train_image_paths, train_labels, transform=train_transform)
test_dataset = CustomImageDataset(test_image_paths, test_labels, transform=test_transform)

# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


通过打印数据集中的图片和标签来验证数据集是否加载成功

In [None]:
import matplotlib.pyplot as plt

# 获取一批数据
images, labels = next(iter(train_loader))

# 取出第一张图片和它的标签
image, label = images[0], labels[0]

# 因为图像被转换为了 Tensor，并且进行了标准化，所以我们需要先将其转换回 PIL Image
image = image.permute(1, 2, 0)  
image = image * torch.Tensor([0.229, 0.224, 0.225]) + torch.Tensor([0.485, 0.456, 0.406])  
image = (image * 255).byte()  

# 显示图片
plt.imshow(image)
plt.show()

# 打印标签
print("Label:", label.item())

### 搭建CNN网络(RESNET50)

In [None]:
class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.conv3 = nn.Conv2d(out_channels, out_channels * self.expansion, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(out_channels * self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

class ResNet50(nn.Module):

    # 将resnet网络的输出通道改为12
    def __init__(self, num_classes=12):
        super(ResNet50, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(Bottleneck, 64, 3)
        self.layer2 = self._make_layer(Bottleneck, 128, 4, stride=2)
        self.layer3 = self._make_layer(Bottleneck, 256, 6, stride=2)
        self.layer4 = self._make_layer(Bottleneck, 512, 3, stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
       
        self.fc = nn.Linear(512 * Bottleneck.expansion, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_channels != out_channels * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * block.expansion),
            )

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x


model_cnn = ResNet50()



将resnet50的预训练模型参数导入自己定义的网络中

In [None]:
pretrained_model_cnn = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V2)

# 获取预训练模型的参数
pretrained_dict_cnn = pretrained_model_cnn.state_dict()

# 获取我的模型的参数
model_dict_cnn = model_cnn.state_dict()

# 过滤出在自己模型中存在的预训练参数
pretrained_dict_cnn = {k: v for k, v in pretrained_dict_cnn.items() if k in model_dict_cnn and model_dict_cnn[k].shape == pretrained_dict_cnn[k].shape}

# 更新模型的参数
model_dict_cnn.update(pretrained_dict_cnn)

# 将更新后的参数加载到我自己的模型中
model_cnn.load_state_dict(model_dict_cnn)

### 搭建Transformer网络（VIT）

In [None]:
# 将图像分割为小块并进行线性嵌入
class PatchEmbedding(nn.Module):

    def __init__(self, img_size, patch_size, in_channels, hidden_dim):
        super().__init__()
        self.img_size = img_size
        self.patch_size = patch_size
        self.num_patches = (img_size // patch_size) ** 2

        self.projection = nn.Conv2d(in_channels, hidden_dim, kernel_size=patch_size, stride=patch_size)

    def forward(self, x):
        x = self.projection(x)  # (B, hidden_dim, H/P, W/P)
        x = x.flatten(2)        # (B, hidden_dim, num_patches)
        x = x.transpose(1, 2)   # (B, num_patches, hidden_dim)
        return x
    
#多头自注意力机制
class MultiHeadAttention(nn.Module):

    def __init__(self, hidden_dim, num_heads, dropout_rate):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.num_heads = num_heads
        self.scale = hidden_dim ** -0.5

        self.qkv = nn.Linear(hidden_dim, hidden_dim * 3)
        self.attention_dropout = nn.Dropout(dropout_rate)
        self.out = nn.Linear(hidden_dim, hidden_dim)
        self.out_dropout = nn.Dropout(dropout_rate)

    def forward(self, x):
        B, N, C = x.shape
        qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4)
        q, k, v = qkv.unbind(0)

        attn = (q @ k.transpose(-2, -1)) * self.scale
        attn = attn.softmax(dim=-1)
        attn = self.attention_dropout(attn)

        x = (attn @ v).transpose(1, 2).reshape(B, N, C)
        x = self.out(x)
        x = self.out_dropout(x)
        return x
    
# Transformer编码器
class Encoder(nn.Module):
    
    def __init__(self, hidden_dim, num_heads, mlp_dim, dropout_rate):
        super().__init__()
        self.ln_1 = nn.LayerNorm(hidden_dim)
        self.attention = MultiHeadAttention(hidden_dim, num_heads, dropout_rate)
        self.ln_2 = nn.LayerNorm(hidden_dim)

        self.mlp = nn.Sequential(
            nn.Linear(hidden_dim, mlp_dim),
            nn.GELU(),
            nn.Dropout(dropout_rate),
            nn.Linear(mlp_dim, hidden_dim),
            nn.Dropout(dropout_rate),
        )

    def forward(self, x):
        x = x + self.attention(self.ln_1(x))
        x = x + self.mlp(self.ln_2(x))
        return x

class VisionTransformer(nn.Module):
    def __init__(self, img_size, patch_size, in_channels, num_layers, num_heads, hidden_dim, mlp_dim, num_classes, dropout_rate=0.1):
        super().__init__()

        self.patch_embedding = PatchEmbedding(img_size, patch_size, in_channels, hidden_dim)
        self.class_token = nn.Parameter(torch.zeros(1, 1, hidden_dim))
        self.position_embedding = nn.Parameter(torch.zeros(1, 1 + self.patch_embedding.num_patches, hidden_dim))

        self.encoder = nn.Sequential(*[Encoder(hidden_dim, num_heads, mlp_dim, dropout_rate) for _ in range(num_layers)])

        self.ln = nn.LayerNorm(hidden_dim)
        self.head = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):
        x = self.patch_embedding(x)
        class_token = self.class_token.expand(x.shape[0], -1, -1)
        x = torch.cat((class_token, x), dim=1)
        x = x + self.position_embedding

        x = self.encoder(x)
        x = self.ln(x)

        return self.head(x[:, 0])
    
model_trans = VisionTransformer(img_size=224, patch_size=16, in_channels=3, num_layers=12, num_heads=12, hidden_dim=768, mlp_dim=3072, num_classes=1000)

将VIT的预训练模型参数导入自己定义的网络中

In [None]:
# 获取预训练模型的参数
pretrained_dict_trans = torch.load('../state/01-cat/vit_b_16-c867db91.pth')

# 获取我的模型的参数
model_dict_trans = model_trans.state_dict()

# 修改在自己模型中不存在的预训练参数
pretrained_dict_fix = {}
for k, v in pretrained_dict_trans.items():
    if k in model_dict_trans:
        pretrained_dict_fix[k] = v
    elif "conv_proj" in k:
        key = k.replace("conv_proj", "patch_embedding.projection")
        pretrained_dict_fix[key] = v
    elif "encoder.pos_embedding" in k:
        key = k.replace("encoder.pos_embedding", "position_embedding")
        pretrained_dict_fix[key] = v
    elif "ln_1" in k:
        key = k.replace("ln_1", "ln_1")
        key = key.replace("encoder.layers.encoder_layer_", "encoder.")
        pretrained_dict_fix[key] = v
    elif "ln_2" in k:
        key = k.replace("ln_2", "ln_2")
        key = key.replace("encoder.layers.encoder_layer_", "encoder.")
        pretrained_dict_fix[key] = v
    elif "self_attention.in_proj_weight" in k:
        key = k.replace("self_attention.in_proj_weight", "attention.qkv.weight")
        key = key.replace("encoder.layers.encoder_layer_", "encoder.")
        key = key.replace("self_attention", "attention")
        pretrained_dict_fix[key] = v
    elif "self_attention.in_proj_bias" in k:
        key = k.replace("self_attention.in_proj_bias", "attention.qkv.bias")
        key = key.replace("encoder.layers.encoder_layer_", "encoder.")
        key = key.replace("self_attention", "attention")
        pretrained_dict_fix[key] = v
    elif "self_attention.out_proj.weight" in k:
        key = k.replace("self_attention.out_proj.weight", "attention.out.weight")
        key = key.replace("encoder.layers.encoder_layer_", "encoder.")
        key = key.replace("self_attention", "attention")
        pretrained_dict_fix[key] = v
    elif "self_attention.out_proj.bias" in k:
        key = k.replace("self_attention.out_proj.bias", "attention.out.bias")
        key = key.replace("encoder.layers.encoder_layer_", "encoder.")
        key = key.replace("self_attention", "attention")
        pretrained_dict_fix[key] = v
    elif "mlp.linear_1" in k:
        key = k.replace("mlp.linear_1", "mlp.0")
        key = key.replace("encoder.layers.encoder_layer_", "encoder.")
        pretrained_dict_fix[key] = v
    elif "mlp.linear_2" in k:
        key = k.replace("mlp.linear_2", "mlp.3")
        key = key.replace("encoder.layers.encoder_layer_", "encoder.")
        pretrained_dict_fix[key] = v
    elif "encoder.ln.weight" in k:
        key = k.replace("encoder.ln.weight", "ln.weight")
        pretrained_dict_fix[key] = v
    elif "encoder.ln.bias" in k:
        key = k.replace("encoder.ln.bias", "ln.bias")
        pretrained_dict_fix[key] = v
    elif "heads.head.weight" in k:
        key = k.replace("heads.head.weight", "head.weight")
        pretrained_dict_fix[key] = v
    elif "heads.head.bias" in k:
        key = k.replace("heads.head.bias", "head.bias")
        pretrained_dict_fix[key] = v
    else:
        print(k)
        print("Not found")
        print()

# 更新模型的参数
model_dict_trans.update(pretrained_dict_fix)


# 将更新后的参数加载到我自己的模型中
model_trans.load_state_dict(model_dict_trans)


### 定义损失函数及优化器

In [None]:


#  在将模型和损失函数移动到GPU之前，先定义它们。这样可以避免在CPU上创建不必要的副本
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device.type}')

# 选择要训练的模型(CNN或Transformer)
model_name = 'CNN'

if model_name == 'CNN':
    model = model_cnn.to(device)

    epochs = 30
    learning_rate = 0.000025 #CNN网络推荐使用0.000025，Transformer网络推荐使用0.00002
else:
    model = model_trans.to(device)

    epochs = 50
    learning_rate = 0.00002
    
# 损失函数和优化器
criterion = nn.CrossEntropyLoss().to(device)
optimizer = optim.AdamW(model.parameters(), lr=learning_rate)

### 训练模型

In [None]:
if device.type == 'cuda':
        torch.cuda.empty_cache()

min_loss = float('inf')
      
model.train()

for epoch in range(epochs):
    running_loss = 0.0
    correct = 0
    total = 0

    # 使用 tqdm 显示进度条，方便观察进度
    progress_bar = tqdm(enumerate(train_loader), total=len(train_loader), desc="Epoch: %d" % (epoch+1))
    for i, data in progress_bar:
        inputs, labels =  data[0].to(device), data[1].to(device)
        optimizer.zero_grad()

        # 前向传播
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        
        # 在进度条上实时显示当前的平均损失和准确率
        progress_bar.set_postfix({'loss': running_loss / (i + 1), 'acc': f'{100. * correct / total:.2f}%'})

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = 100 * correct / total
    print(f'Epoch {epoch+1}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.2f}%')

    if epoch_loss < min_loss and epoch_acc > 95:
        min_loss = epoch_loss
        if model_name == 'CNN':
            torch.save(model.state_dict(), '../state/01-cat/model_cnn.pt')
        else:
            torch.save(model.state_dict(), '../state/01-cat/model_trans.pt')

    # 每个epoch结束后清除GPU缓存
    if device.type == 'cuda':
        torch.cuda.empty_cache()

### 测试模型

In [None]:
if model_name == 'CNN':
    model.load_state_dict(torch.load('../state/01-cat/model_cnn.pt'))
else:
    model.load_state_dict(torch.load('../state/01-cat/model_trans.pt'))


model.eval()
correct = 0
total = 0
running_loss = 0.0
progress_bar = tqdm(enumerate(test_loader), total=len(test_loader), desc="Testing")
with torch.no_grad():
    for i, data in progress_bar:
        images, labels = data[0].to(device), data[1].to(device)
        outputs = model(images)
        loss = criterion(outputs, labels)
        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        progress_bar.set_postfix({'loss': running_loss / (i + 1), 'acc': f'{100. * correct / total:.2f}%'})

print('Accuracy of the network on test images: %.2f %%' % (100 * correct / total))


# 从测试集中随机选择3个样本
indices = random.sample(range(len(test_dataset)), 3)
samples = [test_dataset[i] for i in indices]
images = torch.stack([s[0] for s in samples])
labels = torch.tensor([s[1] for s in samples])

outputs = model(images.to(device))
_, predicted = torch.max(outputs, 1)
predicted = predicted.to('cpu')
labels = labels.to('cpu')

# 展示图像和预测结果
fig, axes = plt.subplots(nrows=1, ncols=3, figsize=(9, 3))
for i, ax in enumerate(axes):
    image = images[i].permute(1, 2, 0)
    image = (image - image.min()) / (image.max() - image.min())  # 归一化到[0, 1]范围
    ax.imshow(image)
    ax.title.set_text(f'Predicted: {predicted[i]}, Truth: {labels[i]}')
plt.show()