In [1]:
!pip3 install -q segmentation-models-pytorch
!pip3 install -q torchsummary

In [None]:
#step 1: importig all the nessesary liabaries for the assignment
import glob
import cv2
import random
import os
import time
import glob
import torch

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch.nn as nn
from torchvision import transforms as T
import torchvision
import torch.nn.functional as F
from torch.autograd import Variable
from torchvision import models

from PIL import Image
import albumentations as A
import segmentation_models_pytorch as smp
import torchvision
from glob import glob
from tqdm import tqdm
import matplotlib.pyplot as plt
import torchvision.transforms as transform
from torch.utils.data import DataLoader,Dataset
from torch.utils.tensorboard import SummaryWriter
from torchvision.utils import make_grid


In [3]:
device = torch.device("cudu" if torch.cuda.is_available() else "cpu")

In [4]:
# STEP 2: uploading the project file for a saved folder
label_colors_path = 'Cam101/label_colors.txt'
train_images_path = 'Cam101/train'
test_images_path = 'Cam101/test'

# This code below opens the dataset from the path
class_to_color = {}

with open(label_colors_path, 'r') as f:
    lines = f.readlines()
    for line in lines:
        parts = line.strip().split('\t')
        r, g, b = map(int, parts[0].split(' '))
        class_name = parts[1]
        class_to_color[class_name] = (r, g, b)
        
print("Class to color mapping:")
for class_name, color in class_to_color.items():
    print(f"{class_name}: {color}")

In [None]:
# the code prints out the number of training and testing samples in the dataset

train_images = glob.glob(os.path.join(train_images_path, '*.png'))
train_images = [img for img in train_images if not img.endswith('_L.png')]
train_labels = [img.replace('.png', '_L.png') for img in train_images]

test_images = glob.glob(os.path.join(test_images_path, '*.png'))
test_images = [img for img in test_images if not img.endswith('_L.png')]
test_labels = [img.replace('.png', '_L.png') for img in test_images]

print(f"Number of training samples: {len(train_images)}")
print(f"Number of testing samples: {len(test_images)}")

In [None]:
# the code plots a figure from the data set to visualise a sample 

def visualize_sample(image_path, label_path):
    img = cv2.imread(image_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    
    label_img = cv2.imread(label_path)
    label_img = cv2.cvtColor(label_img, cv2.COLOR_BGR2RGB)
    
    fig, ax = plt.subplots(1, 2, figsize=(10, 5))
    ax[0].imshow(img)
    ax[0].set_title("Original Image")
    ax[1].imshow(label_img)
    ax[1].set_title("Labeled Image")
    plt.show()

# visualise a few samples
for i in range(1):
    visualize_sample(train_images[i], train_labels[i])

#defining a data set
train_dataset = []
validation_dataset = []

In [None]:
#preforming data augmentation on the images and the labels 
def augment_data(images, labels, augment=True):
    new_size = (512, 512) 

    for i, (x_path, y_path) in tqdm(enumerate(zip(images, labels)), total=len(images)):
        x_name = os.path.splitext(os.path.basename(x_path))[0]
        
        # reading the image and the label using OpenCV
        x = cv2.imread(x_path, cv2.IMREAD_COLOR)
        y = cv2.imread(y_path, cv2.IMREAD_COLOR)

        if augment:
            aug = HorizontalFlip(p=1.0) 
            augmented = aug(image=x, mask=y) 
            x1 = augmented['image']
            y1 = augmented['mask']

            # apply a rotation at a random angle between 0 and 45 degrees
            aug = Rotate(limit=45, p=1.0)
            augmented = aug(image=x, mask=y)
            x2 = augmented['image']
            y2 = augmented['mask']

            # append the original, horizontally flipped and rotated images and labels to lists X and Y respectively
            X = [x, x1, x2]
            Y = [y, y1, y2]
        else:
            X = [x]
            Y = [y]

        # iterate over the images and labels in X and Y
        for index, (img, lab) in enumerate(zip(X, Y)):
            img = cv2.resize(img, new_size)
            lab = cv2.resize(lab, new_size)

    return(augment_data)


In [8]:
#segmantation function is used to load the image and the label pairs for a given path
class SegmentationDataset(Dataset):
    def __init__(self, image_paths, label_paths, transform=None, label_transform=None):
        self.image_paths = image_paths
        self.label_paths = label_paths
        self.transform = transform
        self.label_transform = label_transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        label_path = self.label_paths[idx]
        image = Image.open(image_path).convert('RGB')
        label = Image.open(label_path).convert('L')

        if self.transform:
            image = self.transform(image)
        if self.label_transform:
            label = self.label_transform(label)

        return image, label

# applying transformation to the images
mytransformsImage = transform.Compose([
    transform.Resize((736, 960)),
    transform.ToTensor(),
    transform.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

mytransformsLabel = transform.Compose([
    transform.Resize((736, 960)),
    transform.ToTensor()
])

In [None]:
#splitting the tranning data into traning and validation by 90% being traning and 10% being validation
train_val_images = glob.glob(os.path.join(train_images_path, '*.png'))
train_val_images = [img for img in train_val_images if not img.endswith('_L.png')]
train_val_labels = [img.replace('.png', '_L.png') for img in train_val_images]
train_val = list(zip(train_val_images, train_val_labels))
random.shuffle(train_val)
train_size = int(0.9 * len(train_val))
train_images, train_labels = zip(*train_val[:train_size])
val_images, val_labels = zip(*train_val[train_size:])

#preparing the data for segmentation tasks 
train_dataset = SegmentationDataset(train_images, train_labels, transform=mytransformsImage, label_transform=mytransformsLabel)
train_dataloader = DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=0)

val_dataset = SegmentationDataset(val_images, val_labels, transform=mytransformsImage, label_transform=mytransformsLabel)
val_dataloader = DataLoader(val_dataset, batch_size=4, shuffle=True, num_workers=2)

test_dataset = SegmentationDataset(test_images, test_labels, transform=mytransformsImage, label_transform=mytransformsLabel)
test_dataloader = DataLoader(test_dataset, batch_size=4, shuffle=True, num_workers=2)

print(f"Number of training samples: {len(train_dataset)}")
print(f"Number of validation samples: {len(val_dataset)}")
print(f"Number of testing samples: {len(test_dataset)}")

BUILDING THE MODELS- IMPORTING THE MODELS UNet, DeepLabs and FCN

In [26]:
#calculates the pixel accuracy of the segmintation
def pixel_accuracy(output, mask):
    with torch.no_grad():
        _, predicted_labels = torch.max(output, dim=1)
        correct = torch.eq(predicted_labels, mask)
        accuracy = torch.mean(correct.float())
    return accuracy.item()

In [27]:
#used to calculate the mIou of the segmintation
def mIoU(pred_mask, mask, smooth=1e-10, n_classes=23):
    with torch.no_grad():
        pred_mask = F.softmax(pred_mask, dim=1)
        pred_mask = torch.argmax(pred_mask, dim=1)
        pred_mask = pred_mask.view(-1)
        mask = mask.view(-1)

        iou_per_class = []
        for clas in range(n_classes):  # loop per pixel class
            true_class = torch.eq(pred_mask, clas)
            true_label = torch.eq(mask, clas)

            if torch.sum(true_label).item() == 0:  # no existing label in this class
                iou_per_class.append(np.nan)
            else:
                intersect = torch.logical_and(true_class, true_label).sum().float().item()
                union = torch.logical_or(true_class, true_label).sum().float().item()

                iou = (intersect + smooth) / (union + smooth)
                iou_per_class.append(iou)
        return np.nanmean(iou_per_class)

In [28]:
def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

def fit(epochs, model, train_loader, val_loader, criterion, optimizer, scheduler, patch=False):
    torch.cuda.empty_cache()
    train_losses = []
    test_losses = []
    val_iou = []
    val_acc = []
    train_iou = []
    train_acc = []
    lrs = []
    min_loss = np.inf
    decrease = 1
    not_improve = 0

    model.to(device)
    fit_time = time.time()
    for e in range(epochs):
        since = time.time()
        running_loss = 0
        iou_score = 0
        accuracy = 0
        
        # training loop
        model.train()
        for i, data in enumerate(tqdm(train_loader)):
            # training phase
            image_tiles, mask_tiles = data
            if patch:
                bs, n_tiles, c, h, w = image_tiles.size()
                image_tiles = image_tiles.view(-1, c, h, w)
                mask_tiles = mask_tiles.view(-1, h, w)

            image = image_tiles.to(device)
            mask = mask_tiles.to(device)
            # forward
            output = model(image)
            loss = criterion(output, mask.squeeze().long())
            # evaluation metrics
            iou_score += mIoU(output, mask.squeeze()) 
            accuracy += pixel_accuracy(output, mask.squeeze()) 
            # backward
            loss.backward()
            optimizer.step()  # update weight
            optimizer.zero_grad()  # reset gradient

            # step the learning rate
            lrs.append(get_lr(optimizer))
            scheduler.step()

            running_loss += loss.item()

        else:
            model.eval()
            test_loss = 0
            test_accuracy = 0
            val_iou_score = 0
            # validation loop
            with torch.no_grad():
                for i, data in enumerate(tqdm(val_loader)):
                    # reshape to 9 patches from single image, delete batch size
                    image_tiles, mask_tiles = data

                    if patch:
                        bs, n_tiles, c, h, w = image_tiles.size()
                        image_tiles = image_tiles.view(-1, c, h, w)
                        mask_tiles = mask_tiles.view(-1, h, w)

                    image = image_tiles.to(device)
                    mask = mask_tiles.to(device)
                    output = model(image)
                    # evaluation metrics
                    val_iou_score += mIoU(output, mask.squeeze()) 
                    test_accuracy += pixel_accuracy(output, mask.squeeze()) 
                    # loss
                    loss = criterion(output, mask.squeeze())  
                    test_loss += loss.item()

            # calculation mean for each batch
            train_losses.append(running_loss / len(train_loader))
            test_losses.append(test_loss / len(val_loader))
            
            if min_loss > (test_loss / len(val_loader)):
                print('Loss Decreasing.. {:.3f} >> {:.3f} '.format(min_loss, (test_loss / len(val_loader))))
            min_loss = (test_loss / len(val_loader))
            decrease += 1
            if decrease % 5 == 0:
                print('saving model...')
                torch.save(model, 'Unet-Mobilenet_v2_mIoU-{:.3f}.pt'.format(val_iou_score / len(val_loader)))

        if (test_loss / len(val_loader)) > min_loss:
            not_improve += 1
            min_loss = (test_loss / len(val_loader))
            print(f'Loss Not Decrease for {not_improve} time')
            if not_improve == 7:
                print('Loss not decrease for 7 times, Stop Training')
                break

        # iou
        val_iou.append(val_iou_score / len(val_loader))
        train_iou.append(iou_score / len(train_loader))
        train_acc.append(accuracy / len(train_loader))
        val_acc.append(test_accuracy / len(val_loader))
        print("Epoch:{}/{}..".format(e + 1, epochs),
              "Train Loss: {:.3f}..".format(running_loss / len(train_loader)),
              "Val Loss: {:.3f}..".format(test_loss / len(val_loader)),
              "Train mIoU:{:.3f}..".format(iou_score / len(train_loader)),
              "Val mIoU: {:.3f}..".format(val_iou_score / len(val_loader)),
              "Train Acc:{:.3f}..".format(accuracy / len(train_loader)),
              "Val Acc:{:.3f}..".format(test_accuracy / len(val_loader)),
              "Time: {:.2f}m".format((time.time() - since) / 60))

    history = {'train_loss': train_losses, 'val_loss': test_losses,
           'train_miou': train_iou, 'val_miou': val_iou,
           'train_acc': train_acc, 'val_acc': val_acc,
           'lrs': lrs}
    print('Total time: {:.2f} m'.format((time.time() - fit_time) / 60))
    return history



In [29]:
def calculate_metrics(y_pred, y_true):
  
  y_pred = y_pred.cpu().detach().numpy()
  y_pred = y_pred > 0.5
  y_pred = y_pred.astype(np.uint8)
  y_pred = y_pred.reshape(-1)
  
  y_true = y_true.cpu().detach().numpy()
  y_true = y_true > 0.5
  y_true = y_true.astype(np.uint8)
  y_true = y_true.reshape(-1)

  # Compute the Jaccard/Intersection over Union (IoU) and pixel accuracy
  IoU = jaccard_score(y_true, y_pred) 
  accuracy = accuracy_score(y_true, y_pred) 

  return IoU, accuracy

def run_time(start_time, end_time):

  time_taken = end_time - start_time
  mins = int(time_taken/60)
  secs = int(time_taken - (mins * 60))
  
  return mins, secs


Training and Testing the models 

In [23]:
#visulaing the results 
def show(img,output,label,denorm = False):
    img,output,label = img.cpu(),output.cpu(),label.cpu()
    fig,ax = plt.subplots(len(output),3,figsize=(15,30))
    cols = ['Input Image','Actual Output','Predicted Output']
    for i in range(len(output)):
        if(len(output) == 3):
            Img,Lab,act = img[i],output[i],label[i]
            Img,Lab,act = Img,Lab.detach().permute(1,2,0).numpy(),act
            ax[i][0].imshow(Img.permute(1,2,0))
            ax[i][2].imshow(Lab)
            ax[i][1].imshow(act.permute(1,2,0))
        else:
            Img,Lab,act = img[i],output[i],label[i]
            Img,Lab,act = Img,Lab.detach().permute(1,2,0).numpy(),act
            ax[0].imshow(Img.permute(1,2,0))
            ax[2].imshow(Lab)
            ax[1].imshow(act.permute(1,2,0))
            #ax[0].title('this')
            for ax, col in zip(ax, cols):
                ax.set_title(col)
    plt.show()

UNets model

In [24]:
class Convblock(nn.Module):
    
      def __init__(self,input_channel,output_channel,kernal=3,stride=1,padding=1):
            
        super().__init__()
        self.convblock = nn.Sequential(
            nn.Conv2d(input_channel,output_channel,kernal,stride,padding),
            nn.BatchNorm2d(output_channel),
            nn.ReLU(inplace=True),
            nn.Conv2d(output_channel,output_channel,kernal),
            nn.ReLU(inplace=True),
        )
    

      def forward(self,x):
        x = self.convblock(x)
        return x

In [25]:
class UNet(nn.Module):
    
    def __init__(self,input_channel,retain=True):

        super().__init__()

        self.conv1 = Convblock(input_channel,32)
        self.conv2 = Convblock(32,64)
        self.conv3 = Convblock(64,128)
        self.conv4 = Convblock(128,256)
        self.neck = nn.Conv2d(256,512,3,1)
        self.upconv4 = nn.ConvTranspose2d(512,256,3,2,0,1)
        self.dconv4 = Convblock(512,256)
        self.upconv3 = nn.ConvTranspose2d(256,128,3,2,0,1)
        self.dconv3 = Convblock(256,128)
        self.upconv2 = nn.ConvTranspose2d(128,64,3,2,0,1)
        self.dconv2 = Convblock(128,64)
        self.upconv1 = nn.ConvTranspose2d(64,32,3,2,0,1)
        self.dconv1 = Convblock(64,32)
        self.out = nn.Conv2d(32,3,1,1)
        self.retain = retain
        
    def forward(self,x):
        
        # Encoder Network
        
        # Conv down 1
        conv1 = self.conv1(x)
        pool1 = F.max_pool2d(conv1,kernel_size=2,stride=2)
        # Conv down 2
        conv2 = self.conv2(pool1)
        pool2 = F.max_pool2d(conv2,kernel_size=2,stride=2)
        # Conv down 3
        conv3 = self.conv3(pool2)
        pool3 = F.max_pool2d(conv3,kernel_size=2,stride=2)
        # Conv down 4
        conv4 = self.conv4(pool3)
        pool4 = F.max_pool2d(conv4,kernel_size=2,stride=2)

        # BottelNeck
        neck = self.neck(pool4)
        
        # Decoder Network
        
        # Upconv 1
        upconv4 = self.upconv4(neck)
        croped = self.crop(conv4,upconv4)
        # Making the skip connection 1
        dconv4 = self.dconv4(torch.cat([upconv4,croped],1))
        # Upconv 2
        upconv3 = self.upconv3(dconv4)
        croped = self.crop(conv3,upconv3)
        # Making the skip connection 2
        dconv3 = self.dconv3(torch.cat([upconv3,croped],1))
        # Upconv 3
        upconv2 = self.upconv2(dconv3)
        croped = self.crop(conv2,upconv2)
        # Making the skip connection 3
        dconv2 = self.dconv2(torch.cat([upconv2,croped],1))
        # Upconv 4
        upconv1 = self.upconv1(dconv2)
        croped = self.crop(conv1,upconv1)
        # Making the skip connection 4
        dconv1 = self.dconv1(torch.cat([upconv1,croped],1))
        # Output Layer
        out = self.out(dconv1)
        
        if self.retain == True:
            out = F.interpolate(out,list(x.shape)[2:])

        return out
    

    def crop(self,input_tensor,target_tensor):
        # For making the size of the encoder conv layer and the decoder Conv layer same
        _,_,H,W = target_tensor.shape
        return transform.CenterCrop([H,W])(input_tensor)
    
# initializing the model
model = UNet(3).float().to(device) 

In [None]:
# initialising the UNets model
uent = UNet(3).float().to(device) 

lr = 0.01
epochs = 20

# Choosing the loss function to be Mean Square Error Loss
lossfunc = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

train_acc = []
val_acc = []
train_loss = []
val_loss = []

# Set other parameters and create optimiser and loss function

for i in range(epochs):
    trainloss = 0
    valloss = 0

#traning the model
    for img, label in tqdm(train_dataloader):
        optimizer.zero_grad()
        img = img.to(device)
        label = label.to(device)
        output = model(img)
        loss = lossfunc(output, label)
        loss.backward()
        optimizer.step()
        trainloss += loss.item()

    if i % 5 == 0:
        show(img, output, label)

    train_loss.append(trainloss / len(train_dataloader))

   # disabling the  multiprocessing for validation data loader to provent it from running at a high RAM
    val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=4, num_workers=0)

#validating the model
    for img, label in tqdm(val_dataloader):
        img = img.to(device)
        label = label.to(device)
        output = model(img)
        loss = lossfunc(output, label)
        valloss += loss.item()

    val_loss.append(valloss / len(val_dataloader))

    print("epoch: {}, train loss: {}, valid loss: {}".format(i, train_loss[-1], val_loss[-1]))


In [None]:
#plotting the training vs validation loss curve for UNets
plt.plot(train_loss,color='b',label='train loss')
plt.plot(val_loss,color='r',label = 'val_loss')
plt.legend()

#visulaing the results 
def show(img,output,label,denorm = False):
    img,output,label = img.cpu(),output.cpu(),label.cpu()
    fig,ax = plt.subplots(len(output),3,figsize=(15,30))
    cols = ['Input Image','Actual Output','Predicted Output']
    for i in range(len(output)):
        if(len(output) == 3):
            Img,Lab,act = img[i],output[i],label[i]
            Img,Lab,act = Img,Lab.detach().permute(1,2,0).numpy(),act
            ax[i][0].imshow(Img.permute(1,2,0))
            ax[i][2].imshow(Lab)
            ax[i][1].imshow(act.permute(1,2,0))
        else:
            Img,Lab,act = img[i],output[i],label[i]
            Img,Lab,act = Img,Lab.detach().permute(1,2,0).numpy(),act
            ax[0].imshow(Img.permute(1,2,0))
            ax[2].imshow(Lab)
            ax[1].imshow(act.permute(1,2,0))
            #ax[0].title('this')
            for ax, col in zip(ax, cols):
                ax.set_title(col)
    plt.show()
    

In [None]:
# Plot a graph with itertion vs epoch for UNets
fig, ax = plt.subplots(figsize=(10, 6))
ax.plot(val_loss, label='Validation Loss')

ax.set_xlabel('Epoch')
ax.set_ylabel('Iteration')
ax.legend(loc='best')
plt.show()

DeepLabs Model

In [None]:
class DecoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()

        self.conv1 = nn.Conv2d(in_channels, in_channels // 4, kernel_size=1)
        self.bn1 = nn.BatchNorm2d(in_channels // 4)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.ConvTranspose2d(in_channels // 4, in_channels // 4, kernel_size=3, stride=2, padding=1, output_padding=1)
        self.bn2 = nn.BatchNorm2d(in_channels // 4)
        self.conv3 = nn.Conv2d(in_channels // 4, out_channels, kernel_size=1)
        self.bn3 = nn.BatchNorm2d(out_channels)

    def forward(self, x, skip=None):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.conv3(x)
        x = self.bn3(x)
        
        if skip is not None:
            # Resize the skip tensor to match the size of the current tensor
            skip = F.interpolate(skip, size=x.size()[2:], mode='bilinear', align_corners=True)
            # Concatenate the skip tensor with the current tensor
            x = torch.cat([x, skip], dim=1)
        
        x = self.relu(x)
        
        return x
    

class DeepLabHead(nn.Sequential):

    def __init__(self, in_ch, out_ch):
        super(DeepLabHead, self).__init__(
            nn.Conv2d(in_ch, out_ch, 3, padding=1),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(),
            nn.Conv2d(out_ch, out_ch, 1)
        )

class DeepLab(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.base_model = models.resnet50(pretrained=True)
        self.base_model = nn.Sequential(*list(self.base_model.children())[:-2])
        self.classifier = DeepLabHead(2048, num_classes)

    def forward(self, x):
        h = self.base_model(x)
        h = self.classifier(h)
        h = F.interpolate(h, size=x.shape[2:], mode='bilinear', align_corners=False)
        return h

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
deeplab = DeepLab(3).to(device)
print(deeplab)


In [None]:
# initialising the DeepLabs model
model = deeplab.to(device)

lr = 0.01
epochs = 20

# choosing the loss function to be Mean Square Error Loss
lossfunc = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

train_acc = []
val_acc = []
train_loss = []
val_loss = []

# setting other parameters and create optimiser and loss function
for i in range(epochs):
    trainloss = 0
    valloss = 0

    for img, label in tqdm(train_dataloader):
#traning the model
        optimizer.zero_grad()
        img = img.to(device)
        label = label.to(device)
        output = model(img)
        loss = lossfunc(output, label)
        loss.backward()
        optimizer.step()
        trainloss += loss.item()
        
    with torch.no_grad():
        valloss = 0
        for img, label in val_dataloader:
            img = img.to(device)
            img = img.unsqueeze(0)
            label = label.to(device)
            img = torch.squeeze(img, dim=0) 
            label = torch.squeeze(label, dim=0) 
            output = model(img)
            loss = lossfunc(output, label)
            valloss += loss.item()    


    if i % 1 == 0:
        show(img, output, label)

    train_loss.append(trainloss / len(train_dataloader))

    # disabling the  multiprocessing for validation data loader to provent it from running at a high RAM
    val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=4, num_workers=0)

    for img, label in tqdm(val_dataloader):
#validating the model
        img = img.to(device)
        
        label = label.to(device)
        output = model(img)
        loss = lossfunc(output, label)
        valloss += loss.item()

    val_loss.append(valloss / len(val_dataloader))

    with torch.no_grad():
        valloss = 0
        for img, label in val_dataloader:
            img = img.to(device)
            label = label.to(device)
            img = torch.squeeze(img, dim=0) 
            label = torch.squeeze(label, dim=0) 
            output = model(img)
            loss = lossfunc(output, label)
            valloss += loss.item()

    val_loss.append(valloss / len(val_dataloader))


    print("epoch: {}, train loss: {}, valid loss: {}".format(i, train_loss[-1], val_loss[-1]))

In [None]:
#plotting the training vs validation loss curve for DeepLabs
plt.plot(train_loss,color='b',label='train loss')
plt.plot(val_loss,color='r',label = 'val_loss')
plt.legend()

#visulaing the results 
def show(img,output,label,denorm = False):
    img,output,label = img.cpu(),output.cpu(),label.cpu()
    fig,ax = plt.subplots(len(output),3,figsize=(15,30))
    cols = ['Input Image','Actual Output','Predicted Output']
    for i in range(len(output)):
        if(len(output) == 3):
            Img,Lab,act = img[i],output[i],label[i]
            Img,Lab,act = Img,Lab.detach().permute(1,2,0).numpy(),act
            ax[i][0].imshow(Img.permute(1,2,0))
            ax[i][2].imshow(Lab)
            ax[i][1].imshow(act.permute(1,2,0))
        else:
            Img,Lab,act = img[i],output[i],label[i]
            Img,Lab,act = Img,Lab.detach().permute(1,2,0).numpy(),act
            ax[0].imshow(Img.permute(1,2,0))
            ax[2].imshow(Lab)
            ax[1].imshow(act.permute(1,2,0))
            #ax[0].title('this')
            for ax, col in zip(ax, cols):
                ax.set_title(col)
    plt.show()

In [None]:
# Plot a graph with itertion vs epoch for DeepLabs
fig, ax = plt.subplots(figsize=(10, 6))
ax.plot(val_loss, label='Validation Loss')

ax.set_xlabel('Epoch')
ax.set_ylabel('Iteration')
ax.legend(loc='best')
plt.show()

Fully convelutional Network (FCN)

In [None]:
class FCN(nn.Module):
    def __init__(self, num_classes=3):
        super().__init__()

        # defining the base feature extraction layers
        self.base = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
        )

        # defining the classifier layers
        self.classifier = nn.Sequential(
            nn.Conv2d(512, 1024, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Dropout2d(),
            nn.Conv2d(1024, 1024, kernel_size=1),
            nn.ReLU(inplace=True),
            nn.Dropout2d(),
            nn.Conv2d(1024, num_classes, kernel_size=1),
        )

    def forward(self, x):
        x_size = x.size()
        x = self.base(x)
        x = self.classifier(x)
        return F.interpolate(x, x_size[2:], mode='bilinear', align_corners=False)


# Instantiate and print the FCN architecture
fcn = FCN().to(device)
print(fcn)

In [None]:
# initialising the model
model1 = fcn.to(device)

lr = 0.01
epochs = 20

# Choosing the loss function to be Mean Square Error Loss
lossfunc = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

train_acc = []
val_acc = []
train_loss = []
val_loss = []

# setting parameters and create optimiser and loss function
for i in range(epochs):
    trainloss = 0
    valloss = 0

    for img, label in tqdm(train_dataloader):
#training model
        optimizer.zero_grad()
        img = img.to(device)
        label = label.to(device)
        output = model1(img)
        loss = lossfunc(output, label)
        loss.backward()
        optimizer.step()
        trainloss += loss.item()
        
    with torch.no_grad():
        valloss = 0
        for img, label in val_dataloader:
            img = img.to(device)
            img = img.unsqueeze(0)
            label = label.to(device)
            img = torch.squeeze(img, dim=0)
            label = torch.squeeze(label, dim=0) 
            output = model1(img)
            loss = lossfunc(output, label)
            valloss += loss.item()    


    if i % 1 == 0:
        show(img, output, label)

    train_loss.append(trainloss / len(train_dataloader))

    # Disabling the multiprocessing for validation data loader to porvent it from running at a high RAM
    val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=4, num_workers=0)

    for img, label in tqdm(val_dataloader):
#validation model
        img = img.to(device)
        
        label = label.to(device)
        output = model(img)
        loss = lossfunc(output, label)
        valloss += loss.item()

    val_loss.append(valloss / len(val_dataloader))

    with torch.no_grad():
        valloss = 0
        for img, label in val_dataloader:
            img = img.to(device)
            label = label.to(device)
            img = torch.squeeze(img, dim=0) 
            label = torch.squeeze(label, dim=0) 
            output = model(img)
            loss = lossfunc(output, label)
            valloss += loss.item()

    val_loss.append(valloss / len(val_dataloader))


    print("epoch: {}, train loss: {}, valid loss: {}".format(i, train_loss[-1], val_loss[-1]))

In [None]:
#plotting the training vs validation loss curve for FCN
plt.plot(train_loss,color='b',label='train loss')
plt.plot(val_loss,color='r',label = 'val_loss')
plt.legend()

#visulaing the results 
def show(img,output,label,denorm = False):
    img,output,label = img.cpu(),output.cpu(),label.cpu()
    fig,ax = plt.subplots(len(output),3,figsize=(15,30))
    cols = ['Input Image','Actual Output','Predicted Output']
    for i in range(len(output)):
        if(len(output) == 3):
            Img,Lab,act = img[i],output[i],label[i]
            Img,Lab,act = Img,Lab.detach().permute(1,2,0).numpy(),act
            ax[i][0].imshow(Img.permute(1,2,0))
            ax[i][2].imshow(Lab)
            ax[i][1].imshow(act.permute(1,2,0))
        else:
            Img,Lab,act = img[i],output[i],label[i]
            Img,Lab,act = Img,Lab.detach().permute(1,2,0).numpy(),act
            ax[0].imshow(Img.permute(1,2,0))
            ax[2].imshow(Lab)
            ax[1].imshow(act.permute(1,2,0))
            #ax[0].title('this')
            for ax, col in zip(ax, cols):
                ax.set_title(col)
    plt.show()

In [None]:
# Plot a graph with itertion vs epoch for FCN
fig, ax = plt.subplots(figsize=(10, 6))
ax.plot(val_loss, label='Validation Loss')

ax.set_xlabel('Epoch')
ax.set_ylabel('Iteration')
ax.legend(loc='best')
plt.show()