In [1]:
# 导入必要的库
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchsummary import summary
import matplotlib.pyplot as plt
import numpy as np

In [2]:
# 数据增强
transform = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.RandomRotation(15),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    transforms.RandomErasing(p=0.5, scale=(0.02, 0.2))
])

test_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
# 加载CIFAR-10数据集
trainset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
testset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

# 划分训练集和验证集（80%训练，20%验证）
train_size = int(0.8 * len(trainset))
val_size = len(trainset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(trainset, [train_size, val_size])

# 创建数据加载器
trainloader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=2)
valloader = DataLoader(val_dataset, batch_size=64, shuffle=False, num_workers=2)
testloader = DataLoader(testset, batch_size=64, shuffle=False, num_workers=2)

# 类别名称（用于报告）
classes = ('airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

Files already downloaded and verified
Files already downloaded and verified


In [3]:
# 定义残差块
class BasicBlock(nn.Module):
    expansion = 1
    def __init__(self, in_channels, out_channels, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += self.shortcut(x)
        out = self.relu(out)
        return out

# 定义ResNet变体
class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        self.in_channels = 48  # 初始通道从 32 增加到 64
        self.conv1 = nn.Conv2d(3, 48, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(48)
        self.relu = nn.ReLU(inplace=True)
        self.layer1 = self.make_layer(block, 48, num_blocks[0], stride=1)
        self.layer2 = self.make_layer(block, 196, num_blocks[1], stride=2)
        self.layer3 = self.make_layer(block, 192, num_blocks[2], stride=2)
        self.layer4 = self.make_layer(block, 384, num_blocks[3], stride=2)
        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.dropout = nn.Dropout(0.5)  # 添加 Dropout 防止过拟合
        self.fc = nn.Linear(384 * block.expansion, num_classes)

    def make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.avg_pool(out)
        out = self.dropout(out.view(out.size(0), -1))  # 在全连接层前应用 Dropout
        out = self.fc(out)
        return out

In [4]:
# 创建模型实例（减少层数和通道数以控制参数<500万）
model = ResNet(BasicBlock, [2, 2, 2, 1])  # 每个阶段2个残差块
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# 检查模型参数量
summary(model, (3, 32, 32))  # 确保参数<500万

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 48, 32, 32]           1,296
       BatchNorm2d-2           [-1, 48, 32, 32]              96
              ReLU-3           [-1, 48, 32, 32]               0
            Conv2d-4           [-1, 48, 32, 32]          20,736
       BatchNorm2d-5           [-1, 48, 32, 32]              96
              ReLU-6           [-1, 48, 32, 32]               0
            Conv2d-7           [-1, 48, 32, 32]          20,736
       BatchNorm2d-8           [-1, 48, 32, 32]              96
              ReLU-9           [-1, 48, 32, 32]               0
       BasicBlock-10           [-1, 48, 32, 32]               0
           Conv2d-11           [-1, 48, 32, 32]          20,736
      BatchNorm2d-12           [-1, 48, 32, 32]              96
             ReLU-13           [-1, 48, 32, 32]               0
           Conv2d-14           [-1, 48,

In [None]:
def train_model(model, trainloader, valloader, num_epochs=40, device=device):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)  # 添加权重衰减
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)  # 每 10 epoch 降低学习率
    
    train_losses = []
    val_accuracies = []
    
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for i, data in enumerate(trainloader, 0):
            inputs, labels = data[0].to(device), data[1].to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            if i % 100 == 99:
                print(f'Epoch [{epoch + 1}/{num_epochs}], Step [{i + 1}/{len(trainloader)}], Loss: {running_loss / 100:.3f}')
                running_loss = 0.0
        
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for data in valloader:
                inputs, labels = data[0].to(device), data[1].to(device)
                outputs = model(inputs)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        val_acc = 100 * correct / total
        print(f'Validation Accuracy: {val_acc:.2f}%')
        train_losses.append(running_loss / len(trainloader))
        val_accuracies.append(val_acc)
        scheduler.step()
        
        if val_acc >= 94:  # 目标提升到 90%
            break
    
    torch.save(model.state_dict(), 'resnet_model.pth')
    print("Model saved as 'resnet_model.pth'")
    
    return train_losses, val_accuracies

train_losses, val_accuracies = train_model(model, trainloader, valloader)

# 绘制损失和准确率曲线
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Training Loss')
plt.title('Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(val_accuracies, label='Validation Accuracy')
plt.title('Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.legend()
plt.show()

Epoch [1/40], Step [100/625], Loss: 2.053
Epoch [1/40], Step [200/625], Loss: 1.845
Epoch [1/40], Step [300/625], Loss: 1.752
Epoch [1/40], Step [400/625], Loss: 1.654
Epoch [1/40], Step [500/625], Loss: 1.599
Epoch [1/40], Step [600/625], Loss: 1.551
Validation Accuracy: 46.96%
Epoch [2/40], Step [100/625], Loss: 1.460
Epoch [2/40], Step [200/625], Loss: 1.409
Epoch [2/40], Step [300/625], Loss: 1.379
Epoch [2/40], Step [400/625], Loss: 1.333
Epoch [2/40], Step [500/625], Loss: 1.296
Epoch [2/40], Step [600/625], Loss: 1.260
Validation Accuracy: 50.52%
Epoch [3/40], Step [100/625], Loss: 1.210
Epoch [3/40], Step [200/625], Loss: 1.197
Epoch [3/40], Step [300/625], Loss: 1.188
Epoch [3/40], Step [400/625], Loss: 1.126
Epoch [3/40], Step [500/625], Loss: 1.105
Epoch [3/40], Step [600/625], Loss: 1.105
Validation Accuracy: 58.97%
Epoch [4/40], Step [100/625], Loss: 1.034
Epoch [4/40], Step [200/625], Loss: 1.063
Epoch [4/40], Step [300/625], Loss: 1.008
Epoch [4/40], Step [400/625], Loss

In [None]:
# 在 train_model 函数末尾添加
torch.save(model.state_dict(), 'resnet_model.pth')
print("Model saved as 'resnet_model.pth'")

import pickle
import numpy as np
import matplotlib.pyplot as plt

def unpickle(file):
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

# 加载自定义测试集
test_pkl_path = './test_data/cifar_test_nolabel.pkl'
data_dict = unpickle(test_pkl_path)
images = data_dict[b'data']
print(f"Raw data shape: {images.shape}")
print(f"Data type: {images.dtype}")

# 检查数值范围
print(f"Min value: {images.min()}, Max value: {images.max()}")

def predict_with_tta(model, testloader, device, num_augment=5):
    model.eval()
    predictions = []
    ids = []
    aug_transform = transforms.Compose([
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomRotation(10),
        transforms.ToPILImage(),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
    with torch.no_grad():
        for inputs, batch_ids in testloader:
            batch_preds = torch.zeros(inputs.size(0), 10, device=device)
            for _ in range(num_augment):
                aug_inputs = torch.stack([aug_transform(inputs[i].cpu()) for i in range(inputs.size(0))])
                aug_inputs = aug_inputs.to(device)
                outputs = model(aug_inputs)
                batch_preds += outputs.softmax(dim=1)
            _, predicted = torch.max(batch_preds, 1)
            predictions.extend(predicted.cpu().numpy())
            ids.extend(batch_ids.numpy())
    return ids, predictions

In [None]:
import pickle
import numpy as np
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import pandas as pd
import torch
import time

def unpickle(file):
    print(f"Loading {file}...")
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    print("File loaded successfully.")
    return dict

In [None]:
# 修正后的 CustomTestDataset
class CustomTestDataset(Dataset):
    def __init__(self, pkl_path, transform=None):
        data_dict = unpickle(pkl_path)
        self.images = data_dict[b'data']  # 直接使用 (10000, 32, 32, 3)
        print(f"Loaded images shape: {self.images.shape}")
        print(f"Data type: {self.images.dtype}")
        print(f"Min value: {self.images.min()}, Max value: {self.images.max()}")
        self.ids = list(range(len(self.images)))
        self.transform = transform
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        img = self.images[idx]  # (32, 32, 3)
        # 如果确认通道顺序是 BGR，可在此处反转为 RGB
        # img = img[..., ::-1]
        # img = self.images[..., ::-1]  # 取消注释此行如果需要 BGR -> RGB
        if self.transform:
            img = self.transform(img)
        return img, self.ids[idx]

# 测试集变换，与训练一致
test_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor(),  # 转换为 (C, H, W)
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # 与 CIFAR-10 训练一致
])

# 加载测试数据
test_pkl_path = './test_data/cifar_test_nolabel.pkl'
print(f"Initializing dataset with {test_pkl_path}")
test_dataset = CustomTestDataset(pkl_path=test_pkl_path, transform=test_transform)
testloader = DataLoader(test_dataset, batch_size=128, shuffle=False, num_workers=0)

# 加载模型
model.load_state_dict(torch.load('resnet_model.pth', weights_only=True))
model.eval()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
print(f"Running on: {device}")


# # 使用 TTA 进行预测
# start_time = time.time()
# print("Starting predictions with TTA...")
# ids, predictions = predict_with_tta(model, testloader, device, num_augment=5)

# print(f"Prediction completed in {time.time() - start_time:.2f} seconds")
# submission = pd.DataFrame({'ID': ids, 'Labels': predictions})
# submission.to_csv('submission_tta.csv', index=False)
# print("Submission file 'submission_tta.csv' generated.")

In [None]:
# 预测
num_samples = len(test_dataset)
predictions = []
ids = []

start_time = time.time()
print("Starting predictions...")
with torch.no_grad():
    for i, data in enumerate(testloader):
        inputs, batch_ids = data[0].to(device), data[1]
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        predictions.extend(predicted.cpu().numpy())
        ids.extend(batch_ids.numpy())
        if i % 10 == 0:  # 每 10 个 batch 打印一次
            print(f"Processed batch {i}, time elapsed: {time.time() - start_time:.2f} seconds")

print(f"Prediction completed in {time.time() - start_time:.2f} seconds")
submission = pd.DataFrame({'ID': ids, 'Labels': predictions})
submission.to_csv('submission.csv', index=False)
print("Submission file 'submission.csv' generated.")