In [1]:
import torch
from torch import nn
import time
import torchvision
import torchvision.transforms as transforms
import torch.utils.data
import torch.optim as optim
import os
from PIL import Image

In [2]:
# 设置参数
BATCH_SIZE = 32
EPOCHS = 1
NUM_CLASSES = 60
LEARNING_RATE = 0.02
MOMENTUM = 0.99
WEIGHT_DECAY = 0.0005
NUM_PRINT = 100
LOG_DIR = './pth_file/densenet_model.pth'
confirm_flag = True

# 设置设备
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.set_printoptions(edgeitems=10000)
# 载入数据
train_data = torchvision.datasets.ImageFolder(root='./Datasets/train', transform=transforms.ToTensor())
train_loader = torch.utils.data.DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
test_data = torchvision.datasets.ImageFolder(root='./Datasets/test', transform=transforms.ToTensor())
test_loader = torch.utils.data.DataLoader(dataset=test_data, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
image = Image.open("./Datasets/confirm/0/0001-3001.png")
transform = transforms.ToTensor()
confirm_data = transform(image)
print(confirm_data)

# 定义网络模型
# 定义conv_block
def conv_block(in_channel, out_channel):
    layer = nn.Sequential(
        nn.BatchNorm2d(in_channel),
        nn.ReLU(),
        nn.Conv2d(in_channel, out_channel, kernel_size=3, padding=1, bias=False)
    )
    return layer


# 定义dense_block
class dense_block(nn.Module):
    def __init__(self, in_channel, growth_rate, num_layers):
        super(dense_block, self).__init__()
        block = []
        channel = in_channel
        for i in range(num_layers):
            block.append(conv_block(channel, growth_rate))
            channel += growth_rate
        self.net = nn.Sequential(*block)

    def forward(self, x):
        for layer in self.net:
            out = layer(x)
            x = torch.cat((out, x), dim=1)
        return x


# 定义transition
def transition(in_channel, out_channel):
    trans_layer = nn.Sequential(
        nn.BatchNorm2d(in_channel),
        nn.ReLU(),
        nn.Conv2d(in_channel, out_channel, 1),
        nn.AvgPool2d(2, 2)
    )
    return trans_layer


# 定义densenet
class densenet(nn.Module):
    def __init__(self, in_channel, num_classes, growth_rate=32, block_layers=[6, 12, 24, 16]):
        super(densenet, self).__init__()
        self.block1 = nn.Sequential(
            nn.Conv2d(in_channel, 64, 7, 2, 3),
            nn.BatchNorm2d(64),
            nn.ReLU(True),
            nn.MaxPool2d(3, 2, padding=1)
        )
        self.DB1 = self._make_dense_block(64, growth_rate, num=block_layers[0])
        self.TL1 = self._make_transition_layer(256)
        self.DB2 = self._make_dense_block(128, growth_rate, num=block_layers[1])
        self.TL2 = self._make_transition_layer(512)
        self.DB3 = self._make_dense_block(256, growth_rate, num=block_layers[2])
        self.TL3 = self._make_transition_layer(1024)
        self.DB4 = self._make_dense_block(512, growth_rate, num=block_layers[3])
        self.global_average = nn.Sequential(
            nn.BatchNorm2d(1024),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((1, 1)),
        )
        self.classifier = nn.Linear(1024, num_classes)

    def forward(self, x):
        x = self.block1(x)

        x = self.DB1(x)
        x = self.TL1(x)
        x = self.DB2(x)
        x = self.TL2(x)
        x = self.DB3(x)
        x = self.TL3(x)
        x = self.DB4(x)

        x = self.global_average(x)
        x = x.view(x.shape[0], -1)
        x = self.classifier(x)
        return x

    def _make_dense_block(self, channels, growth_rate, num):
        block = []
        block.append(dense_block(channels, growth_rate, num))
        channels += num * growth_rate
        return nn.Sequential(*block)

    def _make_transition_layer(self, channels):
        block = []
        block.append(transition(channels, channels // 2))
        return nn.Sequential(*block)


# 定义Densenet来初始化densenet的输入通道和划分类别参数
def Densenet(num_classes):
    return densenet(in_channel=3, num_classes=num_classes)  # 输入数据通道数为in_channel=3


# 初始化模型并将其移到目标设备上
model = Densenet(NUM_CLASSES).to(DEVICE)
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(
    model.parameters(),
    lr=LEARNING_RATE,
    momentum=MOMENTUM,
    weight_decay=WEIGHT_DECAY,
    nesterov=True
)
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)


# 定义get_cur_lr函数，用于存放lr(学习率)参数
def get_cur_lr():
    for param_group in optimizer.param_groups:  # optimizer.param_groups为数据字典
        return param_group['lr']


# 模型训练
def train():
    model.train()
    record_train = list()
    total, correct, train_loss = 0, 0, 0
    start = time.time()

    for i, (data, target) in enumerate(train_loader):
        data, target = data.to(DEVICE), target.to(DEVICE)

        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)  # target.long()
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        total += target.size(0)
        correct += (output.argmax(dim=1) == target).sum().item()
        train_acc = 100.0 * correct / total

        if (i + 1) % NUM_PRINT == 0:
            print("step: [{}/{} ({:.0f}%)], train_loss: {:.3f} | train_acc: {:6.3f}% | lr: {:.6f}"
                  .format(i + 1, len(train_loader), NUM_PRINT * i / len(train_loader), train_loss / (i + 1),
                          train_acc, get_cur_lr()))

    print("--- cost time: {:.4f}s ---".format(time.time() - start))

    record_train.append(train_acc)

    return record_train


# 定义test函数用于测试数据集
def test():
    model.eval()
    record_test = list()
    total, correct = 0, 0
    start = time.time()
    with torch.no_grad():
        print("*************** test ***************")
        for data, target in test_loader:
            data, target = data.to(DEVICE), target.to(DEVICE)

            output = model(data)
            loss = criterion(output, target.long())
            total += target.size(0)
            correct += (output.argmax(dim=1) == target).sum().item()
            predict = nn.functional.softmax(output,dim = 1)
    test_acc = 100.0 * correct / total

    print("test_loss: {:.3f} | test_acc: {:6.3f}%".format(loss.item(), test_acc))
    print("************************************\n")
    print("--- cost time: {:.4f}s ---".format(time.time() - start))
    record_test.append(test_acc)
    
    return record_test 

def confirm(data):
    model.eval()
    record_test = list()
    total, correct =1, 0
    start = time.time()
    with torch.no_grad():
        print("*************** confirm ***************")
        data = data.to(DEVICE)

        output = model(data)
        predict = output.argmax(dim=1)
    test_acc = 100.0 * correct / total

    print(predict)
    record_test.append(test_acc)
    
    return record_test 

tensor([[[1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
          1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
          1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
          1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
          1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
          1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
          1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
          1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
          1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
          1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
          1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
          1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
          1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
          1.0000, 1.0000, 1.0000, 1.00

In [3]:
def main():
    # 如果test_flag=True,则加载已保存的模型
    if confirm_flag:
        # 加载保存的模型直接进行测试机验证，不进行此模块以后的步骤
        checkpoint = torch.load(LOG_DIR)
        model.load_state_dict(checkpoint['model'])
        optimizer.load_state_dict(checkpoint['optimizer'])
        start_epoch = checkpoint['epoch']
        confirm(confirm_data)
    else:
        # 如果有保存的模型，则加载模型，并在其基础上继续训练
        if os.path.exists(LOG_DIR):
            checkpoint = torch.load(LOG_DIR)
            model.load_state_dict(checkpoint['model'])
            optimizer.load_state_dict(checkpoint['optimizer'])
            start_epoch = checkpoint['epoch']
            print('加载 epoch {} 成功！'.format(start_epoch))
        else:
            start_epoch = 0
            print('无保存模型，将从头开始训练！')

        for epoch in range(EPOCHS):
            print("========== epoch: [{}/{}] ==========".format(epoch + 1 + start_epoch, start_epoch + EPOCHS))
            train()
            if lr_scheduler is not None:
                lr_scheduler.step()
            test()
            # 保存模型
            state = {'model': model.state_dict(), 'optimizer': optimizer.state_dict(), 'epoch': epoch + 1 + start_epoch}
            torch.save(state, LOG_DIR)


# 调用main函数
if __name__ == '__main__':
    main()

*************** confirm ***************


ValueError: expected 4D input (got 3D input)