# 训练

In [None]:
# 检查是否有可用的CUDA设备,这是针对mac的NPU,如果是英伟达的显卡则把mps改为cuda
if torch.cuda.is_available():
    device = torch.device("mps")
else:
    print("CUDA is not available. Using CPU.")
    device = torch.device("cpu")
    
## resnet18
# net = torchvision.models.resnet18(weights='IMAGENET1K_V1')
net = torchvision.models.resnet34(weights='IMAGENET1K_V1')
net.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
net.to(device)  # 将模型移动到CUDA设备上
print(next(net.parameters()).device)  # 打印模型的设备信息

# 自定义图片图片读取方式，可以自行增加resize、数据增强等操作
def MyLoader(path):
#    return Image.open(path)
#     return Image.open(path).convert('RGB')
    return np.load(path)
    
class MyDataset (Dataset):
    # 构造函数设置默认参数
    def __init__(self, txt, transform=None, target_transform=None, loader=MyLoader):
        with open(txt, 'r') as fh:
            imgs = []
            for line in fh:
                line = line.strip('\n')  # 移除字符串首尾的换行符
                line = line.rstrip()  # 删除末尾空
                words = line.split( )  # 以空格为分隔符 将字符串分成
                imgs.append((words[0], int(words[1]))) # imgs中包含有图像路径和标签
        self.imgs = imgs
        self.transform = transform
        self.target_transform = target_transform
        self.loader = loader

    def __getitem__(self, index):
        fn, label = self.imgs[index]
        #调用定义的loader方法
        img = self.loader(fn)
        if self.transform is not None:
            img = self.transform(img)
        return img, label

    def __len__(self):
        return len(self.imgs)


root = r"/images/"
train_data = MyDataset(txt=root + 'train.txt', transform=transforms.ToTensor())
test_data = MyDataset(txt=root + 'test.txt', transform=transforms.ToTensor())

test_num = len(test_data)

trainloader = DataLoader(dataset=train_data, batch_size=32, shuffle=True)
testloader = DataLoader(dataset=test_data, batch_size=32, shuffle=False)

print('加载成功！')

# CrossEntropyLoss
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

# Parameters
patience = 30  # for early stopping
n_folds = 3   # for k-fold cross-validation
num_epochs = 100

# K-Fold Cross Validation
# kfold = KFold(n_splits=n_folds, shuffle=True, random_state=5)
kfold = KFold(n_splits=n_folds, shuffle=True)
early_stop_counter = 0
best_loss = float('inf')

for fold, (train_ids, val_ids) in enumerate(kfold.split(trainloader.dataset)):
    print(f"FOLD {fold + 1}/{n_folds}")
    
    # Define your network and optimizer here, they should be re-initialized for each fold
    optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
    train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
    val_subsampler = torch.utils.data.SubsetRandomSampler(val_ids)
    
    
    trainloader_fold = torch.utils.data.DataLoader(trainloader.dataset, batch_size=32, sampler=train_subsampler)
    valloader_fold = torch.utils.data.DataLoader(trainloader.dataset, batch_size=32, sampler=val_subsampler)
    
    for epoch in range(num_epochs):
        print('-'*30, '\n','epoch', epoch)
        net.train()
        loss100 = 0.0
        correct = 0
        total = 0
        for i, data in enumerate(trainloader_fold):
            # Training code remains largely the same...
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device) # 注意需要复制到GPU
            optimizer.zero_grad()
            outputs = net(inputs)
            loss = criterion(outputs, labels.to(device))
            loss.backward()
            optimizer.step()

            loss100 += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        # Validation
        net.eval()
        val_loss = 0.0
        with torch.no_grad():
            for i, data in enumerate(valloader_fold):
                inputs, labels = data
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = net(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

        avg_val_loss = val_loss / len(valloader_fold)
        print(f"Validation Loss: {avg_val_loss:.4f}")

        # Check for early stopping
        if avg_val_loss < best_loss:
            best_loss = avg_val_loss
            early_stop_counter = 0
        else:
            early_stop_counter += 1

        if early_stop_counter >= patience:
            print("Early stopping!")
            break

# Finally, you could report the average performance across all folds.
#plt.plot(train_accuracies)
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training Accuracy')
#plt.show()

# 测试

In [None]:
predicted_list = []
labels_list = []
# 构造测试的dataloader
dataiter = iter(testloader)
# 预测正确的数量和总数量
correct = 0
total = 0
net.eval()
with torch.no_grad():
    for data in testloader:
        images, labels = data
        images, labels = images.to(device), labels.to(device)
        # 预测
        outputs = net(images)
        # 输出概率分布，最大概率的一项作为预测分类
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        # 将预测结果和真实结果添加到列表中
        predicted_list.extend(predicted.cpu().numpy())
        labels_list.extend(labels.cpu().numpy())
        # 计算损失
        loss = criterion(outputs, labels)

confusion_mat = confusion_matrix(labels_list, predicted_list)
print('Accuracy : %f %%' % (
    100 * correct / test_num))
print('Loss : %f %%' % (
    loss.item()))
balanced_accuracy = balanced_accuracy_score(labels_list, predicted_list, adjusted=False)
print('Balanced Accuracy : %f %%' %(100 * balanced_accuracy))
print(confusion_mat)