# 树叶图像分类

Kaggle: https://www.kaggle.com/c/classify-leaves

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset

import os
import numpy as np
import pandas as pd
from PIL import Image
import pytorch_lightning as pl

## 整理数据集

### 读取和预处理

In [2]:
train = pd.read_csv('./Leaves/data/train.csv')
test = pd.read_csv('./Leaves/data/test.csv')

train.head()

Unnamed: 0,image,label
0,images/0.jpg,maclura_pomifera
1,images/1.jpg,maclura_pomifera
2,images/2.jpg,maclura_pomifera
3,images/3.jpg,maclura_pomifera
4,images/4.jpg,maclura_pomifera


In [3]:
# 对label进行排序
labels = sorted(list(set(train['label'])))
n_classes = len(labels)
print(n_classes)
labels[:10]

176


['abies_concolor',
 'abies_nordmanniana',
 'acer_campestre',
 'acer_ginnala',
 'acer_griseum',
 'acer_negundo',
 'acer_palmatum',
 'acer_pensylvanicum',
 'acer_platanoides',
 'acer_pseudoplatanus']

In [4]:
# 把label转换成数字
class2num = dict(zip(labels, range(n_classes)))
# 把数字转回label
num2class = dict(zip(range(n_classes), labels))

In [5]:
# 创建自己的训练集Dataset
class TrainDataset(Dataset):
    def __init__(self, csv_path, file_path, transform=None):
        self.file_path = file_path
        self.to_tensor = transforms.ToTensor()
        self.transform = transform

        # 读取csv
        self.info = pd.read_csv(csv_path, header=None)
        # 第一列包含图像文件名称，第二列是label
        self.image_arr = np.asarray(self.info.iloc[1:, 0])  # 读取第一列，从第二行开始
        self.label_arr = np.asarray(self.info.iloc[1:, 1])
        # 计算length
        self.length = len(self.info.index) - 1

    def __getitem__(self, index):
        single_image_name = self.image_arr[index]
        # 读取图像
        img = Image.open(os.path.join(self.file_path, single_image_name))
        # 图像增强
        transform = transforms.Compose([
            transforms.RandomResizedCrop(224),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
        img = transform(img)
        # 读取label
        label = self.label_arr[index]
        num_label = class2num[label]
        return (img, num_label)

    def __len__(self):
        return self.length

In [6]:
# 创建自己的测试集Dataset
class TestDataset(Dataset):
    def __init__(self, csv_path, file_path, transform=None):
        self.file_path = file_path
        self.to_tensor = transforms.ToTensor()
        self.transform = transform

        # 读取csv
        self.info = pd.read_csv(csv_path, header=None)
        # 第一列包含图像文件名称，第二列是label
        self.image_arr = np.asarray(self.info.iloc[1:, 0])  # 读取第一列，从第二行开始
        # 计算长度
        self.length = len(self.info.index) - 1

    def __getitem__(self, index):
        single_image_name = self.image_arr[index]
        # 读取图像
        img = Image.open(os.path.join(self.file_path, single_image_name))
        # 图像增强
        transform = transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
        img = transform(img)
        return img

    def __len__(self):
        return self.length

In [7]:
train_path = './Leaves/data/train.csv'
test_path = './Leaves/data/test.csv'
# csv文件中已经包含image的路径，因此这里只到上一级目录
img_path = './Leaves/data/'

train_set = TrainDataset(train_path, img_path)
test_set = TestDataset(test_path, img_path)
print(train_set.info)
print(test_set.info)

                      0                        1
0                 image                    label
1          images/0.jpg         maclura_pomifera
2          images/1.jpg         maclura_pomifera
3          images/2.jpg         maclura_pomifera
4          images/3.jpg         maclura_pomifera
...                 ...                      ...
18349  images/18348.jpg          aesculus_glabra
18350  images/18349.jpg  liquidambar_styraciflua
18351  images/18350.jpg            cedrus_libani
18352  images/18351.jpg      prunus_pensylvanica
18353  images/18352.jpg          quercus_montana

[18354 rows x 2 columns]
                     0
0                image
1     images/18353.jpg
2     images/18354.jpg
3     images/18355.jpg
4     images/18356.jpg
...                ...
8796  images/27148.jpg
8797  images/27149.jpg
8798  images/27150.jpg
8799  images/27151.jpg
8800  images/27152.jpg

[8801 rows x 1 columns]


## 定义模型

In [8]:
class Classifier(pl.LightningModule):
    def __init__(self, hparams):
        super(Classifier, self).__init__()

        self.params = hparams
        self.num_classes = self.params['num_classes']
        self.lr = self.params['lr']
        self.batch_size = self.params['batch_size']
        self.weight_decay = self.params['weight_decay']

        # /*-------------- model_ResNet18 ----------------*/
        self.arch = torchvision.models.resnet50(pretrained=True)
        num_ftrs = self.arch.fc.in_features
        self.arch.fc = nn.Linear(num_ftrs, self.num_classes)
        # /*-------------- model_ResNet18 ----------------*/
        
    def forward(self, x):
        return self.arch(x)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.lr, weight_decay=self.weight_decay)
        return optimizer

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self.forward(x)
        loss = F.cross_entropy(y_hat, y)
        
        _, pred = torch.max(y_hat, dim=1)
        acc = torch.sum(pred == y.data) / (y.shape[0] * 1.0)
        return {'loss': loss, 'train_acc': acc}

    def training_epoch_end(self, outputs):
        avg_loss = torch.stack([x['loss'] for x in outputs]).mean()
        avg_acc = torch.stack([x['train_acc'] for x in outputs]).mean()
        self.log('step', self.trainer.current_epoch)
        self.log('avg_loss', avg_loss)
        self.log('avg_acc', avg_acc)

    def train_dataloader(self):
        train_loader = DataLoader(train_set, batch_size=self.batch_size, shuffle=True)
        return train_loader
    
    def test_dataloader(self):
        test_loader = DataLoader(test_set, batch_size=self.batch_size, shuffle=False)
        return test_loader

In [9]:
torch.cuda.empty_cache()
hparams = {'num_classes': n_classes, 'lr': 1e-4, 'batch_size': 32, 'weight_decay': 1e-3}

torch.cuda.empty_cache()
model = Classifier(hparams)
# trainer = pl.Trainer(max_epochs=5)
trainer = pl.Trainer(gpus=1, max_epochs=30)
trainer.fit(model)

GPU available: True, used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name | Type   | Params
--------------------------------
0 | arch | ResNet | 23.9 M
--------------------------------
23.9 M    Trainable params
0         Non-trainable params
23.9 M    Total params
95.475    Total estimated model params size (MB)
  rank_zero_warn(


Training: 0it [00:00, ?it/s]

  rank_zero_warn("Detected KeyboardInterrupt, attempting graceful shutdown...")


In [13]:
# 返回长度为batch数的list，每个元素是包含batch_size个样本的tensor
predictions = trainer.predict(model, model.test_dataloader())

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
  rank_zero_warn(


Predicting: 288it [00:00, ?it/s]

In [26]:
preds = []
for batch in predictions:
    preds.extend(batch.argmax(dim=-1).numpy())

# 得到最终预测结果
classes = [num2class[i] for i in preds]
len(classes)

test_data = pd.read_csv(test_path)
test_data['label'] = pd.Series(classes)
submission = pd.concat([test_data['image'], test_data['label']], axis=1)
submission.to_csv('submission.csv', index=False)

In [None]:
%reload_ext tensorboard
%tensorboard --logdir ./lightning_logs