In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from PIL import Image
import os

# 定义残差块
class BasicBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += self.shortcut(x)
        out = self.relu(out)
        return out

# 定义 ResNet-18 模型
class ResNet(nn.Module):
    def __init__(self, block, num_classes=10):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # 定义各个层
        self.layer1 = self._make_layer(block, 64, 2, stride=1)
        self.layer2 = self._make_layer(block, 128, 2, stride=2)
        self.layer3 = self._make_layer(block, 256, 2, stride=2)
        self.layer4 = self._make_layer(block, 512, 2, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride):
        layers = []
        layers.append(block(self.in_channels, out_channels, stride))
        self.in_channels = out_channels
        for _ in range(1, blocks):
            layers.append(block(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

In [7]:
  
# 超参数
batch_size = 64
num_epochs = 20
learning_rate = 0.001

# 数据预处理和增强
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),  # 随机水平翻转
    transforms.RandomRotation(10),      # 随机旋转
    transforms.RandomCrop(32, padding=4),  # 随机裁剪，加上填充
    transforms.Resize((224, 224)),      # 将图像调整为224x224
    transforms.ToTensor(),               # 转为Tensor
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),  # 归一化
])

# 下载和加载 CIFAR-10 数据集
train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                             download=True, transform=transform)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                            download=True, transform=transform)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

# 初始化模型
model = ResNet(BasicBlock)

# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

Files already downloaded and verified
Files already downloaded and verified


In [8]:
# 保存检查点
def save_checkpoint(model, optimizer, epoch):
    checkpoint_filename = f'checkpoint_epoch_{epoch}.pth'
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
    }, checkpoint_filename)
    print(f"Checkpoint saved: {checkpoint_filename}")

# 加载模型检查点
def load_checkpoint(model, optimizer, checkpoint_path):
    if os.path.isfile(checkpoint_path):
        checkpoint = torch.load(checkpoint_path)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        epoch = checkpoint['epoch']
        print(f"Loaded checkpoint from epoch {epoch}")
    else:
        print("No checkpoint found.")


load_checkpoint(model, optimizer, 'checkpoint_epoch_0.pth')  # 可选的最新检查点

# 训练模型
for epoch in range(num_epochs):
    model.train()
    for i, (images, labels) in enumerate(train_loader):
        images, labels = images.to(device), labels.to(device)

        # 前向传播
        outputs = model(images)
        loss = criterion(outputs, labels)

        # 后向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (i + 1) % 100 == 0:
            print(f'Epoch [{epoch + 1}/{num_epochs}], Step [{i + 1}/{len(train_loader)}], Loss: {loss.item():.4f}')
    
    # 保存检查点
    save_checkpoint(model, optimizer, epoch + 1)  # 保存每个 epoch 的检查点


No checkpoint found.
Epoch [1/20], Step [100/782], Loss: 1.9884
Epoch [1/20], Step [200/782], Loss: 1.7790
Epoch [1/20], Step [300/782], Loss: 1.5558
Epoch [1/20], Step [400/782], Loss: 1.3486
Epoch [1/20], Step [500/782], Loss: 1.5597
Epoch [1/20], Step [600/782], Loss: 1.1876
Epoch [1/20], Step [700/782], Loss: 1.5459
Checkpoint saved: checkpoint_epoch_1.pth
Epoch [2/20], Step [100/782], Loss: 1.0549
Epoch [2/20], Step [200/782], Loss: 1.1639
Epoch [2/20], Step [300/782], Loss: 0.8396
Epoch [2/20], Step [400/782], Loss: 1.3172
Epoch [2/20], Step [500/782], Loss: 1.2901
Epoch [2/20], Step [600/782], Loss: 1.1530
Epoch [2/20], Step [700/782], Loss: 1.0441
Checkpoint saved: checkpoint_epoch_2.pth
Epoch [3/20], Step [100/782], Loss: 1.0583
Epoch [3/20], Step [200/782], Loss: 0.8677
Epoch [3/20], Step [300/782], Loss: 0.7281
Epoch [3/20], Step [400/782], Loss: 0.8246
Epoch [3/20], Step [500/782], Loss: 0.8043
Epoch [3/20], Step [600/782], Loss: 0.7541
Epoch [3/20], Step [700/782], Loss: 0

In [11]:
# 测试模型
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f'Accuracy of the model on the 10000 test images: {100 * correct / total:.2f}%')

Accuracy of the model on the 10000 test images: 87.21%


In [12]:
# # 加载已保存的模型检查点
# def load_model(model, optimizer):
#     if os.path.isfile(checkpoint_path):
#         checkpoint = torch.load(checkpoint_path)
#         model.load_state_dict(checkpoint['model_state_dict'])
#         optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
#         print("Model and optimizer state loaded from checkpoint.")
#     else:
#         print("No checkpoint found.")

# # 图像分类函数
# def classify_image(img_path, model):
#     model.eval()  # 设置为评估模式
#     with torch.no_grad():
#         img = Image.open(img_path)
#         img = transform(img).unsqueeze(0).to(device)  # 添加批次维度并移动到设备
#         outputs = model(img)
#         _, predicted = torch.max(outputs.data, 1)
#         return predicted.item()

# # 示例：加载模型并分类单张图片
# model = ResNet(BasicBlock)
# optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# # 加载模型
# load_model(model, optimizer)

# # 替换为你的图片路径
# image_path = 'path_to_your_image.jpg'
# predicted_class = classify_image(image_path, model)
# print(f'Predicted class for the image: {predicted_class}')
print('ok')