<a href="https://colab.research.google.com/github/ShikharGhimire/Diabetic-Retinopathy/blob/main/SimpleUnet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
### Segmenting retina blood vessels using UNET

In [None]:
#Accesing the google drive
from google.colab import drive
drive.mount('/content/gdrive/')

Mounted at /content/gdrive/


In [None]:
#Importing libraries
import os
import numpy as np
import cv2
import pandas as pd
from glob import glob
from tqdm import tqdm
import imageio
from albumentations import HorizontalFlip, VerticalFlip, Rotate
import time
import random
import torch
from torch.utils.data import Dataset
import torch.nn as nn

In [None]:
#Loading the dataset
def load_path(path):
  train_x = sorted(glob(os.path.join(folder_path,'train','training','images','*.tif'))) #Image folder
  train_y = sorted(glob(os.path.join(folder_path,'train','markup','*.gif'))) #Mask images y
  #For testing folder
  test_x = sorted(glob(os.path.join(folder_path,'test','images','*.tif'))) #Image folder
  test_y = sorted(glob(os.path.join(folder_path,'test','mask','*.gif'))) #Mask images y
  
  return (train_x,train_y), (test_x,test_y)

In [None]:
#Loading the dataset
folder_path = '/content/gdrive/My Drive/DBR/DRIVE'
(train_x,train_y),(test_x,test_y) = load_path(folder_path)

In [None]:
#Let's check the length of the data
print('Length of training data is {}-{}'.format(len(train_x),len(train_y)))
print('Length of testing data is {}-{}'.format(len(test_x),len(test_y)))

Length of training data is 20-20
Length of testing data is 20-20


In [None]:
#Let's check the dimension of the image
training_images = []
mask_images = []
for i in train_x:
  training_images.append(i)
for j in train_y:
  mask_images.append(j)

In [None]:
image_size = set([cv2.imread(x).shape for x in training_images])
print('All the image sizes are',image_size)

#All the image size is that of 584x563 with 3 channels

#Let's look at mask dimensions
mask_size = set([imageio.mimread(x)[0].shape for x in mask_images])
print('All the mask sizes are',mask_size)

#All the mask size is that of 584x565

All the image sizes are {(584, 565, 3)}
All the mask sizes are {(584, 565)}


In [None]:
# #Data Augmentation for the retinal eye images
# def create_dir(path):
#   if not os.path.exists(path):
#     os.makedirs(path)

# #Creating directories to save the augmented data
# create_dir('/content/gdrive/My Drive/DBR/DRIVE/aug_train/images')
# create_dir('/content/gdrive/My Drive/DBR/DRIVE/aug_train/mask')
# create_dir('/content/gdrive/My Drive/DBR/DRIVE/aug_test/images')
# create_dir('/content/gdrive/My Drive/DBR/DRIVE/aug_test/mask')

In [None]:
#Let's create some augmented image
def augmented_data(images,masks,save_path,augment = True):
  size = (560,560)
  for idx, (x,y) in tqdm(enumerate(zip(images,masks)),total = len(images)):
    image_name = x.split('/')[-1].split('.')[0] #Extracting the name of the file
    #Reading the image and the mask
    x = cv2.imread(x,cv2.IMREAD_COLOR)
    y = imageio.mimread(y)[0]
    
    if augment == True: #If we want to do the augmentation
      #Horizontal flipping
      aug = HorizontalFlip(p=1.0)
      augmented = aug(image = x, mask = y)
      x1 = augmented['image']
      y1 = augmented['mask']

      #Vertical Flipping
      aug = VerticalFlip(p = 1.0)
      augmented = aug(image=x,mask = y)
      x2 = augmented['image']
      y2 = augmented['mask']

      #Rotating
      aug = Rotate(limit = 45, p = 1.0)
      augmented = aug(image=x,mask = y)
      x3 = augmented['image']
      y3 = augmented['mask']

      X = [x,x1,x2,x3]
      Y = [y,y1,y2,y3]

    else:
      X = [x]
      Y = [y]

    index = 0
    for i,m in zip(X,Y):
      i = cv2.resize(i,size)
      m = cv2.resize(m,size)

      #Creating temporary name to store all the augmented data
      tmp_image_name = f"{image_name}_{index}.png"
      tmp_mask_name = f"{image_name}_{index}.png"

      image_path = os.path.join(save_path,"images",tmp_image_name)
      mask_path =  os.path.join(save_path,"mask",tmp_mask_name)

      cv2.imwrite(image_path,i)
      cv2.imwrite(mask_path,m)

      index = index+1


In [None]:
# augmented_data(train_x, train_y,'/content/gdrive/My Drive/DBR/DRIVE/aug_train/', augment = True) #For training data
# augmented_data(train_x,train_y,'/content/gdrive/My Drive/DBR/DRIVE/aug_test/',augment = False) #For testing data as we do not need to do any augmentation on it we put augment as false

### Pytorch Unet architecture


In [None]:
class conv_block(nn.Module):
  def __init__(self,in_c,out_c):
    super().__init__()
    #First convolution layer
    self.conv1 = nn.Conv2d(in_c, out_c, kernel_size = 3, padding =1)
    self.bn1 = nn.BatchNorm2d(out_c)

    self.conv2 = nn.Conv2d(out_c, out_c, kernel_size = 3, padding =1)
    self.bn2 = nn.BatchNorm2d(out_c)

    self.relu = nn.ReLU()

  def forward(self,inputs):
    x = self.conv1(inputs)
    x = self.bn1(x)
    x = self.relu(x)

    x = self.conv2(x)
    x = self.bn2(x)
    x = self.relu(x)

    return x

In [None]:
#Using encoder block
class encoder_block(nn.Module):
  def __init__(self,in_c,out_c):
    super().__init__()

    self.conv = conv_block(in_c,out_c)
    self.pool = nn.MaxPool2d((2,2))

  def forward(self,inputs):
    x = self.conv(inputs)
    p = self.pool(x)
    
    return x,p

In [None]:
# DECODER block
class decoder_block(nn.Module):
  def __init__(self,in_c,out_c):
    super().__init__()
    self.up = nn.ConvTranspose2d(in_c,out_c,kernel_size = 2,stride = 2,padding = 0)
    self.conv = conv_block(out_c+out_c,out_c)

  def forward(self,inputs,skip):
    x = self.up(inputs)
    x = torch.cat([x,skip], axis = 1)
    x = self.conv(x)
    return x

In [None]:
class build_unet(nn.Module):
  def __init__(self):
    super().__init__()
    #Making some encoder layers
    self.e1 = encoder_block(3,64) #RGB image and number of output channel
    self.e2 = encoder_block(64,128)

    self.e3 = encoder_block(128,256)
    self.e4 = encoder_block(256,512)

    #Creating the bottleneck in UNET architecture
    self.b = conv_block(512,1024)

    #Making some decoder layers
    self.d1 = decoder_block(1024,512)
    self.d2 = decoder_block(512,256)
    self.d3 = decoder_block(256,128)
    self.d4 = decoder_block(128,64)

    #Classifier
    self.outputs = nn.Conv2d(64, 1, kernel_size = 1, padding = 0) #Classifier to generate the segmentation mask

  def forward(self, inputs):
    #Encoder
    s1,p1 = self.e1(inputs)
    s2,p2 = self.e2(p1)
    s3,p3 = self.e3(p2)
    s4,p4 = self.e4(p3)

    #Bottleneck
    b = self.b(p4)

    #Decoders class
    d1 = self.d1(b,s4)
    d2 = self.d2(d1,s3)
    d3 = self.d3(d2,s2)
    d4 = self.d4(d3,s1)

    outputs = self.outputs(d4)

    return outputs

    # print(s1.shape,s2.shape,s3.shape,s4.shape)
    # print(b.shape)


In [None]:
#Loss function

import torch
import torch.nn as nn
import torch.nn.functional as F

class DiceLoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(DiceLoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):

        
        inputs = torch.sigmoid(inputs)

        #flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        intersection = (inputs * targets).sum()
        dice = (2.*intersection + smooth)/(inputs.sum() + targets.sum() + smooth)

        return 1 - dice

class DiceBCELoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(DiceBCELoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):

        
        inputs = torch.sigmoid(inputs)

        #flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        intersection = (inputs * targets).sum()
        dice_loss = 1 - (2.*intersection + smooth)/(inputs.sum() + targets.sum() + smooth)
        BCE = F.binary_cross_entropy(inputs, targets, reduction='mean')
        Dice_BCE = BCE + dice_loss

        return Dice_BCE

In [None]:
# ##Building a UNET structure

# if __name__ == "__main__":
#   x = torch.randn((2,3,512,512)) #batch size,number_of_channels,height,width 
#   b = build_unet()
#   y = b(x)
#   print(y.shape)

In [None]:
#Creating the data pipelines

class DriveDataset(Dataset):
    def __init__(self, images_path, masks_path):

        self.images_path = images_path
        self.masks_path = masks_path
        self.n_samples = len(images_path) #Checking the number of images

    def __getitem__(self, index):
        """ Reading image """
        image = cv2.imread(self.images_path[index], cv2.IMREAD_COLOR)
        image = image/255.0 
        image = np.transpose(image, (2, 0, 1)) 
        image = image.astype(np.float32)
        image = torch.from_numpy(image) #Converting it into numpy tensors

        """ Reading mask """
        mask = cv2.imread(self.masks_path[index], cv2.IMREAD_GRAYSCALE)
        mask = mask/255.0   
        mask = np.expand_dims(mask, axis=0) 
        mask = mask.astype(np.float32)
        mask = torch.from_numpy(mask)

        return image, mask

    def __len__(self): #To return the number of samples
        return self.n_samples

In [None]:
if __name__=='__main__':
  np.random.seed(42)

  #Loading the dataset
  train_X = sorted(glob('/content/gdrive/My Drive/DBR/DRIVE/aug_train/images/*'))[:20]
  train_y = sorted(glob('/content/gdrive/My Drive/DBR/DRIVE/aug_train/mask/*'))[:20]

  #Validation data
  valid_X = sorted(glob('/content/gdrive/My Drive/DBR/DRIVE/aug_test/images/*'))
  valid_Y = sorted(glob('/content/gdrive/My Drive/DBR/DRIVE/aug_test/mask/*'))

In [None]:
print('Length of training data is {} and length of validation data is {}'.format(len(train_X),len(valid_X)))

Length of training data is 20 and length of validation data is 20


In [None]:
#Using hyper parameters
H = 565
W = 565
size = (H,W)
batch_size = 2
num_epochs = 50
lr = 1e-4
checkpoint_path = '/content/gdrive/My Drive/DBR/DRIVE'


#Using Dataset for training

train_image = DriveDataset(train_X,train_y)
test_image = DriveDataset(train_y,valid_Y)

#Using loader
from torch.utils.data import DataLoader

#Using train data loader
train_loader = DataLoader(
    dataset = train_image,
    batch_size = batch_size,
    shuffle = True,
    num_workers = 2 #Check what this is about
    )

#Using test data loader

valid_loader = DataLoader(
    dataset = test_image,
    batch_size = batch_size,
    shuffle = True,
    num_workers = 2 #Check what this is about
    )

In [None]:
#Compiling the UNET
model = build_unet()
optimizer = torch.optim.Adam(model.parameters(),lr = lr)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience = 5,verbose = True)
loss_fn = DiceBCELoss()

In [None]:
#Creating a training function
def train(model,loader,optimizer,loss_fn):
  epoch_loss = 0.0
  model.train()
  for x,y in loader:
    x.dtype == np.float32
    y.dtype == np.float32 

    optimizer.zero_grad()
    y_pred = model(x)
    loss = loss_fn(y_pred,y)
    loss.backward()
    optimizer.step()
    epoch_loss = epoch_loss+loss.item() #For training

  epoch_loss = epoch_loss/len(loader)
  return epoch_loss

def evaluate(model,loader,loss_fn):
  epoch_loss = 0.0
  model.eval()
  with torch.no_grad():
    for x,y in loader:
      x.dtype == np.float32
      y.dtype == np.float32
      y_pred = model(x) 
      loss = loss_fn(y_pred,y)
      epoch_loss = epoch_loss+loss.item()

    epoch_loss = epoch_loss/len(loader) #For evaluation
  return epoch_loss


#Training the model
for epoch in range(num_epochs):
  start_time = time.time()
  train_loss = train(model,train_loader,optimizer,loss_fn)
  valid_loss = evaluate(model,valid_loader,loss_fn)

  end_time = time.time()

  print('Train loss: {}'.format(train_loss))
  print('Valid loss : {}'.format(valid_loss))