# Practice case 2 :

In [1]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
import numpy as np
import cv2
import matplotlib.pyplot as plt
from os import listdir
from os.path import isfile, join
from torch.utils.data import DataLoader
from torch.utils.data import Dataset as BaseDataset
import math
import imutils
import torch
import segmentation_models_pytorch as smp

In [2]:
!pip install -U git+https://github.com/albumentations-team/albumentations
!pip install imagecorruptions

Collecting git+https://github.com/albumentations-team/albumentations
  Cloning https://github.com/albumentations-team/albumentations to c:\users\34619\appdata\local\temp\pip-req-build-zw9mgzay


  Running command git clone -q https://github.com/albumentations-team/albumentations 'C:\Users\34619\AppData\Local\Temp\pip-req-build-zw9mgzay'




Preparing data...

In [3]:
DATA_DIR = 'output/'
# load repo with data if it is not exists
if not os.path.exists(DATA_DIR):
    print('Loading data...')
x_train_dir = os.path.join(DATA_DIR, 'train/Images')
y_train_dir = os.path.join(DATA_DIR, 'train/GT')

x_valid_dir = os.path.join(DATA_DIR, 'val/Images')
y_valid_dir = os.path.join(DATA_DIR, 'val/GT')

x_test_dir = os.path.join(DATA_DIR, 'test/Images')
y_test_dir = os.path.join(DATA_DIR, 'test/GT')

mypath='output/train/Images'
onlyfiles = [ f for f in listdir(mypath) if isfile(join(mypath,f)) ]
images = np.empty(len(onlyfiles), dtype=object)
for n in range(0, len(onlyfiles)):
    images[n] = cv2.imread( join(mypath,onlyfiles[n]) )    
    original=images[n]

Dataset:

In [4]:
class Dataset(BaseDataset):

    CLASSES = ['0']
    def __init__(
            self, 
            images_dir, 
            masks_dir, 
            classes=None, 
            augmentation=None, 
            preprocessing=None,
    ):
        self.ids =os.listdir(images_dir)
        
        
        self.images_fps = [os.path.join(images_dir, image_id) for image_id in self.ids]
        self.masks_fps =[os.path.join(masks_dir, image_id) for image_id in self.ids]

        # convert str names to class values on masks
        self.class_values = [self.CLASSES.index(cls.lower()) for cls in classes]
        
        self.augmentation = augmentation
        self.preprocessing = preprocessing
    
    def __getitem__(self, i):
        
        #IMAGES
        image = cv2.imread(self.images_fps[i])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image=cv2.resize(image, dsize=(1280,384), interpolation=cv2.INTER_CUBIC)
        image=torch.from_numpy(np.array(image.transpose(2,1,0))).float()
        
        
        #GT
        mask = cv2.imread(self.masks_fps[i])
        mask=cv2.resize(mask, dsize=(1280,384), interpolation=cv2.INTER_CUBIC)
        mask=torch.from_numpy(np.array(mask.transpose(2,1,0))).float()
        

        # If you want to use augmentation...
        if self.augmentation:
            sample = self.augmentation(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']
        
        # If you apply preprocessing...
        if self.preprocessing:
            sample = self.preprocessing(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']
            
        return image, mask
        
    def __len__(self):
        return len(self.ids)
dataset = Dataset(x_train_dir, y_train_dir, classes=['0'])

Model:

In [5]:
ENCODER = 'se_resnext50_32x4d'
ENCODER_WEIGHTS = 'imagenet'
CLASSES = ['0']
ACTIVATION = 'sigmoid' # could be None for logits or 'softmax2d' for multiclass segmentation
DEVICE = 'cpu' 

# create segmentation model with pretrained encoder
model = smp.Unet(encoder_name='resnet50', encoder_weights='imagenet', in_channels=3, classes=1)
#model= smp.FPN('resnet34', in_channels=3)

preprocessing_fn = smp.encoders.get_preprocessing_fn(ENCODER, ENCODER_WEIGHTS)

Preparing DataLoader

In [6]:
train_dataset = Dataset(
    x_train_dir, 
    y_train_dir, 
    augmentation=None, 
    preprocessing=None,
    classes=CLASSES,
)

valid_dataset = Dataset(
    x_valid_dir, 
    y_valid_dir, 
    augmentation=None, 
    preprocessing=None,
    classes=CLASSES,
)

train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True, num_workers=0)
valid_loader = DataLoader(valid_dataset, batch_size=1, shuffle=False, num_workers=0)

In [7]:
# Dice/F1 score - https://en.wikipedia.org/wiki/S%C3%B8rensen%E2%80%93Dice_coefficient
# IoU/Jaccard score - https://en.wikipedia.org/wiki/Jaccard_index

loss = smp.utils.losses.DiceLoss()
metrics = [
    smp.utils.metrics.IoU(threshold=0.5),
]

optimizer = torch.optim.Adam([ 
    dict(params=model.parameters(), lr=0.0001),
])


Create epoch runners 

In [8]:
# it is a simple loop of iterating over dataloader`s samples
train_epoch = smp.utils.train.TrainEpoch(
    model, 
    loss=loss, 
    metrics=metrics, 
    optimizer=optimizer,
    device=DEVICE,
    verbose=True,
)

valid_epoch = smp.utils.train.ValidEpoch(
    model, 
    loss=loss, 
    metrics=metrics,
    device=DEVICE,
    verbose=True,
)

We go through epoches

In [None]:
max_score = 0
#epoches=10
for i in range(0, 10):
    
    print('\nEpoch: {}'.format(i))
    train_logs = train_epoch.run(train_loader)
    valid_logs = valid_epoch.run(valid_loader)
    
    print(valid_logs['iou_score'])
    # do something (save model, change lr, etc.)
    if max_score < valid_logs['iou_score']:
        max_score = valid_logs['iou_score']
        torch.save(model, './best_model.pth')
        print('Model saved!')
        
    if i == 25:
        optimizer.param_groups[0]['lr'] = 1e-5
        print('Decrease decoder learning rate to 1e-5!')


Epoch: 0
train: 100%|██████████| 42/42 [03:45<00:00,  5.37s/it, dice_loss - -10.58, iou_score - 22.32]  
valid: 100%|██████████| 19/19 [00:38<00:00,  2.02s/it, dice_loss - -14.92, iou_score - 30.12]
30.115647366172393
Model saved!

Epoch: 1
train: 100%|██████████| 42/42 [04:13<00:00,  6.04s/it, dice_loss - -23.04, iou_score - 54.2] 
valid: 100%|██████████| 19/19 [00:37<00:00,  1.98s/it, dice_loss - -43.82, iou_score - 69.16]
69.15803450032284
Model saved!

Epoch: 2
train: 100%|██████████| 42/42 [04:13<00:00,  6.05s/it, dice_loss - -36.83, iou_score - 78.87]
valid: 100%|██████████| 19/19 [00:37<00:00,  1.97s/it, dice_loss - -16.68, iou_score - 8.176]
8.175912455508582

Epoch: 3
train: 100%|██████████| 42/42 [04:12<00:00,  6.01s/it, dice_loss - -49.01, iou_score - 85.35]
valid: 100%|██████████| 19/19 [00:37<00:00,  1.96s/it, dice_loss - 20.75, iou_score - 39.14] 
39.14050471331728

Epoch: 4
train: 100%|██████████| 42/42 [04:14<00:00,  6.07s/it, dice_loss - 58.04, iou_score - 72.68] 
val

In [None]:
# load best saved checkpoint
best_model = torch.load('./best_model.pth')
# create test dataset
test_dataset = Dataset(
    x_test_dir, 
    y_test_dir, 
    augmentation=None, 
    preprocessing=None,
    classes=CLASSES,
)

test_dataloader = DataLoader(test_dataset)
# evaluate model on test set
test_epoch = smp.utils.train.ValidEpoch(
    model=best_model,
    loss=loss,
    metrics=metrics,
    device=DEVICE,
)

logs = test_epoch.run(test_dataloader)
print(logs)

In [None]:
#helper function for data visualization
def visualize2(**images):
    """PLot images in one row."""
    n = len(images)
    plt.figure(figsize=(14, 4))
    for i, (name, image) in enumerate(images.items()):
        plt.subplot(1, n, i + 1)
        plt.xticks([])
        plt.yticks([])
        plt.title(' '.join(name.split('_')).title())
        (h, w) = image.shape[:2]
        (cX, cY) = (w // 2, h // 2)
        # rotate 
        M = cv2.getRotationMatrix2D((cX, cY), 90, 1.0)
        rotated = cv2.warpAffine(image, M, (w, h))
        plt.imshow(rotated)
    plt.show()
    
def visualize(**images):
    """PLot images in one row."""
    n = len(images)
    plt.figure(figsize=(14, 4))
    for i, (name, image) in enumerate(images.items()):
        plt.subplot(1, n, i + 1)
        plt.xticks([])
        plt.yticks([])
        plt.title(' '.join(name.split('_')).title())
        plt.imshow(image)
    plt.show()

# test dataset without transformations for image visualization

test_dataset_vis = Dataset(
    x_test_dir, y_test_dir, 
    classes=CLASSES,
)
for i in range(1):
    n = np.random.choice(len(test_dataset))
    
    image_vis = test_dataset_vis[n][0].numpy().astype('uint8')
    image, gt_mask = test_dataset[n]
    
    gt_mask = gt_mask.numpy().astype('uint8').squeeze()
    
    x_tensor = torch.from_numpy(np.asarray(image)).to(DEVICE).unsqueeze(0)
    pr_mask = best_model.predict(x_tensor)
    pr_mask = (pr_mask.squeeze().cpu().numpy().round())
        
    '''print(image_vis.transpose(2,1,0).shape)
    print(gt_mask.transpose(2,1,0).shape)
    print(pr_mask.shape)'''
        
    visualize(image=image_vis.transpose(2,1,0), 
        ground_truth_mask=gt_mask.transpose(2,1,0),
        predicted_mask=pr_mask
    )
    
    
#mask=torch.from_numpy(np.array(mask.transpose(2,1,0))).float()
    
