# 転移学習　画像認識編

Pytorchを使って、事前学習済みモデルを用いた画像認識（転移学習）を行います。
今回は、蟻と蜂の画像データを用いて、2値分類を行います。

基本的にはreferenceにあるサイトを参考に行なっていますが、
一部箇所をオリジナルで分かりやすいコードに変更しています。

今回のコードのデータセットやモデル構造をいじることで自分が取り組みたい課題に応用できると思うので、
参考にしてみてください。

### reference
* [pytorch tutorial transfer learning](https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html)
* [日本語訳](https://torch.classcat.com/2021/04/07/pytorch-1-8-tutorials-beginner-transfer-learning/)

# 設定

In [1]:
import os

import glob

from PIL import Image

import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import models, transforms

In [2]:
device = 'cpu'
# GPUが使えるならGPUをデバイスとして設定
if torch.cuda.is_available:
    device = 'cuda'

# データのダウンロード

今回は、アリとハチの2種類の画像を含むデータを利用します

データのダウンロードは、以下の通り

1. ``` mkdir data ```
2. ``` curl https://download.pytorch.org/tutorial/hymenoptera_data.zip -o data/hymenoptera_data.zip ```
3. ``` unzip data/hymenoptera_data.zip -d data ```

上記のコマンドを実行すると、以下のようなディレクトリ構造になります
```
./
| - data
  | - hymenoptera_data
    | - train
      | - ants
        | - ~.jpg
        | - ~.jpg
        | - ~.jpg
      | - bees
        | - ~.jpg
        | - ~.jpg
        | - ~.jpg
    | - test
      | - ants
        | - ~.jpg
        | - ~.jpg
        | - ~.jpg
      | - bees
        | - ~.jpg
        | - ~.jpg
        | - ~.jpg
```

# データの読み込み

## データセット

In [3]:
class AntBeeDataset(torch.utils.data.Dataset):
    def __init__(self, data_root_path, is_train, transforms):
        super().__init__()

        # ラベルを文字列から数字へ変更する
        self.label_str_to_int = {'ants':0, 'bees':1}
        # 画像の変形等を行う処理
        self.transforms = transforms

        if is_train:
            # トレーニングデータの親ディレクトリへのパスを作成
            # ex : ~/data/hymenoptera_data/train/
            train_data_path = os.path.join(data_root_path, 'train')

            # ants(蟻)のファイルへのパスを全て取得、ラベルをセットで付与する
            # ex : [[~/data/hymenoptera_data/train/ants/01.jpg, 'ants'], ...]
            train_ants_path = os.path.join(train_data_path, 'ants', '*.jpg')
            train_ants_list = [[file_path, 'ants'] for file_path in glob.glob(train_ants_path)]
            # bees(蜂)のファイルへのパスを全て取得、ラベルをセットで付与する
            # ex : [[~/data/hymenoptera_data/train/bees/01.jpg, 'bees'], ...]
            train_bees_path = os.path.join(train_data_path, 'bees', '*.jpg')
            train_bees_list = [[file_path, 'bees'] for file_path in glob.glob(train_bees_path)]

            # 蟻と蜂のデータを結合して1つのデータにする
            self.data_list = train_bees_list + train_ants_list
        else:
            # テストデータの親ディレクトリへのパスを作成
            # ex : ~/data/hymenoptera_data/test/
            test_data_path = os.path.join(data_root_path, 'val')

            # ants(蟻)のファイルへのパスを全て取得、ラベルをセットで付与する
            # ex : [[~/data/hymenoptera_data/test/ants/01.jpg, 'ants'], ...]
            test_ants_path = os.path.join(test_data_path, 'ants', '*.jpg')
            test_ants_list = [[file_path, 'ants'] for file_path in glob.glob(test_ants_path)]
            # bees(蜂)のファイルへのパスを全て取得、ラベルをセットで付与する
            # ex : [[~/data/hymenoptera_data/test/bees/01.jpg, 'bees'], ...]
            test_bees_path = os.path.join(test_data_path, 'bees', '*.jpg')
            test_bees_list = [[file_path, 'bees'] for file_path in glob.glob(test_bees_path)]

            # 蟻と蜂のデータを結合して1つのデータにする
            self.data_list = test_bees_list + test_ants_list

    def load_image(self, file_path):
        # file_pathの画像を読み込む
        img = Image.open(file_path)
        img = img.convert("RGB")
        # Pillowからnpyに変換
        #img = np.array(img)
        
        return self.transforms(img)

    def __len__(self):
        # データの総数
        return len(self.data_list)

    def __getitem__(self, index):
        # データの取得
        # ex : [~/data/hymenoptera_data/test/ants/01.jpg, 'ants']
        data = self.data_list[index]
        # ファイルパスの取得
        # ex : ~/data/hymenoptera_data/test/ants/01.jpg
        file_path = data[0]
        # ラベルの取得
        # ex : 'ants'
        label = self.label_str_to_int[data[1]]

        # 画像の読み込み
        image = self.load_image(file_path)

        return image, torch.tensor(label)


In [4]:
# 訓練データに対する画像の変換処理
train_transforms = transforms.Compose([
                        transforms.RandomResizedCrop(224),
                        transforms.RandomHorizontalFlip(),
                        transforms.ToTensor(),
                        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
                    ])
# テストデータに対する画像の変換処理
test_transforms = transforms.Compose([
                        transforms.Resize(256),
                        transforms.CenterCrop(224),
                        transforms.ToTensor(),
                        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
                    ])

In [5]:
# train, test データセットを作成
train_dataset = AntBeeDataset(data_root_path='./data/hymenoptera_data', is_train=True, transforms=train_transforms)
test_dataset = AntBeeDataset(data_root_path='./data/hymenoptera_data', is_train=False, transforms=test_transforms)

In [6]:
# データローダーを設定する
train_loader = torch.utils.data.DataLoader(train_dataset,
                                           batch_size=16,
                                           shuffle=True,
                                           num_workers=1,
                                           pin_memory=True)

test_loader = torch.utils.data.DataLoader(test_dataset,
                                          batch_size=16,
                                          shuffle=False,
                                          num_workers=1,
                                          pin_memory=True)

# モデルの定義

In [7]:
# 事前学習済みのResNet152を読み込み
# 他のモデルに変更することもできる
# ex : model = models.vit_b_16(pretrained=True)
# 使えるモデル一覧は、https://pytorch.org/vision/0.14/models.html#classification
model = models.resnet152(pretrained=True)

In [8]:
# モデル構造を可視化
model

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [9]:
# モデルの最終層のクラス数が今回の問題に適さないので変更
# モデルをResNetから変えたときは、model.fcが変更になる可能性有
model.fc = nn.Linear(2048, 2)

# CPUからGPUへ
model = model.to(device=device)

# 実験

## 実験設定

In [10]:
#エポック数
epochs = 30
# 損失関数
criterion = nn.CrossEntropyLoss()
# 最適化関数
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

## 訓練

In [11]:
def train(data_loader, model, epochs, optimizer, criterion):
    # 訓練モードに変更
    model.train()
    
    # 学習時の誤差を蓄積する
    train_loss = []
    # エポック数分訓練を行う
    for epoch in range(epochs):

        # 1エポックの誤差
        train_batch_loss = [] 
        for images, labels in data_loader: # データローダーからバッチサイズ分のデータを取得
            images = images.to(device)     # 画像をGPUに
            labels = labels.to(device)     # ラベルをGPUに

            # 1. 勾配の初期化
            optimizer.zero_grad()
            # 2. 推論
            outputs = model(images)
            # 3. 誤差計算
            loss = criterion(outputs, labels)
            # 4. 誤差逆伝搬
            loss.backward()
            # 5. パラメータ更新
            optimizer.step()

            # 誤差の蓄積
            train_batch_loss.append(loss.item())

        train_loss.append(np.mean(train_batch_loss))

        # 10エポックごとに損失を確認
        if epoch % 10 == 0:
            print(f'epoch : {epoch}/{epochs}, train loss : {train_loss[-1]}')

    return model, train_loss

In [12]:
# 訓練を行う
model, train_loss = train(train_loader, model, epochs, optimizer, criterion)

epoch : 0/30, train loss : 0.5312371198087931
epoch : 10/30, train loss : 0.11506553619983606
epoch : 20/30, train loss : 0.10735079352161847


# テスト

In [None]:
def test(model, data_loader):
    # モデルを評価用に設定
    model.eval()

    # 予測結果とラベルを保存する用
    predictions = []
    labels = []
    with torch.no_grad(): # 勾配計算をしない
        for image, label in data_loader: # dataloaderから画像とラベルを取得
            image = image.to(device)     # 画像をGPUに
            label = label.to(device)     # ラベルをGPUに

            # 推論
            outputs = model(image)
            # 推論結果をラベルに変更
            _, preds = torch.max(outputs, 1)

            # 結果とラベルを蓄積
            predictions.append(preds)
            labels.append(label)

    # 予測とラベルを一次元に
    predictions = torch.cat(predictions, axis=0)
    labels = torch.cat(labels, axis=0)

    # .cpu()    : GPUからCPUに変更
    # .detach() : データを複製
    # .numpy()  : npy配列に変換
    predictions = predictions.cpu().detach().numpy()
    labels = labels.cpu().detach().numpy()

    return predictions, labels


In [None]:
# テストデータに対して推論を行う
predictions, labels = test(model, test_loader)

In [None]:
# 精度を計算
accuracy = np.sum(predictions == labels) / len(predictions)
print(f'accuracy : {accuracy}')

accuracy : 0.954248366013072
