#importとseedに固定

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torchvision import models, transforms
import torchmetrics
from torchmetrics.functional import accuracy
from PIL import Image
import numpy as np
from glob import glob
import glob
import re
import cv2
from tqdm import tqdm as tqdm
import statistics
import os
import json
import datetime
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
from torch.optim.lr_scheduler import StepLR

In [None]:
#seedの固定
def fix_seed(seed):
    # Numpy
    np.random.seed(seed)
    # Pytorch
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True

SEED = 1
fix_seed(SEED)

#データセットの定義

In [1]:
#512to256の画像をリストに取得
Tr_image = [p for p in glob.glob('../../data/MLO_dataset/512to256DS/Tr_image/*/*.png', recursive=True)
       if re.findall('png', p)]
Tr_label = [p for p in glob.glob('../../data/MLO_dataset/512to256DS/Tr_label/*/*.png', recursive=True)
       if re.findall('png', p)]
Va_image = [p for p in glob.glob('../../data/MLO_dataset/512to256DS/Va_image/*/*.png', recursive=True)
       if re.findall('png', p)]
Va_label = [p for p in glob.glob('../../data/MLO_dataset/512to256DS/Va_label/*/*.png', recursive=True)
       if re.findall('png', p)]
Te_image = [p for p in glob.glob('../../data/MLO_dataset/512to256DS/Te_image/*/*.png', recursive=True)
       if re.findall('png', p)]
Te_label = [p for p in glob.glob('../../data/MLO_dataset/512to256DS/Te_label/*/*.png', recursive=True)
       if re.findall('png', p)]

#取得した画像のリストを辞書に格納
tvt_image = {'Tr':Tr_image,'Va':Va_image,'Te':Te_image}
tvt_label = {'Tr':Tr_label,'Va':Va_label,'Te':Te_label}

In [None]:
#前処理

# 256×256,16bit,png形式の原画像をtorch型,3チャンネル形式に変換
class Dataset_16bit(torch.utils.data.Dataset):

    def __init__(self, mode):
        self.images = sorted(tvt_image[mode])        
        self.labels = sorted(tvt_label[mode])
        
    def __getitem__(self, idx):
        image_path = self.images[idx]
        image = cv2.imread(image_path, cv2.IMREAD_ANYDEPTH | cv2.IMREAD_ANYCOLOR) #16bitで読み込む
        image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB) #3チャンネルに変換
        image = image[16:240, 16:240] #中心の224×２２４に切り取る
        image = image/65535 #0~65535を０〜１に変換
        image = image.astype(np.float32) #float型に変換
        image = torch.from_numpy(image).clone() #torchに変換
        image = image.permute((2, 0, 1)) #順番を修正　(H,W,C) → (C,H,W)

        label_path = self.labels[idx]
        label = Image.open(label_path)
        label = np.array(label) #numpy型に変換
        label = label[16:240, 16:240] #中心の224×２２４に切り取る
        label = label/255
        label = torch.tensor(label, dtype=torch.float32) #float型に変換
        return image, label

    def __len__(self):
        return len(self.images)

In [None]:
# データセットの取得
train = Dataset_16bit('Tr')
val = Dataset_16bit('Va')
test = Dataset_16bit('Te')

# バッチサイズの定義
batch_size = 16

# Data Loader を定義
train_loader = torch.utils.data.DataLoader(train, batch_size, shuffle=True, drop_last=True)
val_loader = torch.utils.data.DataLoader(val, batch_size, shuffle=True, drop_last=True)
test_loader = torch.utils.data.DataLoader(test, batch_size)

#損失関数とIoUの定義

In [None]:
#PyTorch DiceLossを定義
class DiceLoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(DiceLoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):
        
        #comment out if your model contains a sigmoid or equivalent activation layer
        inputs = F.sigmoid(inputs)       
        
        #flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)
        
        intersection = (inputs * targets).sum() #「正解」かつ「出力結果」のピクセル数を算出
        dice = (2.*intersection + smooth)/(inputs.sum() + targets.sum() + smooth) #(「正解」かつ「出力結果」)/(「正解」と「出力結果」の平均の大きさ)   
        
        return 1 - dice

In [None]:
#IoUを定義
def iou_score(output, target):
    smooth = 1e-5

    if torch.is_tensor(output):
        output = torch.sigmoid(output).data.cpu().numpy()
    if torch.is_tensor(target):
        target = target.data.cpu().numpy()
        
    output = output > 0.5 #閾値の設定
    target = target > 0.5
    intersection = (output & target).sum()
    union = (output | target).sum()

    return (intersection + smooth) / (union + smooth)

#ResNet50をbackbornとしたUNetの定義

In [1]:
class ConvBlock(nn.Module):
    """
    入出力チャンネル数を引数に取り、Conv -> BN -> ReLU　を実行する。
    """

    def __init__(self, in_channels, out_channels, padding=1, kernel_size=3, stride=1, with_nonlinearity=True):
        super().__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, padding=padding, kernel_size=kernel_size, stride=stride)
        self.bn = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()
        self.with_nonlinearity = with_nonlinearity

    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        if self.with_nonlinearity:
            x = self.relu(x)
        return x


class Bridge(nn.Module):
    """
    UNetのエンコーダーとデコーダーを繋ぐ部分
    """

    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.bridge = nn.Sequential(
            ConvBlock(in_channels, out_channels),
            ConvBlock(out_channels, out_channels)
        )

    def forward(self, x):
        return self.bridge(x)


class UpBlockForUNetWithResNet50(nn.Module):
    """
    Up block that encapsulates one up-sampling step which consists of Upsample -> ConvBlock -> ConvBlock
    """

    def __init__(self, in_channels, out_channels, up_conv_in_channels=None, up_conv_out_channels=None,
                 upsampling_method="conv_transpose"):
        super().__init__()

        if up_conv_in_channels == None:
            up_conv_in_channels = in_channels
        if up_conv_out_channels == None:
            up_conv_out_channels = out_channels

        if upsampling_method == "conv_transpose":
            self.upsample = nn.ConvTranspose2d(up_conv_in_channels, up_conv_out_channels, kernel_size=2, stride=2)
        elif upsampling_method == "bilinear":
            self.upsample = nn.Sequential(
                nn.Upsample(mode='bilinear', scale_factor=2),
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1)
            )
        self.conv_block_1 = ConvBlock(in_channels, out_channels)
        self.conv_block_2 = ConvBlock(out_channels, out_channels)

    def forward(self, up_x, down_x):
        """
        :param up_x: this is the output from the previous up block
        :param down_x: this is the output from the down block
        :return: upsampled feature map
        """
        x = self.upsample(up_x)
        x = torch.cat([x, down_x], 1)
        x = self.conv_block_1(x)
        x = self.conv_block_2(x)
        return x


class UNetWithResnet50Encoder(nn.Module):
    DEPTH = 6

    def __init__(self, n_classes=1):
        super().__init__()
        resnet = torchvision.models.resnet.resnet50(pretrained=False) #エンコーダー部分は既存のResNet50を読み込む
        down_blocks = []
        up_blocks = []
        self.input_block = nn.Sequential(*list(resnet.children()))[:3]
        self.input_pool = list(resnet.children())[3]
        for bottleneck in list(resnet.children()):
            if isinstance(bottleneck, nn.Sequential):
                down_blocks.append(bottleneck)
        self.down_blocks = nn.ModuleList(down_blocks)
        self.bridge = Bridge(2048, 2048)
        up_blocks.append(UpBlockForUNetWithResNet50(2048, 1024))
        up_blocks.append(UpBlockForUNetWithResNet50(1024, 512))
        up_blocks.append(UpBlockForUNetWithResNet50(512, 256))
        up_blocks.append(UpBlockForUNetWithResNet50(in_channels=128 + 64, out_channels=128,
                                                    up_conv_in_channels=256, up_conv_out_channels=128))
        up_blocks.append(UpBlockForUNetWithResNet50(in_channels=64 + 3, out_channels=64,
                                                    up_conv_in_channels=128, up_conv_out_channels=64))

        self.up_blocks = nn.ModuleList(up_blocks)

        self.out = nn.Conv2d(64, n_classes, kernel_size=1, stride=1)

    def forward(self, x, with_output_feature_map=False):
        pre_pools = dict()
        pre_pools[f"layer_0"] = x
        x = self.input_block(x)
        pre_pools[f"layer_1"] = x
        x = self.input_pool(x)

        for i, block in enumerate(self.down_blocks, 2):
            x = block(x)
            if i == (UNetWithResnet50Encoder.DEPTH - 1):
                continue
            pre_pools[f"layer_{i}"] = x

        x = self.bridge(x)

        for i, block in enumerate(self.up_blocks, 1):
            key = f"layer_{UNetWithResnet50Encoder.DEPTH - 1 - i}"
            x = block(x, pre_pools[key])
        output_feature_map = x
        x = self.out(x)
        del pre_pools
        if with_output_feature_map:
            return x, output_feature_map
        else:
            return x
        
if __name__ == '__main__':
    image = torch.rand((1, 3, 224, 224))
    model = UNetWithResnet50Encoder()
    model(image)

#https://github.com/kevinlu1211/pytorch-unet-resnet-50-encoder/blob/master/u_net_resnet_50_encoder.py
#↑参考にしたサイト

#学習済みパラメータを読み込み、置き換える。

In [2]:
#deviceの確認
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
device

NameError: ignored

In [None]:
#　置き換える関数を定義
def Convert(model, pre_param, seg_param):
    pre_key = list(pre_param.keys())
    seg_key = list(seg_param.keys())
    for n in range(318): #ResNet50の入力から318個目までのパラメータを使う。
        seg_param[seg_key[n]] = pre_param[pre_key[n]]
        model.load_state_dict(seg_param)
        #print(seg_key[n] + 'を置き換えました。')

In [None]:
#　パラメータの置き換え
SEED = 1
fix_seed(SEED)

net = UNetWithResnet50Encoder().to(device) #モデルの読み込み(初期値)
Seg_param = net.state_dict() #今から置き換えられる重み
pre_param = torch.load('../../parameter/Fractal/RadImageNet_ResNet50_torch_2.pth') #新しく置き換える重み
Convert(net, pre_param, Seg_param) #「net」の重みが置き換えられる

In [None]:
#　置き換わっているかの確認 (一緒だったら置き換え成功)
new_param = net.state_dict()
print(new_param['input_block.0.weight'][0])
print(pre_param['conv1.weight'][0])

#学習

In [None]:
SEED = 1
fix_seed(SEED)

#パラメータを置き換えたモデルを読み込む
model = net

#ハイパーパラメータの設定
max_epochs = 30 # 学習繰り返し回数
lr = 0.001 #learning_rate

param_savedir = ""
param_savedir_last = ""
figure_savedir = ""
LossAcc_savedir = ""

diceloss = DiceLoss() #損失関数はDiceloss
iou = iou_score #評価指標はIoU
optimizer = torch.optim.Adam(model.parameters(), lr=lr) #optimizerはAdam
scheduler = StepLR(optimizer, step_size=10, gamma=0.1) #10エポックごとに学習率を1/10にさせる

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
time_start = datetime.datetime.now()
print('train', time_start)

#確認用のtrainとvalのloss,IoUを入れるリスト
total_train_loss = []
total_train_iou = []
total_valid_loss = []
total_valid_iou = []
epoch = []

#初期値。以降エポックごとにValのiouを算出し、前のエポックと比較してiourateが大きくなればモデルを保存する。
iourate = 0

for iepoch in range(max_epochs):
    #学習率変更の設定
    #scheduler.step() #学習率を変更する時に使う
 
   #訓練の定義
    train_loss_list = [] #繰り返しごとのtrain_lossを入れるリストを作成 
    train_IoU_list = [] #繰り返しごとのtrain_IoUを入れるリストを作成
    model.train() #学習モード
    pbar = tqdm(train_loader, desc = 'description')
    for x_train, t_train in pbar:
        # toGPU (CPUの場合はtoCPU)
        x = x_train.to(device)
        t = t_train.to(device)
        optimizer.zero_grad()
        #　推定
        y = model.forward(x) # y.shape = (batch_size, 1, 224, 224)
        y = y.view(-1, 224, 224) #torch.Size([8, 224, 224]) torch.float32
        #　損失計算
        train_loss = diceloss(y,t)
        train_loss.backward() # backward (勾配計算)
        #精度評価
        train_iou = iou(y,t)
        train_loss_list.append(train_loss.item()) #lossを取得したリストに追加する。
        train_IoU_list.append(train_iou.item()) #iouを取得したリストに追加する。
        optimizer.step() # パラメータの微小移動
        pbar.set_description(f"Epoch: {iepoch+1}")

    #検証の定義
    val_loss_list = [] #繰り返しごとのval_lossを入れるリストを作成
    val_IoU_list = [] #繰り返しごとのval_IoUを入れるリストを作成
    with torch.no_grad():
        model.eval() #検証モード
        for val_x,val_t in val_loader:
            val_x,val_t = val_x.to(device),val_t.to(device)

            #順伝搬の計算
            val_y = model(val_x)
            val_y = val_y.view(-1, 224, 224) #torch.Size([8, 224, 224]) torch.float32
            #損失計算
            val_loss = diceloss(val_y,val_t).item()
            #精度評価
            val_iou = iou(val_y,val_t).item()
            val_loss_list.append(val_loss) #lossを取得したリストに追加する。
            val_IoU_list.append(val_iou) #iouを取得したリストに追加する。   
    
    #後でグラフにするためにlossとepoch数をリストに保存する。
    train_loss_mean = statistics.mean(train_loss_list) #train_lossのepochごとの平均を求める。
    total_train_loss.append(train_loss_mean)
    
    train_iou_mean = statistics.mean(train_IoU_list) #train_iouのepochごとの平均を求める。
    total_train_iou.append(train_iou_mean) 
    
    val_loss_mean = statistics.mean(val_loss_list) #val_lossのepochごとの平均を求める。
    total_valid_loss.append(val_loss_mean)
    
    val_iou_mean = statistics.mean(val_IoU_list) #val_iouのepochごとの平均を求める。
    total_valid_iou.append(val_iou_mean)
    
    epoch.append(iepoch+1)
    
    #途中結果の表示とモデルの保存
    if iourate <= val_iou_mean: #このエポックのval_iouが、これまでのval_iouの最大値より小さくなれば「IoU向上」とプリントし、そのモデルを保存
            iourate = val_iou_mean #val_lossの最小値を更新しておく。
            coment = 'IoU向上!'
            torch.save(model.state_dict(), param_savedir) #同じ名前で保存し、更新のたびに上書きするようにする。 
            
    else: #val_iouが小さくなれば、更新せずに次のエポックへ
            coment = 'IoU低下,,,' #「IoU低下」とプリント
    
    print(f"Train Loss: {total_train_loss[-1]}, Train IOU: {total_train_iou[-1]}")
    print(f"Valid Loss: {total_valid_loss[-1]}, Valid IOU: {total_valid_iou[-1]}, Validの{coment}")
    
    #エポックごとに~_listを空にする。
    train_loss_list.clear() 
    train_IoU_list.clear()
    val_loss_list.clear()
    val_IoU_list.clear()

torch.save(model.state_dict(), param_savedir_last)
time_fin = datetime.datetime.now() 
print(time_fin)
print('---------------------------------------------')

#Loss,IoUの描画

In [None]:
# グラフの表示(trainとval)
def writing_plot(history_tr, history_va, epoch):
    # プロット領域(Figure, Axes)の初期化
    fig = plt.figure(figsize=(20, 8))
    ax1 = fig.add_subplot(121)
    ax2 = fig.add_subplot(122)
    
    ax1.plot(range(1,epoch+1), history_tr['loss'], label='loss(train)')
    ax1.plot(range(1,epoch+1), history_va['loss'], label='loss(val)')
    ax1.set_xlim(1,epoch) #ax1.set_ylim(0,100)
    ax1.xaxis.set_major_locator(ticker.MultipleLocator(1))
    ax1.set_xlabel("epoch")
    ax1.set_ylabel("loss")
    ax1.set_title("model loss")
    
    ax2.plot(range(1,epoch+1), history_tr['IoU'], label='IoU(train)')
    ax2.plot(range(1,epoch+1), history_va['IoU'], label='IoU(val)')
    ax2.set_xlim(1,epoch)
    ax2.set_ylim(0,1)
    ax2.xaxis.set_major_locator(ticker.MultipleLocator(1))
    ax2.set_xlabel("epoch")
    ax2.set_ylabel("IoU")
    ax2.set_title("model IoU")
    
    ax1.legend()
    ax2.legend()
    plt.show()
    
    fig.savefig(figure_savedir, dpi=300)

In [None]:
#epochごとのlossとiouを描画
history_tr = {'loss':total_train_loss, 'IoU':total_train_iou}
history_va = {'loss':total_valid_loss, 'IoU':total_valid_iou}
writing_plot(history_tr, history_va, max_epochs)

In [None]:
#epochごとのlossとiouの値の保存
history = {'Tr':history_tr, 'Va':history_va}
with open(LossAcc_savedir, 'w') as outfile:
        json.dump(history, outfile, indent=4)