In [123]:
import os
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.parallel
import torch.optim
import torch.utils.data
import torch.utils.data.distributed
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from PIL import Image
from sklearn.metrics import f1_score
from sklearn.metrics import confusion_matrix
from torchvision.models import resnet18, ResNet18_Weights

In [117]:
# 设置全局参数
modellr = 2e-4
BATCH_SIZE = 64
EPOCHS = 8
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(DEVICE)

cuda


In [118]:
# 数据预处理
transform_train = transforms.Compose([
    transforms.Resize((300, 300)),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])  # 注意这里只有一个值
])


transform_test = transforms.Compose([
    transforms.Resize((300, 300)),
    transforms.Grayscale(num_output_channels=3),  # 将图像转换为三通道
    transforms.ToTensor(),
    transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])  # 对每个通道进行归一化
])


In [119]:
# 读取数据
dataset_train = datasets.ImageFolder('Data/train', transform_train)
dataset_validate = datasets.ImageFolder('Data/validate', transform_train)

# 类别名称列表
classes = ('0.0', '0.33', '0.67', '1.0')

# 导入数据
train_loader = torch.utils.data.DataLoader(dataset_train, batch_size=BATCH_SIZE, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset_validate, batch_size=BATCH_SIZE, shuffle=False)


In [120]:
model = resnet18(weights=ResNet18_Weights.IMAGENET1K_V1)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 4)

# 应用 Kaiming He 初始化到最后一层
nn.init.kaiming_normal_(model.fc.weight, mode='fan_out', nonlinearity='relu')
model.fc.bias.data.fill_(0)
model.to(DEVICE)

# 设置损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=modellr)

# 设置学习率调度器
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10)


In [121]:
# 定义训练过程
def train(model, device, train_loader, optimizer, epoch):
    model.train()
    sum_loss = 0
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

        sum_loss += loss.item()
        if (batch_idx + 1) % 50 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, (batch_idx + 1) * len(data), len(train_loader.dataset),
                       100. * (batch_idx + 1) / len(train_loader), loss.item()))
    ave_loss = sum_loss / len(train_loader)
    print('Epoch: {}, Training Loss: {}'.format(epoch, ave_loss))

# 定义验证过程
def val(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)
            test_loss += loss.item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader)
    acc = correct / len(test_loader.dataset)
    print('\nVal set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset), 100 * acc))
    return test_loss

# 训练和验证循环
for epoch in range(1, EPOCHS + 1):
    train(model, DEVICE, train_loader, optimizer, epoch)
    val_loss = val(model, DEVICE, test_loader)
    scheduler.step(val_loss)  # 更新学习率

# 保存模型
torch.save(model.state_dict(), 'model.pth')


Epoch: 1, Training Loss: 5.417996691620869

Val set: Average loss: 2.9796, Accuracy: 269/601 (45%)

Epoch: 2, Training Loss: 1.1970896688492403

Val set: Average loss: 1.1111, Accuracy: 435/601 (72%)

Epoch: 3, Training Loss: 0.8104978547148083

Val set: Average loss: 0.8662, Accuracy: 428/601 (71%)

Epoch: 4, Training Loss: 0.47706475967298384

Val set: Average loss: 1.0915, Accuracy: 442/601 (74%)

Epoch: 5, Training Loss: 0.40680004753496335

Val set: Average loss: 1.1085, Accuracy: 439/601 (73%)

Epoch: 6, Training Loss: 0.22060699258809505

Val set: Average loss: 1.0922, Accuracy: 447/601 (74%)

Epoch: 7, Training Loss: 0.10781360701050448

Val set: Average loss: 1.1826, Accuracy: 447/601 (74%)

Epoch: 8, Training Loss: 0.07966112940693679

Val set: Average loss: 1.0723, Accuracy: 455/601 (76%)



Task part:

In [124]:
# 加载模型状态字典
model.load_state_dict(torch.load("model.pth"))
model.eval()
model.to(DEVICE)


# 初始化计数器
total_images = 0
correct_predictions = 0

# 测试目录路径
path = 'D:/9444f/Data/test'
# 获取类别目录
class_directories = os.listdir(path)

# 初始化真实标签和预测结果列表
all_targets = []
all_preds = []

# 遍历每个类别目录
for class_index, class_dir in enumerate(class_directories):
    class_dir_path = os.path.join(path, class_dir)
    # 检查是否是一个目录
    if os.path.isdir(class_dir_path):
        # 列出目录中的文件
        for file_name in os.listdir(class_dir_path):
            file_path = os.path.join(class_dir_path, file_name)
            # 检查是否是文件
            if os.path.isfile(file_path):
                total_images += 1
                try:
                    with Image.open(file_path) as img:
                        img = transform_test(img)
                        img = img.unsqueeze(0)
                        img = img.to(DEVICE)
                        with torch.no_grad():
                            out = model(img)
                        _, pred = torch.max(out.data, 1)
                        # 添加预测和真实标签到列表
                        all_preds.append(pred.item())
                        all_targets.append(class_index)
                        # 更新正确预测计数器
                        correct_predictions += (pred.item() == class_index)
                except Exception as e:
                    print(f'Error processing {file_name}: {e}')

# 计算精度并打印
accuracy = correct_predictions / total_images if total_images > 0 else 0
print(f'Accuracy: {accuracy:.4f} ({correct_predictions}/{total_images})')

# 计算并打印混淆矩阵
cm = confusion_matrix(all_targets, all_preds)
print("Confusion Matrix:\n", cm)

# 计算并打印F1分数
f1_micro = f1_score(all_targets, all_preds, average='micro')
f1_macro = f1_score(all_targets, all_preds, average='macro')
print(f'F1 Score (Micro): {f1_micro:.4f}')
print(f'F1 Score (Macro): {f1_macro:.4f}')

Accuracy: 0.7621 (503/660)
Confusion Matrix:
 [[335  20   2  21]
 [ 40  27   1   7]
 [ 18   3   1   5]
 [ 33   6   1 140]]
F1 Score (Micro): 0.7621
F1 Score (Macro): 0.5253
