# 衛星データの画像分類

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/RyoWakabayashi/elixir-learning/blob/main/colab/PyTorch_EfficientNetV2_TransferLearning.ipynb)

衛星データ（可視光による衛星写真）を以下の種類に分類する

- cloudy: くもり
- desert: 砂漠
- green_area: 緑地
- water: 水

## ONNX のインストール

In [None]:
!pip install onnx

## モジュールのインポート

In [None]:
import copy
import math
import os
import random
import time

import matplotlib.pyplot as plt
import numpy as np
import shutil
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision

from glob import glob
from google.colab import drive

from torchvision import datasets, models, transforms

## データの準備

Google Drive 上の space/satellite.zip に Kaggle のデータを配置しておく

https://www.kaggle.com/datasets/mahmoudreda55/satellite-image-classification

In [None]:
drive.mount("/content/drive")

In [None]:
!cp /content/drive/MyDrive/space/satellite.zip .
!unzip satellite.zip

クラス名一覧を取得する

In [None]:
classes = [os.path.basename(directory) for directory in sorted(glob("./data/*"))]

classes

テストデータを各クラスから25枚取得する

In [None]:
os.makedirs("./test_data")

for class_name in classes:
  os.makedirs(f"./test_data/{class_name}")
  target_files = random.sample(glob(f"./data/{class_name}/*.jpg"), 25)
  for target_file in target_files:
    basename = os.path.basename(target_file)
    shutil.move(target_file, f"./test_data/{class_name}/{basename}")

残りのデータから各クラス8割をトレーニングデータとして取得する

In [None]:
os.makedirs("./train_data")

for class_name in classes:
  os.makedirs(f"./train_data/{class_name}")
  all_files = glob(f"./data/{class_name}/*.jpg")
  num_of_targets = math.floor(len(all_files) * 0.8)
  target_files = random.sample(all_files, num_of_targets)
  for target_file in target_files:
    basename = os.path.basename(target_file)
    shutil.move(target_file, f"./train_data/{class_name}/{basename}")

残りを評価データにする

In [None]:
shutil.move("./data", "./val_data")

## モデル定義

EfficientNet V2 の ImageNet データセット学習済モデルを転移学習元とする

In [None]:
model = models.efficientnet_v2_m(weights=models.EfficientNet_V2_M_Weights.IMAGENET1K_V1)

In [None]:
for param in model.parameters():
    param.requires_grad = False

In [None]:
model.classifier

出力層を4種類のクラス分類用に変更する

In [None]:
num_ftrs = model.classifier[1].in_features
model.classifier[1] = nn.Linear(num_ftrs, len(classes))

In [None]:
model.classifier

## データロード定義

データの読み込み方を定義する

- EfficientNet V2 M の入力層に合わせた画像サイズに変換する
- トレーニングデータは水平・垂直反転、回転・移動・拡大・縮小、台形変換、明度・コントラスト・彩度変化をランダムに発生させる

In [None]:
data_transforms = {
    "train": transforms.Compose([
        transforms.Resize(256),
        transforms.RandomHorizontalFlip(0.5),
        transforms.RandomVerticalFlip(0.5),
        transforms.RandomAffine(degrees=[-10, 10], translate=(0.1, 0.1), scale=(0.9, 1.1)),
        transforms.RandomPerspective(distortion_scale=0.1, p=0.9),
        transforms.ColorJitter(brightness=0.3, contrast=0.2, saturation=0.2),
        transforms.CenterCrop(224),
        transforms.ToTensor()
    ]),
    "val": transforms.Compose([
        transforms.Resize(224),
        transforms.ToTensor()
    ]),
    "test": transforms.Compose([
        transforms.Resize(224),
        transforms.ToTensor()
    ]),
}

In [None]:
data_kind = ["train", "val", "test"]

image_datasets = {x: datasets.ImageFolder(f"{x}_data", data_transforms[x])
                  for x in data_kind}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=4,
                                             shuffle=(x != "test"), num_workers=2)
              for x in data_kind}
dataset_sizes = {x: len(image_datasets[x]) for x in data_kind}
class_names = image_datasets["train"].classes

In [None]:
def imshow(inp, title=None):
    """Imshow for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)

ランダム変化の様子を確認する

In [None]:
inputs, classes = next(iter(dataloaders['train']))

out = torchvision.utils.make_grid(inputs)

imshow(out, title=[class_names[x] for x in classes])

## トレーニング定義

デバイスを取得する（GPU が使えれば GPU 、そうでなければ CPU）

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

過学習を防ぐため、早期終了を定義する

In [None]:
class EarlyStopping:
    def __init__(self, patience=5, verbose=0):
        self.epoch = 0
        self.pre_loss = float('inf')
        self.patience = patience
        self.verbose = verbose

    def __call__(self, current_loss):
        if self.pre_loss < current_loss:
            self.epoch += 1

            if self.epoch > self.patience:
                if self.verbose:
                    print('early stopping')
                return True

        else:
            self.epoch = 0
            self.pre_loss = current_loss

        return False

誤差、精度の推移をグラフとして保存する

In [None]:
def save_plots(value_dict, label):
    plt.figure(figsize=(10, 7))
    plt.plot(
        value_dict['train'], color='blue', linestyle='-',
        label=f"train {label}"
    )
    plt.plot(
        value_dict['val'], color='orange', linestyle='-',
        label=f"validataion {label}"
    )
    plt.xlabel('Epochs')
    plt.ylabel(label.capitalize())
    plt.legend()
    plt.savefig(f"./{label}.png")

トレーニングを定義する

In [None]:
def train_model(model, criterion, optimizer, early_stopping, num_epochs=25):
    since = time.time()

    loss_dict = {'train': [], 'val': []}
    acc_dict = {'train': [], 'val': []}
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    stop = False

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            loss_dict[phase].append(epoch_loss)
            acc_dict[phase].append(epoch_acc.to('cpu').detach().numpy().copy())

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            if phase == 'val':
                if early_stopping(epoch_loss):
                    stop = True

                if epoch_acc > best_acc:
                    best_acc = epoch_acc
                    best_model_wts = copy.deepcopy(model.state_dict())

        if stop:
            break

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    model.load_state_dict(best_model_wts)
    return model, loss_dict, acc_dict

## トレーニング実行

In [None]:
model = model.to(device)

# 損失関数
criterion = nn.CrossEntropyLoss()

# 最適化関数
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False)

# 早期終了
early_stopping = EarlyStopping(patience=5, verbose=1)

# トレーニング実行
model, loss_dict, acc_dict = train_model(model, criterion, optimizer, early_stopping, num_epochs=25)

モデルを保存する

In [None]:
torch.save(model.state_dict(), "./efficientnet_v2_m.pth")

誤差推移のグラフを保存する

In [None]:
save_plots(loss_dict, "loss")

精度推移のグラフを保存する

In [None]:
save_plots(acc_dict, "accuracy")

各出力を Google Drive に保存する

In [None]:
!mkdir "./drive/MyDrive/space/model"
!cp "./efficientnet_v2_m.pth" "./drive/MyDrive/space/model/"
!cp "./loss.png" "./drive/MyDrive/space/model/"
!cp "./accuracy.png" "./drive/MyDrive/space/model/"

テストデータを Google Drive に保存する

In [None]:
!zip -r "test_data.zip" "test_data"
!cp "test_data.zip" "./drive/MyDrive/space/"

## 精度検証

In [None]:
def visualize_model(model, dataset, num_images=12):
    model.eval()
    images_so_far = 0
    correct = 0
    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloaders[dataset]):
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            for j in range(inputs.size()[0]):
                if labels[j] == preds[j]:
                    correct += 1
                images_so_far += 1
                print(f"correct: {class_names[labels[j]]}")
                print(f"predicted: {class_names[preds[j]]}")
                imshow(inputs.cpu().data[j])

                if images_so_far == num_images:
                    return correct / num_images

評価データに対する精度

In [None]:
visualize_model(model, "val")

保存したモデルをロードする

In [None]:
saved_model = models.efficientnet_v2_m()

In [None]:
num_ftrs = saved_model.classifier[1].in_features
saved_model.classifier[1] = nn.Linear(num_ftrs, len(classes))

In [None]:
saved_model.load_state_dict(torch.load("efficientnet_v2_m.pth"))

In [None]:
saved_model = saved_model.to(device)

テストデータに対する精度

In [None]:
visualize_model(saved_model, "test", 100)

## ONNX 形式への変換

In [None]:
onnx_file = "efficientnet_v2_m.onnx"

dummy_img = torch.zeros(1, 3, 224, 224)

cpu_device = torch.device("cpu")

cpu_model = saved_model.to(cpu_device)
dummy_img.to(cpu_device)

torch.onnx.export(cpu_model, dummy_img, onnx_file, verbose=False, opset_version=12, input_names=['images'],
                  output_names=['predictions'],
                  dynamic_axes={'images': {0: 'batch_size'},})

onnx_model = onnx.load(onnx_file)
onnx.checker.check_model(onnx_model)

In [None]:
!cp "./efficientnet_v2_m.onnx" "./drive/MyDrive/space/model/"