In [9]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader, random_split, Subset
from torch.utils.tensorboard import SummaryWriter


#### 数据预处理与保存测试集

In [10]:
# # 定义数据预处理
# transform = transforms.Compose([
#     transforms.Resize((224, 224)),
#     transforms.ToTensor(),
#     transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
# ])

# # 加载数据
# dataset = datasets.ImageFolder(root='../CUB_200_2011/images')

# # 取其中20%的数据作为测试集保存，并保存测试集不再改动
# test_part = 0.2

# total_count = len(dataset)
# train_count = int((1-test_part) * total_count)
# test_count = total_count - train_count

# # 随机分割并定义处理方式
# train_dataset, test_dataset = random_split(dataset, [train_count, test_count])
# train_dataset.dataset.transform = transform
# test_dataset.dataset.transform = transform

# # 保存数据集和索引
# def save_datasets(dataset, train_dataset, test_dataset):
#     torch.save(dataset, 'dataset/full_dataset.pth')
#     indices = (train_dataset.indices, test_dataset.indices)
#     torch.save(indices, 'dataset/dataset_indices.pth')

# save_datasets(dataset, train_dataset, test_dataset)

#### 加载分割的数据集和索引文件

In [11]:
# 加载数据集和索引
def load_datasets():
    full_dataset = torch.load('dataset/full_dataset.pth')
    train_indices, test_indices = torch.load('dataset/dataset_indices.pth')        
    train_dataset = Subset(full_dataset, train_indices)
    test_dataset = Subset(full_dataset, test_indices)
    return train_dataset, test_dataset

#### 加载随机初始化的 Resnet 模型，并替换线性层

In [12]:

class MyResNet_New(nn.Module):
    def __init__(self, chosen_model, num_classes=200):
        super(MyResNet_New, self).__init__()
        # 加载预训练的ResNet模型
        if chosen_model == "resnet18":
            self.resnet = models.resnet18(weights=False)
        elif chosen_model == "resnet34":
            self.resnet = models.resnet34(weights=False)
        elif chosen_model == "resnet50":
            self.resnet = models.resnet50(weights=False)
        elif chosen_model == "resnet101":
            self.resnet = models.resnet101(weights=False)     
        elif chosen_model == "resnet152":
            self.resnet = models.resnet152(weights=False)    

        # 替换原来的fc层
        num_ftrs = self.resnet.fc.in_features
        self.resnet.fc = nn.Sequential(
            nn.Linear(num_ftrs, num_classes)
        )


    def forward(self, x):
        x = self.resnet(x)
        return x

#### 定义训练 Pipeline

In [13]:
# 模型训练函数
def train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs=10, log_freq=5, saved_name='', device=torch.device('cpu')):
    writer = SummaryWriter(log_dir = '../model_logs/'+saved_name)
    batches_per_epoch = len(train_loader)
    log_steps = int(batches_per_epoch / log_freq)  # 每隔log_steps个batch记录一次损失和准确率

    # 训练过程
    for epoch in range(num_epochs):
        # 训练模式
        model.train()  
        running_loss = 0.0
        correct = 0
        total = 0
        # 遍历数据集
        for i, (images, labels) in enumerate(train_loader):
            images, labels = images.to(device), labels.to(device)  

            outputs = model(images)
            loss = criterion(outputs, labels)
            # 优化
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item() * images.size(0)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            # 每隔log_steps个batch记录一次损失和准确率
            if (i + 1) % log_steps == 0 or i == batches_per_epoch - 1:
                current_loss = running_loss / total
                current_accuracy = correct / total
                print('Epoch [{}/{}], Step [{}/{}], Training Loss: {:.4f}, Training Accuracy: {:.4f}'.format(epoch+1, num_epochs, i+1, batches_per_epoch, current_loss, current_accuracy))
                # 记录训练损失和准确率
                writer.add_scalars('Loss', {'Training': current_loss}, epoch * batches_per_epoch + i)
                writer.add_scalars('Accuracy', {'Training': current_accuracy}, epoch * batches_per_epoch + i)

                running_loss = 0
                correct = 0
                total = 0

                # 验证模式
                model.eval()
                val_running_loss = 0.0
                val_correct = 0
                val_total = 0
                with torch.no_grad():
                    for val_images, val_labels in val_loader:
                        # 将数据转移到GPU
                        val_images, val_labels = val_images.to(device), val_labels.to(device)  

                        val_outputs = model(val_images)
                        val_loss = criterion(val_outputs, val_labels)
                        val_running_loss += val_loss.item() * val_images.size(0)
                        _, val_predicted = torch.max(val_outputs.data, 1)
                        val_total += val_labels.size(0)
                        val_correct += (val_predicted == val_labels).sum().item()

                val_loss = val_running_loss / val_total
                val_accuracy = val_correct / val_total
                print(f'Validation at Epoch {epoch+1}, Step {i+1}/{batches_per_epoch}, Validation Loss: {val_loss:.4f}, Accuracy: {val_accuracy:.4f}')
                # 记录验证损失和准确率
                writer.add_scalars('Loss', {'Validation': val_loss}, epoch * batches_per_epoch + i)
                writer.add_scalars('Accuracy', {'Validation': val_accuracy}, epoch * batches_per_epoch + i)

                model.train()

    writer.close()


#### 执行训练

In [14]:
# 设置
chosen_model = 'resnet50'
lr_list = [0.01]
regularization_list = [1e-5]
batch_size = 64
num_epoches = 60
log_freq = 10
train_val_split = 0.9

# 加载数据集
train_dataset, test_dataset = load_datasets()
train_dataset, val_dataset = random_split(train_dataset, [train_val_split, 1-train_val_split])
# 创建DataLoader
train_loader = DataLoader(train_dataset, batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size, shuffle=False)


# 每个组合依次训练
for lr in lr_list:
    for regularization in regularization_list:
        saved_name =chosen_model+f'_LR-{lr}_Reg-{regularization}'
        print('='*50)
        print('Training '+saved_name+'......')
        print('='*50)

        # 检查GPU是否可用
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print("Using device:", device)
        # 清空GPU缓存
        torch.cuda.empty_cache()

        # 创建模型实例
        model = MyResNet_New(chosen_model)
        model.to(device)  

        # 损失函数
        criterion = nn.CrossEntropyLoss()

        # 优化器
        optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=regularization)

        # 训练
        train_model(model, criterion, optimizer, train_loader, val_loader, num_epoches, log_freq, saved_name, device)
        

Training resnet50_LR-0.01_Reg-1e-05......
Using device: cuda
Epoch [1/60], Step [13/133], Training Loss: 13.0113, Training Accuracy: 0.0024
Validation at Epoch 1, Step 13/133, Validation Loss: 626822952.4926, Accuracy: 0.0042
Epoch [1/60], Step [26/133], Training Loss: 6.3206, Training Accuracy: 0.0036
Validation at Epoch 1, Step 26/133, Validation Loss: 12527.5965, Accuracy: 0.0096
Epoch [1/60], Step [39/133], Training Loss: 5.4368, Training Accuracy: 0.0024
Validation at Epoch 1, Step 39/133, Validation Loss: 147.4347, Accuracy: 0.0064
Epoch [1/60], Step [52/133], Training Loss: 5.3071, Training Accuracy: 0.0024
Validation at Epoch 1, Step 52/133, Validation Loss: 8.7147, Accuracy: 0.0042
Epoch [1/60], Step [65/133], Training Loss: 5.3013, Training Accuracy: 0.0072
Validation at Epoch 1, Step 65/133, Validation Loss: 5.8601, Accuracy: 0.0053
Epoch [1/60], Step [78/133], Training Loss: 5.2892, Training Accuracy: 0.0036
Validation at Epoch 1, Step 78/133, Validation Loss: 5.5134, Accur

In [15]:
torch.save(model.state_dict(), f'weights/' + saved_name + '.pth')

In [16]:
model.load_state_dict(torch.load('weights/'+ saved_name +'.pth'))
model.eval()
correct = 0
total = 0

with torch.no_grad():
    for images, labels in test_loader:

        images, labels = images.to(device), labels.to(device)  # 将数据转移到GPU

        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100 * correct / total
print('Test Accuracy: {}%'.format(accuracy))


Test Accuracy: 24.257845631891435%
