In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import numpy as np
import os
import pickle
import time
from torch.utils.data import Dataset, DataLoader

# 使用CPU
device = torch.device("cpu")
print("Using CPU for training")

Using CPU for training


In [3]:
# 定义ResNet基本块
class OptimizedBasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1):
        super(OptimizedBasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != self.expansion * out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, self.expansion * out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion * out_channels)
            )

    def forward(self, x):
        out = torch.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = torch.relu(out)
        return out

In [5]:
# 定义CPU优化的ResNet网络
class CPUOptimizedResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(CPUOptimizedResNet, self).__init__()
        self.in_channels = 32  # 减少初始通道数以降低计算量

        
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(32)
        self.layer1 = self._make_layer(block, 32, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 64, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 128, num_blocks[2], stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.linear = nn.Linear(128 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = torch.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.avgpool(out)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out


In [7]:
# 定义可pickle的数据集类
class CIFAR10Dataset(Dataset):
    def __init__(self, data, labels, transform=None):
        self.data = data
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img = self.data[idx]
        label = self.labels[idx]

        if self.transform:
            img = self.transform(img)

        return img, label

In [9]:
# 加载CIFAR10数据
def load_cifar10_cpu(data_dir):
    # 训练数据
    train_data = []
    train_labels = []
    for i in range(1, 6):
        batch_file = os.path.join(data_dir, f'data_batch_{i}')
        with open(batch_file, 'rb') as fo:
            batch_dict = pickle.load(fo, encoding='bytes')
        train_data.append(batch_dict[b'data'])
        train_labels += batch_dict[b'labels']

    train_data = np.concatenate(train_data, dtype=np.uint8)
    train_data = train_data.reshape((-1, 3, 32, 32)).transpose(0, 2, 3, 1)

    # 测试数据
    test_file = os.path.join(data_dir, 'test_batch')
    with open(test_file, 'rb') as fo:
        test_dict = pickle.load(fo, encoding='bytes')
    test_data = test_dict[b'data']
    test_labels = test_dict[b'labels']
    test_data = np.array(test_data, dtype=np.uint8).reshape((-1, 3, 32, 32)).transpose(0, 2, 3, 1)

    return train_data, train_labels, test_data, test_labels


In [11]:
# CPU优化版获取数据转换
def get_cpu_transforms():CPU优化版
    train_transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.RandomHorizontalFlip(),  # 仅保留水平翻转，移除随机裁剪以减少计算
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])

    test_transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])

    return train_transform, test_transform

IndentationError: unexpected indent (4013043470.py, line 3)

In [13]:
# CPU优化的训练函数
def cpu_train_epoch(model, train_loader, criterion, optimizer):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

    train_loss = running_loss / len(train_loader)
    train_acc = 100. * correct / total

    return train_loss, train_acc


In [15]:
# CPU优化的测试函数
def cpu_test_epoch(model, test_loader, criterion):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

    test_loss = running_loss / len(test_loader)
    test_acc = 100. * correct / total

    return test_loss, test_acc


In [17]:
# 主函数
def main():
    # 设置参数
    data_dir = "D:\\QG训练\\小组\\week3"
    batch_size = 64  # 减小批大小以适应CPU内存
    initial_lr = 0.05  
    num_epochs = 30  

    # 加载数据
    train_data, train_labels, test_data, test_labels = load_cifar10_cpu(data_dir)
    train_transform, test_transform = get_cpu_transforms()

    # 创建数据集
    train_dataset = CIFAR10Dataset(train_data, train_labels, train_transform)
    test_dataset = CIFAR10Dataset(test_data, test_labels, test_transform)

    # 创建数据加载器(禁用多进程以在Windows上更好工作)
    train_loader = DataLoader(
        train_dataset, batch_size=batch_size, shuffle=True,
        num_workers=0, pin_memory=False
    )
    test_loader = DataLoader(
        test_dataset, batch_size=batch_size, shuffle=False,
        num_workers=0, pin_memory=False
    )

    # 初始化模型(更轻量级的版本)
    model = CPUOptimizedResNet(OptimizedBasicBlock, [1, 1, 1]).to(device)  # 减少层数

    # 使用交叉熵损失
    criterion = nn.CrossEntropyLoss()

    # 使用带动量的SGD优化器
    optimizer = optim.SGD(
        model.parameters(),
        lr=initial_lr,
        momentum=0.9,
        weight_decay=5e-4
    )

    # 使用简单的学习率衰减
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

    # 训练循环
    best_acc = 0.0
    start_time = time.time()

    for epoch in range(num_epochs):
        epoch_start = time.time()

        train_loss, train_acc = cpu_train_epoch(model, train_loader, criterion, optimizer)
        test_loss, test_acc = cpu_test_epoch(model, test_loader, criterion)
        scheduler.step()

        epoch_time = time.time() - epoch_start

        # 保存最佳模型
        if test_acc > best_acc:
            best_acc = test_acc
            torch.save(model.state_dict(), 'best_model_cpu.pth')

        print(f'Epoch [{epoch + 1}/{num_epochs}] | '
              f'Time: {epoch_time:.2f}s | '
              f'Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}% | '
              f'Test Loss: {test_loss:.4f} | Test Acc: {test_acc:.2f}% | '
              f'Best Acc: {best_acc:.2f}%')

    total_time = time.time() - start_time
    print(f'Training completed in {total_time:.2f} seconds')
    print(f'Best Test Accuracy: {best_acc:.2f}%')

In [None]:
if __name__ == '__main__':
    main()