构建瓶颈块

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
from tqdm import tqdm

# 瓶颈块定义
class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.conv3 = nn.Conv2d(out_channels, out_channels * self.expansion, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(out_channels * self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample

    def forward(self, x):
        identity = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)
        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)
        return out

ResNet

In [None]:
# ResNet 结构定义
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=10):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)  # 输入通道为1
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_channels != out_channels * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * block.expansion),
            )

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

ResNet50, ResNet101

In [None]:
def resnet50(num_classes=10):
    return ResNet(Bottleneck, [3, 4, 6, 3], num_classes)
def resnet101(num_classes=10):
    return ResNet(Bottleneck, [3, 4, 23, 3], num_classes)

定义 ResNet50, ResNet101

In [None]:
input_tensor = torch.randn(1, 1, 96, 96)

# 实例化模型
model_50 = resnet50(num_classes=10)
model_101 = resnet101(num_classes=10)

# 用模型进行前向传播
output_50 = model_50(input_tensor)
output_101 = model_101(input_tensor)

# 打印输出形状
print(f"ResNet50 output shape: {output_50.shape}")
print(f"ResNet101 output shape: {output_101.shape}")

加载数据集

In [None]:
class FashionMNIST:
    """The Fashion-MNIST dataset."""

    def __init__(self, batch_size=64, resize=(28, 28), root='../data/FashionMNIST'):
        self.batch_size = batch_size
        self.root = root
        self.resize = resize
        # 定义数据转换，包括缩放和转为张量
        self.transform = transforms.Compose([
            transforms.Resize(resize),
            transforms.ToTensor()
        ])
        # 加载训练和验证集
        self.train = torchvision.datasets.FashionMNIST(
            root="../data", train=True, transform=self.transform, download=True)
        self.val = torchvision.datasets.FashionMNIST(
            root="../data", train=False, transform=self.transform, download=True)

    def text_labels(self, indices):
        """返回文本标签"""
        labels = ['t-shirt', 'trouser', 'pullover', 'dress', 'coat',
                  'sandal', 'shirt', 'sneaker', 'bag', 'ankle boot']
        return [labels[int(i)] for i in indices]

    def get_dataloader(self, train=True):
        """获取训练或验证数据加载器"""
        data = self.train if train else self.val
        return torch.utils.data.DataLoader(data, self.batch_size, shuffle=train)

# 绘制图像函数
def show_images(imgs, num_rows, num_cols, titles=None, scale=1.5):
    """绘制图像列表"""
    plt.figure(figsize=(num_cols * scale, num_rows * scale))
    for i in range(num_rows * num_cols):
        ax = plt.subplot(num_rows, num_cols, i + 1)
        if titles is not None:
            ax.set_title(titles[i])
        ax.imshow(imgs[i].numpy().squeeze(), cmap='gray')
        ax.axis('off')
    plt.tight_layout()
    plt.show()

训练

In [None]:
import matplotlib.pyplot as plt

# 在训练和测试模型的函数中定义两个列表，用于收集损失和准确率
train_losses = []
test_losses = []
test_accuracies = []

def train_and_test_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10, device='cpu'):
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0

        # 训练阶段
        for images, labels in tqdm(train_loader, desc=f'Training Epoch [{epoch + 1}/{num_epochs}]'):
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()

            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        train_loss = running_loss / len(train_loader)
        test_loss, test_accuracy = test_model(model, val_loader, criterion, device)

        # 记录损失和准确率
        train_losses.append(train_loss)
        test_losses.append(test_loss)
        test_accuracies.append(test_accuracy)

        print(f'Epoch [{epoch + 1}/{num_epochs}], Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')

    # 绘制损失和准确率曲线
    plot_metrics()

def plot_metrics():
    """绘制训练损失、测试损失和测试准确率"""
    plt.figure(figsize=(12, 5))

    # 创建损失图
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss', color='blue')
    plt.plot(test_losses, label='Test Loss', color='red')
    plt.title('Loss Curves')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    # 创建测试准确率图
    plt.subplot(1, 2, 2)
    plt.plot(test_accuracies, label='Test Accuracy', color='green')
    plt.title('Test Accuracy Curve')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.tight_layout()
    plt.show()

def test_model(model, val_loader, criterion, device):
    model.eval()  # 进入评估模式
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():  # 禁用梯度计算
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)  # 计算损失
            running_loss += loss.item()

            # 计算正确预测的数量
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    test_loss = running_loss / len(val_loader)
    test_accuracy = correct / total
    return test_loss, test_accuracy

def visualize_images(model, dataloader, device):
    model.eval()  # 进入评估模式
    dataiter = iter(dataloader)
    images, labels = next(dataiter)
    images, labels = images.to(device), labels.to(device)

    with torch.no_grad():
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)

    # 可视化结果
    fig, axes = plt.subplots(2, 5, figsize=(15, 6))
    for i in range(10):
        ax = axes[i // 5, i % 5]
        ax.imshow(images[i].cpu().numpy().squeeze(), cmap='gray')  # 显示图像
        ax.set_title(f'Pred: {predicted[i].item()}, True: {labels[i].item()}')
        ax.axis('off')
    plt.tight_layout()
    plt.show()

进行训练

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')

# 创建 FashionMNIST 数据模块
fashion_mnist = FashionMNIST(batch_size=64)

# 获取训练和验证数据加载器
train_loader = fashion_mnist.get_dataloader(train=True)
val_loader = fashion_mnist.get_dataloader(train=False)

# 初始化模型、损失函数和优化器
model = resnet50(num_classes=10).to(device)  # 将模型移动到 GPU
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# 训练和测试模型
train_and_test_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10, device=device)

# 可视化图像并验证模型
visualize_images(model, train_loader, device)
