# 任务介绍
**猫狗图像二分类问题**，本次实验基于[Pytorch](https://pytorch.org/)

## 引入头文件

In [7]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'

import random
import shutil

from tensorboardX import SummaryWriter
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
import torchvision.utils

from matplotlib import pyplot as plt
from PIL import Image

## 准备数据集，将数据集划分为训练集、验证集和测试集
源数据集[来源](https://www.kaggle.com/competitions/dogs-vs-cats/data)，这是一个包含25000张图像的**猫狗**数据集。本次实验采用的数据集是源数据集的子集。
* 训练集：每个类别均有1000张图像
* 验证集：每个类别均有500张图像
* 测试集：每个类别均有500张图像

In [1]:
original_dataset_dir = './dataset/original_data/train'

base_dir = './dataset'
if not os.path.exists(base_dir):
    os.mkdir(base_dir)

train_dir = os.path.join(base_dir, 'train')
os.mkdir(train_dir)
validation_dir = os.path.join(base_dir, 'validation')
os.mkdir(validation_dir)
test_dir = os.path.join(base_dir, 'test')
os.mkdir(test_dir)

train_cats_dir = os.path.join(train_dir, 'cats')
os.mkdir(train_cats_dir)

train_dogs_dir = os.path.join(train_dir, 'dogs')
os.mkdir(train_dogs_dir)

validation_cats_dir = os.path.join(validation_dir, 'cats')
os.mkdir(validation_cats_dir)

validation_dogs_dir = os.path.join(validation_dir, 'dogs')
os.mkdir(validation_dogs_dir)

test_cats_dir = os.path.join(test_dir, 'cats')
os.mkdir(test_cats_dir)

test_dogs_dir = os.path.join(test_dir, 'dogs')
os.mkdir(test_dogs_dir)

fnames = ['cat.{}.jpg'.format(i) for i in range(1000)]
for fname in fnames:
    src = os.path.join(original_dataset_dir, fname)
    dst = os.path.join(train_cats_dir, fname)
    shutil.copyfile(src, dst)

fnames = ['cat.{}.jpg'.format(i) for i in range(1000, 1500)]
for fname in fnames:
    src = os.path.join(original_dataset_dir, fname)
    dst = os.path.join(validation_cats_dir, fname)
    shutil.copyfile(src, dst)

fnames = ['cat.{}.jpg'.format(i) for i in range(1500, 2000)]
for fname in fnames:
    src = os.path.join(original_dataset_dir, fname)
    dst = os.path.join(test_cats_dir, fname)
    shutil.copyfile(src, dst)

fnames = ['dog.{}.jpg'.format(i) for i in range(1000)]
for fname in fnames:
    src = os.path.join(original_dataset_dir, fname)
    dst = os.path.join(train_dogs_dir, fname)
    shutil.copyfile(src, dst)

fnames = ['dog.{}.jpg'.format(i) for i in range(1000, 1500)]
for fname in fnames:
    src = os.path.join(original_dataset_dir, fname)
    dst = os.path.join(validation_dogs_dir, fname)
    shutil.copyfile(src, dst)

fnames = ['dog.{}.jpg'.format(i) for i in range(1500, 2000)]
for fname in fnames:
    src = os.path.join(original_dataset_dir, fname)
    dst = os.path.join(test_dogs_dir, fname)
    shutil.copyfile(src, dst)

In [2]:
print('total training cat images:', len(os.listdir(train_cats_dir)))

print('total training dog images:', len(os.listdir(train_dogs_dir)))

print('total validation cat images:', len(os.listdir(validation_cats_dir)))

print('total validation dog images:', len(os.listdir(validation_dogs_dir)))

print('total test cat images:', len(os.listdir(test_cats_dir)))

print('total test dog images:', len(os.listdir(test_dogs_dir)))

total training cat images: 1000
total training dog images: 1000
total validation cat images: 500
total validation dog images: 500
total test cat images: 500
total test dog images: 500


In [None]:
"""数据预处理
Dataset - Pytorch使用的数据集的创建,传递给DataLoader
DataLoader - 迭代产生训练数据提供给模型
"""

dataset_dir = './dataset'

class Imageset(Dataset):
    def __init__(self, root, mode):
        # Todo
        # 1. Initialize file path or list of file names.

        super(Imageset, self).__init__()

        assert mode in ['train', 'validation', 'test']
        self.image_dir = os.path.join(root, mode)

        """Todo: 我们可以在这里进行数据增强
        if mode == 'train':
            self.transform = transforms.Compose([
                transforms.RandomResizedCrop((150, 150), (1.0, 1.12), interpolation=Image.BICUBIC),  # 随机裁剪，然后对裁剪得到的图像缩放为同一大小
                transforms.RandomHorizontalFlip(), transforms.ToTensor(),
                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
            ])
        else:
            self.transform = transforms.Compose([
                transforms.Resize((150, 150)), transforms.ToTensor(),
                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
            ])
        """
        if mode == 'train':
            self.transform = transforms.Compose([
                transforms.Resize((150, 150)), transforms.ToTensor(),
            ])
        else:
            self.transform = transforms.Compose([
                transforms.Resize((150, 150)), transforms.ToTensor(),
            ])

        self.image_list = []
        self.target_list = []

        for i, category in enumerate(os.listdir(self.image_dir)):
            for name in os.listdir(os.path.join(self.image_dir, category)):
                self.image_list.append(os.path.join(self.image_dir, category, name))
                self.target_list.append(i)

    def __getitem__(self, index):
        # Todo
        # 1. Read one data from file (e.g. using numpy.fromfile, PIL.Image.open).
        # 2. Preprocess the data (e.g. torchvision.Transform).
        # 3. Return a data pair (e.g. image and label).

        path = self.image_list[index]
        image = self.transform(Image.open(path).convert('RGB'))
        label = self.target_list[index]
        return image, label

    def __len__(self):
        return len(self.image_list)


train_dataset = Imageset(dataset_dir, 'train')
validation_dataset = Imageset(dataset_dir, 'validation')
test_dataset = Imageset(dataset_dir, 'test')

train_dataloader = DataLoader(dataset=train_dataset, batch_size=20, shuffle=True, num_workers=8)
validation_dataloader = DataLoader(dataset=validation_dataset, batch_size=20, shuffle=False, num_workers=8, drop_last=False)
test_dataloader = DataLoader(dataset=test_dataset, batch_size=1, shuffle=False, num_workers=8, drop_last=False)

## 构建网络

In [None]:
class convnet(nn.Module):
    def __init__(self):
        super(convnet, self).__init__()
        self.conv_1 = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.conv_2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(in_features=6272, out_features=512)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(in_features=512, out_features=1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        conv_1_out = self.conv_1(x)
        conv_2_out = self.conv_2(conv_1_out)
        x = self.relu(self.fc1(self.flatten(conv_2_out)))
        return conv_1_out, conv_2_out, self.sigmoid(self.fc2(x))

model = convnet()
model = model.cuda()

"""用于测试网络是否可行
model = convnet()
x = torch.randn((4, 3, 150, 150))
output = model(x)
print(output.size())
print(output)
"""

"""使用预训练的网络(vgg16)来提取特征，只训练最后的分类器
from torchvision.models.vgg import vgg16

class convnet(nn.Module):
    def __init__(self):
        super(convnet, self).__init__()
        self.vgg = vgg16(pretrained=True).features
        for param in self.parameters():
            param.requires_grad = False
        self.flatten = nn.Flatten()
        self.classifier = nn.Sequential(
            nn.Linear(in_features=4 * 4 * 512, out_features=512),
            nn.ReLU(),
            nn.Linear(in_features=512, out_features=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.classifier(self.flatten(self.vgg(x)))

optimizer = optim.RMSprop(model.classifier.parameters(), lr=learning_rate)
outputs = model(images)
"""

## 训练

In [None]:
# 损失函数
loss_fn = nn.BCELoss()
# 优化器
learning_rate = 1e-4
optimizer = optim.RMSprop(model.parameters(), lr=learning_rate)

# 创建存放模型的文件夹
if not os.path.exists('./checkpoints'):
    os.mkdir('./checkpoints')

# 设置训练网络的一些参数
# 记录训练的次数
total_train_step = 0
# 记录测试的次数
total_validation_step = 0
# 训练的轮数
epochs = 30

# 添加tensorboard
writer = SummaryWriter("logs")

for epoch in range(epochs):
    print("--------第 {} 轮训练开始--------".format(epoch + 1))

    # 训练步骤开始
    model.train()
    for images, labels in train_dataloader:
        images, labels = images.cuda(), labels.cuda().unsqueeze(-1).to(torch.float)
        _, _, outputs = model(images)
        loss = loss_fn(outputs, labels)

        # 优化器调优
        optimizer.zero_grad()  # 清空过往梯度
        loss.backward()
        optimizer.step()

        total_train_step = total_train_step + 1
        if total_train_step % 100 == 0:
            print("训练次数: {}, Loss: {}".format(total_train_step, loss.item()))
            writer.add_scalar("train_loss", loss.item(), total_train_step)

    # 验证步骤开始
    model.eval()
    total_validation_loss = 0
    total_accuracy = 0

    # 表明当前计算不需要反向传播，使用之后，强制后边的内容不进行计算图的构建
    with torch.no_grad():
        for images, labels in validation_dataloader:
            images, labels = images.cuda(), labels.cuda().unsqueeze(-1).to(torch.float)
            _, _, outputs = model(images)
            loss = loss_fn(outputs, labels)
            total_validation_loss = total_validation_loss + loss.item()
            predicts = torch.where(outputs > 0.5, 1, 0)
            accuracy = (predicts == labels.to(torch.long)).sum()
            total_accuracy = total_accuracy + accuracy

    print("整体验证集上的Loss: {}".format(total_validation_loss))
    print("整体验证集上的正确率: {}".format(total_accuracy / len(validation_dataset)))
    writer.add_scalar("validation_loss", total_validation_loss, total_validation_step)
    writer.add_scalar("validation_accuracy", total_accuracy / len(validation_dataset), total_validation_step)
    total_validation_step = total_validation_step + 1

    torch.save(model.state_dict(), os.path.join('./checkpoints', "model_{}.pth".format(epoch + 1)))
    print("模型已保存")

writer.close()

## 测试 & 可视化中间特征

In [None]:
# 定义函数，随机从0-end的一个序列中抽取size个不同的数
def random_num(size, end):
    range_ls = [i for i in range(end)]
    num_ls = []
    for i in range(size):
        num = random.choice(range_ls)
        range_ls.remove(num)
        num_ls.append(num)
    return num_ls

model = convnet()
model = model.cuda()
# 加载先前保留的参数
model.load_state_dict(torch.load('./checkpoints/model_20.pth'))

with torch.no_grad():
    model.eval()
    for i, (images, _) in enumerate(test_dataloader):
        images = images.cuda()
        features, _, outputs = model(images)
        predicts = torch.where(outputs > 0.5, 1, 0)
        print(predicts)
        # 可视化第一张图的低级特征
        if i == 0:
            torchvision.utils.save_image(images, f'{i}.jpg')
            features = features.data.squeeze(0).cpu()
            # 随机选取25个通道的特征图
            channel_num = random_num(25, features.shape[0])
            plt.figure(figsize=(10, 10))
            for index, channel in enumerate(channel_num):
                ax = plt.subplot(5, 5, index + 1, )
                plt.imshow(features[channel, :, :])
            plt.savefig(f"feature_{i}.jpg", dpi=300)