In [None]:
#In this cell, UNET Architecture has been created
#In Torch, dimension is like (batch size, channels, height, width)
import torch 
import torch.nn as nn
import torchvision.transforms.functional as TF

class DoubleConv(nn.Module):
  def __init__(self,in_channels,out_channels):
    super(DoubleConv,self).__init__()
    self.conv=nn.Sequential(nn.Conv2d(in_channels,out_channels,3,1,1,bias=False),  #Kernel size=3x3, stride=1, padding=1
                            nn.BatchNorm2d(out_channels),nn.ReLU(inplace=True),nn.Conv2d(out_channels,out_channels,3,1,1,bias=False),
                            nn.BatchNorm2d(out_channels),nn.ReLU(inplace=True))
  def forward(self,x):
    return self.conv(x)
  
class UNET(nn.Module):
  def __init__(self,in_channels=3,out_channels=1):
    super(UNET,self).__init__()
    self.ups=nn.ModuleList()
    self.downs=nn.ModuleList()
    self.pool=nn.MaxPool2d(kernel_size=2,stride=2)
    #Downsampling portion of UNET
    self.downs.append(DoubleConv(in_channels,64))
    self.downs.append(DoubleConv(64,128))
    self.downs.append(DoubleConv(128,256))
    self.downs.append(DoubleConv(256,512))
    #Upsampling portion of UNET
    self.ups.append(nn.ConvTranspose2d(1024,512,kernel_size=2,stride=2))
    self.ups.append(DoubleConv(1024,512))
    self.ups.append(nn.ConvTranspose2d(512,256,kernel_size=2,stride=2))
    self.ups.append(DoubleConv(512,256))
    self.ups.append(nn.ConvTranspose2d(256,128,kernel_size=2,stride=2))
    self.ups.append(DoubleConv(256,128))
    self.ups.append(nn.ConvTranspose2d(128,64,kernel_size=2,stride=2))
    self.ups.append(DoubleConv(128,64))
    #Middle (Horizontal) portion of UNET
    self.middle=DoubleConv(512,1024)
    #Final 1x1 convolution
    self.final_conv=nn.Conv2d(64,1,kernel_size=1)
  
  def forward(self,x):
    skip_connections=[]
    #Add skip connections between each of the downward double conv layers
    x=self.downs[0](x)
    skip_connections.append(x)
    x=self.pool(x)
    x=self.downs[1](x)
    skip_connections.append(x)
    x=self.pool(x)
    x=self.downs[2](x)
    skip_connections.append(x)
    x=self.pool(x)
    x=self.downs[3](x)
    skip_connections.append(x)
    x=self.pool(x)
    x=self.middle(x)
    x=self.ups[0](x)
    skip=skip_connections[-1]
    if x.shape!=skip.shape:  #To check for case where the shapes might not exactly match i.e. when input image has odd dimensions like 101 x 101 for example
      x=TF.resize(x,size=skip.shape[2:])
    concat_skip=torch.cat((skip,x),dim=1)
    x=self.ups[1](concat_skip)
    x=self.ups[2](x)
    skip=skip_connections[-2]
    if x.shape!=skip.shape:  
      x=TF.resize(x,size=skip.shape[2:])
    concat_skip=torch.cat((skip,x),dim=1)
    x=self.ups[3](concat_skip)
    x=self.ups[4](x)
    skip=skip_connections[-3]
    if x.shape!=skip.shape:
      x=TF.resize(x,size=skip.shape[2:])
    concat_skip=torch.cat((skip,x),dim=1)
    x=self.ups[5](concat_skip)
    x=self.ups[6](x)
    skip=skip_connections[-4]
    if x.shape!=skip.shape:
      x=TF.resize(x,size=skip.shape[2:])
    concat_skip=torch.cat((skip,x),dim=1)
    x=self.ups[7](concat_skip)
    return self.final_conv(x)

#Function to check if architecture of U-Net is proper

def test():   
  x=torch.randn((3,1,160,160))
  model=UNET(in_channels=1,out_channels=1)
  predictions=model(x)
  print("Input shape: ",x.shape)
  print("Output shape: ",predictions.shape)
  assert predictions.shape==x.shape


test()





    


    




Input shape:  torch.Size([3, 1, 160, 160])
Output shape:  torch.Size([3, 1, 160, 160])


In [None]:
#In this cell we do data preparation
import numpy as np
import cv2
from google.colab.patches import cv2_imshow
import os
from torch.utils.data import Dataset
from PIL import Image

class Carvana(Dataset):
  def __init__(self,image_dir,mask_dir,transform=None):
    self.image_dir=image_dir
    self.mask_dir=mask_dir
    self.transform=transform
    self.images=os.listdir(image_dir)
    #self.masks=os.listdir(mask_dir)
  
  def __len__(self):
    return len(self.images)
  
  def __getitem__(self,index):
    img_path=os.path.join(self.image_dir,self.images[index])
    #mask_path=os.path.join(self.mask_dir,self.masks[index])
    mask_path=os.path.join(self.mask_dir,self.images[index].replace(".jpg","_mask.gif"))
    #image=cv2.imread(img_path)
    #image=cv2.cvtColor(image,cv2.COLOR_BGR2RGB)
    image=np.array(Image.open(img_path).convert("RGB"))
    #mask=cv2.imread(mask_path)
    #mask=cv2.cvtColor(mask,cv2.COLOR_BGR2GRAY)
    mask=np.array(Image.open(mask_path).convert("L"),dtype=np.float32)
    mask=mask.astype(np.float32)
    mask[mask==255.0]=1.0

    if self.transform is not None:
      augmentations=self.transform(image=image,mask=mask)
      image=augmentations["image"]
      mask=augmentations["mask"]
    return image,mask









In [None]:
#In this cell we define utils like saving and loading checkpoint
import torch
import torchvision
#from dataset import CarvanaDataset
from torch.utils.data import DataLoader
#def get_loaders(train_dir,train_maskdir,val_dir,val_maskdir,batch_size,train_transform,val_transform,num_workers=4,pin_memory=True):
def get_loaders(train_dir,train_maskdir,batch_size,train_transform,num_workers=4,pin_memory=True):
  train_ds=Carvana(image_dir=train_dir,mask_dir=train_maskdir,transform=train_transform)
  train_loader=DataLoader(train_ds,batch_size=batch_size,num_workers=num_workers,pin_memory=pin_memory,shuffle=True)
  return train_loader

def check_accuracy(loader,model,device="cuda"):
  num_correct=0
  num_pixels=0 #In segmentation, we are giving class for each individual pixel
  dice_score=0
  model.eval()

  with torch.no_grad():
    for x,y in loader:
      x=x.to(device)
      y=y.to(device).unsqueeze(1)
      predictions=torch.sigmoid(model(x))
      predictions=(predictions>0.5).float()
      num_correct+=(predictions==y).sum()
      num_pixels+=torch.numel(predictions)
      dice_score+=(2*(predictions*y).sum())/((predictions+y).sum()+1e-8)
  
  print(f"Dice score: {dice_score/len(loader)}")
  model.train()  


In [None]:
#In this cell we do the training loop
import torch
import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim
#from model import UNET #Use this if you are writing this as a separate program
#from utils import (load_checkpoint,save_checkpoint,get_loaders,check_accuracy,save_predictions_as_imgs)
LEARNING_RATE=1e-4
DEVICE="cuda" if torch.cuda.is_available() else "cpu"
BATCH_SIZE=32
NUM_EPOCHS=3
NUM_WORKERS=2
IMAGE_HEIGHT=160 #Original image ht is 1280
IMAGE_WIDTH=240 #original image width is 1918
PIN_MEMORY=True
LOAD_MODEL=False
TRAIN_IMG_DIR="/content/drive/MyDrive/Projects/Semantic Segmentation/carvana-image-masking-challenge/train"
TRAIN_MASK_DIR="/content/drive/MyDrive/Projects/Semantic Segmentation/carvana-image-masking-challenge/train_masks"
#VAL_IMG_DIR=
#VAL_MASK_DIR=

def train(loader,model,optimizer,loss_fn,scaler):
  loop=tqdm(loader)

  for batch_idx,(data,targets) in enumerate(loop):
    data=data.to(device=DEVICE)
    targets=targets.float().unsqueeze(1).to(device=DEVICE)

    #Converting to float 16
    with torch.cuda.amp.autocast():
      predictions=model(data)
      loss=loss_fn(predictions,targets)
    
    optimizer.zero_grad()
    scaler.scale(loss).backward()
    scaler.step(optimizer)
    scaler.update()

    #Update tqdm loop
    loop.set_postfix(loss=loss.item())


def main():
  train_transform=A.Compose([
                             A.Resize(height=IMAGE_HEIGHT,width=IMAGE_WIDTH),
                             A.Rotate(limit=35,p=1.0),
                             A.HorizontalFlip(p=0.5),
                             A.VerticalFlip(p=0.1),
                             A.Normalize(mean=[0.0,0.0,0.0],std=[1.0,1.0,1.0],max_pixel_value=255.0), #Divide by 255
                             ToTensorV2()]) 
  """val_transform=A.Compose([
                             A.Resize(height=IMAGE_HEIGHT,width=IMAGE_WIDTH),
                             A.Normalize(mean=[0.0,0.0,0.0],std=[1.0,1.0,1.0],max_pixel_value=255.0), #Divide by 255
                             ToTensorV2()])"""
  
  model=UNET(in_channels=3,out_channels=1).to(DEVICE)
  loss_fn=nn.BCEWithLogitsLoss() #We use Logits because we are not applying sigmoid activation in the output of the model
  optimizer=optim.Adam(model.parameters(),lr=LEARNING_RATE)
  #train_loader,val_loader=get_loaders(TRAIN_IMG_DIR,TRAIN_MASK_DIR,VAL_IMG_DIR,VAL_MASK_DIR,BATCH_SIZE,train_transform,val_transform,NUM_WORKERS,PIN_MEMORY)
  train_loader=get_loaders(TRAIN_IMG_DIR,TRAIN_MASK_DIR,BATCH_SIZE,train_transform,NUM_WORKERS,PIN_MEMORY)
  scaler=torch.cuda.amp.GradScaler()
  for epoch in range(NUM_EPOCHS):
    train(train_loader,model,optimizer,loss_fn,scaler)
    check_accuracy(train_loader,model,device=DEVICE)
  torch.save(model.state_dict(),"/content/drive/MyDrive/Projects/Semantic Segmentation/Models/car_model.pt")

main()

100%|██████████| 159/159 [1:09:03<00:00, 26.06s/it, loss=0.192]
  0%|          | 0/159 [00:00<?, ?it/s]

Dice score: 0.973755419254303


100%|██████████| 159/159 [05:43<00:00,  2.16s/it, loss=0.141]
  0%|          | 0/159 [00:00<?, ?it/s]

Dice score: 0.9773179292678833


100%|██████████| 159/159 [05:44<00:00,  2.17s/it, loss=0.104]


Dice score: 0.9799780249595642
