# 自动零售柜商品识别

## 【赛题任务】

针对自动零售柜的应用场景，给定一个零售商品的训练集，训练一个模型，对测试集中给定的商品的照片，要求识别出它的类别ID。以识别准确率作为评价指标，准确率定义为正确识别的次数除以识别总次数。零售柜商品采集图像如下图所示，流程是先检测每个商品，然后判断它的类别ID。本次比赛不包含商品检测部分，仅需要针对一个已经检测到的商品，识别它的ID。

## 【操作流程】

数据集目录如下：

![tree.png](../imagelib/tree.png)

为方便数据的调用，我们先对数据进行预处理，将其目录保存至txt文件。

考虑到中文目录和文件对读取和训练都不利，我们考虑先将中文标签映射成数字标签，并保存映射表。然后依据映射表将文件和文件夹重命名，最后将重命名好的文件的目录和标签保存成txt训练文件，其格式为：

图像目录0   图像标签0

图像目录1   图像标签1

图像目录2   图像标签2

……

In [None]:
import os
import shutil
from tqdm import tqdm
import json

data_root_path = r"D:/repos/product_identification/data_before"
new_data_root_path = r"D:/repos/product_identification/data"
categories = os.listdir(data_root_path)
index = {}
if not os.path.exists(new_data_root_path):
    os.mkdir(new_data_root_path)
if not os.path.exists("txt_data"):
    os.mkdir("txt_data")
try:
    total_txt = open('txt_data/total.txt', 'w', encoding="utf-8")
    for i, category in enumerate(tqdm(categories)):
        if not os.path.exists(os.path.join(new_data_root_path, str(i))):
            os.mkdir(os.path.join(new_data_root_path, str(i)))
        index[i] = category
        category_path = os.path.join(data_root_path, category)
        new_category_path = os.path.join(new_data_root_path, str(i))
        files = os.listdir(category_path)
        for j, file in enumerate(tqdm(files, leave=False)):
            file_path = os.path.join(category_path, file)
            new_file_path = os.path.join(new_category_path, str(i) + '_' + str(j) + '.jpg')
            shutil.copyfile(file_path, new_file_path)
            total_txt.write(new_file_path + '\t' + str(i) + '\n')
finally:
    total_txt.close()

with open('txt_data/index.json', 'w', encoding="utf-8") as f:
    json.dump(index, f)

将预处理好的数据按八二开划分成训练集和验证集，分别保存成两个文件train.txt和val.txt。

In [None]:
from sklearn.model_selection import train_test_split

with open("txt_data/total.txt", "r", encoding="utf-8") as f:
    lines = f.readlines()
train, val = train_test_split(lines, test_size=0.2, random_state=42)
try:
    f_train = open("txt_data/train.txt", "w", encoding="utf-8")
    f_val = open("txt_data/val.txt", "w", encoding="utf-8")
    f_train.writelines(train)
    f_val.writelines(val)
finally:
    f_train.close()
    f_val.close()

定义dataloader类

我们注意到数据集图像大小不一致，所以在加载图像后，进行resize处理。

In [None]:
from torch.utils.data import Dataset
import cv2

class MyDataset(Dataset):
    def __init__(self, txt_path, transform=None):
        with open(txt_path, "r", encoding="utf-8") as f:
            lines = f.readlines()
        self.imgs = []
        self.labels = []
        for line in lines:
            line = line.strip("\n").rstrip().split("\t")
            self.imgs.append(line[0])
            self.labels.append(int(line[1]))
        self.transform = transform

    def __getitem__(self, index):
        img_path = self.imgs[index]
        label = self.labels[index]
        img = cv2.resize(cv2.imread(img_path), (96, 192))
        if self.transform is not None:
            img = self.transform(img)
        return img, label

    def __len__(self):
        return len(self.imgs)

定义神经网络模型

你可以自己编写一些深度学习神经网络，让模型更好的适应数据集，这里我们使用pytorch预训练好的resnet34模型，其拥有更少的网络层数和不逊色的性能。

In [None]:
from torchvision import models

ResNet = models.resnet34(num_classes=101)

训练模型

In [None]:
import os
import torch
from torch import nn
from torchvision import transforms
from torch.utils.data import DataLoader

epochs = 200
is_cuda = True
best_acc = 0
batch_size = 32
if is_cuda:
    device = torch.device("cuda")
if not os.path.exists("output"):
    os.makedirs("output")

train_dataset = MyDataset("txt_data/train.txt", transform=transforms.ToTensor())
val_dataset = MyDataset("txt_data/val.txt", transform=transforms.ToTensor())
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)

optimizer = torch.optim.AdamW(ResNet.parameters(), lr=0.01, weight_decay=1e-3)
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [100, 120, 150], 0.1)
loss_func = nn.CrossEntropyLoss()

ResNet.to(device)

for epoch in range(epochs):
    # train
    ResNet.train()
    for i, (img, label) in enumerate(train_loader):
        img = img.to(device)
        label = label.to(device)
        output = ResNet(img)
        loss = loss_func(output, label)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    scheduler.step()
    
    # val
    ResNet.eval()
    with torch.no_grad():
        total = 0
        correct = 0
        for i, (img, label) in enumerate(val_loader):
            img = img.to(device)
            label = label.to(device)
            output = ResNet(img)
            _, predicted = torch.max(output.data, 1)
            total += label.size(0)
            correct += (predicted == label).sum().item()
        print(f"Epoch: {epoch}, Loss: {loss.item()}, Accuracy: {correct / total}")
        if correct / total > best_acc:
            best_acc = correct / total
            torch.save(ResNet.state_dict(), "output/best_model_" + f"{best_acc:.4f}" + ".pt")