In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
from tqdm import tqdm
import matplotlib.pyplot as plt
from torch.optim.lr_scheduler import ExponentialLR
import os
import pandas as pd
from MyInceptionV4 import InceptionV4
from MyFramework import InceptionNet
from MyFramework import InceptionResNet
from MyVGG import VGG_A
from MyVGG import VGG_BatchNorm


In [3]:
device_id = 0
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"  
os.environ["CUDA_VISIBLE_DEVICES"]=str(device_id)

device = torch.device("cuda:{}".format(device_id) if torch.cuda.is_available() else "cpu")
print(device)
print(torch.cuda.get_device_name(device_id))

cuda:0
NVIDIA GeForce RTX 2060


In [4]:
transform_train = transforms.Compose([
    transforms.RandomResizedCrop(32, scale=(0.8, 1.0)),  # 如果使用Inception v4网络，需要调整为299
    transforms.RandomHorizontalFlip(),  # 随机水平翻转
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),  # 随机颜色变换
    transforms.RandomRotation(10),  # 随机旋转(-10, 10)度
    transforms.ToTensor(),  
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5,0.5)), 
])

transform_test = transforms.Compose([
    transforms.Resize(32),    # 如果使用Inception v4网络，需要调整为299
    transforms.ToTensor(),  
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5,0.5)),  
])

trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=False, transform=transform_train)
trainloader = DataLoader(trainset, batch_size=100, shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=False, transform=transform_test)
testloader = DataLoader(testset, batch_size=100, shuffle=False, num_workers=2)

In [None]:
# 选择网络
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = VGG_A().to(device)

In [None]:
criterion = nn.CrossEntropyLoss()
# optimizer = optim.SGD(model.parameters(), lr=0.0001, weight_decay=0.0001)
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.0001) #0.001 for VGG
# optimizer = optim.AdamW(model.parameters(), lr=0.0001, weight_decay=0.0001)
# optimizer = optim.RMSprop(model.parameters(), lr=0.0001, alpha=0.99, weight_decay=0.0001)

# 训练模型
def train(epoch):
    model.train()
    running_loss = 0.0
    epoch_loss = 0.0
    pbar = tqdm(enumerate(trainloader), total=len(trainloader), desc=f"Epoch {epoch+1}")
    for batch_idx, (inputs, targets) in pbar:
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        epoch_loss += loss.item()
        if batch_idx % 100 == 99:  # 每100个批次打印一次
            pbar.set_postfix({'Loss': running_loss / 100})
            running_loss = 0.0
    return epoch_loss / len(trainloader)

def test():
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        pbar = tqdm(enumerate(testloader), total=len(testloader), desc="Testing")
        for batch_idx, (inputs, targets) in pbar:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()
    accuracy = 100. * correct / total
    print(f'Test Accuracy: {accuracy:.2f}%')
    return accuracy

# 初始化列表以记录训练损失和测试精度
train_losses = []
test_accuracies = []
epoch_size = 300
# 执行训练和测试
for epoch in range(epoch_size):  # 训练100个epoch
    train_loss = train(epoch)
    train_losses.append(train_loss)
    test_accuracy = test()
    test_accuracies.append(test_accuracy)

plt.figure()
plt.plot(range(1, epoch_size+1), test_accuracies, label='Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Testing Accuracy over Epochs')
plt.legend()
plt.show()

plt.figure()
plt.plot(range(1, epoch_size+1), train_losses, label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss over Epochs')
plt.legend()
plt.show()


# 输出最终的结果
print(f'Final Training Loss: {train_losses[-1]:.4f}')
print(f'Final Test Accuracy: {test_accuracies[-1]:.2f}%')

损失景观：

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = VGG_A().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.0001) #0.001 for VGG

In [None]:
learning_rates = [1e-3, 2e-3, 1e-4, 5e-4]

# 初始化用于保存损失的列表
train_losses_dict = {lr: [] for lr in learning_rates}
test_accuracies_dict = {lr: [] for lr in learning_rates}

# 定义训练函数
def train(epoch, model, optimizer, trainloader, device):
    model.train()
    running_loss = 0.0
    epoch_loss = 0.0
    pbar = tqdm(enumerate(trainloader), total=len(trainloader), desc=f"Epoch {epoch+1}")
    for batch_idx, (inputs, targets) in pbar:
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        epoch_loss += loss.item()
        if batch_idx % 100 == 99:  # 每100个批次打印一次
            pbar.set_postfix({'Loss': running_loss / 100})
            running_loss = 0.0
    return epoch_loss / len(trainloader)

# 定义测试函数
def test(model, testloader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        pbar = tqdm(enumerate(testloader), total=len(testloader), desc="Testing")
        for batch_idx, (inputs, targets) in pbar:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()
    accuracy = 100. * correct / total
    print(f'Test Accuracy: {accuracy:.2f}%')
    return accuracy

# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 训练和测试每个学习率下的模型
epoch_size = 300
for lr in learning_rates:

    for epoch in range(epoch_size):
        train_loss = train(epoch, model, optimizer, trainloader, device)
        train_losses_dict[lr].append(train_loss)
        test_accuracy = test(model, testloader, device)
        test_accuracies_dict[lr].append(test_accuracy)


max_curve = []
min_curve = []

for epoch in range(epoch_size):
    epoch_losses = [train_losses_dict[lr][epoch] for lr in learning_rates]
    max_curve.append(max(epoch_losses))
    min_curve.append(min(epoch_losses))

plt.figure()
plt.plot(range(1, epoch_size+1), max_curve, label='Max Loss')
plt.plot(range(1, epoch_size+1), min_curve, label='Min Loss')
plt.fill_between(range(1, epoch_size+1), min_curve, max_curve, color='gray', alpha=0.3)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss LandScape')
plt.legend()
plt.show()

In [None]:
df = pd.DataFrame()
df.index = range(1, epoch_size+1)
df['max_curve'] = max_curve
df['min_curve'] = min_curve
df.to_pickle("VGG_A_LandScape")