In [1]:
import json
import os
import random
import zipfile

import numpy as np
import paddle
from PIL import Image
from matplotlib import pyplot as plt
from paddle import nn

In [None]:
train_parameters = {
    "input_size": [
        3,
        224,
        224
    ],
    "class_dim": 102,
    "src_path": "E:\\Python\\CaltechClassification\\data\\dataset.zip",
    "target_path": "E:\\Python\\CaltechClassification\\data\\dataset\\",
    "image_path": "E:\\Python\\CaltechClassification\\data\\dataset\\dataset\\images\\",
    "train_list_path": "E:\\Python\\CaltechClassification\\data\\dataset\\train.txt",
    "eval_list_path": "E:\\Python\\CaltechClassification\\data\\dataset\\eval.txt",
    "class_list_path": "E:\\Python\\CaltechClassification\\data\\dataset\\dataset\\class.txt",
    "readme_path": "E:\\Python\\CaltechClassification\\data\\dataset\\readme.json",
    "label_dict": {
    },
    "num_epochs": 2,
    "train_batch_size": 32,
    "learning_strategy": {
        "lr": 0.0005
    }
}

## 1.数据准备

In [None]:
def unzip_data(src_path, target_path):
    """
    解压原始数据集，将src_path路径下的zip包解压至target_path目录下
    :param src_path:  zip包
    :param target_path:  目标目录
    :return:
    """
    if not os.path.isdir(target_path):
        z = zipfile.ZipFile(src_path, 'r')
        z.extractall(path=target_path)
        z.close()


def get_data_list(target_path, train_list_path, image_path, class_list_path, readme_path):
    """
    生成数据列表
    :param target_path: 目标文件夹
    :param train_list_path: 训练列表
    :param image_path: 图片路径
    :param class_list_path: 标签文件路径
    :param readme_path: readme 文件路径
    :return:
    """
    # 训练集图片路径
    train_data_paths = []
    # 训练集图片标签
    train_data_labels = []
    # 训练集
    train_list = []
    # 所有图片数量
    all_class_images = 0
    # 标签类别
    class_detail = []
    # 训练集文本
    train_data_list = open(target_path + "dataset\\train.txt", "r").readlines()
    # 标签集
    class_data_list = open(class_list_path, "r").readlines()
    # 标签对应的图片数
    class_num = {}

    # 生成train.txt
    for train_data in train_data_list:
        train_data_path = image_path.replace("\\", "\\\\") + train_data.split("\t")[0]
        train_data_label = train_data.split("\t")[1].replace("\n", "")

        train_data_paths.append(train_data_path)
        train_data_labels.append(train_data_labels)

        train_list.append(f"{train_data_path}\t{train_data_label}\n")

        # 统计对应标签的训练集数量
        if train_data_label not in class_num.keys():
            class_num.setdefault(train_data_label, 0)
        class_num[train_data_label] += 1

    # 乱序
    random.shuffle(train_list)

    # 写入 train.txt
    with open(train_list_path, "a") as f:
        for train_data in train_list:
            f.write(train_data)

    # 生成标签集
    for class_data in class_data_list:
        class_name = class_data.split("\t")[0]
        class_label = class_data.split("\t")[1].replace("\n", "")
        # 标签类别默认内容
        class_detail_default = {"class_train_images": class_num[class_label], "class_label": int(class_label),
                                "class_name": class_name}
        class_detail.append(class_detail_default)

    all_class_images = len(train_data_paths)

    # 写入 readme
    readme_json = {"all_class_images": all_class_images, "class_detail": class_detail}
    with open(readme_path, 'w') as f:
        f.write(json.dumps(readme_json, sort_keys=True, indent=4, separators=(',', ': ')))

    print("数据生成完成")

In [None]:
unzip_data(train_parameters["src_path"], train_parameters["target_path"])
get_data_list(train_parameters["target_path"], train_parameters["train_list_path"], train_parameters["image_path"],
              train_parameters["class_list_path"], train_parameters["readme_path"])

## 2.模型搭建

In [None]:
class CaltechDataset(paddle.io.Dataset):
    def __init__(self, data_path, mode="train"):
        """
        数据读取器
        :param data_path: 文件路径
        :param mode: 读取模式
        """
        super().__init__()
        self.data_path = data_path
        self.image_paths = []
        self.labels = []

        if mode == "train":
            with open(os.path.join(self.data_path, "train.txt"), "r", encoding="utf8") as f:
                self.info = f.readlines()
            for image_info in self.info:
                image_path, image_label = image_info.strip().split("\t")
                self.image_paths.append(image_path)
                self.labels.append(image_label)
        else:
            pass

    def __getitem__(self, index):
        image_path = self.image_paths[index]
        label = self.labels[index]
        image = Image.open(image_path)
        if not image.mode == "RGB":
            image = image.convert("RGB")
        image = image.resize((224, 224), Image.BILINEAR)
        image = np.array(image).astype("float32")
        image = image.transpose((2, 0, 1)) / 255
        label = np.array([label], dtype="int64")
        return image, label

    def __len__(self):
        return len(self.image_paths)

In [17]:
class Residual(paddle.nn.Layer):
    def __init__(self, in_channel, out_channel, use_conv1=False, stride=1):
        super(Residual, self).__init__()
        self.conv1 = nn.Conv2D(in_channels=in_channel, out_channels=out_channel, kernel_size=3, stride=stride,
                               padding=1)
        self.bn1 = nn.BatchNorm2D(out_channel)
        self.relu = nn.ReLU()

        self.conv2 = nn.Conv2D(in_channels=out_channel, out_channels=out_channel, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2D(out_channel)

        if use_conv1:
            self.skip = nn.Conv2D(in_channels=in_channel, out_channels=out_channel, kernel_size=1, stride=stride)
        else:
            self.skip = None

    def forward(self, x):
        print("in:" + str(x.shape))
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        if self.skip:
            x = self.skip(x)
        output = paddle.nn.functional.relu(out + x)
        print("out:" + str(output.shape))
        return output

In [18]:
def build_res_block(in_channel, out_channel, num_layers, is_first=False):
    if is_first:
        assert in_channel == out_channel
    block_list = []
    for i in range(num_layers):
        if i == 0 and not is_first:
            block_list.append(Residual(in_channel, out_channel, use_conv1=True, stride=2))
        else:
            block_list.append(Residual(in_channel, out_channel))
    res_net_block = nn.Sequential(*block_list)
    return res_net_block

In [19]:
class ResNet18(nn.Layer):
    def __init__(self, num_class):
        super(ResNet18, self).__init__()
        self.conv1 = nn.Conv2D(in_channels=3, out_channels=64, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2D(64)
        self.relu = nn.ReLU()
        self.pool1 = nn.MaxPool2D(kernel_size=3, stride=2, padding=1)
        self.layer1 = build_res_block(64, 64, 2, is_first= True)
        self.layer2 = build_res_block(64, 128, 2)
        self.layer3 = build_res_block(128, 256, 2)
        self.layer4 = build_res_block(256, 512, 2)
        self.avg_pool = nn.AdaptiveAvgPool2D(1)
        self.flatten = nn.Flatten()
        self.fc = nn.Linear(512 * 1 * 1, num_class)

    def forward(self, input):
        x = self.conv1(input)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.pool1(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avg_pool(x)
        x = self.flatten(x)
        x = self.fc(x)
        return x

In [22]:
model = paddle.vision.models.resnet34()
paddle.summary(model, (-1, 3, 64, 64))
print(model)

-------------------------------------------------------------------------------
   Layer (type)         Input Shape          Output Shape         Param #    
    Conv2D-141        [[1, 3, 64, 64]]     [1, 64, 32, 32]         9,408     
  BatchNorm2D-123    [[1, 64, 32, 32]]     [1, 64, 32, 32]          256      
      ReLU-64        [[1, 64, 32, 32]]     [1, 64, 32, 32]           0       
    MaxPool2D-8      [[1, 64, 32, 32]]     [1, 64, 16, 16]           0       
    Conv2D-142       [[1, 64, 16, 16]]     [1, 64, 16, 16]        36,864     
  BatchNorm2D-124    [[1, 64, 16, 16]]     [1, 64, 16, 16]          256      
      ReLU-65        [[1, 64, 16, 16]]     [1, 64, 16, 16]           0       
    Conv2D-143       [[1, 64, 16, 16]]     [1, 64, 16, 16]        36,864     
  BatchNorm2D-125    [[1, 64, 16, 16]]     [1, 64, 16, 16]          256      
   BasicBlock-9      [[1, 64, 16, 16]]     [1, 64, 16, 16]           0       
    Conv2D-144       [[1, 64, 16, 16]]     [1, 64, 16, 16]    

In [None]:
class ConvPool(paddle.nn.Layer):
    def __init__(self, num_channels, num_filters, filter_size, pool_size, pool_stride, groups, conv_stride=1,
                 conv_padding=1):
        super(ConvPool, self).__init__()

        for i in range(groups):
            self.add_sublayer(
                "conv_%d" % i,
                paddle.nn.Conv2D(in_channels=num_channels, out_channels=num_filters, kernel_size=filter_size,
                                 stride=conv_stride, padding=conv_padding)
            )
            self.add_sublayer(
                "relu_%d" % i,
                paddle.nn.ReLU()
            )
            num_channels = num_filters
        self.add_sublayer(
            "max_pool",
            paddle.nn.MaxPool2D(
                kernel_size=pool_size,
                stride=pool_stride

            )
        )

    def forward(self, input):
        x = input
        for prefix, sub_layer in self.named_children():
            x = sub_layer(x)
        return x

In [None]:
class CaltechModel(paddle.nn.Layer):
    def __init__(self):
        super(CaltechModel, self).__init__()
        self.conv_pool1 = ConvPool(3, 64, 3, 2, 2, 2)
        self.conv_pool2 = ConvPool(64, 128, 3, 2, 2, 2)
        self.conv_pool3 = ConvPool(128, 256, 3, 2, 2, 3)
        self.conv_pool4 = ConvPool(256, 512, 3, 2, 2, 3)
        self.conv_pool5 = ConvPool(512, 512, 3, 2, 2, 3)
        self.fc1 = paddle.nn.Linear(7 * 7 * 512, 4096)
        self.fc2 = paddle.nn.Linear(4096, 4096)
        self.fc3 = paddle.nn.Linear(4096, train_parameters['class_dim'])

    def forward(self, input, label=None):
        x = self.conv_pool1(input)
        x = self.conv_pool2(x)
        x = self.conv_pool3(x)
        x = self.conv_pool4(x)
        x = self.conv_pool5(x)

        x = paddle.reshape(x, shape=[-1, 512 * 7 * 7])
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)

        if label is not None:
            acc = paddle.metric.accuracy(input=x, label=label)
            return x, acc
        else:
            return x

## 3.模型训练

In [None]:
Batch = 0
Batchs = []
all_train_accs = []


def draw_train_acc(Batchs, train_accs):
    title = "training accs"
    plt.title(title, fontsize=24)
    plt.xlabel("batch", fontsize=14)
    plt.ylabel("acc", fontsize=14)
    plt.plot(Batchs, train_accs, color='green', label='training accs')
    plt.legend()
    plt.grid()
    plt.show()


all_train_loss = []


def draw_train_loss(Batchs, train_loss):
    title = "training loss"
    plt.title(title, fontsize=24)
    plt.xlabel("batch", fontsize=14)
    plt.ylabel("loss", fontsize=14)
    plt.plot(Batchs, train_loss, color='red', label='training loss')
    plt.legend()
    plt.grid()
    plt.show()


In [None]:
train_dataset = CaltechDataset(train_parameters["target_path"])
train_dataloader = paddle.io.DataLoader(train_dataset)

In [None]:
model = CaltechModel()
model.train()
cross_entropy = paddle.nn.CrossEntropyLoss()
opt = paddle.optimizer.SGD(learning_rate=train_parameters["learning_strategy"]["lr"], parameters=model.parameters())
epoch_num = train_parameters["num_epochs"]

for epoch in range(epoch_num):
    for batch_id, data in enumerate(train_dataloader):
        img = data[0]
        label = data[1]
        predict, acc = model(img, label)
        loss = cross_entropy(predict, label)

        if batch_id != 0 and batch_id % 10 == 0:
            Batch = Batch + 10
            Batchs.append(Batch)
            all_train_loss.append(loss.numpy()[0])
            all_train_accs.append(acc.numpy()[0])
            print("epoch:{},step:{},train_loss:{},train_acc:{}".format(epoch, batch_id, loss.numpy()[0],
                                                                       acc.numpy()[0]))
        loss.backward()
        opt.step()
        opt.clear_grad()

    if epoch != 0 and epoch % 5 == 0:
        paddle.save(model.state_dict(), f"./checkpoints/caltech_dataset_{str(epoch)}")

paddle.save(model.state_dict(), f"./checkpoints/caltech_dataset_last")  #保存模型
draw_train_acc(Batchs, all_train_accs)
draw_train_loss(Batchs, all_train_loss)

## 3.结果预测

In [None]:
def load_image(img_path):
    '''
    预测图片预处理
    '''
    image = Image.open(img_path)
    if image.mode != 'RGB':
        image = image.convert('RGB')
    image = image.resize((224, 224), Image.BILINEAR)
    image = np.array(image).astype('float32')
    image = image.transpose((2, 0, 1))  # HWC to CHW
    image = image / 255  # 像素值归一化
    return image

In [None]:
para_state_dict = paddle.load("./checkpoints/caltech_dataset_last")
model = CaltechModel()
model.set_state_dict(para_state_dict)

infer_image_path = r"E:\Python\CaltechClassification\data\dataset\dataset\test.txt"
infer_images = open(infer_image_path, 'r').readlines()

infer_image_list = []

for infer_image in infer_images:
    infer_image_list.append(load_image(os.path.join(train_parameters["image_path"], infer_image).replace("\n", "")))

# infer_image_list = np.array(infer_image_list)

outs = []

for i in range(len(infer_image_list)):
    dy_x_data = np.array(infer_image_list[i]).astype('float32')
    dy_x_data = dy_x_data[np.newaxis, :, :, :]
    img = paddle.to_tensor(dy_x_data)
    out = model(img)
    lab = np.argmax(out.numpy())  #argmax():返回最大数的索引
    image = infer_images[i].replace("\n", "")
    outs.append(f"{image}\t{lab}\n")

with open("result.txt", "w") as f:
    for out in outs:
        f.write(out)
print("结束")