Detection and localization of anomalous objects in Video Sequences using Vision Transformer and U-Net Model |
Stage 2

* ViT & Vanilla U-Net

In [None]:
import os #to joint the path
from glob import glob #used to extract images and masks path
from sklearn.model_selection import train_test_split #to split the dataset to train and validation
from tqdm import tqdm #The progress bar
import cv2

In [None]:
from google.colab import drive
drive.mount('/content/drive',force_remount=True)

Mounted at /content/drive


In [None]:
def create_dir(path):
  """ Create a directory """
  if not os.path.exists(path):
    os.makedirs(path)

##Prepar train and valid dataset

In [None]:
create_dir('Data')
create_dir('Data/Dataset')
create_dir('Data/TrainValid')

In [None]:
# Paths
dataset_path = '/content/Data/Dataset/ped1'

In [None]:
# Copy Data folder from drive
!cp -r '/content/drive/MyDrive/Colab Notebooks/ADViTUNet/Data/Preprocess/' '/content/Data/'
!cp -r '/content/drive/MyDrive/Colab Notebooks/ADViTUNet/Dataset/ped1/' '/content/Data/Dataset/'

In [None]:
!cp -r '/content/Data/Preprocess/FramesComb/' '/content/Data/Dataset/ped1'

In [None]:
from sklearn.utils.validation import sp
def load_data(path,split=0.2):
  """ Load frames and masks """
  frames = sorted(glob(f"{path}/FramesComb/*.jpg"))
  masks = sorted(glob(f"{path}/Frames_GT/*.bmp"))
  # print("Number of frames : ",len(frames))
  # print("Number of masks : ", len(masks))

  """ Split the data """
  split_size = int(len(frames) * split)
  train_x, valid_x = train_test_split(frames, test_size=split_size, random_state=42)
  train_y, valid_y = train_test_split(masks, test_size=split_size, random_state=42)

  return (train_x, train_y), (valid_x, valid_y)

In [1]:
""" Load tha dataset """
(train_x, train_y), (valid_x, valid_y) = load_data(dataset_path,split=0.2)

' Load tha dataset '

In [None]:
create_dir("/content/Data/TrainValid/train/frames/")
create_dir("/content/Data/TrainValid/train/masks/")
create_dir("/content/Data/TrainValid/valid/frames/")
create_dir("/content/Data/TrainValid/valid/masks/")

In [None]:
# Paths
TrainPath = '/content/Data/TrainValid/train/'
ValidPath = '/content/Data/TrainValid/valid/'

In [None]:
from albumentations import HorizontalFlip, VerticalFlip, Rotate #Data augmentation

In [None]:
def augment_data(images, masks, save_path, augment=False):
  """ Performing data augmentation """
  H = 512
  W = 512

  for idx, (x,y) in tqdm(enumerate(zip(images, masks)), total=len(images)):
    """Extracting the dir name and image name"""
    dir_name=x.split("/")[-3]
    name=dir_name + "_" + x.split("/")[-1].split(".")[0]

    """Read the image and name"""
    x=cv2.imread(x, cv2.IMREAD_COLOR)
    y=cv2.imread(y, cv2.IMREAD_COLOR)

    if augment== True:
      aug = HorizontalFlip(p=1.0) #p:probability of applying this data augmentation
      augmented = aug(image=x, mask=y)
      x1=augmented["image"]
      y1=augmented["mask"]

      aug = VerticalFlip(p=1)
      augmented = aug(image=x, mask=y)
      x2=augmented["image"]
      y2=augmented["mask"]

      aug = Rotate(limit=45, p=1.0)
      augmented = aug(image=x, mask=y)
      x3=augmented["image"]
      y3=augmented["mask"]

      X=[x, x1, x2, x3]
      Y=[y, y1, y2, y3]

    else:
      X=[x]
      Y=[y]

    idx = 0
    for i, m in zip(X, Y):
      i=cv2.resize(i,(W,H))
      m=cv2.resize(m, (W,H))
      m=m/255.0
      m=(m > 0.5)*255

      if len(X) == 1:
        tmp_image_name = f"{name}.jpg"
        tmp_mask_name = f"{name}.jpg"
      else:
        tmp_image_name = f"{name}_{idx}.jpg"
        tmp_mask_name = f"{name}_{idx}.jpg"

      image_path = os.path.join(save_path,"frames/", tmp_image_name)
      mask_path = os.path.join(save_path,"masks/", tmp_mask_name)

      cv2.imwrite(image_path, i)
      cv2.imwrite(mask_path, m)

      idx += 1

    # break


In [2]:
# augment_data(train_x, train_y,"new_data/train/", augment=True)
augment_data(train_x, train_y, TrainPath, augment=False)
augment_data(valid_x, valid_y, ValidPath, augment=False) #we dont apply data augmentation for validation data

## Vanilla U-Net Model

In [None]:
from torch import nn

In [None]:
class DoubleConv(nn.Module):
    def __init__(self, in_ch, out_ch):
        super(DoubleConv, self).__init__()

        self.conv = nn.Sequential(

            nn.Conv2d(in_ch, out_ch, 3, padding = 1),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace = True),

            nn.Conv2d(out_ch, out_ch, 3, padding = 1),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace = True)

        )

    def forward(self, input):

        return self.conv(input)

class InputConvolution(nn.Module):
    def __init__(self, in_ch, out_ch):
        super(InputConvolution, self).__init__()
        self.inp_conv = DoubleConv(in_ch, out_ch)

    def forward(self, x):
        x = self.inp_conv(x)
        return x

class Up(nn.Module):
    def __init__(self, in_ch, out_ch):
        super(Up, self).__init__()
        self.up_conv = nn.ConvTranspose2d(in_ch, out_ch, kernel_size = 2, stride = 2)
        self.conv = DoubleConv(in_ch, out_ch)

    def forward(self, x1, x2):
        x1 = self.up_conv(x1)
        x = torch.cat([x2, x1], dim = 1)
        x = self.conv(x)

        return x

class Down(nn.Module):
    def __init__(self, in_ch, out_ch):
        super(Down, self).__init__()
        self.pool = nn.MaxPool2d(2)
        self.conv = DoubleConv(in_ch, out_ch)

    def forward(self, x):
        x = self.pool(x)
        x = self.conv(x)
        return x

class LastConvolution(nn.Module):
    def __init__(self, in_ch, out_ch):
        super(LastConvolution, self).__init__()
        self.conv1 = nn.Conv2d(in_ch, out_ch, 1)

    def forward(self, x):
        x = self.conv1(x)
        return x

In [None]:
class VUnet(nn.Module):
    def __init__(self, channels, classes):
        super(VUnet, self).__init__()
        self.inp = InputConvolution(channels, 64)
        self.down1 = Down(64, 128)
        self.down2 = Down(128, 256)
        self.down3 = Down(256, 512)
        self.down4 = Down(512, 1024)
        self.up1 = Up(1024, 512)
        self.up2 = Up(512, 256)
        self.up3 = Up(256, 128)
        self.up4 = Up(128, 64)
        self.out = LastConvolution(64, classes)

    def forward(self, x):
        x1 = self.inp(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x6 = self.up1(x5, x4)
        x7 = self.up2(x6, x3)
        x8 = self.up3(x7, x2)
        x9 = self.up4(x8, x1)
        x10 = self.out(x9)
        return x10

In [None]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [None]:
vmodel = VUnet(3,1).to(device)

from torchsummary import summary
summary(vmodel, input_size=(3, 512, 512))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 512, 512]           1,792
       BatchNorm2d-2         [-1, 64, 512, 512]             128
              ReLU-3         [-1, 64, 512, 512]               0
            Conv2d-4         [-1, 64, 512, 512]          36,928
       BatchNorm2d-5         [-1, 64, 512, 512]             128
              ReLU-6         [-1, 64, 512, 512]               0
        DoubleConv-7         [-1, 64, 512, 512]               0
  InputConvolution-8         [-1, 64, 512, 512]               0
         MaxPool2d-9         [-1, 64, 256, 256]               0
           Conv2d-10        [-1, 128, 256, 256]          73,856
      BatchNorm2d-11        [-1, 128, 256, 256]             256
             ReLU-12        [-1, 128, 256, 256]               0
           Conv2d-13        [-1, 128, 256, 256]         147,584
      BatchNorm2d-14        [-1, 128, 2

In [None]:
##Trainig dataset
from os.path import ismount
import os
# os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
import numpy as np
import cv2
from glob import glob
from sklearn.utils import shuffle
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger, ReduceLROnPlateau, EarlyStopping, TensorBoard
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Recall, Precision


In [None]:
# for V-Unet
train_x = sorted(glob(TrainPath + "frames/*"))
train_y = sorted(glob(TrainPath + "masks/*"))

valid_x = sorted(glob(ValidPath + "frames/*"))
valid_y = sorted(glob(ValidPath + "masks/*"))

data_str = f"Dataset Size:\nTrain x: {len(train_x)} - Valid: {len(valid_x)}\n"

# print(data_str)

In [None]:
from torch.utils.data import Dataset

class DriveDataset(Dataset):
    def __init__(self, images_path, masks_path):

        self.images_path = images_path
        self.masks_path = masks_path
        self.n_samples = len(images_path)

    def __getitem__(self, index):
        """ Reading image """
        image = cv2.imread(self.images_path[index], cv2.IMREAD_COLOR)
        image = image/255.0 ## (512, 512, 3)
        image = np.transpose(image, (2, 0, 1))  ## (3, 512, 512)
        image = image.astype(np.float32)
        image = torch.from_numpy(image)

        """ Reading mask """
        mask = cv2.imread(self.masks_path[index], cv2.IMREAD_GRAYSCALE)
        mask = mask/255.0   ## (512, 512)
        mask = np.expand_dims(mask, axis=0) ## (1, 512, 512)
        mask = mask.astype(np.float32)
        mask = torch.from_numpy(mask)

        return image, mask

    def __len__(self):
        return self.n_samples

In [None]:
# for V-Unet
# train_dataset = tf_dataset(train_x, train_y, batch=batch_size)
# valid_dataset = tf_dataset(valid_x, valid_y, batch=batch_size)
train_dataset = DriveDataset(train_x, train_y)
valid_dataset = DriveDataset(valid_x, valid_y)

##Metrics

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import  backend as k

In [None]:
def iou(y_true, y_pred):
  def f(y_true, y_pred):
    intersection = (y_true * y_pred).sum()
    union = y_true.sum() + y_pred.sum() - intersection
    x = (intersection + 1e-15) / (union + 1e-15)
    x = x.astype(np.float32)
    return x
  return tf.numpy_function(f, [y_true, y_pred], tf.float32)

smooth = 1e-15

def dice_coef(y_true, y_pred):
  y_true = tf.keras.layers.Flatten()(y_true)
  y_pred = tf.keras.layers.Flatten()(y_pred)
  intersection = tf.reduce_sum(y_true * y_pred)
  return (2. * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred)+smooth)

def dice_loss(y_true, y_pred):
  return 1.0 - dice_coef(y_true, y_pred)

##Training the Vanilla U-Net model

In [None]:
from torchsummary import summary
import time
from torch.utils.data import DataLoader

In [None]:
""" Calculate the time taken """
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [None]:
def train(model, loader, optimizer, loss_fn, device):
    epoch_loss = 0.0

    model.train()
    for x, y in loader:
        x = x.to(device, dtype=torch.float32)
        y = y.to(device, dtype=torch.float32)

        optimizer.zero_grad()
        y_pred = model(x)
        loss = loss_fn(y_pred, y)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()

    epoch_loss = epoch_loss/len(loader)
    return epoch_loss

In [None]:
def evaluate(model, loader, loss_fn, device):
    epoch_loss = 0.0

    model.eval()
    with torch.no_grad():
        for x, y in loader:
            x = x.to(device, dtype=torch.float32)
            y = y.to(device, dtype=torch.float32)

            y_pred = model(x)
            loss = loss_fn(y_pred, y)
            epoch_loss += loss.item()

        epoch_loss = epoch_loss/len(loader)
    return epoch_loss


In [None]:
import torch.nn.functional as F
class DiceLoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(DiceLoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):
        inputs = torch.sigmoid(inputs)

        #flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        intersection = (inputs * targets).sum()
        dice = (2.*intersection + smooth)/(inputs.sum() + targets.sum() + smooth)

        return 1 - dice

class DiceBCELoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(DiceBCELoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):
        inputs = torch.sigmoid(inputs)

        #flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        intersection = (inputs * targets).sum()
        dice_loss = 1 - (2.*intersection + smooth)/(inputs.sum() + targets.sum() + smooth)
        BCE = F.binary_cross_entropy(inputs, targets, reduction='mean')
        Dice_BCE = BCE + dice_loss

        return Dice_BCE

In [None]:
create_dir("files")

In [None]:
import tensorflow as tf
"""Seeding"""
np.random.seed(42)
tf.random.set_seed(42)

In [4]:
H = 512
W = 512
size = (H, W)
batch_size = 2
num_epochs = 20
lr = 1e-4
checkpoint_path = "files/checkpoint.pth"

# train_loader = DataLoader(dataset=train_dataset,batch_size=batch_size,shuffle=True,num_workers=2)
train_loader = DataLoader(dataset=train_dataset,batch_size=batch_size,shuffle=True)
valid_loader = DataLoader(dataset=valid_dataset, batch_size=batch_size, shuffle=False)

device = torch.device('cuda')
vmodel = VUnet(3,1)
vmodel = vmodel.to(device)

# summary(vmodel, input_size=(3, 512, 512))

optimizer = torch.optim.Adam(vmodel.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, verbose=True)
loss_fn = DiceBCELoss()

best_valid_loss = float("inf")

for epoch in range(num_epochs):
  start_time = time.time()
  train_loss = train(vmodel, train_loader, optimizer, loss_fn, device)
  valid_loss = evaluate(vmodel, valid_loader, loss_fn, device)

  """ Saving the model """
  if valid_loss < best_valid_loss:
    data_str = f"Valid loss improved from {best_valid_loss:2.4f} to {valid_loss:2.4f}. Saving checkpoint: {checkpoint_path}"
    print(data_str)

    best_valid_loss = valid_loss
    torch.save(vmodel.state_dict(), checkpoint_path)


  end_time = time.time()
  epoch_mins, epoch_secs = epoch_time(start_time, end_time)

  data_str = f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s\n'
  data_str += f'\tTrain Loss: {train_loss:.3f}\n'
  data_str += f'\t Val. Loss: {valid_loss:.3f}\n'
  print(data_str)

In [None]:
!cp -r '/content/files' '/content/drive/MyDrive/Colab Notebooks/ADViTUNet/Models/V-Unet/'

##Evaluation

In [None]:
# seeding
create_dir("Results")

In [None]:
from sklearn.metrics import accuracy_score, f1_score, jaccard_score, precision_score,recall_score

In [None]:
def calculate_metrics(y_true, y_pred):
    """ Ground truth """
    y_true = y_true.cpu().numpy()
    y_true = y_true > 0.5
    y_true = y_true.astype(np.uint8)
    y_true = y_true.reshape(-1)

    """ Prediction """
    y_pred = y_pred.cpu().numpy()
    y_pred = y_pred > 0.5
    y_pred = y_pred.astype(np.uint8)
    y_pred = y_pred.reshape(-1)

    score_jaccard = jaccard_score(y_true, y_pred)
    score_f1 = f1_score(y_true, y_pred)
    score_recall = recall_score(y_true, y_pred)
    score_precision = precision_score(y_true, y_pred)
    score_acc = accuracy_score(y_true, y_pred)

    return [score_jaccard, score_f1, score_recall, score_precision, score_acc]


def mask_parse(mask):
    mask = np.expand_dims(mask, axis=-1)    ## (512, 512, 1)
    mask = np.concatenate([mask, mask, mask], axis=-1)  ## (512, 512, 3)
    return mask

In [3]:
from operator import add
H = 512
W = 512
size = (H, W)

checkpoint_path = "files/checkpoint.pth"

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

vmodel = VUnet(3,1)
vmodel = vmodel.to(device)
vmodel.load_state_dict(torch.load(checkpoint_path, map_location=device))
vmodel.eval()

metrics_score = [0.0, 0.0, 0.0, 0.0, 0.0]
time_taken = []

for i, (x, y) in tqdm(enumerate(zip(valid_x, valid_y)), total=len(valid_x)):
        """ Extract the name """
        name = x.split("/")[-1].split(".")[0]

        """ Reading image """
        image = cv2.imread(x, cv2.IMREAD_COLOR) ## (512, 512, 3)
        ## image = cv2.resize(image, size)
        x = np.transpose(image, (2, 0, 1))      ## (3, 512, 512)
        x = x/255.0
        x = np.expand_dims(x, axis=0)           ## (1, 3, 512, 512)
        x = x.astype(np.float32)
        x = torch.from_numpy(x)
        x = x.to(device)

        """ Reading mask """
        mask = cv2.imread(y, cv2.IMREAD_GRAYSCALE)  ## (512, 512)
        ## mask = cv2.resize(mask, size)
        y = np.expand_dims(mask, axis=0)            ## (1, 512, 512)
        y = y/255.0
        y = np.expand_dims(y, axis=0)               ## (1, 1, 512, 512)
        y = y.astype(np.float32)
        y = torch.from_numpy(y)
        y = y.to(device)


        with torch.no_grad():
            """ Prediction and Calculating FPS """
            start_time = time.time()
            pred_y = vmodel(x)
            pred_y = torch.sigmoid(pred_y)
            total_time = time.time() - start_time
            time_taken.append(total_time)


            score = calculate_metrics(y, pred_y)
            metrics_score = list(map(add, metrics_score, score))
            print("-------",metrics_score)
            pred_y = pred_y[0].cpu().numpy()        ## (1, 512, 512)
            pred_y = np.squeeze(pred_y, axis=0)     ## (512, 512)
            pred_y = pred_y > 0.5
            pred_y = np.array(pred_y, dtype=np.uint8)

        """ Saving masks """
        ori_mask = mask_parse(mask)
        pred_y = mask_parse(pred_y)
        line = np.ones((size[1], 10, 3)) * 128

        cat_images = np.concatenate(
            [image, line, ori_mask, line, pred_y * 255], axis=1
        )
        cv2.imwrite(f"Results/{name}.png", cat_images)


jaccard = metrics_score[0]/len(valid_x)
f1 = metrics_score[1]/len(valid_x)
recall = metrics_score[2]/len(valid_x)
precision = metrics_score[3]/len(valid_x)
acc = metrics_score[4]/len(valid_x)
print(f"Jaccard: {jaccard:1.4f} - F1: {f1:1.4f} - Recall: {recall:1.4f} - Precision: {precision:1.4f} - Acc: {acc:1.4f}")

fps = 1/np.mean(time_taken)
print("FPS: ", fps)

In [None]:
#Save results to Colab
!cp -r '/content/Results' '/content/drive/MyDrive/Colab Notebooks/ADViTUNet/VResults'