## 7.4 案例应用

### 7.4.1 MNIST手写体识别

【例7-14】

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torchvision import datasets


def main():
    # 超参数设置
    BATCH_SIZE = 64  # 批次大小
    EPOCHS = 10  # 迭代次数
    learning_rate = 0.01  # 学习率
    # 如果cuda可用，则使用cuda，否则使用CPU训练
    DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    # 实例化LeNet5模型
    lenet5 = LeNet5().to(DEVICE)
    # 实例化交叉熵损失函数（包含了softmax损失函数）
    criterion = nn.CrossEntropyLoss().to(DEVICE)
    # 实例化Adam优化器
    optimizer = optim.Adam(lenet5.parameters(), lr=learning_rate)
    # 获取训练集数据加载器
    train_loader = torch.utils.data.DataLoader(
        datasets.MNIST('data', train=True, download=True, transform=transforms.Compose(
            [transforms.ToTensor(), transforms.Normalize(mean=(0.5,), std=(0.5,))])),
        batch_size=BATCH_SIZE, shuffle=True)
    # 获取测试集数据加载器
    test_loader = torch.utils.data.DataLoader(
        datasets.MNIST('data', train=False, transform=transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.5,), (0.5,))
        ])),
        batch_size=BATCH_SIZE, shuffle=True)
    # 训练模型
    print('---------训练模型---------')
    train(EPOCHS, lenet5, criterion, DEVICE, train_loader, optimizer)
    # 测试模型
    print('---------测试模型---------')
    test(test_loader, lenet5, criterion, DEVICE)


# 定义AlexNet模型
class LeNet5(nn.Module):
    def __init__(self):
        super(LeNet5, self).__init__()
        # 搭建由卷积层、池化层和激活函数组成的特征提取器
        self.features = nn.Sequential(
            nn.Conv2d(1, 6, 5),
            nn.Tanh(),
            nn.AvgPool2d(2, 2),
            nn.Conv2d(6, 16, 5),
            nn.Tanh(),
            nn.AvgPool2d(2, 2),
        )
        # 创建分类器
        self.classifier = nn.Sequential(
            nn.Linear(16 * 4 * 4, 120),
            nn.Linear(120, 84),
            nn.Linear(84, 10),
        )

    def forward(self, x):
        x = self.features(x)
        # 平铺特征图
        x = torch.flatten(x, start_dim=1)
        x = self.classifier(x)
        return x


def train(num_epochs, model, criterion, device, train_loader, optimizer):
    for epoch in range(num_epochs):
        # 设置模型为训练模式
        model.train()
        # test_loss为训练时的总损失
        total_test_loss = 0
        # 从迭代器抽取图片和标签
        for i, (images, labels) in enumerate(train_loader):
            samples,labels = images.to(device),labels.to(device)
            # 获取当前模型的输出
            output = model(samples.reshape(-1, 1, 28, 28))
            # 计算损失函数值
            loss = criterion(output, labels)
            # 参数更新前优化器内部参数梯度设置为0
            optimizer.zero_grad()
            # 损失值前向传播
            loss.backward()
            # 更新模型参数
            optimizer.step()
            # 转成Python数据类型
            loss = loss.item()
            # 累计损失
            total_test_loss += loss
        model.eval()  # 设置模型进入预测模式
        correct = 0
        for data, target in train_loader:
            data, target = data.to(device), target.to(device)
            # 获得模型输出
            output = model(data.reshape(-1, 1, 28, 28))
            # 获得模型分类结果
            pred = output.data.max(1)[1]
            # 计算分类正确的样本数
            correct += pred.eq(target.data.view_as(pred)).cpu().sum()
        # 打印迭代训练过程中训练集的损失和准确率
        print("Epoch: {}/{}".format(epoch + 1, num_epochs))
        print("Train-lose: {:.4f}".format(total_test_loss / len(train_loader.dataset)))
        print('Accuracy: {:.3f}%\n'.format(100. * correct / len(train_loader.dataset)))


def test(test_loader, model, criterion, device):
    # 设置模型为验证模式
    model.eval()
    correct = 0
    for data, target in test_loader:
        data, target = data.to(device), target.to(device)
        output = model(data.reshape(-1, 1, 28, 28))
        # 找到概率最大的下标，为输出值
        pred = output.data.max(1)[1]
        # 正确的样本数
        correct += pred.eq(target.data.view_as(pred)).cpu().sum().item()
    # 打印测试结果
    print('Total number of test samples: ', len(test_loader.dataset))
    print('Number of correct classifications: ', correct)
    print('Accuracy: {:.3f}% '.format(100. * correct / len(test_loader.dataset)))


if __name__ == '__main__':
    main()

ModuleNotFoundError: No module named 'torch'

### 7.4.2 猫狗大战

【例7-16】

In [5]:
import os
import shutil
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data import Dataset
from torchvision import transforms, datasets, models
from torchvision import transforms
from torchvision import datasets


# 制作数据集
def data_preprocess():
    # kaggle原始数据中‘train’的地址
    original_dataset_dir = 'train'
    total_num = int(len(os.listdir(original_dataset_dir)) / 2)
    index = np.array(range(total_num))
    # 待处理的数据集地址
    base_dir = 'cats_and_dogs'
    if not os.path.exists(base_dir):
        os.mkdir(base_dir)
    # 训练集、测试集的划分
    sub_dirs = ['train', 'test']
    animals = ['cats', 'dogs']
    train_index = index[:int(total_num * 0.9)]
    test_index = index[int(total_num * 0.9):]
    numbers = [train_index, test_index]
    for idx, sub_dir in enumerate(sub_dirs):
        dir = os.path.join(base_dir, sub_dir)
        if not os.path.exists(dir):
            os.mkdir(dir)
        for animal in animals:
            animal_dir = os.path.join(dir, animal)
            if not os.path.exists(animal_dir):
                os.mkdir(animal_dir)
            fnames = [animal[:-1] + '.{}.jpg'.format(i) for i in numbers[idx]]
            for fname in fnames:
                src = os.path.join(original_dataset_dir, fname)
                dst = os.path.join(animal_dir, fname)
                shutil.copyfile(src, dst)
            # 验证训练集、测试集的划分的照片数目
            print(animal_dir + ' total images : %d' % (len(os.listdir(animal_dir))))


def get_dataloader(train=True, root='cats_and_dogs/train/', batch_size=8):
    loader = None
    dataset = None
    data_transform = transforms.Compose([
        transforms.Scale(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])
    if train:
        dataset = datasets.ImageFolder(root=root, transform=data_transform)
        loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True, )
    else:
        dataset = datasets.ImageFolder(root=root, transform=data_transform)
        loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True, )
    return loader


# 创建模型
class LeNet5(nn.Module):
    def __init__(self):
        super(LeNet5, self).__init__()
        # 搭建由卷积层、池化层和激活函数组成的特征提取器
        self.features = nn.Sequential(
            nn.Conv2d(3, 6, 5),
            nn.Tanh(),
            nn.AvgPool2d(2, 2),
            nn.Conv2d(6, 16, 5),
            nn.Tanh(),
            nn.AvgPool2d(2, 2),
        )
        # 创建分类器
        self.classifier = nn.Sequential(
            nn.Linear(44944, 120),
            nn.Linear(120, 84),
            nn.Linear(84, 2),
        )

    def forward(self, x):
        x = self.features(x)
        # 平铺特征图
        x = torch.flatten(x, start_dim=1)
        x = self.classifier(x)
        return x


# 使用LeNet-5进行猫狗预测
def main():
    # 超参数设置
    BATCH_SIZE = 256  # 批次大小
    EPOCHS = 10  # 迭代次数
    learning_rate = 0.01  # 学习率
    # 如果cuda可用，则使用cuda，否则使用CPU训练
    DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    # 实例化LeNet5模型
    lenet5 = LeNet5().to(DEVICE)
    # 实例化交叉熵损失函数（包含了softmax损失函数）
    criterion = nn.CrossEntropyLoss().to(DEVICE)
    # 实例化Adam优化器
    optimizer = optim.Adam(lenet5.parameters(), lr=learning_rate)
    # 获取训练集数据加载器
    train_loader = get_dataloader(train=True, root='cats_and_dogs/train/', batch_size=BATCH_SIZE)
    # 获取测试集数据加载器
    test_loader = get_dataloader(train=False, root='cats_and_dogs/test/', batch_size=BATCH_SIZE)
    # 训练模型
    print('---------训练模型---------')
    train(EPOCHS, lenet5, criterion, DEVICE, train_loader, optimizer)
    # 测试模型
    print('---------测试模型---------')
    test(test_loader, lenet5, criterion, DEVICE)


# ResNet迁移学习
def main2():
    # 超参数设置
    BATCH_SIZE = 16  # 批次大小
    EPOCHS = 10  # 迭代次数
    learning_rate = 0.01  # 学习率
    # 如果cuda可用，则使用cuda，否则使用CPU训练
    DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    # 使用预训练的resnet50，进行基于模型的迁移
    resnet = models.resnet50(pretrained=True).to(DEVICE)
    # 实例化交叉熵损失函数（包含了softmax损失函数）
    criterion = nn.CrossEntropyLoss().to(DEVICE)
    # 实例化Adam优化器
    optimizer = optim.Adam(resnet.parameters(), lr=learning_rate)
    # 获取训练集数据加载器
    train_loader = get_dataloader(train=True, root='cats_and_dogs/train/', batch_size=BATCH_SIZE)
    # 获取测试集数据加载器
    test_loader = get_dataloader(train=False, root='cats_and_dogs/test/', batch_size=BATCH_SIZE)
    # 训练模型
    print('---------训练模型---------')
    train(EPOCHS, resnet, criterion, DEVICE, train_loader, optimizer)
    # 测试模型
    print('---------测试模型---------')
    test(test_loader, resnet, criterion, DEVICE)


# 训练
def train(num_epochs, model, criterion, device, train_loader, optimizer):
    for epoch in range(num_epochs):
        # 设置模型为训练模式
        model.train()
        # test_loss为训练时的总损失
        total_test_loss = 0
        # 从迭代器抽取图片和标签
        for i, (images, labels) in enumerate(train_loader):
            samples = images.to(device)
            labels = labels.to(device)
            # 获取当前模型的输出
            output = model(samples)
            # 计算损失函数值
            loss = criterion(output, labels)
            # 参数更新前优化器内部参数梯度设置为0
            optimizer.zero_grad()
            # 损失值前向传播
            loss.backward()
            # 更新模型参数
            optimizer.step()
            # 转成Python数据类型
            loss = loss.item()
            # 累计损失
            total_test_loss += loss
        model.eval()  # 设置模型进入预测模式
        correct = 0
        for data, target in train_loader:
            data, target = data.to(device), target.to(device)
            # 获得模型输出
            output = model(data)
            # 获得模型分类结果
            pred = output.data.max(1)[1]
            # 计算分类正确的样本数
            correct += pred.eq(target.data.view_as(pred)).cpu().sum()
        # 打印迭代训练过程中训练集的损失和准确率
        print("Epoch: {}/{}".format(epoch + 1, num_epochs))
        print("Train-lose: {:.4f}".format(total_test_loss / len(train_loader.dataset)))
        print('Accuracy: {:.3f}%\n'.format(100. * correct / len(train_loader.dataset)))


# 测试
def test(test_loader, model, criterion, device):
    # 设置模型为验证模式
    model.eval()
    correct = 0
    for data, target in test_loader:
        data, target = data.to(device), target.to(device)
        output = model(data)
        # 找到概率最大的下标，为输出值
        pred = output.data.max(1)[1]
        # 正确的样本数
        correct += pred.eq(target.data.view_as(pred)).cpu().sum().item()
    # 打印测试结果
    print('Total number of test samples: ', len(test_loader.dataset))
    print('Number of correct classifications: ', correct)
    print('Accuracy: {:.3f}% '.format(100. * correct / len(test_loader.dataset)))


if __name__ == '__main__':
    if not os.path.exists('cats_and_dogs'):
        data_preprocess()
    main()
    main2()


  "please use transforms.Resize instead.")


---------训练模型---------


KeyboardInterrupt: 