In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install segmentation-models-pytorch

In [None]:
from glob import glob

import albumentations as A
from albumentations.pytorch import ToTensorV2
import cv2
import matplotlib.pyplot as plt
import numpy as np
import segmentation_models_pytorch as smp
import segmentation_models_pytorch.utils.metrics as smp_metrics
import segmentation_models_pytorch.utils.train as smp_train
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset


In [None]:
#今回の学習で必要になる処理を入れたDataset
class MyDataset(Dataset):
    def __init__(self, imgs, masks, transform):
        """
        imgs : 画像が入ったlist
        masks : 正解マスクが入ったlist
        transform : 画像やマスクに前処理を行う関数
        """
        self.imgs = imgs
        self.masks = masks
        self.transform = transform

    def __len__(self):
        return len(self.imgs)

    #処理を行う部分
    def __getitem__(self, idx):
        img = self.imgs[idx]
        mask = (self.masks[idx] > 0).astype(float)

        #画像とマスクに前処理を実施
        sample = self.transform(image=img, mask=mask)
        img, mask = sample['image'], sample['mask']

        return img, mask.unsqueeze(0) #maskは3チャンネルである必要あり（class, H, W）


In [None]:
img_path = sorted(glob("/content/drive/MyDrive/止まれセグメンテーション/dataset/image/*.jpg"))
img = [cv2.imread(i)[..., [2,1,0]] for i in img_path] #BGR→RGBで読み込み

mask_path = sorted(glob("/content/drive/MyDrive/止まれセグメンテーション/dataset/mask/*.png"))
mask = [cv2.imread(i, 0) for i in mask_path]

#今回は一旦後ろから5個の「止まれ」合計15枚をvalidationにする
train_img = img[:-15]
train_mask = mask[:-15]
valid_img = img[-15:]
valid_mask = mask[-15:]


In [None]:
#前処理の定義
#Composeを使用すると複数の処理を一気に行うことができる

#訓練用
train_transform = A.Compose([
    A.Resize(512,512),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2(), #numpy arrayをpytorchで使用するTensorに変換
])

#推論用
val_transform = A.Compose([
    A.Resize(512,512),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])

In [None]:
#バッチサイズは5
batch_size = 5

#Datasetを作成し、それをDataLoaderに渡す
train_dataset = MyDataset(train_img, train_mask, train_transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size,
                          drop_last=False, shuffle=True, num_workers=2)

valid_dataset = MyDataset(valid_img, valid_mask, val_transform)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size,
                          drop_last=False, shuffle=False, num_workers=2)

In [None]:
#######################モデル定義###########################

model = smp.Unet(
    encoder_name="tu-efficientnet_b0", #timmのモデルを使う際は先頭にtu-をつける
    encoder_weights="imagenet",
    in_channels=3,
    classes=1,
    encoder_depth=5,
)

# 損失関数
loss = nn.BCEWithLogitsLoss()
loss.__name__ = "bce_loss"

# 評価関数（今回はIoUを使用）
metrics = [
    smp_metrics.IoU(threshold=0.5, activation="sigmoid"),
]

# 最適化関数（今回はAdamを使用）
optimizer = optim.Adam(params=model.parameters(), lr=1e-4)

#使用デバイスの設定（今回はcudaが設定される）
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
# smpに用意されているシンプルなループ用クラス（train用）
train_epoch = smp_train.TrainEpoch(
    model,
    loss=loss,
    metrics=metrics,
    optimizer=optimizer,
    device=device,
    verbose=True,
)

# smpに用意されているシンプルなループ用クラス（valid用）
valid_epoch = smp_train.ValidEpoch(
    model,
    loss=loss,
    metrics=metrics,
    device=device,
    verbose=True,
)

In [None]:
# 学習ループの実行

n_epoch = 20 #学習epoch数
max_score = 0 #ベストのスコアを保持する用
#モデルの保存名
model_save_path = "/content/drive/MyDrive/止まれセグメンテーション/best_model.pth"

#n_epoch分学習ループを回す
for e in range(0, n_epoch):
    print(f'Epoch: {e+1}')

    #学習
    _ = train_epoch.run(train_loader)

    #評価
    valid_logs = valid_epoch.run(valid_loader)

    #もしvalidのIoUスコアが今までの最大値よりも大きかったらモデルの保存
    if max_score < valid_logs['iou_score']:
        max_score = valid_logs['iou_score']
        torch.save(model.state_dict(), model_save_path)
        print('Model saved!')

In [None]:
#保存したモデルの重みをロード
model.load_state_dict(torch.load(model_save_path, map_location='cpu'))
model.eval() #評価用モードに変更

idx = 0 #見たい画像のインデックス
valid_test, _ = valid_dataset[idx]
valid_test = valid_test.unsqueeze(0).to(device) #モデルに入力するための処理

#推論
with torch.no_grad():
    pred = model(valid_test)

#推論結果をsigmoid関数に通して確率値に変換 → 確率0.5以上の部分の領域を選択
pred = (nn.Sigmoid()(pred[0, 0]) > 0.5).detach().cpu().numpy()
pred = (pred * 255).astype(np.uint8)

img_shape = valid_img[idx].shape
pred = cv2.resize(pred, (img_shape[1], img_shape[0])) #元の画像サイズにリサイズ

In [None]:
plt.subplot(121)
plt.axis("off")
plt.imshow(valid_img[idx])

plt.subplot(122)
plt.axis("off")
plt.imshow(pred)
#plt.savefig("predict.png", bbox_inches="tight") #保存用