<a href="https://colab.research.google.com/github/Debrup-commits/Image-Segmentaion-UNET/blob/main/Image_segmentaation_UNET.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Setup**

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!cp /content/drive/MyDrive/NN\ Tasks/Semantic\ Segmentation\ UNET/train.zip /content/
!cp /content/drive/MyDrive/NN\ Tasks/Semantic\ Segmentation\ UNET/train_masks.zip /content/


In [3]:
!unzip -q /content/train.zip
!unzip -q /content/train_masks.zip

In [4]:
# to check if any missing masks as training mask and image folders have unequal size
import os
from os import listdir
 
# get the path/directory
folder_dir = "/content/train"
count=0

# iterating through train
for images in os.listdir(folder_dir):

    # getting mask path for each train image
    path= os.path.join('/content/train_masks/', images[:-4]+'_mask.gif')
    
    if os.path.isfile(path)==False:
      count+=1
      print("{} Mask(s) absent".format(count))



In [5]:
#generating val and val_masks folders
import os
import shutil



val_path='/content/val'
val_mask_path='/content/val_masks'

if os.path.isdir(val_path)==False:
  os.mkdir(val_path)
  os.mkdir(val_mask_path)

train = '/content/train'
train_mask = '/content/train_masks'

i=1
for images in os.listdir(train):
  if i<49:
    image_path = os.path.join('/content/train/', images)
    image_mask_path = os.path.join('/content/train_masks/', images[:-4]+'_mask.gif')

    target_image_path = os.path.join('/content/val/', images)
    target_mask_path = os.path.join('/content/val_masks/', images[:-4]+'_mask.gif')

    shutil.copyfile(image_path, target_image_path)
    shutil.copyfile(image_mask_path, target_mask_path)

  else:
    break
  
  i+=1

# **Dataset**

In [6]:
import os
from PIL import Image
from torch.utils.data import Dataset
import numpy as np

class CarvanaDataset(Dataset):
  def __init__(self, img_dir, mask_dir, transform=None):
    self.img_dir=img_dir
    self.mask_dir=mask_dir
    self.transform=transform
    self.images=os.listdir(img_dir)
  
  def __len__(self):
    return len(self.images)

  def __getitem__(self, index):
      img_path=os.path.join(self.img_dir, self.images[index])
      mask_path=os.path.join(self.mask_dir, self.images[index].replace(".jpg", "_mask.gif"))

      image=np.array(Image.open(img_path).convert("RGB")) #img converted to RGB np array
      mask=np.array(Image.open(mask_path).convert("L"), dtype=np.float32) #mask converted to BnW np array

      #normalizing the mask for smaller calculations while training
      mask[mask==255.0]=1.0

      if self.transform is not None:
        transformations=self.tansform(image=image, mask=mask)
        image=transformations["image"]
        mask=transformations["mask"]

      return image, mask

# **Our** **Model**

In [7]:
import torch
import torch.nn as nn
import torchvision.transforms.functional as f

# making the UNET structure
class DoubleConv(nn.Module):
  def __init__(self, in_channels, out_channels):
    super(DoubleConv, self).__init__()
    self.conv = nn.Sequential(
        nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias=False),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace=True),
        nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace=True)
    )

  def forward(self, x):
    return self.conv(x)

class UNET(nn.Module):
  def __init__(self, in_channels=3, out_channels=1, features=[64, 128, 256, 512]):
    super(UNET, self).__init__()
    self.downs=nn.ModuleList()
    self.ups=nn.ModuleList()
    self.pool=nn.MaxPool2d(kernel_size=2, stride=2)

    # Down part of UNET
    for feature in features:
      self.downs.append(DoubleConv(in_channels, feature))
      in_channels=feature

    for feature in reversed(features):
      self.ups.append(nn.ConvTranspose2d(2*feature, feature, kernel_size=2, stride=2))
      self.ups.append(DoubleConv(2*feature, feature))

    self.bottle_neck=DoubleConv(features[-1], 2*features[-1])

    self.final_conv=nn.Conv2d(features[0], out_channels, kernel_size=1)

  def forward(self, x):
    skip_connections=[]

    for down in self.downs:
      x=down(x)
      skip_connections.append(x)
      x=self.pool(x)

    x=self.bottle_neck(x)
    skip_connections=skip_connections[::-1]

    for idx in range(0, len(self.ups), 2):
      x=self.ups[idx](x)
      skip_connection=skip_connections[idx//2]

      if x.shape != skip_connection.shape:
        x = f.resize(x, size=skip_connection.shape[2:])

      concatenated_output=torch.cat((skip_connection, x), dim=1)
      x=self.ups[idx+1](concatenated_output)

    x=self.final_conv(x)

    return x


def test():
  x=torch.randn((3, 1, 160, 160))
  model=UNET(in_channels=1, out_channels=1)
  preds=model(x)

  print(x.shape, preds.shape)

  assert preds.shape==x.shape

if __name__=='__main__':
  test()
    

torch.Size([3, 1, 160, 160]) torch.Size([3, 1, 160, 160])


# **Helper functions**

In [8]:
import torch
import torchvision
from torch.utils.data import DataLoader

def save_checkpoint(state, filename="my_checkpoint.pth.tar"):
  print("Saving CheckPoint!")
  torch.save(state, filename)

def load_checkpoint(checkpoint, model):
  print("Loading CheckPoint!")
  model.load_state_dict(checkpoint["state_dict"])

def get_loaders(
    train_dir,
    train_mask_dir,
    val_dir,
    val_mask_dir,
    batch_size,
    train_transform,
    val_transform,
    num_workers=4,
    pin_memory=True):
  
  train_dataset=CarvanaDataset(train_dir, train_mask_dir, train_transform)
  train_loader=DataLoader(train_dataset, batch_size, num_workers, pin_memory, shuffle=True)

  val_dataset=CarvanaDataset(val_dir, val_mask_dir, val_transform)
  val_loader=DataLoader(val_dataset, batch_size, num_workers, pin_memory, shuffle=False)

  return train_loader, val_loader

def check_accuracy(loader, model, device="cuda"):
  num_correct=0
  num_pixels=0
  model.eval()

  with torch.no_grad():
    for x, y in loader:
      x=x.to(device)
      y=y.to(device)

      preds=torch.sigmoid(model(x))
      preds=(preds>0.5).float()

      num_correct+=(preds==y).sum()
      num_pixels+=torch.numel(preds)

  print("Accuracy: {}:.2f".format(num_correct/num_pixels*100))
  model.train()

def save_predictions_as_imgs(loader, model, folder='/content/segmented_masks', device="cuda"):
  model.eval()
  for idx, (x, y) in enumerate(loader):
    x=x.to(device)

    with torch.no_grad():
      
      preds=torch.sigmoid(model(x))
      preds=(preds>0.5).float()
    
    torchvision.utils.save_image(
        preds, f"{folder}/pred_{idx}.png"
    )
    torchvision.utils.save_image(y.unsqueeze(1), f"{folder}{idx}.png")

  model.train()


# **Training**

In [10]:
! pip install albumentations==0.4.6

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting albumentations==0.4.6
  Downloading albumentations-0.4.6.tar.gz (117 kB)
[K     |████████████████████████████████| 117 kB 5.1 MB/s 
Collecting imgaug>=0.4.0
  Downloading imgaug-0.4.0-py2.py3-none-any.whl (948 kB)
[K     |████████████████████████████████| 948 kB 36.1 MB/s 
Building wheels for collected packages: albumentations
  Building wheel for albumentations (setup.py) ... [?25l[?25hdone
  Created wheel for albumentations: filename=albumentations-0.4.6-py3-none-any.whl size=65174 sha256=0b25e1c1d0e80790401a2f6e8505928a5f75374415773d2de8cda785890e0010
  Stored in directory: /root/.cache/pip/wheels/cf/34/0f/cb2a5f93561a181a4bcc84847ad6aaceea8b5a3127469616cc
Successfully built albumentations
Installing collected packages: imgaug, albumentations
  Attempting uninstall: imgaug
    Found existing installation: imgaug 0.2.9
    Uninstalling imgaug-0.2.9:
      Successfully uni

In [9]:
import torch
import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim

#HyperParameters
learning_rate=1e-4
device="cuda" if torch.cuda.is_available() else "cpu"
batch_size=16
num_epochs=100
num_workers=2
image_height=160
image_width=240
pin_memory=True
load_model=False

train_img_path='/content/train'
train_mask_path='/content/train_masks'
val_img_path='/content/val'
val_mask_path='/content/val_masks'

def train(loader, model, optimizer, loss_fn, scaler):
  loop=tqdm(loader)

  for batch_idx, (data, targets) in enumerate(loop):
    data=data.to(device)
    targets = targets.float().unsqueeze(1).to(device=device)

    # forward
    with torch.cuda.amp.autocast(): 
      predictions=model(data)
      loss=loss_fn(predictions, targets)

    # backward
    optimizer.zero_grad()
    scaler.scale(loss).backward()
    scaler.step(optimizer)
    scaler.update()

    # update tqdm loop
    loop.set_postfix(loss=loss.item())



def main():
  train_transforms=A.compose([
                              A.Resize(height=image_height, width=image_width),
                              A.Rotate(limit=35, p=1.0),
                              A.HorizontalFlip(p=0.5),
                              A.VerticalFlip(p=0.1),
                              A.Normalize(
                                  mean=[0.0, 0.0, 0.0],
                                  std=[1.0, 1.0, 1.0],
                                  max_pixel_value=255.0,
                              ),
                              ToTensorV2()
  ])

  val_transforms = A.compose([
                              A.Resize(height=image_height, width=image_width),
                              A.Normalize(
                                  mean=[0.0, 0.0, 0.0],
                                  std=[1.0, 1.0, 1.0],
                                  max_pixel_value=255.0,
                              ),
                              ToTensorV2()                            
  ])

  model = UNET(in_channels=3, out_channels=1).to(device)

  # Binary cross entropy with logits loss
  loss_fn = nn.BCEWithLogitsLoss() 
  optimizer = optim.Adam(model.parameters(), lr=learning_rate)

  train_loader, val_loader = get_loaders(
      train_img_path,
      train_mask_path,
      val_img_path,
      val_mask_path,
      batch_size,
      train_transforms,
      val_transforms,
      num_workers,
      pin_memory
  )

  scaler = torch.cuda.amp.GradScaler()

  for epoch in range(num_epochs):
    train(train_loader, model, optimizer, loss_fn, scaler)
    checkpoint = {
        "state_dict" : model.state_dict(),
        "optimizer" : optimizer.state_dict()
    }
    save_checkpoint(checkpoint)

    # check accuracy
    check_accuracy(val_loader, model, device=device)

    # save some predictions
    saved_img_path='/content/segmented_masks'
    if os.path.isdir(saved_img_path)==False:
      os.mkdir(saved_img_path)

    save_predictions_as_imgs(val_loader, folder=saved_img_path, device=device)


if __name__=='__main__':
  main()



ImportError: ignored