#### 实验二 猫狗分类
|姓名|石钧予|学号|2022E8013282111|培养单位|计算技术研究所|
|---|---|---|---|---|---|
- 实验目的
  1. 进一步理解和掌握卷积神经网络中卷积层、卷积步长、卷积核、池化层、池
化核、微调(Fine-tune)等概念。
  2. 进一步掌握使用深度学习框架进行图像分类任务的具体流程：如读取数据、
构造网络、训练和测试模型等等。
- 实验要求
  - 原则上要求人为划分的数据集中，训练
集图像总数不少于 2000 张，测试集图像总数不少于大于 500，最终模型的
准确率要求不低于 75%。
  - 基于 Python 语言和任意一种深度学习框架，从零开始一步步完成数据读取、网络构建、模型训练和模型
测试等过程，最终实现一个可以进行猫狗图像分类的分类器。
  - 按规定时间在课程网站提交实验报告、代码以及 PPT。
- 实验结果
  - 
- 实验过程

In [1]:
import os
import torch
import random
import numpy as np
from torch import nn
from PIL import Image
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
torch.cuda.empty_cache() # 清除 cache

- 数据预处理
  - 数据集：Kaggle猫狗分类实验数据集
  - 数据增强：使用transform对训练集进行缩放、裁剪、旋转、归一化
  - 数据划分：使用torch.utils.data.random_split进行数据集&测试集切分

In [2]:
class DogCatDataset(Dataset):
    def __init__(self, data_dir_path, training=True):
        self.data_dir_path = data_dir_path
        self.file_list = [os.path.join(data_dir_path, file)
                          for file in os.listdir(data_dir_path)]
        self.training = training

        # 数据增强操作
        self.transform = transforms.Compose([
            transforms.Resize((128, 128)),
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(10),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])
        ])

    def __getitem__(self, index):
        # 从data_path中读取图像
        image = Image.open(self.file_list[index])
        # 对图像进行预处理，例如缩放、裁剪、标准化等
        image = self.transform(image)
        if self.training:
            # 获取对应的标签
            label = torch.FloatTensor(
                [0, 1]) if 'cat' in self.file_list[index] else torch.FloatTensor([1, 0])
            return image, label
        else:
            return image
    def __len__(self):
        return len(self.file_list)

In [4]:
BATCH_SIZE = 64
DATA_TRAIN_PATH = './data/DogCatData/train'
data_set = DogCatDataset(data_dir_path=DATA_TRAIN_PATH)
train_set, test_set = torch.utils.data.random_split(
    data_set, [int(0.8*len(data_set)), int(0.2*len(data_set))])
train_loader = DataLoader(train_set, BATCH_SIZE, True)
test_loader = DataLoader(test_set, BATCH_SIZE, False)
print('>>>>>>数据预处理')
print('train set size : ', len(train_set))
print('test  set size : ', len(test_set))

>>>>>>数据预处理
train set size :  20000
test  set size :  5000


- 模型构建
  - 残差块定义

In [16]:
class ResidualBlock(nn.Module):

    def __init__(self, in_channels, out_channels, stride=1) -> None:
        super().__init__()
        self.conv_1 = nn.Conv2d(
            in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.batch_norm_1 = nn.BatchNorm2d(out_channels)
        self.relu_1 = nn.ReLU()

        self.conv_2 = nn.Conv2d(
            out_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.batch_norm_2 = nn.BatchNorm2d(out_channels)
        self.relu_2 = nn.ReLU()

        self.stride = stride
        self.shortcut = nn.Sequential()

        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias=False),
                nn.BatchNorm2d(out_channels)
            )
    def forward(self, input):
        output = self.conv_1(input)
        output = self.batch_norm_1(output)
        output = self.relu_1(output)
        output = self.conv_2(output)
        output = self.batch_norm_2(output)
        output += self.shortcut(input)
        output = self.relu_2(output)
        return output

In [17]:
class DogCatClassification(nn.Module):
    def __init__(self) -> None:
        super(DogCatClassification, self).__init__()

        self.cnn = nn.Sequential(
            nn.Conv2d(3, 64, 3, 1, 1),  # [64,128,128]
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),  # [64,64,64]

            nn.Conv2d(64, 128, 3, 1, 1),  # [128,64,64]
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),  # [128,32,32]

            ResidualBlock(128, 256),  # [256,32,32]
            nn.MaxPool2d(2, 2, 0),
            nn.Flatten()
        )
        self.fc = nn.Sequential(
            nn.Linear(256*16*16, 256),
            nn.LeakyReLU(),
            nn.Linear(256, 128),
            nn.LeakyReLU(),
            nn.Linear(128, 2),
        )

    def forward(self, input):
        cnn_ouput = self.cnn(input)
        fc_output = self.fc(cnn_ouput)
        return fc_output


In [33]:
def train(model, loss_fn, optimizer, dataloader, epochs):
    size = len(dataloader.dataset)
    loss_set = []
    accuracy_set = []
    print('>>>>>> Model Train Beginning......')
    for idx_epoch in range(epochs):
        print(f'Epoch : {idx_epoch + 1}\n')
        model.train()
        for idx_batch, (X, y) in enumerate(dataloader):
            X = X.cuda()
            y = y.cuda()
            pred = model(X)
            loss = loss_fn(pred, y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if idx_batch % 100 == 0:
                loss, current = loss.item(), (idx_batch + 1) * len(X)
                loss_set.append(loss)
                
                print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

def evaluation(model, dataloader, result_path):
    model.eval()
    prediction = []
    print('>>>>>> Model Evaluation Beginning......\n')
    with torch.no_grad():
        for X in dataloader:
            X = X.cuda()
            pred = model(X)
            prediction += pred.argmax(dim=1).tolist()
    print('>>>>>> Saving Result......\n')
    with open(result_path, 'w', encoding='utf-8') as f:
        f.write('Id,Category\n')
        for idx, predict in enumerate(prediction):
            f.write(f'{idx},{predict}\n')


def test(model, loss_fn, dataloader):
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0
    print('>>>>>> Model Test Beginning......')
    with torch.no_grad():
        for X, y in dataloader:
            X = X.cuda()
            y = y.cuda()
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += \
                (pred.argmax(1) == y.argmax(1)).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(
        f"Test Error: \n Accuracy:{(100*correct):>0.1f} % , Avg loss : {test_loss:>8f} \n")


In [34]:
LR = 1e-3
EPOCHS = 10
DEVICE = 'cuda:0' if torch.cuda.is_available else 'cpu'
MODEL_PATH = './DogCatClassification.pt'

model = DogCatClassification().cuda()
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
# torch.save(model.state_dict(), PATH)
model.load_state_dict(torch.load(MODEL_PATH))


<All keys matched successfully>

In [35]:
test(model, loss_fn,test_loader)

>>>>>> Model Test Beginning......
Test Error: 
 Accuracy:95.8 % , Avg loss : 0.126072 

