In [None]:
!nvidia-smi

In [None]:
!pip install -U segmentation-models-pytorch albumentations pytorch-lightning rich

In [3]:
import os
import numpy as np
import cv2
import torch
import matplotlib.pyplot as plt
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint, TQDMProgressBar, RichProgressBar, LearningRateMonitor
from pytorch_lightning.loggers import TensorBoardLogger
import segmentation_models_pytorch as smp
from torch.utils.data import DataLoader
from torch.utils.data import Dataset as BaseDataset
import json
import albumentations as albu
from timeit import default_timer as timer
from typing import Any

# Ignore annoying certificate problem
import ssl
ssl._create_default_https_context = ssl._create_unverified_context

torch.hub.set_dir("/content/drive/MyDrive/Colab Notebooks/weights")
DATA_DIR = '/content/drive/MyDrive/Colab Notebooks/lpr_seg/train_set/'

%load_ext tensorboard

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [35]:
# helper function for data visualization
def visualize(**images):
    """PLot images in one row."""
    n = len(images)
    plt.figure(figsize=(16, 5))
    for i, (name, image) in enumerate(images.items()):
        plt.subplot(1, n, i + 1)
        plt.xticks([])
        plt.yticks([])
        plt.title(' '.join(name.split('_')).title())
        plt.imshow(image)
    plt.show()


class DefectsDataset(BaseDataset):
    """CamVid Dataset. Read images, apply augmentation and preprocessing transformations.
    
    Args:
        images_dir (str): path to images folder
        masks_dir (str): path to segmentation masks folder
        class_values (list): values of classes to extract from segmentation mask
        augmentation (albumentations.Compose): data transfromation pipeline 
            (e.g. flip, scale, etc.)
        preprocessing (albumentations.Compose): data preprocessing 
            (e.g. noralization, shape manipulation, etc.)
    
    """
    
    CLASSES = ['background', 'plates']
    
    def __init__(
            self, 
            root,
            classes=None, 
            augmentation=None, 
            preprocessing=None,
            new_shape=None,
    ):
        self.root = root
        images_dir = os.path.join(self.root, "images")
        self.ids = os.listdir(images_dir)
        self.ids.sort()
        self.images_fps = [os.path.join(images_dir, image_id) for image_id in self.ids]

        masks_dir = os.path.join(self.root, "masks")
        self.masks_fps = [os.path.join(masks_dir, image_id) for image_id in self.ids]

        # convert str names to class values on masks
        self.class_values = [self.CLASSES.index(cls.lower()) for cls in classes]
        
        self.augmentation = augmentation
        self.preprocessing = preprocessing
        self.new_shape = new_shape
    
    def __getitem__(self, i):
        
        # read image and mask
        file_name = self.ids[i]
        # image = cv2.imread(self.images_fps[i], cv2.IMREAD_COLOR | cv2.IMREAD_IGNORE_ORIENTATION)
        image = cv2.imread(self.images_fps[i])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(self.masks_fps[i], cv2.IMREAD_GRAYSCALE)

        # resize image and mask
        # width and height must be divisible by 32
        if self.new_shape is None:
            new_width = 1024
            factor = new_width / image.shape[1]
            new_height = factor * image.shape[0]
            new_height = int(np.ceil(new_height / 32) * 32)
            new_shape = (new_width, new_height)
        else:
            new_shape = self.new_shape
        image = cv2.resize(image, new_shape)
        mask = cv2.resize(mask, new_shape)
        
        # extract certain classes from mask (e.g. cars)
        masks = [(mask == v) for v in self.class_values]
        mask = np.stack(masks, axis=-1)
        for i in range(1, mask.shape[-1]):
            mask[:, :, i - 1] = np.bitwise_or(mask[:, :, i - 1], mask[:, :, i])
        
        # apply augmentations
        if self.augmentation:
            sample = self.augmentation(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']
        
        # apply preprocessing
        mask = mask.transpose(2, 0, 1).astype(np.float32)
        image = image.transpose(2, 0, 1).astype(np.float32)
        image /= 255.0
            
        return {'image': image, 'mask': mask}
        
    def __len__(self):
        return len(self.ids)

In [None]:
# Lets look at data we have

dataset = DefectsDataset(DATA_DIR, classes=['plates'])
print(f"{len(dataset)} images in the dataset")

for i in range(5):
    input_dict = dataset[i]
    # print(input_dict)
    image, mask = input_dict['image'], input_dict['mask'] # get some sample
    print(image.shape, mask.shape)
    image = image.transpose(1, 2, 0)
    mask = mask.transpose(1, 2, 0)
    visualize(
        image=image, 
        plates_mask=mask.squeeze(),
    )

In [36]:
def get_training_augmentation():
    train_transform = [

        # albu.HorizontalFlip(p=0.5),

        # albu.ShiftScaleRotate(scale_limit=0.5, rotate_limit=0, shift_limit=0.1, p=1, border_mode=0),

        # albu.PadIfNeeded(min_height=320, min_width=320, always_apply=True, border_mode=0),
        albu.OneOf(
            [
                albu.RandomCrop(height=480, width=480, p=0.1),
                albu.CropNonEmptyMaskIfExists(height=480, width=480, p=0.9)
            ],
            p=1.0,
        ),
        

        # albu.GaussNoise(p=0.2),
        # albu.IAAPerspective(p=0.5),

        # albu.OneOf(
        #     [
        #         albu.Sharpen(p=1),
        #         # albu.Blur(p=1),
        #         albu.MotionBlur(blur_limit=5, p=1),
        #         albu.Defocus(radius=4, p=1)
        #     ],
        #     p=0.1,
        # ),

        # albu.OneOf(
        #     [
        #         # albu.CLAHE(p=1),
        #         albu.RandomGamma(p=1),
        #         albu.RandomBrightnessContrast(p=1),
        #         albu.HueSaturationValue(p=1),
        #     ],
        #     p=0.5,
        # ),
    ]
    return albu.Compose(train_transform)

In [37]:
# #### Visualize resulted augmented images and masks

# augmented_dataset = DefectsDataset(
#     DATA_DIR, 
#     augmentation=get_training_augmentation(), 
#     classes=['plates'],
# )

# # same image with different random transforms
# for i in range(5):
#     input_dict = augmented_dataset[3]
#     image, mask = input_dict['image'], input_dict['mask'] # get some sample
#     print(image.shape, mask.shape)
#     image = image.transpose(1, 2, 0)
#     mask = mask.transpose(1, 2, 0)
#     visualize(
#         image=image, 
#         plates_mask=mask.squeeze(),
#     )

In [38]:
classes = ['plates']
# augment = True
augment = False
if augment:
    augmentations = get_training_augmentation()
else:
    augmentations = None
resize = False
if resize:
    new_shape = [960, 960]
else:
    new_shape = None
full_dataset = DefectsDataset(
    DATA_DIR, 
    augmentation=augmentations, 
    classes=classes,
    new_shape=new_shape
)

full_loader = DataLoader(full_dataset, batch_size=1, shuffle=False, num_workers=2)

In [39]:
class DefectsModel(pl.LightningModule):

    def __init__(self, arch, encoder_name, in_channels, out_classes, loss, lr=0.0001, **kwargs):
        super().__init__()
        self.model = smp.create_model(
            arch, encoder_name=encoder_name, in_channels=in_channels, classes=out_classes, **kwargs
        )

        # preprocessing parameteres for image
        params = smp.encoders.get_preprocessing_params(encoder_name)
        self.register_buffer('std', torch.tensor(params['std']).view(1, 3, 1, 1))
        self.register_buffer('mean', torch.tensor(params['mean']).view(1, 3, 1, 1))

        # for image segmentation dice loss could be the best first choice
        self.loss_mode = smp.losses.BINARY_MODE if out_classes == 1 else smp.losses.MULTILABEL_MODE
        self.metrics_mode = 'binary' if out_classes == 1 else 'multilabel'
        
        if loss == 'DiceLoss':
            self.loss_fn = smp.losses.DiceLoss(self.loss_mode, from_logits=True)
        elif loss == 'FocalLoss':
            self.loss_fn = smp.losses.FocalLoss(self.loss_mode)
        elif loss == 'JaccardLoss':
            self.loss_fn = smp.losses.JaccardLoss(self.loss_mode, from_logits=True)
        else:
            raise Exception("Unsupported loss")

        self.lr = lr

    def forward(self, image):
        # normalize image here
        image = (image - self.mean) / self.std
        mask = self.model(image)
        return mask

    def shared_step(self, batch, stage):
        image = batch['image']

        # Shape of the image should be (batch_size, num_channels, height, width)
        # if you work with grayscale images, expand channels dim to have [batch_size, 1, height, width]
        assert image.ndim == 4

        # Check that image dimensions are divisible by 32, 
        # encoder and decoder connected by `skip connections` and usually encoder have 5 stages of 
        # downsampling by factor 2 (2 ^ 5 = 32); e.g. if we have image with shape 65x65 we will have 
        # following shapes of features in encoder and decoder: 84, 42, 21, 10, 5 -> 5, 10, 20, 40, 80
        # and we will get an error trying to concat these features
        h, w = image.shape[2:]
        assert h % 32 == 0 and w % 32 == 0

        mask = batch['mask']

        # Shape of the mask should be [batch_size, num_classes, height, width]
        # for binary segmentation num_classes = 1
        assert mask.ndim == 4

        # Check that mask values in between 0 and 1, NOT 0 and 255 for binary segmentation
        assert mask.max() <= 1.0 and mask.min() >= 0

        logits_mask = self.forward(image)
        prob_mask = logits_mask.sigmoid()
        
        # Predicted mask contains logits, and loss_fn param `from_logits` is set
        loss = self.loss_fn(logits_mask, mask)
        self.log(f"{stage}_loss", loss)

        # Lets compute metrics for some threshold
        # first convert mask values to probabilities, then 
        # apply thresholding
        pred_mask = (prob_mask > 0.5).float()

        # We will compute IoU metric by two ways
        #   1. dataset-wise
        #   2. image-wise
        # but for now we just compute true positive, false positive, false negative and
        # true negative 'pixels' for each image and class
        # these values will be aggregated in the end of an epoch
        tp, fp, fn, tn = smp.metrics.get_stats(pred_mask.long(), mask.long(), mode=self.metrics_mode)

        return {
            "loss": loss,
            "tp": tp,
            "fp": fp,
            "fn": fn,
            "tn": tn,
        }

    def shared_epoch_end(self, outputs, stage):
        # aggregate step metics
        tp = torch.cat([x["tp"] for x in outputs])
        fp = torch.cat([x["fp"] for x in outputs])
        fn = torch.cat([x["fn"] for x in outputs])
        tn = torch.cat([x["tn"] for x in outputs])

        # per image IoU means that we first calculate IoU score for each image 
        # and then compute mean over these scores
        per_image_iou = smp.metrics.iou_score(tp, fp, fn, tn, reduction="micro-imagewise")
        
        # dataset IoU means that we aggregate intersection and union over whole dataset
        # and then compute IoU score. The difference between dataset_iou and per_image_iou scores
        # in this particular case will not be much, however for dataset 
        # with "empty" images (images without target class) a large gap could be observed. 
        # Empty images influence a lot on per_image_iou and much less on dataset_iou.
        dataset_iou = smp.metrics.iou_score(tp, fp, fn, tn, reduction="micro")

        metrics = {
            f"{stage}_per_image_iou": per_image_iou,
            f"{stage}_dataset_iou": dataset_iou,
        }
        
        self.log_dict(metrics, prog_bar=True)

    def training_step(self, batch, batch_idx):
        return self.shared_step(batch, "train")            

    def training_epoch_end(self, outputs):
        return self.shared_epoch_end(outputs, "train")

    def validation_step(self, batch, batch_idx):
        return self.shared_step(batch, "valid")

    def validation_epoch_end(self, outputs):
        return self.shared_epoch_end(outputs, "valid")

    def test_step(self, batch, batch_idx):
        return self.shared_step(batch, "test")  

    def test_epoch_end(self, outputs):
        return self.shared_epoch_end(outputs, "test")

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.lr)
        # optimizer = torch.optim.SGD(self.parameters(), lr=self.lr, momentum=0.9)
        # scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5)
        scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=25)
        return {'optimizer': optimizer, 'lr_scheduler': scheduler, 'monitor': 'train_loss'}
        # return optimizer

In [40]:
def decode_file_name(file_name):
    """
    Decodes parameters from a checkpoint file_name.
    """
    args = file_name.split('_')
    model_name = args[2]
    if args[3] == 'se':
        encoder_name = f"{args[3]}_{args[4]}_{args[5]}"
    else:
        encoder_name = args[3]
    if args[-3][0] == 'a':
        augment = True if args[-3][4] == 'T' else False
        out_classes = int(args[-4][-1])
    else:
        augment = False
        out_classes = int(args[-3][-1])
    loss = args[-2]

    return model_name, encoder_name, augment, out_classes, loss

file_name = "lpr_lightning_Unet_se_resnext50_32x4d_c=1_aug=True_JaccardLoss_loss-0.9469.ckpt"
model_name, encoder_name, augment, out_classes, loss = decode_file_name(file_name)
print(model_name, encoder_name, out_classes, augment, loss)
best_model = DefectsModel.load_from_checkpoint(
    f"/content/drive/MyDrive/Colab Notebooks/weights/lightning/{file_name}",
    arch=model_name, encoder_name=encoder_name, in_channels=3, out_classes=out_classes, loss=loss)

Unet se_resnext50_32x4d 1 True JaccardLoss


In [41]:
best_model = best_model.model
sigmoid_on = True

In [21]:
# file_name = "FPNmod_se_resnext50_32x4d_loss_0.04456.pth"
# best_model = torch.load(f"/content/drive/MyDrive/Colab Notebooks/weights/lightning/{file_name}")
# # print(best_model)
# encoder_name = "se_resnext50_32x4d"
# out_classes = 1
# sigmoid_on = False

In [42]:
DEVICE = 'cuda'
# DEVICE = 'cpu'
params = smp.encoders.get_preprocessing_params(encoder_name)
std = torch.tensor(params['std']).view(1, 3, 1, 1).to(DEVICE)
mean = torch.tensor(params['mean']).view(1, 3, 1, 1).to(DEVICE)
metrics_mode = 'binary' if out_classes == 1 else 'multilabel'
best_model.to(DEVICE)

best_model.eval()
dataset_iou = 0.0
with torch.no_grad():
    for i in range(len(full_dataset)):
        if i % 100 == 0:
            print(f"Image {i}")
        input_dict = full_dataset[i]
        image, gt_mask = input_dict['image'], input_dict['mask']

        gt_mask = torch.from_numpy(gt_mask).to(DEVICE).unsqueeze(0)

        x_tensor = torch.from_numpy(image).to(DEVICE).unsqueeze(0)
        x_tensor = (x_tensor - mean) / std
        if sigmoid_on:
            pr_mask = best_model(x_tensor).sigmoid()
        else:
            pr_mask = best_model(x_tensor)
        pr_mask = (pr_mask > 0.5).float()
        tp, fp, fn, tn = smp.metrics.get_stats(pr_mask.long(), gt_mask.long(), mode=metrics_mode)
        dataset_iou += smp.metrics.iou_score(tp, fp, fn, tn, reduction="micro")
        if i % 100 == 0:
            print(f"IOU: {dataset_iou / (i + 1)}")

print(f"Total IOU: {dataset_iou / len(full_dataset)}")

Image 2200
IOU: nan


KeyboardInterrupt: ignored

In [None]:
# lpr_lightning_Unet_se_resnext50_32x4d_c=1_aug=True_JaccardLoss_iou-0.9468.ckpt        0.9180
# FPNmod_se_resnext50_32x4d_loss_0.04456.pth                                            0.8924
# lpr_lightning_Unet_se_resnext50_32x4d_c=1_aug=True_JaccardLoss_loss-0.9469-old.ckpt   0.9177
# 

In [None]:
total_time = 0.0
with torch.no_grad():
  for i in range(min(10000, len(full_dataset))):
      if i % 100 == 0:
          print(f"Image {i}")
      input_dict = full_dataset[i]
      image, gt_mask = input_dict['image'], input_dict['mask']

      gt_mask = gt_mask.squeeze()

      x_tensor = torch.from_numpy(image).to(DEVICE).unsqueeze(0)
      x_tensor = (x_tensor - mean) / std
      start = timer()
      pr_mask = best_model(x_tensor).sigmoid()
      pr_mask = pr_mask.squeeze().cpu().numpy()
      pr_mask = pr_mask > 0.5
      total_time += timer() - start
      if i % 100 == 0:
            print(f"Time: {total_time / (i + 1)}")

print(f"Total time: {total_time / len(full_dataset)}")

In [None]:
# lpr_lightning_Unet_se_resnext50_32x4d_c=1_aug=True_JaccardLoss_iou-0.9468.ckpt        xxx
# FPNmod_se_resnext50_32x4d_loss_0.04456.pth                                            xxx
# lpr_lightning_Unet_se_resnext50_32x4d_c=1_aug=True_JaccardLoss_loss-0.9469-old.ckpt   0.1157
# 