# 实验：植物识别

在我们读入数据集中的图片时，我们需要对图片进行预处理，使其能够从图像转变为能够被模型读取的数据格式，同时对于训练数据，我们可能想对它们进行[数据增广](https://pytorch.org/vision/stable/transforms.html)来提升训练效果，这些操作都可以通过`torchvision.transforms`来实现。在下面，我们定义了一个针对了训练的`train_transform`，其中包含了数据转换与一些数据增广相关的变换，我们还定义了针对验证和测试的`test_transform`，其中只包含了数据转换的步骤，因为我们不需要在测试时进行数据增广。

In [None]:
from torchvision import transforms
import torch

# 下面的代码除了随机将图片裁剪为224x224以外没有使用其他的数据增广
# 你可以随意添加自己想使用的数据增广变换

train_transform = transforms.Compose(
    (
        transforms.PILToTensor(),
        transforms.ConvertImageDtype(torch.float),
        transforms.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        transforms.RandomResizedCrop(
            224, interpolation=transforms.InterpolationMode.BILINEAR
        ),
####################################################################################
#       你可以在这里添加你想要的变换                                                    #
#       transforms.xxx(),                                                          #
#       transforms.xxx(),                                                          #                             
####################################################################################        
    )
)

test_transform = transforms.Compose(
    (
        transforms.ToTensor(),
        transforms.Resize(256, interpolation=transforms.InterpolationMode.BILINEAR),
        transforms.CenterCrop(224),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    )
)

接下来我们需要定义数据集来读取数据，对于训练集和验证集，因为它们的文件结构为主文件夹中包含了对应多个类别的子文件夹。
所以我们可以直接使用`torchvision.datasets.ImageFolder`来建立带标签的数据集。但是对于测试集，我们需要通过继承`VisionDataset`类的方法来构建自己的数据集类。在这里，我们建立了`PlantDataset`，它能够读取指定文件夹下的所有图片，并且在被调用时返回经过转换后的数据值以及作为标签的文件名。
记得把你在上面定义的变换作为参数传给对应的数据集。

In [None]:
from torchvision.datasets import VisionDataset
from typing import Any, Callable, List, Optional, Tuple
from PIL import Image
from pathlib import Path
import re
import os


class PlantDataset(VisionDataset):
    def __init__(
        self,
        root: str,
        transforms: Optional[Callable] = None,
        transform: Optional[Callable] = None,
        target_transform: Optional[Callable] = None,
    ) -> None:
        super().__init__(root, transforms, transform, target_transform)
        data_path = Path(self.root)
        image_list = list(data_path.glob("*.jpg"))
        self.images = [str(i) for i in image_list]

    def __getitem__(self, index: int) -> Any:
        image_path = self.images[index]
        image = Image.open(image_path).convert("RGB")
        image = self.transforms(image)

        label = re.sub(r"\D", "", os.path.sep(image_path)[1])  # 这里会把文件后缀给去掉，将文件名保留作为标签返回
        return image, int(label)

    def __len__(self) -> int:
        return len(self.images)

In [None]:
import os

from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader

data_path = './10 plants dataset'
batch_size = 32 # @param
num_workers = 0 # @param 如果你使用的是CPU，建议不要改动
torch.manual_seed(42) # @param 为你的pytorch指定一个随机数种子，使得每一次的结果可以被复现

train_path = (os.path.join(data_path, 'train'))
test_path = (os.path.join(data_path, 'test'))
val_path = (os.path.join(data_path, 'val'))

train_dataset = ImageFolder(
    train_path,
    train_transform)
val_dataset = ImageFolder(
    val_path,
    test_transform)
test_dataset = PlantDataset(
    test_path,
    test_transform)


在训练和测试时，PyTorch通过`DataLoader`类来从定义好的dataset中读取每一个batch的数据，在下面我们建立了三个`DataLoader`对象，并指定训练和验证时的batch size为超参数`batch_size`的值。此处我们还指定测试时的batch size为1，你也可以取一个其他值。

In [None]:
import torch
from torch import nn
from torch.nn import functional as F

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

train_loader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    num_workers=num_workers,
    shuffle=True)
val_loader = DataLoader(
    val_dataset,
    batch_size=len(val_dataset),
    num_workers=num_workers,
    shuffle=True)
test_loader = DataLoader(
    test_dataset,
    batch_size=1,
    num_workers=num_workers)

在下面一个单元格中，请你搭建自己的神经网络。具体来说，你需要自定义一个继承自`nn.Module`的类，并在`__init__`方法中定义这个类各层的结构，在`forward`方法中定义网络前向传播的方式（反向传播已经由PyTorch替你安排好了，对此无需担心），如果你对这部分感到有问题，可以参阅PyTorch官方教程[BUILD THE NEURAL NETWORK](https://pytorch.org/tutorials/beginner/basics/buildmodel_tutorial.html)或询问助教。考虑到我们在处理的是二维图像问题，你可能还要参阅[二维卷积核(Conv2d)](https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html?highlight=conv2d)这一模块的文档，当然你也可以询问ChatGPT，请注意我们在`transforms`中指定图片的宽高为224x224，在计算卷积核大小时不要忘记这一点。
>注：你可以像官方教程一样将多个神经网络模块串联在`nn.Sequential()`中，也可以将它们分开定义，并在前向传播时指定传递方式，在搭建复杂的网络时往往后者更为实用。

In [None]:
from torch import nn

class ExampleNetwork(nn.Module):
    def __init__(self):
        super().__init__()
####################################################################################
#       你可以在这里定义你的神经网络                                                    #
#       self.conv1 = nn.(3, 5, 3),                                                 #
#       self.relu1 = nn.ReLU(),                                                    #                             
#       ...                                                                        #
####################################################################################  

    def forward(self, x):
####################################################################################
#       你可以在这里定义你的前向传播                                                    #
#       x = self.conv1(x),                                                         #
#       logits = self.relu1(x),                                                    #                             
#       ...                                                                        #
#################################################################################### 
        return logits
    
model = ExampleNetwork().to(device)

在下面，我们指定了模型训练的各超参数，我们默认使用带动量的SGD作为优化器，交叉熵损失作为损失函数，你也可以尝试使用Adam等不同的选择，或是指定学习率衰减策略。
我们还定义了一些用于评估模型训练效果的指标，如topk准确率，它们有助于我们在训练过程中跟踪模型的性能。

In [None]:
import numpy as np

learning_rate = 1e-3  # @param
epochs = 20  # @param
warmup_epoches = 2  # @param

iters_per_epoch = np.ceil(len(train_dataset) / batch_size)
loss_fn = nn.CrossEntropyLoss()

optimizer = torch.optim.SGD(
    model.parameters(), lr=learning_rate, weight_decay=1e-4, momentum=0.9
)

In [None]:
class AverageMeter(object):
  """Computes and stores the average and current value"""
  def __init__(self, name, fmt=':f'):
      self.name = name
      self.fmt = fmt
      self.reset()

  def reset(self):
      self.val = 0
      self.avg = 0
      self.sum = 0
      self.count = 0

  def update(self, val, n=1):
      self.val = val
      self.sum += val * n
      self.count += n
      self.avg = self.sum / self.count

  def __str__(self):
      fmtstr = '{name} {avg' + self.fmt + '}'
      return fmtstr.format(**self.__dict__)

def accuracy(output, target, topk=(1,)):
  """Computes the accuracy over the k top predictions for the specified values of k"""
  with torch.no_grad():
    maxk = max(topk)
    batch_size = target.size(0)

    _, pred = output.topk(maxk, 1, True, True)
    pred = pred.t()
    correct = pred.eq(target.view(1, -1).expand_as(pred))

    res = []
    for k in topk:
        correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
        res.append(correct_k.mul_(100.0 / batch_size))
    return res

batch_time = AverageMeter('Time', ':6.3f')
data_time = AverageMeter('Data', ':6.3f')
losses = AverageMeter('Training loss', ':.4e')
top1 = AverageMeter('Acc@1', ':6.2f')
top5 = AverageMeter('Acc@5', ':6.2f')

以下是模型实际的训练代码，注意在每一次iteration中，loss在何时计算，梯度在何时反向传播（别忘记清空优化器中已有的梯度），权重和学习率（如果你有学习率衰减）在何时进行调整。每经过一个完整的epoch，模型会在验证集上进行一次评估，记录其损失和准确率。

In [None]:
import time

val_loss_history = []
loss_history = []
top1_history = []
top5_history = []
best_loss = 0x7fffffff

start = time.time()
for i in range(epochs):
  for batch, (X, y) in enumerate(train_loader):
    X = X.to(device)
    y = y.to(device)
    data_time.update(time.time() - start)
    pred = model(X)
    loss = loss_fn(pred, y)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    # scheduler.step()
  
  batch_time.update(time.time() - start)
  start = time.time()

  with torch.no_grad():
    for batch, (X, y) in enumerate(val_loader):
      X = X.to(device)
      y = y.to(device)
      pred = model(X)
      val_loss = loss_fn(pred, y)
      if val_loss.item() < best_loss:
        best_loss = val_loss.item()
        best_epoch = i
        best_model = model
      acc1, acc5 = accuracy(pred, y, topk=(1, 5))
      loss_history.append(loss.item())
      val_loss_history.append(val_loss.item())
      top1_history.append(acc1[0])
      top5_history.append(acc5[0])
      losses.update(loss.item(), X.size(0))
      top1.update(acc1[0], X.size(0))
      top5.update(acc5[0], X.size(0))
      
  print(f"Epoch:{i + 1}: {batch_time}, {losses}, Validation loss {val_loss.item():.4e}, {top1}, {top5}, learning rate {optimizer.state_dict()['param_groups'][0]['lr']}")
  losses.reset()
  top1.reset()
  top5.reset()

print(f"Best Epoch:{best_epoch + 1}, loss: {best_loss}, Acc@1: {top1_history[best_epoch]}, Acc@5: {top5_history[best_epoch]}")


训练结束之后，我们可以将每一个epoch存储的各项指标在图标上绘制出来，观察它们是怎样变化的。如果你使用tensorboard或wandb，这一过程也可以实时进行。注意执行这个单元格需要你安装了`matplotlib`库，你可以直接执行`pip install matplotlib`来安装（注意安装后可能需要你重启内核并重新训练）。

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from torch import randint
%matplotlib inline
plt.plot(np.arange(1, epochs+1), loss_history, label="loss")
plt.plot(np.arange(1, epochs+1), val_loss_history, label="val_loss")
plt.xlabel("epochs")
plt.ylabel("loss")
plt.legend()
plt.show()
plt.clf()

plt.plot(np.arange(1, epochs+1), [i.item() for i in top1_history])
plt.xlabel("epochs")
plt.ylabel("Acc@1")
plt.show()
plt.clf()

通过将测试集的DataLoader传给下面的函数，我们可以对模型在测试集上进行测试，测试的过程与训练类似，只是不需要更新梯度。`model.eval()`和`with torch.no_grad():`会告诉模型现在正处于测试的状态，注意模型的输出总是一个代表了类别的整数，因此需要构建`IDX_TO_CLASSES`来建立反向的索引，使得我们能从模型输出的数字得到具体的类别名。

In [None]:
CLASSES_TO_IDX = train_dataset.class_to_idx
IDX_TO_CLASSES = {CLASSES_TO_IDX[i]:i for i in train_dataset.classes}


def test(model, test_image, name_prefix):
    model.eval()
    labels = []
    image_numbers = []
    with torch.no_grad(): # 在测试时，需要禁用梯度的计算
        for _, (images, image_number) in enumerate(test_image):
            images = images.to(device)
            y = model(images)
            batch_labels = torch.argmax(y, dim=1) # 取线性层输出最大值的下标作为我们对应的分类下标
            labels.append(batch_labels)
            image_numbers.append(image_number)
    ans = torch.cat(labels, 0).cpu().numpy()
    image_numbers = torch.cat(image_numbers, 0).cpu().numpy()
    res = {str(image_numbers[i]): IDX_TO_CLASSES[j] for i, j in enumerate(ans)}
    print(f"{name_prefix} model result:", res)
    return res


res = test(best_model, test_loader, "best")

接下来就可以输出你的模型测试结果了，不要忘记修改文件名为`<学号>-<姓名>.json`.

In [None]:
import json

filename = "<学号>-<姓名>" # 在此处编辑你的文件名
with open(f"{filename}.json", "w") as f:
    json.dump(res, f, indent=4)