<a href="https://colab.research.google.com/github/EmilijaSvirsk/DeepLearning-ImageSegmentation/blob/main/3lab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from datetime import datetime
import math
import os
import random
import numpy as np
from PIL import Image
import torch
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
from albumentations.pytorch import ToTensorV2
import albumentations as A
from torch.optim import Adam
from tqdm import tqdm

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'Device: {device}')


Device: cpu


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Dataset

In [None]:
class PhotoDataset(torch.utils.data.Dataset):
    def __init__(self, root_dir, transforms):
        self.transforms = transforms
        self.img_dir = os.path.join(root_dir, 'CameraPng')
        self.mask_dir = os.path.join(root_dir, 'CameraMask')

        self.img_list = []
        for fn in os.listdir(self.img_dir):
            _, ext = os.path.splitext(fn)
            if ext == '.png':
                self.img_list.append(fn)

    def __len__(self):
        return len(self.img_list)

    def __getitem__(self, idx):
      img_name = os.path.join(self.img_dir, self.img_list[idx])
      mask_name = os.path.join(self.mask_dir, self.img_list[idx])

      img = np.array(Image.open(img_name))
      mask = np.array(Image.open(mask_name))
      if self.transforms is not None:
            aug = self.transforms(image=img,mask=mask)
            img = aug['image']
            mask = aug['mask']
            mask = torch.max(mask,dim=2)[0]
      return img,mask

## Mokymo ir testavimo funkcijos

In [None]:
def seconds_to_time(seconds):
    s = int(seconds) % 60
    m = int(seconds) // 60
    if m < 1:
        return f'{s}s'
    h = m // 60
    m = m % 60
    if h < 1:
        return f'{m}m{s}s'
    return f'{h}h{m}m{s}s'

In [None]:
def train_epoch(optimizer, loss_func, scaler, model, loader):
  model.train()
  loss_acum = np.array([], dtype = np.float32)

  for data in loader:
    images = data[0].to(device)
    mask = data[1].to(device)
    mask = mask.type(torch.long)

    with torch.cuda.amp.autocast():
      pred = model(images)
      loss = loss_func(pred, mask)

    # backward
    optimizer.zero_grad()
    #scaler op? why?
    scaler.scale(loss).backward()
    scaler.step(optimizer)
    scaler.update()

    loss_acum = np.append(loss_acum, loss.cpu().detach().numpy())

  return np.mean(loss_acum)

In [None]:
def evaluate(model, loader):
  num_correct = 0
  num_pixels = 0
  dice_score = 0
  model.eval()

  with torch.no_grad():
      for img, mask in loader:
          img = img.to(device)
          mask = mask.to(device)

          softmax = torch.nn.Softmax(dim=1)
          preds = torch.argmax(softmax(model(img)),axis=1)

          num_correct += (preds == mask).sum()
          num_pixels += torch.numel(preds)
          dice_score += (2 * (preds * mask).sum()) / ((preds + mask).sum() + 1e-8)

  print(f"Got {num_correct}/{num_pixels} with acc {num_correct/num_pixels*100:.2f}")
  print(f"Dice score: {dice_score/len(loader)}\n")

  mean = num_correct/num_pixels
  mean = mean.cpu().detach().numpy()
  return mean

In [None]:
def train_and_eval(model, loader_train, loader_valid, epoch_count = 10, lr = 1e-3):
  loss_func = torch.nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam(model.parameters(), lr = lr)
  scaler = torch.cuda.amp.GradScaler()

  start_time = datetime.now()

  train_iou_acum = []
  valid_iou_acum = []
  for epoch in range(epoch_count):
    loss = train_epoch(optimizer, loss_func, scaler, model, loader_train)

    train_iou = evaluate(model, loader_train)
    train_iou_acum.append(train_iou)
    valid_iou = evaluate(model, loader_valid)
    valid_iou_acum.append(valid_iou)

    current_time = datetime.now()
    elapsed = seconds_to_time((current_time - start_time).total_seconds())
    print(f'Epoch: {epoch}, Time: {elapsed}, Training loss: {loss}')
    print(f'Training IoU: {(train_iou * 100)}, Validation IoU: {(valid_iou * 100)}')

  return train_iou_acum, valid_iou_acum

In [None]:
def plot_iou(train_iou, valid_iou):
  plt.clf()
  plt.plot(train_iou, 'b', label = 'Training IoU')
  plt.plot(valid_iou, 'r', label = 'Validation IoU')
  plt.ylim(0.0, 1.0)
  plt.legend()
  plt.show()

## Modelis ir duomenys

In [None]:
class encoding_block(torch.nn.Module):
    def __init__(self,in_channels, out_channels):
        super(encoding_block,self).__init__()
        model = []
        model.append(torch.nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias=False))
        model.append(torch.nn.BatchNorm2d(out_channels))
        model.append(torch.nn.ReLU(inplace=True))
        model.append(torch.nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False))
        model.append(torch.nn.BatchNorm2d(out_channels))
        model.append(torch.nn.ReLU(inplace=True))
        self.conv = torch.nn.Sequential(*model)
    def forward(self, x):
        return self.conv(x)

In [None]:
class unet_model(torch.nn.Module):
    def __init__(self,out_channels=23,features=[64, 128, 256, 512]):
        super(unet_model,self).__init__()
        self.pool = torch.nn.MaxPool2d(kernel_size=(2,2),stride=(2,2))
        self.conv1 = encoding_block(3,features[0])
        self.conv2 = encoding_block(features[0],features[1])
        self.conv3 = encoding_block(features[1],features[2])
        self.conv4 = encoding_block(features[2],features[3])
        self.conv5 = encoding_block(features[3]*2,features[3])
        self.conv6 = encoding_block(features[3],features[2])
        self.conv7 = encoding_block(features[2],features[1])
        self.conv8 = encoding_block(features[1],features[0])
        self.tconv1 = torch.nn.ConvTranspose2d(features[-1]*2, features[-1], kernel_size=2, stride=2)
        self.tconv2 = torch.nn.ConvTranspose2d(features[-1], features[-2], kernel_size=2, stride=2)
        self.tconv3 = torch.nn.ConvTranspose2d(features[-2], features[-3], kernel_size=2, stride=2)
        self.tconv4 = torch.nn.ConvTranspose2d(features[-3], features[-4], kernel_size=2, stride=2)
        self.bottleneck = encoding_block(features[3],features[3]*2)
        self.final_layer = torch.nn.Conv2d(features[0],out_channels,kernel_size=1)
    def forward(self,x):
        skip_connections = []
        x = self.conv1(x)
        skip_connections.append(x)
        x = self.pool(x)
        x = self.conv2(x)
        skip_connections.append(x)
        x = self.pool(x)
        x = self.conv3(x)
        skip_connections.append(x)
        x = self.pool(x)
        x = self.conv4(x)
        skip_connections.append(x)
        x = self.pool(x)
        x = self.bottleneck(x)
        skip_connections = skip_connections[::-1]
        x = self.tconv1(x)
        x = torch.cat((skip_connections[0], x), dim=1)
        x = self.conv5(x)
        x = self.tconv2(x)
        x = torch.cat((skip_connections[1], x), dim=1)
        x = self.conv6(x)
        x = self.tconv3(x)
        x = torch.cat((skip_connections[2], x), dim=1)
        x = self.conv7(x)
        x = self.tconv4(x)
        x = torch.cat((skip_connections[3], x), dim=1)
        x = self.conv8(x)
        x = self.final_layer(x)
        return x

In [None]:
class FCNNet(torch.nn.Module):
  def __init__(self, in_channels):
    super().__init__()

    self.conv1_1 = torch.nn.Conv2d(in_channels, 16, (3, 3), padding = 'same')
    self.relu1_1 = torch.nn.ReLU()
    self.conv1_2 = torch.nn.Conv2d(16, 16, (3, 3), padding = 'same')
    self.relu1_2 = torch.nn.ReLU()

    self.pool2 = torch.nn.MaxPool2d((2, 2), (2, 2))
    self.conv2_1 = torch.nn.Conv2d(16, 32, (3, 3), padding = 'same')
    self.relu2_1 = torch.nn.ReLU()
    self.conv2_2 = torch.nn.Conv2d(32, 32, (3, 3), padding = 'same')
    self.relu2_2 = torch.nn.ReLU()

    self.pool3 = torch.nn.MaxPool2d((2, 2), (2, 2))
    self.conv3_1 = torch.nn.Conv2d(32, 64, (3, 3), padding = 'same')
    self.relu3_1 = torch.nn.ReLU()
    self.conv3_2 = torch.nn.Conv2d(64, 64, (3, 3), padding = 'same')
    self.relu3_2 = torch.nn.ReLU()

    self.pool4 = torch.nn.MaxPool2d((2, 2), (2, 2))
    self.conv4_1 = torch.nn.Conv2d(64, 128, (3, 3), padding = 'same')
    self.relu4_1 = torch.nn.ReLU()
    self.conv4_2 = torch.nn.Conv2d(128, 128, (3, 3), padding = 'same')
    self.relu4_2 = torch.nn.ReLU()
    self.conv4_3 = torch.nn.Conv2d(128, 64, (3, 3), padding = 'same')
    self.relu4_3 = torch.nn.ReLU()
    self.upscale4 = torch.nn.Upsample(scale_factor = 2)

    self.conv5_1 = torch.nn.Conv2d(64, 64, (3, 3), padding = 'same')
    self.relu5_1 = torch.nn.ReLU()
    self.conv5_2 = torch.nn.Conv2d(64, 32, (3, 3), padding = 'same')
    self.relu5_2 = torch.nn.ReLU()
    self.upscale5 = torch.nn.Upsample(scale_factor = 2)

    self.conv6_1 = torch.nn.Conv2d(32, 32, (3, 3), padding = 'same')
    self.relu6_1 = torch.nn.ReLU()
    self.conv6_2 = torch.nn.Conv2d(32, 16, (3, 3), padding = 'same')
    self.relu6_2 = torch.nn.ReLU()
    self.upscale6 = torch.nn.Upsample(scale_factor = 2)

    self.conv7_1 = torch.nn.Conv2d(16, 16, (3, 3), padding = 'same')
    self.relu7_1 = torch.nn.ReLU()
    self.conv7_2 = torch.nn.Conv2d(16, 16, (3, 3), padding = 'same')
    self.relu7_2 = torch.nn.ReLU()

    self.conv8 = torch.nn.Conv2d(16, 1, (1, 1))
    self.sigmoid8 = torch.nn.Sigmoid()

  def forward(self, x):
    block1 = torch.nn.Sequential(
        self.conv1_1,
        self.relu1_1,
        self.conv1_2,
        self.relu1_2
    )(x)
    block2 = torch.nn.Sequential(
        self.pool2,
        self.conv2_1,
        self.relu2_1,
        self.conv2_2,
        self.relu2_2,
    )(block1)
    block3 = torch.nn.Sequential(
        self.pool3,
        self.conv3_1,
        self.relu3_1,
        self.conv3_2,
        self.relu3_2,
    )(block2)
    block4 = torch.nn.Sequential(
        self.pool4,
        self.conv4_1,
        self.relu4_1,
        self.conv4_2,
        self.relu4_2,
        self.conv4_3,
        self.relu4_3,
        self.upscale4,
    )(block3) + block3
    block5 = torch.nn.Sequential(
        self.conv5_1,
        self.relu5_1,
        self.conv5_2,
        self.relu5_2,
        self.upscale5,
    )(block4) + block2
    block6 = torch.nn.Sequential(
        self.conv6_1,
        self.relu6_1,
        self.conv6_2,
        self.relu6_2,
        self.upscale6,
    )(block5) + block1
    block7 = torch.nn.Sequential(
        self.conv7_1,
        self.relu7_1,
        self.conv7_2,
        self.relu7_2,
    )(block6)
    block8 = torch.nn.Sequential(
        self.conv8,
        self.sigmoid8
    )(block7)
    return block8

## Mokymas ir testavimas

In [None]:
transf = A.Compose([
    A.Resize(160,240),
    A.augmentations.transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
    ToTensorV2()
])

In [None]:
train_dataset = PhotoDataset('drive/MyDrive/GMM_images/dataA',transf)
valid_dataset = PhotoDataset('drive/MyDrive/GMM_images/dataB',transf)

num_workers = 2
batch_size = 10

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size = batch_size, num_workers = num_workers, shuffle = True)
valid_loader = torch.utils.data.DataLoader(valid_dataset, batch_size = 1, num_workers = num_workers, shuffle = False)

print(f'Train: {len(train_dataset)}, Test: {len(valid_dataset)}')

Train: 100, Test: 100


In [None]:
model = unet_model().to(device)
print(f'Parameter count: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}')

train_iou, valid_iou = train_and_eval(model, train_loader, valid_loader, epoch_count = 10, lr = 1e-3)
plot_iou(train_iou, valid_iou)

Parameter count: 31,039,063




Got 1195634/3840000 with acc 31.14
Dice score: 5.988448619842529

Got 1192262/3840000 with acc 31.05
Dice score: 5.986215591430664

Epoch: 0, Time: 10m21s, Training loss: 2.2197508811950684
Training IoU: 31.136301159858704, Validation IoU: 31.04848861694336
Got 1627608/3840000 with acc 42.39
Dice score: 6.408050537109375

Got 1623931/3840000 with acc 42.29
Dice score: 6.409144401550293

Epoch: 1, Time: 20m13s, Training loss: 1.420250654220581
Training IoU: 42.3856258392334, Validation IoU: 42.28987097740173
Got 3023533/3840000 with acc 78.74
Dice score: 7.565220832824707

Got 3015306/3840000 with acc 78.52
Dice score: 7.558035850524902

Epoch: 2, Time: 29m56s, Training loss: 0.977212131023407
Training IoU: 78.73783707618713, Validation IoU: 78.52359414100647
Got 3344035/3840000 with acc 87.08
Dice score: 7.806251525878906

Got 3341814/3840000 with acc 87.03
Dice score: 7.797047138214111

Epoch: 3, Time: 39m46s, Training loss: 0.6995503902435303
Training IoU: 87.0842456817627, Validatio

#Statistics

In [None]:
def evaluate_with_stats(model, loader, stats):
  model.eval()

  with torch.no_grad():
      for img, mask in loader:
          img = img.to(device)
          mask = mask.to(device)

          softmax = torch.nn.Softmax(dim=1)
          preds = torch.argmax(softmax(model(img)),axis=1)

          #flatten
          mask = torch.flatten(mask)
          preds = torch.flatten(preds)

          #relocate to cpu
          mask_arr = mask.cpu().detach().numpy()
          pred_arr = preds.cpu().detach().numpy()
          total_mask = torch.bincount(mask).cpu().detach().numpy()
          total_pred = torch.bincount(preds).cpu().detach().numpy()

          total_num = torch.numel(preds)

          stats.appendConfusionMatrix(mask_arr, pred_arr, total_mask, total_pred, total_num)

In [None]:
class Statistics():
  def __init__(self, class_number=13):
    self.class_number = class_number
    self.TP = np.zeros(class_number)
    self.TN = np.zeros(class_number)
    self.FP = np.zeros(class_number)
    self.FN = np.zeros(class_number)

    self.f1_micro = np.zeros(class_number)
    self.f1_macro = 0

  def appendConfusionMatrix(self, mask, pred, total_mask_b, total_pred_b, total_num):
    TP = np.zeros(self.class_number)
    TN = np.zeros(self.class_number)
    FP = np.zeros(self.class_number)
    FN = np.zeros(self.class_number)

    mask_num = self.class_number-len(total_mask_b)
    total_mask = np.pad(total_mask_b, (0, mask_num), mode='constant')

    pred_num = self.class_number-len(total_pred_b)
    total_pred = np.pad(total_pred_b, (0, pred_num), mode='constant')

    for i in range(self.class_number):
      TP[i] += self.count_matches(mask, pred, i)

    #get FN
    FN += total_pred - TP

    #get FP
    FP += total_mask - TP

    #get TN
    TN += np.full(self.class_number, total_num) - TP - FN - FP

    self.TP += TP
    self.TN += TN
    self.FP += FP
    self.FN += FN

  def count_matches(self, list1, list2, num):
    arr1 = np.array(list1)
    arr2 = np.array(list2)
    count = np.sum((arr1 == num) & (arr2 == num))
    return count

  def getConfusionMatrixClass(self, class_name):
    return {"TP":self.TP[class_name],"TN":self.TN[class_name],"FP":self.FP[class_name],"FN":self.FN[class_name]}

  def calcF1Statistics(self):
    TP_arr = self.TP
    TN_arr = self.TN
    FP_arr = self.FP
    FN_arr = self.FN
    recall = 0
    precision = 0
    f1 = 0

    #calc micro
    for i in range(self.class_number):
      TP = TP_arr[i]
      TN = TN_arr[i]
      FP = FP_arr[i]
      FN = FN_arr[i]

      if((TP + FN)==0):
        recall = 0
      else:
        recall = TP / (TP + FN)

      if((TP + FP)==0):
        precision = 0
      else:
        precision = TP / (TP + FP)

      if((precision + recall)==0):
        f1 = 0
      else:
        f1 = 2 * (precision * recall) / (precision + recall)

      self.f1_micro[i] = f1

    #calc macro
    self.f1_macro = np.sum(self.f1_micro)/self.class_number

  def getF1Micro(self):
    return self.f1_micro

  def getF1Macro(self):
    return self.f1_macro

  def printF1Micro(self):
    for i in range(self.class_number):
      print(f'Class:{i} f1={self.f1_micro[i]}')



In [None]:
def testing(model, loader, stats):
  evaluate_with_stats(model, loader, stats)

In [None]:
stats = Statistics()
testing(model, valid_loader, stats)
stats.calcF1Statistics()

In [None]:
print(stats.getConfusionMatrixClass(0))

In [None]:
print("F1-Micro")
stats.printF1Micro()
print("\nF1-Macro")
print(f'f1={stats.getF1Macro()}')

In [None]:
num_of_img = 3
ind = 0
for x,y in valid_loader:
    x = x.to(device)
    fig , ax =  plt.subplots(3, 3, figsize=(18, 18))
    softmax = torch.nn.Softmax(dim=1)
    preds = torch.argmax(softmax(model(x)),axis=1).to('cpu')
    img1 = np.transpose(np.array(x[0,:,:,:].to('cpu')),(1,2,0))
    preds1 = np.array(preds[0,:,:])
    mask1 = np.array(y[0,:,:])

    ax[0,0].set_title('Image')
    ax[0,1].set_title('Prediction')
    ax[0,2].set_title('Mask')

    ax[0][0].axis("off")
    ax[1][0].axis("off")
    ax[2][0].axis("off")

    ax[0][0].imshow(img1)
    ax[0][1].imshow(preds1)
    ax[0][2].imshow(mask1)

    ind+=1
    if(ind == num_of_img):
      break