# Image Classification Source Code
**Studeas 2023 fall**

In [5]:
import numpy as np
import pandas as pd
import os
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from tqdm import tqdm
from PIL import Image
from torchvision import transforms
from torch.utils.data import Dataset
from torch.utils.data import DataLoader, Dataset, random_split

## 数据集预处理
为了便于使用pytorch进行后续计算与训练，需要预处理数据集（图片、标签文件）。class CustomDataset继承torch.utils.data中的Dataset类，允许我们处理自定义数据集。

In [6]:
# 自定义数据集
class CustomDataset(Dataset):
    # 初始化数据集方法
    def __init__(self, root_dir, csv_file, characteristics, num_classes, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.characteristics = characteristics
        self.num_classes = num_classes
        self.characteristic_indices = self._get_characteristic_indices(csv_file)
        self.labels = self._load_labels_from_csv(csv_file)

    # 获取指定索引的图片和标签
    def __getitem__(self, idx):
        img_name = sorted(os.listdir(self.root_dir))[idx]
        img_path = os.path.join(self.root_dir, img_name)
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        label = self.labels[img_name]
        return image, torch.tensor(label, dtype=torch.long)

    # 求size of dataset的方法
    def __len__(self):
        return len(os.listdir(self.root_dir))

    # 获取我们关心的特征列表在csv文件中的索引
    def _get_characteristic_indices(self, csv_file):
        df = pd.read_csv(csv_file)
        columns = df.columns[1:].tolist()
        indices = [columns.index(c) for c in self.characteristics[:self.num_classes - 1]]
        return indices

    # 从csv文件中加载标签
    def _load_labels_from_csv(self, csv_file):
        df = pd.read_csv(csv_file)
        labels = {row[0]: self._determine_label(row[1:].values.tolist()) for _, row in df.iterrows()}
        return labels

    # 根据特征确定标签
    def _determine_label(self, label_vector):
        for rank, i in enumerate(self.characteristic_indices):
            if label_vector[i] == 1:
                return rank
        return self.num_classes - 1
 
# 数据格式调整
transform_pipeline = transforms.Compose([
    # reshape/resize
    transforms.Resize((224, 224)), 
    # 转化为张量
    transforms.ToTensor(), 
    # 正则化
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) 
])


# 实例化，得到dataset
dataset = CustomDataset(
    # 图片文件夹、标签文件地址
    root_dir='./data_face_imgs/images',
    csv_file='./data_face_imgs/anno.csv',
    # 二分类问题，是否微笑
    characteristics=['Smiling', 'Others'],
    num_classes=2,
    # 五分类问题，发色分类
    #characteristics=['Black_Hair','Blond_Hair','Brown_Hair','Gray_Hair','Others'],
    #num_classes=5,
    # 格式调整
    transform=transform_pipeline
)

# 打印数据集前10项，包含图片编号和对应标签值（模块功能测试）
print(f"Dataset size: {len(dataset)}")
for i in range(10):
    image, label = dataset[i]
    print(f"Image {i}: Label - {label}")


Dataset size: 50000
Image 0: Label - 1
Image 1: Label - 1
Image 2: Label - 0
Image 3: Label - 0
Image 4: Label - 0
Image 5: Label - 1
Image 6: Label - 0
Image 7: Label - 1
Image 8: Label - 0
Image 9: Label - 0


## 定义一个卷积神经网络CNN
CNN包含两个卷积池化层和两个全连接层。模型并不复杂，但是足够处理本次图像分类问题。

In [7]:
class CNN(nn.Module):
    # 初始化方法，CNN包含两个卷积池化层和两个全连接层
    def __init__(self, num_classes):
        super(CNN, self).__init__()
        # 卷积层1，输入通道3，输出通道16，卷积核3*3
        self.conv1 = nn.Conv2d(3, 16, 3, padding=1)
        # 最大池化层，窗口大小和步长都是2
        self.pool = nn.MaxPool2d(2, 2)
        # 卷积层2，输入通道16，输出通道32，卷积核3*3
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        # 全连接层1
        self.fc1 = nn.Linear(32 * 56 * 56, 512)  
        # 全连接层2
        self.fc2 = nn.Linear(512, num_classes)

    # 前向传播方法
    def forward(self, x):
        # 卷积与池化
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        # 数据展平 Flatten
        x = x.view(-1, 32 * 56 * 56) 
        # 全连接层
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x


## 模型训练、测试
设置训练参数，划分训练集测试集，训练与评估。

In [8]:
class Model4Classify:
    # 初始化方法
    def __init__(self) -> None:
        # 任务种类（二分类、五分类），需要手动设置修改
        self.num_classes = 2
        # 设置device，cuda or cpu
        self.setup_device()
        # 设置模型训练参数
        self.setup_model_parameters()
        # 载入数据集
        self.setup_dataset()
        # 设置数据集分割，得到训练集、测试集
        self.setup_dataset_split()
        # 设置训练结果存储位置
        self.setup_address()
        # 设置随机数
        self.setup_random_seeds()

    # 设置device，cuda or cpu
    def setup_device(self):
        # 自动选择
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        print("My device is: ", self.device)
  
    # 设置模型训练参数
    def setup_model_parameters(self):  
        # 手动设置batch_size和num_epochs    
        self.batch_size = 64
        self.num_epochs = 5

        # 选择模型、损失函数、优化器
        self.model = CNN(self.num_classes).to(self.device)
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = optim.Adam(self.model.parameters())
    
    # 载入数据集
    def setup_dataset(self):
        # 数据集已经在CustomDataset实例化过程中完成预处理，此处只需载入
        self.all_data = dataset

    # 数据集分割，得到训练集和测试集
    def setup_dataset_split(self):
        # 设定分割比例，计算训练集和测试集的大小
        total_size = len(self.all_data)
        train_ratio = 0.8
        test_ratio = 0.2  
        train_size = int(total_size * train_ratio)
        test_size = total_size - train_size  

        # 使用 random_split 进行分割
        self.train_dataset, self.test_dataset = random_split(self.all_data, [train_size, test_size])
        
        # 创建 DataLoader--train_loader and test_loader
        self.train_loader = DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True)
        self.test_loader = DataLoader(self.test_dataset, batch_size=self.batch_size, shuffle=False)

    # 设置训练结果存储位置
    def setup_address(self):
        # 训练结果
        self.result_path = 'result.txt'

    # 设置随机数
    def setup_random_seeds(self):
        # 随机数种子（手动设置）
        self.seed = 42
        # Python 随机数生成器
        random.seed(self.seed)  
        # Numpy 随机数生成器
        np.random.seed(self.seed)
        # PyTorch 随机数生成器
        torch.manual_seed(self.seed)   
        # PyTorch CUDA 随机数生成器          
        torch.cuda.manual_seed(self.seed)       
        # PyTorch CUDA（所有GPU）随机数生成器 
        torch.cuda.manual_seed_all(self.seed)    
        # CUDA确定性算法
        torch.backends.cudnn.deterministic = True  
        torch.backends.cudnn.benchmark = False     

    # 模型训练方法
    def train_model(self):
        epochs = self.num_epochs
        for epoch in range(epochs):
            total_loss = 0
            for images, labels in tqdm(self.train_loader, desc=f"Epoch {epoch + 1}/{epochs}", unit="batch"):
                images, labels = images.to(self.device), labels.to(self.device)
                # 梯度清零
                self.optimizer.zero_grad()
                # 前向传播
                outputs = self.model(images)
                # 计算损失
                loss = self.criterion(outputs, labels)
                # 反向传播与优化
                loss.backward()
                self.optimizer.step()
                # 累计损失
                total_loss += loss.item()
            # 计算平均损失并打印
            avg_loss = total_loss / len(self.train_loader)
            print(f"Average training loss for Epoch {epoch + 1}: {avg_loss:.4f}")
            # 计算准确率Accuracy以评估模型并打印
            accuracy = self.evaluate_model()
            print(f'Accuracy: {accuracy:.2f}%')

            # 在result中写入累计损失、平均损失和准确率
            with open(self.result_path, 'a') as result_file:
                result_file.write(f'Epoch [{epoch+1}/{epochs}]\n')
                result_file.write(f'Total Loss: {total_loss}, Average Loss: {avg_loss}, Accuracy: {accuracy}\n')

    # 模型评估方法
    def evaluate_model(self):
        model = self.model
        correct = 0
        total = 0
        with torch.no_grad():
            # 遍历测试集，无梯度计算
            for images, labels in self.test_loader:
                images, labels = images.to(self.device), labels.to(self.device)
                # 模型预测
                outputs = model(images)
                _, predicted = torch.max(outputs.data, 1)
                # 累计总样本数
                total += labels.size(0)
                # 累计正确预测数
                correct += (predicted == labels).sum().item()
        # 计算并返回准确率
        accuracy = 100 * correct / total
        return accuracy


# 实例化Model4Classify，训练模型         
model_01 = Model4Classify()
model_01.train_model()

My device is:  cuda


Epoch 1/5: 100%|██████████| 625/625 [14:34<00:00,  1.40s/batch]


Average training loss for Epoch 1: 0.4587
Accuracy: 90.64%


Epoch 2/5: 100%|██████████| 625/625 [14:55<00:00,  1.43s/batch]


Average training loss for Epoch 2: 0.2021
Accuracy: 90.74%


Epoch 3/5: 100%|██████████| 625/625 [14:40<00:00,  1.41s/batch]


Average training loss for Epoch 3: 0.1647
Accuracy: 90.66%


Epoch 4/5: 100%|██████████| 625/625 [14:47<00:00,  1.42s/batch]


Average training loss for Epoch 4: 0.1257
Accuracy: 90.53%


Epoch 5/5: 100%|██████████| 625/625 [14:37<00:00,  1.40s/batch]


Average training loss for Epoch 5: 0.0855
Accuracy: 90.32%
