In [4]:
import torch
from torch import nn
from torchvision.models import resnet50
from torchvision.datasets import ImageFolder
from torchvision.transforms import transforms
from torch.utils.data import DataLoader
from torch.utils.data import random_split

import time

import pandas as np

import matplotlib as plt

Epoch [1/10], Step [100/160], Loss: 2.68076753616333
Validation accuracy: 55.25773195876289%, Best accuracy: 55.25773195876289%
Epoch [2/10], Step [100/160], Loss: 1.2907205820083618
Validation accuracy: 68.05841924398625%, Best accuracy: 68.05841924398625%
Epoch [3/10], Step [100/160], Loss: 0.7911751866340637
Validation accuracy: 72.13058419243985%, Best accuracy: 72.13058419243985%
Epoch [4/10], Step [100/160], Loss: 0.28634151816368103
Validation accuracy: 74.50171821305842%, Best accuracy: 74.50171821305842%
Epoch [5/10], Step [100/160], Loss: 0.11835987865924835
Validation accuracy: 75.61855670103093%, Best accuracy: 75.61855670103093%
Epoch [6/10], Step [100/160], Loss: 0.04259290546178818
Validation accuracy: 76.3573883161512%, Best accuracy: 76.3573883161512%
Epoch [7/10], Step [100/160], Loss: 0.024014437571167946
Validation accuracy: 76.61512027491409%, Best accuracy: 76.61512027491409%
Epoch [8/10], Step [100/160], Loss: 0.015729762613773346
Validation accuracy: 76.56357388

In [None]:
# plot 4 metrics during training progress
def plot_training_progress(results_path):
    data = torch.load(results_path)
    fig, ax1 = plt.subplots()
    plt.plot(data["train_loss"], 'r', label = 'train_loss')
    plt.plot(data["val_loss"], 'm', label = 'val_loss')
    plt.legend(loc='upper right')
    plt.xlabel('epoch')
    plt.ylabel('loss')

    ax2=ax1.twinx()
    plt.plot(data["train_accuracy"], 'g', label = 'train_accuracy')
    plt.plot(data["val_accuracy"], 'b', label = 'val_accuracy')
    plt.legend(loc='lower right')
    ax2.set_ylabel('accuracy')

    plt.show()

# ResNet

In [None]:
# 定义 ResNet 模型
class ResNet(nn.Module):
    def __init__(self, num_classes):
        super(ResNet, self).__init__()

        # 加载预训练的 ResNet 模型
        self.backbone = resnet50(pretrained=True)

        # 替换最后的全连接层
        self.backbone.fc = nn.Linear(2048, num_classes)

    def forward(self, x):
        x = self.backbone(x)
        return x
    
# time recording start
start_time = time.time()

# 定义数据预处理
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# 加载数据集
dataset = ImageFolder(root="./MO_106/", transform=transform)

# 数据集分割为训练集、验证集和测试集
train_size = int(0.7 * len(dataset))
val_size = int(0.2 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

# Training loop
num_epochs = 10
batch_size = 128
lr = 0.00005

# 分别创建训练集、验证集和测试集的 DataLoader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Set the device to GPU if available, otherwise use CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.cuda.empty_cache()

# 定义模型
model = ResNet(num_classes=106)
model = model.to(device)

# 定义优化器和损失函数
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()

# initialise ndarray to store the loss and accuracy in each epoch (on the training data)
train_loss = np.zeros(num_epochs)
train_accuracy = np.zeros(num_epochs)
val_loss = np.zeros(num_epochs)
val_accuracy = np.zeros(num_epochs)

# 训练模型
best_acc = 0.0
for epoch in range(num_epochs):
    model.train()

    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0

    for i, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)

        outputs = model(images)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)

        _, predicted = torch.max(outputs.data, 1)
        correct_predictions += (predicted == labels).sum().item()
        total_predictions += labels.size(0)

        # if (i+1) % 100 == 0:
        #     print(f'Epoch [{epoch+1}/10], Step [{i+1}/{len(train_loader)}], Loss: {loss.item()}')

    # 在每个 epoch 后使用验证集评估模型
    model.eval()
    val_loss_epoch = 0.0
    correct_predictions_val = 0
    total_predictions_val = 0
    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device)
            labels = labels.to(device)

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss_epoch += loss.item() * images.size(0)

            _, predicted = torch.max(outputs.data, 1)
            correct_predictions_val += (predicted == labels).sum().item()
            total_predictions_val += labels.size(0)
    # Calculate average epoch loss and accuracy
    train_loss[epoch] = running_loss / len(train_dataset)
    train_accuracy[epoch] = correct_predictions / total_predictions
    val_loss[epoch] = val_loss_epoch / len(val_dataset)
    val_accuracy[epoch] = correct_predictions_val / total_predictions_val

    # Print training and validation statistics
    print(f'Epoch {epoch+1}/{num_epochs}: Train Loss: {train_loss[epoch]:.4f}, Train Accuracy: {train_accuracy[epoch]:.4f}, Val Loss: {val_loss[epoch]:.4f}, Val Accuracy: {val_accuracy[epoch]:.4f}')

end_time = time.time()
total_time = end_time - start_time
print(f"Training took {total_time:.2f} seconds.")

model_metrics = {
    "model_state_dict": model.state_dict(),
    "train_loss": train_loss,
    "train_accuracy": train_accuracy,
    "val_loss": val_loss,
    "val_accuracy": val_accuracy,
    'total_time': total_time
}

# Save the model parameters and metrics to a file
ResultPath = "./results/"
results_path = ResultPath + f"resnet_epoch{num_epochs}_lr{lr}_bs{batch_size}.pt"

torch.save(model_metrics, results_path)
        
# 所有训练完成后，使用测试集进行最后的评估
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f'Test accuracy: {correct / total * 100}%')


In [None]:
plot_training_progress(results_path)

# ResNetFPN

In [None]:
class ResNetFPN(nn.Module):
    def __init__(self, num_classes):
        super(ResNetFPN, self).__init__()
        
        # 加载预训练的ResNet模型
        self.backbone = resnet50(pretrained=True)
        
        # 获取ResNet中的不同层的输出大小
        self.conv1 = self.backbone.conv1
        self.bn1 = self.backbone.bn1
        self.relu = self.backbone.relu
        self.maxpool = self.backbone.maxpool
        self.layer1 = self.backbone.layer1
        self.layer2 = self.backbone.layer2
        self.layer3 = self.backbone.layer3
        self.layer4 = self.backbone.layer4
        
        # 使用Feature Pyramid Network
        self.fpn = FeaturePyramidNetwork(in_channels_list=[256, 512, 1024, 2048],
                                          out_channels=256)
        
        # 分类层
        self.fc1 = nn.Linear(256, num_classes)
        self.fc2 = nn.Linear(256, num_classes)
        self.fc3 = nn.Linear(256, num_classes)
        self.fc4 = nn.Linear(256, num_classes)

    def forward(self, x):
        # ResNet部分
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x1 = self.layer1(x)
        x2 = self.layer2(x1)
        x3 = self.layer3(x2)
        x4 = self.layer4(x3)
        
        # FPN部分
        features = self.fpn({"0": x1, "1": x2, "2": x3, "3": x4})
        
        # 分类部分
        out1 = self.fc1(F.adaptive_avg_pool2d(features["0"], (1, 1)).view(x.size(0), -1))
        out2 = self.fc2(F.adaptive_avg_pool2d(features["1"], (1, 1)).view(x.size(0), -1))
        out3 = self.fc3(F.adaptive_avg_pool2d(features["2"], (1, 1)).view(x.size(0), -1))
        out4 = self.fc4(F.adaptive_avg_pool2d(features["3"], (1, 1)).view(x.size(0), -1))
        
        out = out1 + out2 + out3 + out4
        
        return out

    
# time recording start
start_time = time.time()

# 定义数据预处理
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# 加载数据集
dataset = ImageFolder(root="./MO_106/", transform=transform)

# 数据集分割为训练集、验证集和测试集
train_size = int(0.7 * len(dataset))
val_size = int(0.2 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

# Training loop
num_epochs = 10
batch_size = 128
lr = 0.00005

# 分别创建训练集、验证集和测试集的 DataLoader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Set the device to GPU if available, otherwise use CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.cuda.empty_cache()

# 定义模型
model = ResNet(num_classes=106)
model = model.to(device)

# 定义优化器和损失函数
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()

# initialise ndarray to store the loss and accuracy in each epoch (on the training data)
train_loss = np.zeros(num_epochs)
train_accuracy = np.zeros(num_epochs)
val_loss = np.zeros(num_epochs)
val_accuracy = np.zeros(num_epochs)

# 训练模型
best_acc = 0.0
for epoch in range(num_epochs):
    model.train()

    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0

    for i, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)

        outputs = model(images)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)

        _, predicted = torch.max(outputs.data, 1)
        correct_predictions += (predicted == labels).sum().item()
        total_predictions += labels.size(0)

        # if (i+1) % 100 == 0:
        #     print(f'Epoch [{epoch+1}/10], Step [{i+1}/{len(train_loader)}], Loss: {loss.item()}')

    # 在每个 epoch 后使用验证集评估模型
    model.eval()
    val_loss_epoch = 0.0
    correct_predictions_val = 0
    total_predictions_val = 0
    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device)
            labels = labels.to(device)

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss_epoch += loss.item() * images.size(0)

            _, predicted = torch.max(outputs.data, 1)
            correct_predictions_val += (predicted == labels).sum().item()
            total_predictions_val += labels.size(0)
    # Calculate average epoch loss and accuracy
    train_loss[epoch] = running_loss / len(train_dataset)
    train_accuracy[epoch] = correct_predictions / total_predictions
    val_loss[epoch] = val_loss_epoch / len(val_dataset)
    val_accuracy[epoch] = correct_predictions_val / total_predictions_val

    # Print training and validation statistics
    print(f'Epoch {epoch+1}/{num_epochs}: Train Loss: {train_loss[epoch]:.4f}, Train Accuracy: {train_accuracy[epoch]:.4f}, Val Loss: {val_loss[epoch]:.4f}, Val Accuracy: {val_accuracy[epoch]:.4f}')

end_time = time.time()
total_time = end_time - start_time
print(f"Training took {total_time:.2f} seconds.")

model_metrics = {
    "model_state_dict": model.state_dict(),
    "train_loss": train_loss,
    "train_accuracy": train_accuracy,
    "val_loss": val_loss,
    "val_accuracy": val_accuracy,
    'total_time': total_time
}

# Save the model parameters and metrics to a file
ResultPath = "./results/"
results_path = ResultPath + f"resnet_fpn_epoch{num_epochs}_lr{lr}_bs{batch_size}.pt"

torch.save(model_metrics, results_path)
        
# 所有训练完成后，使用测试集进行最后的评估
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f'Test accuracy: {correct / total * 100}%')


In [None]:
plot_training_progress(results_path)

In [None]:
# from gradcam.utils import visualize_cam
# from gradcam import GradCAM, GradCAMpp

# # 选择要进行Grad-CAM的层
# target_layer = model.layer4[-1]

# # 初始化Grad-CAM和Grad-CAM++
# # cam = GradCAM(model=model, target_layer=target_layer, use_cuda=device.type=='cuda')
# # cam_pp = GradCAMpp(model=model, target_layer=target_layer, use_cuda=device.type=='cuda')
# cam = GradCAM(model, target_layer)
# cam_pp = GradCAMpp(model, target_layer)

# # 获取一个批次的数据
# images, labels = next(iter(test_loader))
# images = images.to(device)
# labels = labels.to(device)

# # 为输入图像生成掩码
# mask = cam(images, class_idx=labels)
# mask_pp = cam_pp(images, class_idx=labels)

# # 可视化
# heatmap, result = visualize_cam(mask, images)
# heatmap_pp, result_pp = visualize_cam(mask_pp, images)

# plt.imshow(transforms.ToPILImage()(result))
# plt.show()

# plt.imshow(transforms.ToPILImage()(result_pp))
# plt.show()
