In [None]:
import os
import torch
import torch.nn as nn
import pandas as pd
from PIL import Image
from torchvision import transforms, models
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
from IPython.display import clear_output
import torchvision
import numpy as np
from torch.utils.data import random_split
from torch.utils.data import Subset
import glob
import re
from tqdm import tqdm
import csv

In [None]:
seed=42##设置随机种子使结果可复现
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
label_map = {
    'airplane': 0,
    'automobile': 1,
    'bird': 2,
    'cat': 3,
    'deer': 4,
    'dog': 5,
    'frog': 6,
    'horse': 7,
    'ship': 8,
    'truck': 9
}##标签和数字的一一映射

class MyCIFAR10Dataset(Dataset):##定义数据类（labels从.csv文件中获取）
    def __init__(self, img_dir, csv_path, transform=None):
        self.img_dir = img_dir
        self.labels = pd.read_csv(csv_path)
        self.transform = transform

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        img_id=self.labels.iloc[idx,0]
        label_str=self.labels.iloc[idx,1]
        label=label_map[label_str]
        img_path = os.path.join(self.img_dir,f'{img_id}'+'.png')
        image = Image.open(img_path)
        if self.transform:
            image = self.transform(image)
        return image, label
    def reset_transform(self,new_transform):
        self.transform = new_transform

In [None]:
img_dir = r"C:\Users\Lenovo\AI\project\data\train"
csv_path = r"C:\Users\Lenovo\AI\project\data\trainLabels.csv"
transform_for_stats=transforms.ToTensor()##需要从一小部分数据集里获得近似均值和标准差以定义transform_for_train，而定义数据集本身就需要transform，因此定义一个初始transform
dataset_for_stats=MyCIFAR10Dataset(img_dir,csv_path,transform=transform_for_stats)
def get_mean_std(dataset, ratio=0.01):##取一小部分数据集来近似获得均值和标准差以用于归一化
    dataloader = torch.utils.data.DataLoader(
        dataset, batch_size=int(len(dataset) * ratio), shuffle=True, num_workers=0
    )
    train = iter(dataloader).__next__()[0]
    mean = np.mean(train.numpy(), axis=(0, 2, 3))
    std = np.std(train.numpy(), axis=(0, 2, 3))
    return mean, std

data_mean,data_std=get_mean_std(dataset_for_stats)
transform_for_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=data_mean,
                         std=data_std),
])##对训练集数据增强
transform_for_val = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=data_mean,
                         std=data_std),
])##对验证集仅归一化

In [None]:
dataset1 = MyCIFAR10Dataset(img_dir, csv_path, transform=transform_for_train)
dataset2 = MyCIFAR10Dataset(img_dir, csv_path, transform=transform_for_val)
train_size=int(0.9*len(dataset1))
val_size=len(dataset1)-train_size
# 加种子确保可复现
indices = list(range(len(dataset1)))
train_indices, val_indices = random_split(indices, [train_size, val_size], generator=torch.Generator().manual_seed(seed))
#构造子集（注意：不同 transform 绑定的是不同 dataset）
trainset = Subset(dataset1, train_indices)
valset   = Subset(dataset2, val_indices)
train_dataloader = DataLoader(trainset, batch_size=50, shuffle=True, num_workers=0)
val_dataloader   = DataLoader(valset,   batch_size=50, shuffle=False, num_workers=0)


In [None]:
def build_resnet(num_classes):##在torchvision的resnet-18基础上稍加修改以适应小规模的CIFAR-10数据集
    model = models.resnet18(weights=None) 
    model.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
    model.maxpool = nn.Identity()#去掉ResNet中原始的maxpool
    model.fc = nn.Linear(model.fc.in_features, num_classes)#全连接层
    return model

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
num_classes = 10
model = build_resnet(num_classes).to(device)
print(device)


In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001,weight_decay=1e-5)##正则化
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=60, eta_min=0)##余弦衰减

In [None]:
def plot_training_progress(losses, accuracies, label='Train'):
    # 避免列表元素是float
    losses = [float(x.cpu()) if isinstance(x, torch.Tensor) else float(x) for x in losses]
    accuracies = [float(x.cpu()) if isinstance(x, torch.Tensor) else float(x) for x in accuracies]
    epochs = range(1, len(losses) + 1)
    plt.figure(figsize=(12, 5))
    # 绘制 Loss 曲线
    plt.subplot(1, 2, 1)
    plt.plot(epochs, losses, 'r-', label=f'{label} Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title(f'{label} Loss')
    plt.legend()
    # 绘制 Accuracy 曲线
    plt.subplot(1, 2, 2)
    plt.plot(epochs, accuracies, 'b-', label=f'{label} Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.title(f'{label} Accuracy')
    plt.legend()
    plt.show()


In [None]:
def train(model,train_loader,criterion,optimizer,device):
    model.train()
    running_loss = 0.0
    total = 0
    correct = 0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        ##参数更新
        loss = criterion(outputs, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        ##累计正确预测个数与总数
        running_loss += loss.item() * labels.size(0)
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)
    return running_loss/total,correct/total

In [None]:
def evaluate(model, data_loader, criterion, device):
    model.eval()  # 进入评估模式
    total_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for imgs, labels in data_loader:
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            total_loss += loss.item() * imgs.size(0)  # 加总整个 batch 的 loss
            _, preds = torch.max(outputs, 1)  # 获得预测类别
            correct += (preds == labels).sum().item()
            total += labels.size(0)
    return total_loss/total,correct/total



In [None]:
num_epochs = 100
train_losses=[]
train_accuracies=[]
val_losses=[]
val_accuracies=[]
val_best_acc=0
trigger_times=0##耐心值
for epoch in range(num_epochs):
    train_avg_loss,train_acc=train(model,train_dataloader,criterion,optimizer,device)
    val_avg_loss,val_acc=evaluate(model,val_dataloader,criterion,device)
    ##计算每个epoch后训练集和验证集的平均loss和accuracy
    train_losses.append(train_avg_loss)
    train_accuracies.append(train_acc)
    val_losses.append(val_avg_loss)
    val_accuracies.append(val_acc)

    if val_acc>val_best_acc:
       torch.save(model.state_dict(), "resnet_05.pth")
       val_best_acc=val_acc
       trigger_times=0
    else:
        trigger_times += 1
        if trigger_times==7:
            print('Early Stopping!')
            break
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {train_avg_loss:.4f}, Accuracy: {train_acc*100:.2f}%")
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {val_avg_loss:.4f}, Accuracy: {val_acc*100:.2f}%")
    scheduler.step()  # 更新学习率
plot_training_progress(train_losses, train_accuracies,label='train')
plot_training_progress(val_losses, val_accuracies,label='val')

In [None]:
test_dataset=torchvision.datasets.CIFAR10(root='./data',train=False,download=True,transform=transform_for_val)
test_dataloader=torch.utils.data.DataLoader(test_dataset,batch_size=50,shuffle=False,num_workers=0)
net = build_resnet(10)
net.load_state_dict(torch.load(r"resnet_05.pth"))
net.eval()
correct=0
total=0
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
net.to(device)
with torch.no_grad():
    for imgs,labels in test_dataloader:
        imgs,labels = imgs.to(device),labels.to(device)
        outputs = net(imgs)
        _,predicted = torch.max(outputs,1)
        total +=labels.size(0)
        correct +=(predicted==labels).sum().item()
print(f'Accuracy of the network on the 10000 test images: {100 * correct / total:.2f}%')



In [None]:
##预测测试集并保存结果（用于提交）


#加载测试图片的文件夹
test_image_dir = r"C:\Users\Lenovo\AI\project\data\test\test"
test_images = sorted(glob.glob(os.path.join(test_image_dir, "*.png")),
                     key=lambda x: int(re.findall(r"(\d+)", os.path.basename(x))[0]))#按数字顺序排序

#设置标签映射（数字到字符串的映射）
label_map_reverse = {v: k for k, v in label_map.items()}

#定义transform，与训练时一致
transform_for_test = transform_for_val

#再次加载模型
net = build_resnet(10)
net.load_state_dict(torch.load(r"resnet_05.pth"))
net.to(device)
net.eval()
#预测
results = []
with torch.no_grad():
    for img_path in tqdm(test_images):
        img = Image.open(img_path)
        img = transform_for_test(img).unsqueeze(0).to(device)
        output = net(img)
        _, pred = torch.max(output, 1)
        img_id = os.path.splitext(os.path.basename(img_path))[0]#获取文件名不带扩展名
        label_str = label_map_reverse[int(pred.item())]
        results.append((img_id, label_str))
#写入CSV文件
with open("submission.csv", "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["id", "label"])#表头
    writer.writerows(results)