In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install segmentation-models-pytorch

In [None]:
from glob import glob

import albumentations as A
from albumentations.pytorch import ToTensorV2
import cv2
import matplotlib.pyplot as plt
import numpy as np
import segmentation_models_pytorch as smp
from segmentation_models_pytorch.base import SegmentationHead
import segmentation_models_pytorch.utils.metrics as smp_metrics
import segmentation_models_pytorch.utils.train as smp_train
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset


In [None]:
# 今回の学習で必要になる処理を入れたDataset
class MyDataset(Dataset):
    def __init__(self, imgs, masks, transform):
        """
        imgs : 画像が入ったlist
        masks : 正解マスクが入ったlist
        transform : 画像やマスクに前処理を行う関数
        """
        self.imgs = imgs
        self.masks = masks
        self.transform = transform

    def __len__(self):
        return len(self.imgs)

    # 処理を行う部分
    def __getitem__(self, idx):
        img = self.imgs[idx]
        mask = (self.masks[idx] > 0).astype(float)

        # 画像とマスクに前処理を実施
        sample = self.transform(image=img, mask=mask)
        img, mask = sample['image'], sample['mask']

        return img, mask.unsqueeze(0) # maskは3チャンネルである必要あり（class, H, W）


In [None]:
# bce lossとdice lossを組み合わせたloss
class BCE_Dice_Loss(smp.utils.base.Loss):
    def __init__(self, alpha, **kwargs):
        # loss = alpha * bce + (1 - alpha) * dice_loss
        super().__init__(**kwargs)
        self.bce = nn.BCEWithLogitsLoss()
        self.dice_loss = smp.losses.DiceLoss(mode="binary",
                                             log_loss=True, from_logits=True)
        self.alpha = alpha

    def forward(self, y_pr, y_gt):
        loss = self.alpha * self.bce(y_pr, y_gt.float()) \
             + (1 - self.alpha) * self.dice_loss(y_pr, y_gt.float())
        return loss

In [None]:
# 前処理の定義
# Composeを使用すると複数の処理を一気に行うことができる

# 訓練用
train_transform = A.Compose([
    A.ShiftScaleRotate(scale_limit=0.1, rotate_limit=10, p=0.5), # ランダムで画像を拡大縮小+回転
    A.Resize(640,640),
    A.HorizontalFlip(p=0.5), #ランダムで画像を水平反転
    A.VerticalFlip(p=0.5), #ランダムで画像を垂直反転
    A.RandomBrightnessContrast(brightness_limit=0.1, contrast_limit=0.1, p=0.5),  # ランダムで画像の明るさとコントラストを変える
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2(), #numpy arrayをpytorchで使用するTensorに変換
])

# 推論用
val_transform = A.Compose([
    A.Resize(640,640),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])

In [None]:
# data loaderを作成する関数
def make_dataloader(imgs, masks, transforms, batch_size, shuffle=True):
    dataset = MyDataset(imgs, masks, transforms)
    loader = DataLoader(dataset, batch_size=batch_size,
                        drop_last=False, shuffle=shuffle, num_workers=2)

    return loader

# 学習に使用するモデル類を作成する関数
def make_model(device):

    model = smp.Unet(
        encoder_name="tu-convnext_tiny.fb_in1k", #timmのモデルを使う際は先頭にtu-をつける
        encoder_weights="imagenet",
        in_channels=3,
        classes=1,
        encoder_depth=4,
        decoder_attention_type="scse", #追加モジュール
        decoder_channels=[256, 128, 64, 32], #convnextはエンコーダの出力数が4個なのでデコーダの調整が必要
    )
    #convnextを用いると出力サイズが元の1/2になっているので、headの部分だけ置き換えてあげる
    model.segmentation_head = SegmentationHead(in_channels=32, out_channels=1, upsampling=2)  # サイズが2倍になる


    # 損失関数
    loss = BCE_Dice_Loss(0.5) #BCE + Dice lossを使用

    # 評価関数（今回はIoUを使用）
    metrics = [
        smp_metrics.IoU(threshold=0.5, activation="sigmoid"),
    ]

    # 最適化関数（今回はAdamWを使用）
    optimizer = optim.AdamW(params=model.parameters(), lr=5e-5) #学習率を小さめに設定

    # smpに用意されているシンプルなループ用クラス（train用）
    train_epoch = smp_train.TrainEpoch(
        model,
        loss=loss,
        metrics=metrics,
        optimizer=optimizer,
        device=device,
        verbose=True,
    )

    # smpに用意されているシンプルなループ用クラス（valid用）
    valid_epoch = smp_train.ValidEpoch(
        model,
        loss=loss,
        metrics=metrics,
        device=device,
        verbose=True,
    )

    return model, train_epoch, valid_epoch

In [None]:
# テストデータの評価時に使用するループ処理を行う関数 TTAを行って予測
def test_loop(model, data_loader, metric, device):
    """
    model : 学習済みのモデル
    data_loader : テストのデータローダー
    metric : 評価関数（今回はIoU）
    device : GPU or CPU
    """

    metric_list = [] # テスト画像のIoUの結果を保持するためのリスト
    for x, y in data_loader:
        # TTA用の画像を用意する
        # 通常画像、垂直反転画像、水平反転画像の3種類を作成し予測を行う
        x = torch.cat([x, torch.flip(x, [-2]), torch.flip(x, [-1])], 0)
        x, y = x.to(device), y.to(device)
        with torch.no_grad():
            y_pred = torch.sigmoid(model(x)) #事前にsigmoid
        # 作成した画像の内、垂直反転画像、水平反転画像を元に戻して3つの予測の平均を取る
        y_pred = y_pred[0:1] + torch.flip(y_pred[1:2], [-2]) + torch.flip(y_pred[2:3], [-1])
        y_pred = y_pred / 3.0
        # IoU計算
        metric_value = metric(y_pred, y).cpu().detach().numpy()
        metric_list.append(float(metric_value))

    return metric_list

In [None]:
img_path = sorted(glob("/content/drive/MyDrive/止まれセグメンテーション/dataset/image/*.jpg"))
img = [cv2.imread(i)[..., [2,1,0]] for i in img_path] # BGR→RGBで読み込み

mask_path = sorted(glob("/content/drive/MyDrive/止まれセグメンテーション/dataset/mask/*.png"))
mask = [cv2.imread(i, 0) for i in mask_path]

num_tomare = len(img_path) // 3 # 「止まれ」の組数
idxes = list(range(num_tomare)) # 分割に使用するため事前にインデックスのリストを用意


In [None]:
# fold数は6
n_splits = 6

# 「止まれ」の組を6等分する
fold_idxes = []
for i in range(n_splits):
    fold = [idxes[j] for j in range(i, num_tomare, n_splits)]
    fold_idxes.append(fold)
fold_idxes

[[0, 6, 12, 18, 24],
 [1, 7, 13, 19, 25],
 [2, 8, 14, 20, 26],
 [3, 9, 15, 21, 27],
 [4, 10, 16, 22, 28],
 [5, 11, 17, 23, 29]]

In [None]:
# 全ての分割が評価に使用されるように順繰りにしたインデックスを用意
folds = [[i%n_splits for i in range(j, j+n_splits)] for j in range(n_splits)]
folds

[[0, 1, 2, 3, 4, 5],
 [1, 2, 3, 4, 5, 0],
 [2, 3, 4, 5, 0, 1],
 [3, 4, 5, 0, 1, 2],
 [4, 5, 0, 1, 2, 3],
 [5, 0, 1, 2, 3, 4]]

fold 0 は folds[0]を使用し、fold_idxes[[0,1,2,3]]の「止まれ」を学習に、fold_idxes[4]をvalidationに、fold_idxes[5]をテストに使用する。

同様に、
fold 1 は folds[1]を使用し、fold_idxes[[1,2,3,4]]の「止まれ」を学習に、fold_idxes[5]をvalidationに、fold_idxes[0]をテストに使用する。

これをfold 0 〜 5の6回行う。

In [None]:
# バッチサイズは5
batch_size = 5
# 学習epoch数 Data augmentationを追加したので長めに学習
n_epoch = 40

# 使用デバイスの設定（今回はcudaが設定される）
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

test_iou_list = [] # 各foldで行われたテストの評価結果を保持

# 順番にfoldの学習・評価を行う
for enum, fold in enumerate(folds):
    print(f"fold : {enum+1}")

    train_img, train_mask = [], []

    # foldのリストの最初の4つは訓練データ
    for f in fold[:4]:
        img_tmp = [img[i*3 + j] for i in fold_idxes[f] for j in range(3)]
        mask_tmp = [mask[i*3 + j] for i in fold_idxes[f] for j in range(3)]

        train_img += img_tmp
        train_mask += mask_tmp

    # 5番目はvalid
    valid_img = [img[i*3 + j] for i in fold_idxes[fold[-2]] for j in range(3)]
    valid_mask = [mask[i*3 + j] for i in fold_idxes[fold[-2]] for j in range(3)]
    # 6番目はtest
    test_img = [img[i*3 + j] for i in fold_idxes[fold[-1]] for j in range(3)]
    test_mask = [mask[i*3 + j] for i in fold_idxes[fold[-1]] for j in range(3)]

    # data loaderを作成
    train_loader = make_dataloader(train_img, train_mask, train_transform,
                                   batch_size, True)
    valid_loader = make_dataloader(valid_img, valid_mask, val_transform,
                                   batch_size, False)
    test_loader = make_dataloader(test_img, test_mask, val_transform,
                                  1, False) # テストの評価はbatch_sizeを1に

    model, train_epoch, valid_epoch = make_model(device)

    # 学習ループの実行
    max_score = 0 # ベストのスコアを保持する用
    # モデルの保存名(適宜名前を変えてください)
    model_save_path = f"/content/drive/MyDrive/止まれセグメンテーション/best_model_{enum}_v2.pth"

    # n_epoch分学習ループを回す
    for e in range(0, n_epoch):
        print(f'Epoch: {e+1}')

        # 学習
        _ = train_epoch.run(train_loader)

        # 評価
        valid_logs = valid_epoch.run(valid_loader)

        # もしvalidのIoUスコアが今までの最大値よりも大きかったらモデルの保存
        if max_score < valid_logs['iou_score']:
            max_score = valid_logs['iou_score']
            torch.save(model.state_dict(), model_save_path)
            print('Model saved!')

    # 保存したモデルの重みをロード
    model.load_state_dict(torch.load(model_save_path, map_location='cpu'))
    model.eval() # 評価用モードに変更

    # テスト評価
    iou = smp_metrics.IoU(threshold=0.5)
    test_iou = test_loop(model, test_loader, iou, device)

    print(np.mean(test_iou))
    test_iou_list.append(test_iou)

In [None]:
# IoUの平均をみる
print(np.array(test_iou_list).mean())

0.9288006006316653
