### ライブラリのインポート

In [1]:
#使わないものも含む
import os, glob
from google.colab import drive
import random
from typing import Tuple, List, Dict
import cv2
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
from torch.nn import Linear,ReLU,CrossEntropyLoss,Sequential,Conv2d,MaxPool2d,Module,Softmax,BatchNorm2d,Dropout
from torch.optim import Adam,SGD
from torchvision import transforms,utils
from torch.utils.data import Dataset, DataLoader,random_split

import pandas as pd
import time
import copy
from collections import defaultdict
import shutil
from skimage import io,transform
from PIL  import Image
import matplotlib.pyplot as plt
import albumentations as A
from tqdm import tqdm as tqdm
from albumentations import ShiftScaleRotate,Normalize,Resize,Compose,GaussNoise
from albumentations.pytorch import ToTensorV2 as ToTensor

import random

### DatasetとDataloaderの作成

In [2]:
#GPUを使う
cuda = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

#Google ドライブのファイルに連携
drive.mount("/content/drive")

Mounted at /content/drive


In [3]:
#それぞれのパス
DATA_TRAIN_img_ROOT = "/content/drive/MyDrive/Ntask/task/dataset/Train/Images"
DATA_TRAIN_seg_ROOT = "/content/drive/MyDrive/Ntask/task//dataset/Train/Segmentations"

DATA_VALID_img_ROOT = "/content/drive/MyDrive/Ntask/task//dataset/Valid/Images"
DATA_VALID_seg_ROOT = "/content/drive/MyDrive/Ntask/task//dataset/Valid/Segmentations"

DATA_TEST_img_ROOT = "/content/drive/MyDrive/Ntask/task//dataset/Test/Images"
DATA_TEST_seg_ROOT = "/content/drive/MyDrive/Ntask/task//dataset/Test/Segmentations"

In [4]:
def path_append(path1,path2,list1):
  a=os.listdir(path1)
  a.sort()
  for x in a:
    for y in os.listdir(os.path.join(path1,x)):
      if len(os.listdir(os.path.join(path1,x))) == len(os.listdir(os.path.join(path2,x))):
         list1.append(os.path.join(path1,x,y))
         list1.sort()

In [5]:
train_img_path = []
train_seg_path = []

valid_img_path=[]
valid_seg_path=[]

test_img_path=[]
test_seg_path=[]

In [6]:
#訓練、バリデーション、テスト、それぞれのパスを取得
path_append(DATA_TRAIN_img_ROOT,DATA_TRAIN_seg_ROOT,train_img_path)
path_append(DATA_TRAIN_seg_ROOT,DATA_TRAIN_img_ROOT,train_seg_path)

path_append(DATA_VALID_img_ROOT,DATA_VALID_seg_ROOT,valid_img_path)
path_append(DATA_VALID_seg_ROOT,DATA_VALID_img_ROOT,valid_seg_path)

path_append(DATA_TEST_img_ROOT,DATA_TEST_seg_ROOT,test_img_path)
path_append(DATA_TEST_seg_ROOT,DATA_TEST_img_ROOT,test_seg_path)

In [7]:
#画像データ拡張の関数 *改善の余地あり
def get_train_transform():
   return A.Compose(
       [
        #正規化(albumentations.augmentations.transforms.Normalizeのデフォルトの値を適用)
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensor()
        ])

In [8]:
class MyDataset(Dataset):

    def __init__(self,im_root,mask_root,transform=None):
      self.image_path = im_root
      self.mask_path = mask_root
      self.transforms = get_train_transform()

    def __len__(self):
        return len(self.image_path)

    def __getitem__(self, idx):
        #パスを取得
        image_path = self.image_path
        mask_path = self.mask_path

        # 画像の読込
        img = io.imread(image_path[idx])[:,:,:3].astype("float32")
        mask = io.imread(mask_path[idx]).astype("float32")
        #縮小
        img = transform.resize(img,(256,256))
        mask = transform.resize(mask,(256,256))

        #次元を合わせる
        mask = np.expand_dims(mask,axis=-1)
        mask = np.maximum(mask, np.zeros((256, 256, 1), dtype=np.bool))
        mask=mask.transpose(2,0,1)

        augmented = self.transforms(image=img, mask=mask)
        img = augmented['image']
        mask = augmented['mask']
        return (img,mask)

train_dataset = MyDataset(train_img_path,train_seg_path,transform=get_train_transform())
valid_dataset=MyDataset(valid_img_path,valid_seg_path,transform=get_train_transform())
test_dataset=MyDataset(test_img_path,test_seg_path,transform=get_train_transform())


In [9]:
train_dataloader = DataLoader(dataset=train_dataset, batch_size=30, shuffle=True)
valid_dataloader = DataLoader(dataset=valid_dataset, batch_size=30, shuffle=True)
test_dataloader = DataLoader(dataset=test_dataset, batch_size=1, shuffle=True)

### セグメンテーションモデルの実装

In [10]:

class UNet(nn.Module):
    def __init__(self, input_channels, output_channels):
        super().__init__()

        self.conv1 = conv_bn_relu(input_channels,64)
        self.conv2 = conv_bn_relu(64, 128)
        self.conv3 = conv_bn_relu(128, 256)
        self.conv4 = conv_bn_relu(256, 512)
        self.conv5 = conv_bn_relu(512, 1024)
        self.down_pooling = nn.MaxPool2d(2)

        self.up_pool6 = up_pooling(1024, 512)
        self.conv6 = conv_bn_relu(1024, 512)
        self.up_pool7 = up_pooling(512, 256)
        self.conv7 = conv_bn_relu(512, 256)
        self.up_pool8 = up_pooling(256, 128)
        self.conv8 = conv_bn_relu(256, 128)
        self.up_pool9 = up_pooling(128, 64)
        self.conv9 = conv_bn_relu(128, 64)
        self.conv10 = nn.Conv2d(64, output_channels, 1)


        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight.data, a=0, mode='fan_out')
                if m.bias is not None:
                    m.bias.data.zero_()

    def forward(self, x):
        # 正規化
        x = x/255.


        x1 = self.conv1(x)
        p1 = self.down_pooling(x1)
        x2 = self.conv2(p1)
        p2 = self.down_pooling(x2)
        x3 = self.conv3(p2)
        p3 = self.down_pooling(x3)
        x4 = self.conv4(p3)
        p4 = self.down_pooling(x4)
        x5 = self.conv5(p4)


        p6 = self.up_pool6(x5)
        x6 = torch.cat([p6, x4], dim=1)
        x6 = self.conv6(x6)

        p7 = self.up_pool7(x6)
        x7 = torch.cat([p7, x3], dim=1)
        x7 = self.conv7(x7)

        p8 = self.up_pool8(x7)
        x8 = torch.cat([p8, x2], dim=1)
        x8 = self.conv8(x8)

        p9 = self.up_pool9(x8)
        x9 = torch.cat([p9, x1], dim=1)
        x9 = self.conv9(x9)


        output = self.conv10(x9)
        output = torch.sigmoid(output)

        return output

#畳み込みとバッチ正規化と活性化関数Reluをまとめている
def conv_bn_relu(in_channels, out_channels, kernel_size=3, stride=1, padding=1):
    return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            Dropout(0.02)
    )

def down_pooling():
    return nn.MaxPool2d(2)

def up_pooling(in_channels, out_channels, kernel_size=2, stride=2):
    return nn.Sequential(
        #転置畳み込み
        nn.ConvTranspose2d(in_channels, out_channels, kernel_size=kernel_size, stride=stride),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace=True)
    )


- モデルの引用

In [11]:

random_state = 42
random.seed(random_state)
np.random.RandomState(random_state)
torch.manual_seed(random_state)
torch.cuda.manual_seed(random_state)



###損失関数の定義

In [12]:
class DiceBCELoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(DiceBCELoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):

        #comment out if your model contains a sigmoid or equivalent activation layer
        inputs = F.sigmoid(inputs)

        #flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        intersection = (inputs * targets).sum()
        dice_loss = 1 - (2.*intersection + smooth)/(inputs.sum() + targets.sum() + smooth)
        BCE = F.binary_cross_entropy(inputs, targets, reduction='mean')
        Dice_BCE = BCE + dice_loss

        return Dice_BCE


class DiceLoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(DiceLoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):

        #comment out if your model contains a sigmoid or equivalent activation layer
        #inputs = F.sigmoid(inputs)

        #flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        intersection = (inputs * targets).sum()
        dice = (2.*intersection + smooth)/(inputs.sum() + targets.sum() + smooth)

        return 1 - dice

###IoUの定義

In [13]:
class IoU(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(IoU, self).__init__()

    def forward(self, inputs, targets, smooth=1):
        inputs = inputs.view(-1)
        targets = targets.view(-1)
        intersection = (inputs * targets).sum()
        total = (inputs + targets).sum()
        union = total - intersection

        IoU = (intersection + smooth)/(union + smooth)

        return IoU

### モデルトレーニング

In [14]:
#Google colaboratoryで実行
#ランタイム→ランタイムのタイプを変更→NoneからGPUに変更

model = UNet(3,1).cuda()
num_epochs=20

optimizer = torch.optim.Adam(model.parameters(),lr = 1e-3)
criterion = DiceLoss()
accuracy_metric = IoU()
valid_loss_min = np.Inf


total_train_loss = []
total_train_score = []
total_valid_loss = []
total_valid_score = []

losses_value = 0

for epoch in range(num_epochs):
  #<---------------トレーニング---------------------->
    train_loss = []
    train_score = []
    valid_loss = []
    valid_score = []
    pbar = tqdm(train_dataloader, desc = 'description')

    for x_train, y_train in pbar:
      x_train = torch.autograd.Variable(x_train).cuda()
      y_train = torch.autograd.Variable(y_train).cuda()
      optimizer.zero_grad()
      output = model(x_train)

      # 損失計算
      loss = criterion(output, y_train)
      losses_value = loss.item()

      # 精度評価
      score = accuracy_metric(output,y_train)
      loss.backward()
      optimizer.step()
      train_loss.append(losses_value)
      train_score.append(score.item())
      pbar.set_description(f"Epoch: {epoch+1}, loss: {losses_value}, IoU: {score}")

    #<---------------評価---------------------->
    with torch.no_grad():
      for image,mask in valid_dataloader:
        image = torch.autograd.Variable(image).cuda()
        mask = torch.autograd.Variable(mask).cuda()
        output = model(image)
        ## 損失計算
        loss = criterion(output, mask)
        losses_value = loss.item()
        ## 精度評価
        score = accuracy_metric(output,mask)
        valid_loss.append(losses_value)
        valid_score.append(score.item())

    total_train_loss.append(np.mean(train_loss))
    total_train_score.append(np.mean(train_score))
    total_valid_loss.append(np.mean(valid_loss))
    total_valid_score.append(np.mean(valid_score))

    print(f"Train Loss: {total_train_loss[-1]}, Train IoU: {total_train_score[-1]}")
    print(f"Valid Loss: {total_valid_loss[-1]}, Valid IoU: {total_valid_score[-1]}")


    if total_valid_loss[-1] <= valid_loss_min:
      best_model=model

    print("")


Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  mask = np.maximum(mask, np.zeros((256, 256, 1), dtype=np.bool))
description:   0%|          | 0/26 [01:03<?, ?it/s]


OutOfMemoryError: ignored

In [None]:
#学習率を下げて再度学習
optimizer = torch.optim.Adam(model.parameters(),lr = 1e-4)
for epoch in range(20,35):
  #<---------------トレーニング---------------------->
    train_loss = []
    train_score = []
    valid_loss = []
    valid_score = []
    pbar = tqdm(train_dataloader, desc = 'description')

    for x_train, y_train in pbar:
      x_train = torch.autograd.Variable(x_train).cuda()
      y_train = torch.autograd.Variable(y_train).cuda()
      optimizer.zero_grad()
      output = model(x_train)
      # 損失計算
      loss = criterion(output, y_train)
      losses_value = loss.item()
      # 精度評価
      score = accuracy_metric(output,y_train)
      loss.backward()
      optimizer.step()
      train_loss.append(losses_value)
      train_score.append(score.item())
      pbar.set_description(f"Epoch: {epoch+1}, loss: {losses_value}, IoU: {score}")

    #<---------------評価---------------------->
    with torch.no_grad():
      for image,mask in valid_dataloader:
        image = torch.autograd.Variable(image).cuda()
        mask = torch.autograd.Variable(mask).cuda()
        output = model(image)
        ## 損失計算
        loss = criterion(output, mask)
        losses_value = loss.item()
        ## 精度評価
        score = accuracy_metric(output,mask)
        valid_loss.append(losses_value)
        valid_score.append(score.item())

    total_train_loss.append(np.mean(train_loss))
    total_train_score.append(np.mean(train_score))
    total_valid_loss.append(np.mean(valid_loss))
    total_valid_score.append(np.mean(valid_score))

    print(f"Train Loss: {total_train_loss[-1]}, Train IoU: {total_train_score[-1]}")
    print(f"Valid Loss: {total_valid_loss[-1]}, Valid IoU: {total_valid_score[-1]}")



    print("")
