# より少ないデータで試してみよう

### 問題設定
トレーニング用のデータが5枚しかない状況を想定してみましょう．
ただし，val, testはこれまでと同様の枚数とします．

In [None]:
import numpy as np
import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from PIL import Image
import glob

class ChestDataset(Dataset):
    def __init__(self, TrainValTest="train"):
        super().__init__()
        """
        どのデータを使うのかを記述する部分
        """
        if TrainValTest == "train":
            self.img_path_list = sorted(glob.glob("/content/drive/MyDrive/segmentation/data/small_train/org/*"))
            self.label_path_list = sorted(glob.glob("/content/drive/MyDrive/segmentation/data/small_train/label/*"))
        elif TrainValTest == "val":
            self.img_path_list = sorted(glob.glob("/content/drive/MyDrive/segmentation/data/val/org/*"))
            self.label_path_list = sorted(glob.glob("/content/drive/MyDrive/segmentation/data/val/label/*"))
        elif TrainValTest == "test":
            self.img_path_list = sorted(glob.glob("/content/drive/MyDrive/segmentation/data/test/org/*"))
            self.label_path_list = sorted(glob.glob("/content/drive/MyDrive/segmentation/data/test/label/*"))    
    
    def __len__(self):
        """
        データがいくつあるのかを数える
        """
        return len(self.img_path_list)
    
    def __getitem__(self, index):
        """
        データをどのような形で取り出すのか記述する
        """
        image_path = self.img_path_list[index] # ファイル名
        label_path = self.label_path_list[index] # ファイル名
        
        img = Image.open(image_path) # ファイル名を与えて画像を取り出す
        img = np.array(img) # 画像をnumpy形式の行列へ変換
        img = np.expand_dims(img, 0) # 1チャンネルであることを明示する（256, 256）→ (1, 256, 256)
        img = torch.tensor(img) # 行列をpytorchで扱える形式（tensor型）に変換する
        img = img / 255 # 0~255までの値を0~1までの値に変換する
        
        label = Image.open(label_path) # ファイル名を与えてラベルを取り出す
        label = np.array(label) # ラベルをnumpy形式の行列へ変換
        label = np.expand_dims(label, 0)
        label = torch.tensor(label)
        label = label = label / 255 # 肺野領域は255，それ以外は0となっているので，255で割って0 or 1に変換する
        label = label.float() # 行列の値をfloat型に変換する（pytorchの都合）
        
        return img, label

In [None]:
chest_train = ChestDataset(TrainValTest="train")
chest_val = ChestDataset(TrainValTest="val")
chest_test = ChestDataset(TrainValTest="test")

train_loader = DataLoader(chest_train, batch_size=5, shuffle=True)
val_loader = DataLoader(chest_val, batch_size=5, shuffle=False)
test_loader = DataLoader(chest_test, batch_size=5, shuffle=False)

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class UNet(nn.Module):

    def __init__(self, n_class=2, input_channel=1, output_channel=1):
        super(UNet, self).__init__()
        self.n_class = n_class
        
        self.input_channel = input_channel
        self.output_channel = output_channel
        
        self.enco1_1 = nn.Conv2d(self.input_channel, 64, kernel_size=3, stride=1, padding=1)
        self.enco1_2 = nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1)
        
        self.enco2_1 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.enco2_2 = nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1)

        self.enco3_1 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)
        self.enco3_2 = nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1)

        self.enco4_1 = nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1)
        self.enco4_2 = nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1)

        self.enco5_1 = nn.Conv2d(512, 1024, kernel_size=3, stride=1, padding=1)
        self.enco5_2 = nn.Conv2d(1024, 512, kernel_size=3, stride=1, padding=1)

        self.deco6_1 = nn.Conv2d(1024, 512, kernel_size=3, stride=1, padding=1)
        self.deco6_2 = nn.Conv2d(512, 256, kernel_size=3, stride=1, padding=1)

        self.deco7_1 = nn.Conv2d(512, 256, kernel_size=3, stride=1, padding=1)
        self.deco7_2 = nn.Conv2d(256, 128, kernel_size=3, stride=1, padding=1)

        self.deco8_1 = nn.Conv2d(256, 128, kernel_size=3, stride=1, padding=1)
        self.deco8_2 = nn.Conv2d(128, 64, kernel_size=3, stride=1, padding=1)

        self.deco9_1 = nn.Conv2d(128, 64, kernel_size=3, stride=1, padding=1)
        self.deco9_2 = nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1)

        self.final_layer = nn.Conv2d(64, self.output_channel, kernel_size=1)

        self.bn1_1 = nn.BatchNorm2d(  64)
        self.bn1_2 = nn.BatchNorm2d(  64)

        self.bn2_1 = nn.BatchNorm2d(  128)
        self.bn2_2 = nn.BatchNorm2d(  128)

        self.bn3_1 = nn.BatchNorm2d(  256)
        self.bn3_2 = nn.BatchNorm2d(  256)

        self.bn4_1 = nn.BatchNorm2d(  512)
        self.bn4_2 = nn.BatchNorm2d(  512)

        self.bn5_1 = nn.BatchNorm2d(  1024)
        self.bn5_2 = nn.BatchNorm2d(  512)

        self.bn6_1 = nn.BatchNorm2d(  512)
        self.bn6_2 = nn.BatchNorm2d(  256)

        self.bn7_1 = nn.BatchNorm2d(  256)
        self.bn7_2 = nn.BatchNorm2d(  128)

        self.bn8_1 = nn.BatchNorm2d(  128)
        self.bn8_2 = nn.BatchNorm2d(  64)

        self.bn9_1 = nn.BatchNorm2d(  64)
        self.bn9_2 = nn.BatchNorm2d(  64)

    def forward(self, x): 
        
        h1_1 = F.relu(self.bn1_1(self.enco1_1(x)))
        h1_2 = F.relu(self.bn1_2(self.enco1_2(h1_1)))
        pool1, pool1_indice = F.max_pool2d(h1_2, 2, stride=2, return_indices=True) 

        h2_1 = F.relu(self.bn2_1(self.enco2_1(pool1)))
        h2_2 = F.relu(self.bn2_2(self.enco2_2(h2_1)))
        pool2, pool2_indice = F.max_pool2d(h2_2, 2, stride=2, return_indices=True)  

        h3_1 = F.relu(self.bn3_1(self.enco3_1(pool2)))
        h3_2 = F.relu(self.bn3_2(self.enco3_2(h3_1)))
        pool3, pool3_indice = F.max_pool2d(h3_2, 2, stride=2, return_indices=True)  

        h4_1 = F.relu(self.bn4_1(self.enco4_1(pool3)))
        h4_2 = F.relu(self.bn4_2(self.enco4_2(h4_1)))
        pool4, pool4_indice = F.max_pool2d(h4_2, 2, stride=2, return_indices=True) 

        h5_1 = F.relu(self.bn5_1(self.enco5_1(pool4)))
        h5_2 = F.relu(self.bn5_2(self.enco5_2(h5_1)))
        
        up5 = F.max_unpool2d(h5_2, pool4_indice, kernel_size=2, stride=2, output_size=(pool3.shape[2], pool3.shape[3]))
        h6_1 = F.relu(self.bn6_1(self.deco6_1(torch.cat((up5, h4_2), dim=1))))
        h6_2 = F.relu(self.bn6_2(self.deco6_2(h6_1)))

        up6 = F.max_unpool2d(h6_2, pool3_indice, kernel_size=2, stride=2, output_size=(pool2.shape[2], pool2.shape[3]))
        h7_1 = F.relu(self.bn7_1(self.deco7_1(torch.cat((up6, h3_2), dim=1))))
        h7_2 = F.relu(self.bn7_2(self.deco7_2(h7_1)))

        up7 = F.max_unpool2d(h7_2, pool2_indice, kernel_size=2, stride=2, output_size=(pool1.shape[2], pool1.shape[3]))
        h8_1 = F.relu(self.bn8_1(self.deco8_1(torch.cat((up7, h2_2), dim=1))))
        h8_2 = F.relu(self.bn8_2(self.deco8_2(h8_1)))

        up8 = F.max_unpool2d(h8_2, pool1_indice, kernel_size=2, stride=2, output_size=(x.shape[2], x.shape[3]))
        h9_1 = F.relu(self.bn9_1(self.deco9_1(torch.cat((up8, h1_2), dim=1))))
        h9_2 = F.relu(self.bn9_2(self.deco9_2(h9_1)))

        predict = self.final_layer(h9_2)
        

        return torch.sigmoid(predict)

In [None]:
from torch.nn import BCELoss
import torch.optim as optim

# モデルの定義
model = UNet()

# GPUを使う場合は，下記のコメントを外す
#model = model.to("cuda")

# optimizer（勾配降下法のアルゴリズム）の準備
optimizer = optim.RAdam(model.parameters())

# 誤差関数の定義
criterion = BCELoss()

# 学習ループ
epochs = 100 #ミニバッチのサンプリングが一巡 = 1 epoch

train_loss_list = [] # epoch毎のtrain_lossを保存しておくための入れ物
val_loss_list = [] # epoch毎のvalidation_lossを保存しておくための入れ物

loss_min = 100000 # validation_lossが小さくなった場合にのみモデルを保存しておくためのメモ

# ここからループ開始
for epoch in range(epochs):
    
    train_loss_add = 0 # 1エポック分の誤差を累積しておくための変数
    
    model.train() #学習モードであることを明示
   
    for i, data in enumerate(train_loader):
        
        x, t = data # ①データの読み込み
        
        #GPU環境で動かす際は，下記2行のコメントを外す
        #x = x.to("cuda")
        #t = t.to("cuda")
        
        predict = model(x) # ②順伝播計算
        
        loss = criterion(predict, t) # ③誤差の計算
        
        model.zero_grad()# 誤差逆伝播法のための準備
        loss.backward() # ④誤差逆伝播法による誤差の計算
        
        optimizer.step() # ⑤勾配を用いてパラメータを更新
        
        train_loss_add += loss.data # あとで平均を計算するために，誤差を累積しておく
        
    train_loss_mean = train_loss_add / int(len(chest_train)/train_loader.batch_size) # 1epochでの誤差の平均を計算
    print("epoch" + str(epoch+1))
    print("train_loss:" + str(train_loss_mean))
    train_loss_list.append(train_loss_mean.cpu())# 1epoch毎の平均を格納しておく
    
    # validation
    model.eval() # 評価モード（学習を行わない）であることを明示
    
    val_loss_add = 0
    for i, data in enumerate(val_loader):
            
        x, t = data
        
        #GPU環境で動かす際は，下記2行のコメントを外す
        #x = x.to("cuda")
        #t = t.to("cuda")
        
        predict = model(x) # 順伝播計算
        
        loss = criterion(predict, t) # 誤差の計算
        val_loss_add += loss.data
        
    val_loss_mean = val_loss_add / int(len(chest_val)/val_loader.batch_size)
    print("val_loss:" + str(val_loss_mean))
    val_loss_list.append(val_loss_mean.cpu())
    
    if val_loss_mean < loss_min: # 前に保存したモデルよりもvalidation lossが小さければ，モデルを保存する
        torch.save(model.state_dict(), "/content/drive/MyDrive/segmentation/models/small_best.model")
        print("saved best model!")
        loss_min = val_loss_mean # 今回保存したモデルのvalidation lossをメモしておく

In [None]:
import matplotlib.pyplot as plt
plt.plot(train_loss_list, label="train") # train時のlossをplot
plt.plot(val_loss_list, label="loss") # validation時のlossをplot
plt.xlabel("epoch") # X軸のラベルを設定
plt.ylabel("loss") # Y軸のラベルを設定
plt.legend() # 凡例を追加（plot時に指定したlabelが使われる）

In [None]:
from sklearn.metrics import confusion_matrix # TP, TN, FP, FNを求めるための機能をインポート

def thresholding(inference,  threshold=0.5):
    # U-Netが出力するのは各ピクセルに対する確率値（0~1）．
    #ある閾値(threshold)を超えていたらそのピクセル値を255に変更する
    
    inference = inference.data.cpu() # 最後の一枚の１チャンネル目

    mask1 = inference >= threshold
    inference[mask1] = 255

    mask0 = inference < threshold
    inference[mask0] = 0
    
    return inference

def calc_all(tp, pp):
    # TP, TN, FP, FN, Accuracy, Precision, Recall, DSC, IoUを計算する関数
    
    mask = pp != 0
    pp[mask] = 1
    tn, fp, fn, tp = confusion_matrix(tp.flatten(), pp.flatten()).ravel() # TP, TN, FP, FNを計算
    presicion = tp / (tp + fp)
    recall = tp / (tp + fn)
    dice = tp / (tp + ((1/2)*(fp+fn)))
    iou = tp / (tp + fp + fn)
    return presicion, recall, dice, iou


model = UNet(input_channel=1, output_channel=1) # テストに使うためのU-Netを改めて定義

#GPU環境で動かす際は，下記のコメントを外す
#model = model.to("cuda")

#高屋が予め用意した学習済みモデルを使う場合は，small_best.modelをsmall_takaya.modelに書き換えてください
model_path = "/content/drive/MyDrive/segmentation/models/small_takaya.model"

model.load_state_dict(torch.load(model_path)) # 学習時に保存しておいたモデルのパラメータをコピー

model.eval() # 評価モードであることを明示

# 各画像のPrecision, Recall, Dice, IoUを保存しておくためのリスト
precision_list = [] 
recall_list = []
dice_list = []
iou_list = []

for i, data in enumerate(test_loader):
    
    x, t = data
    
    #GPU環境で動かす際は，下記2行のコメントを外す
    #x = x.to("cuda")
    #t = t.to("cuda")
        
    predict = model(x) # 学習済みU-Netによる推論
    predict_imgs = thresholding(predict, threshold=0.5) #出力された確率値（0~1）を8bitに(0~255)に変換
    
    for j in range(test_loader.batch_size):
        img = np.array(predict_imgs[j][0]) # 1チャンネル目だけ取り出して，(256, 256)の形にする
        xx = np.array(x[j][0].cpu()) # 入力画像を表示するための準備（numpy行列への変換）
        tt = np.array(t[j][0].cpu()) # ラベルを表示したり，指標を計算するための準備（numpy行列への変換）
        
        precision, recall, dice, iou = calc_all(img/255, tt) # 評価指標の計算
        
        #計算された指標を各リストへ格納
        precision_list.append(precision)
        recall_list.append(recall)
        dice_list.append(dice)
        iou_list.append(iou)
        
        # 質的評価のための画像出力（入力画像，ラベル，出力画像を並べて表示したい）
        plt.subplot(1, 3, 1)#1桁目 -- グラフの行数、2桁目 -- グラフの列数、3桁目 -- グラフの番号、subplot(2,3,1)の記載でも良い。
        plt.axis("off") # 軸のメモリを表示しない
        plt.title("input")
        plt.imshow(xx, cmap = "gray")
        
        plt.subplot(1, 3, 2)
        plt.axis("off") # 軸のメモリを表示しない
        plt.title("target")
        plt.imshow(tt, cmap = "gray")
        
        plt.subplot(1, 3, 3)
        plt.axis("off") # 軸のメモリを表示しない
        plt.title("output\n(iou=" + str(round(iou, 4)) + ")") #小数第4位までのIoUを図の上に表示
        plt.imshow(img, cmap = "gray")
        plt.savefig("/content/drive/MyDrive/segmentation/results/small_results/" + str(i) + "_" + str(j) + ".png")

precision_list = np.array(precision_list)
recall_list = np.array(recall_list)
dice_list = np.array(dice_list)
iou_list = np.array(iou_list)

precision_mean = np.array(precision_list).mean()
recall_mean = np.array(recall_list).mean()
dice_mean = np.array(dice_list).mean()
iou_mean = np.array(iou_list).mean()

# 各種指標を表示
print("Precision: " + str(precision_mean))
print("Recall: " + str(recall_mean))
print("DSC: " + str(dice_mean))
print("IoU: " + str(iou_mean))