In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
%cd '/content/drive/My Drive/SemSeg'

/content/drive/My Drive/SemSeg


In [3]:
#!unzip archive.zip

In [4]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torchvision import transforms
import torch.utils.data as data
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

In [5]:
#def c2g(colors):
#  cn = np.reshape(colors, (1,1,3))
#  cn = cv2.cvtColor(cn, cv2.COLOR_BGR2GRAY)
#  return cn
# Pixel classes
colors = np.array([[0,0,0], [111,74,0], [81,0,81], [128,64,128], [244, 35, 232]
                   ,[250, 170, 160], [230, 150, 140], [70,70,70], [102,102, 156]
                   , [190, 153, 153], [180, 165, 180], [150, 100, 100]
                   , [150, 120, 90], [153, 153, 153], [250, 170, 30], [220, 220, 0]
                   , [107, 142, 35], [152, 251, 152], [70, 130, 180], [220, 20, 60]
                   , [255, 0, 0], [0, 0, 142], [0, 0, 70], [0, 60, 100], [0, 0, 90]
                   , [0, 0, 110], [0, 80, 100], [0, 0, 230], [119, 11, 32], [0, 0, 142]], dtype = np.uint8)
def c2g(colors):
  c2g = []
  for color in colors:
    color = np.reshape(color, (1,1,3))
    gray = cv2.cvtColor(color, cv2.COLOR_BGR2GRAY)
    c2g.append(gray)
  return c2g
colors = c2g(colors)

In [6]:
colors

[array([[0]], dtype=uint8),
 array([[56]], dtype=uint8),
 array([[33]], dtype=uint8),
 array([[90]], dtype=uint8),
 array([[118]], dtype=uint8),
 array([[176]], dtype=uint8),
 array([[156]], dtype=uint8),
 array([[70]], dtype=uint8),
 array([[118]], dtype=uint8),
 array([[157]], dtype=uint8),
 array([[171]], dtype=uint8),
 array([[106]], dtype=uint8),
 array([[114]], dtype=uint8),
 array([[153]], dtype=uint8),
 array([[137]], dtype=uint8),
 array([[154]], dtype=uint8),
 array([[106]], dtype=uint8),
 array([[210]], dtype=uint8),
 array([[138]], dtype=uint8),
 array([[55]], dtype=uint8),
 array([[29]], dtype=uint8),
 array([[42]], dtype=uint8),
 array([[21]], dtype=uint8),
 array([[65]], dtype=uint8),
 array([[27]], dtype=uint8),
 array([[33]], dtype=uint8),
 array([[77]], dtype=uint8),
 array([[69]], dtype=uint8),
 array([[30]], dtype=uint8),
 array([[42]], dtype=uint8)]

In [7]:
# Perform Data Loading

transform_img = transforms.Compose([transforms.ToTensor(),
                                    transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])])
transform_label = transforms.Compose([transforms.ToTensor()])

In [8]:
def load_img(directory, gray, input):
  images = []
  for img in os.listdir(directory):
    img = cv2.imread(os.path.join(directory, img))
    if input:
      img = cv2.resize(img, (572, 572), interpolation = cv2.INTER_AREA)
    else:
      img = cv2.resize(img, (388, 388), interpolation = cv2.INTER_AREA)
    if gray:
      img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    images.append(img)
  return images

def class_pix(labelimg, colors):
  class_pix = np.ones([388,388,1], dtype = int)
  for index, c in enumerate(colors):
    class_pix[labelimg == c] = index
  return class_pix

def label(imagelist):
  images = []
  for img in imagelist:
    images.append(class_pix(img, colors))
  return images

In [9]:
class train(data.Dataset):
  def __init__(self, transform = None, imgdir = None, labeldir = None, transformlabel = None):
    self.train_img = load_img(imgdir, gray = False, input = True)
    self.transform = transform
    self.transformlabel = transformlabel
    self.train_label = label(load_img(labeldir, gray = True, input = False))
  def __len__(self):
    return len(self.train_img)
  def __getitem__(self, index):
    img = self.transform(self.train_img[index])
    label = self.transformlabel(self.train_label[index])
    return img, label

class validation(data.Dataset):
  def __init__(self, transform = None, imgdir = None, labeldir = None, transformlabel = None):
    self.train_img = load_img(imgdir, gray = False, input = True)
    self.transform = transform
    self.transformlabel = transformlabel
    self.train_label = label(load_img(labeldir, gray = True, input = False))
  def __len__(self):
    return len(self.train_img)
  def __getitem__(self, index):
    img = self.transform(self.train_img[index])
    label = self.transformlabel(self.train_label[index])
    return img, label

class test(data.Dataset):
  def __init__(self, transform = None, imgdir = None, labeldir = None, transformlabel = None):
    self.train_img = load_img(imgdir, gray = False, input = True)
    self.transform = transform
    self.transformlabel = transformlabel
    self.train_label = label(load_img(labeldir, gray = True, input = False))
  def __len__(self):
    return len(self.train_img)
  def __getitem__(self, index):
    img = self.transform(self.train_img[index])
    label = self.transformlabel(self.train_label[index])
    return img, label

In [10]:
os.getcwd()

'/content/drive/My Drive/SemSeg'

In [11]:
trainset = train(transform_img, os.path.join(os.getcwd(),'CamVid/train'),
                 os.path.join(os.getcwd(), 'CamVid/train_labels'),transform_label)

In [12]:
valset = validation(transform_img, os.path.join(os.getcwd(),'CamVid/val'),
                 os.path.join(os.getcwd(), 'CamVid/val_labels'),transform_label)

In [13]:
testset = test(transform_img, os.path.join(os.getcwd(),'CamVid/test'),
                 os.path.join(os.getcwd(), 'CamVid/test_labels'),transform_label)

In [14]:
train_loader = data.DataLoader(trainset, batch_size = 1, shuffle = True, num_workers = 4)
val_loader = data.DataLoader(valset, batch_size = 1, shuffle = True, num_workers = 4)
test_loader = data.DataLoader(testset, batch_size = 1, shuffle = True, num_workers = 4)

In [15]:
# Defining U - Net Architecture
def double_conv(in_channel, out_channel):
  conv = nn.Sequential(
      nn.Conv2d(in_channel, out_channel, kernel_size = 3),
      nn.ReLU(inplace = True),
      nn.Conv2d(out_channel, out_channel, kernel_size = 3),
      nn.ReLU(inplace = True)
  )
  return conv

def crop_img(original, target):
  target_size = target.size()[2]
  original_size = original.size()[2]
  delta = original_size - target_size
  delta = delta // 2
  return original[:, :, delta:original_size - delta, delta:original_size - delta]

class UNet(nn.Module):
  def __init__(self):
    super(UNet, self).__init__()
    self.max_pool = nn.MaxPool2d(kernel_size = 2, stride = 2)
    self.down_conv_1 = double_conv(3, 64)
    self.down_conv_2 = double_conv(64, 128)
    self.down_conv_3 = double_conv(128, 256)
    self.down_conv_4 = double_conv(256, 512)
    self.down_conv_5 = double_conv(512, 1024)

    self.up_trans_1 = nn.ConvTranspose2d(1024, 512, kernel_size = 2, stride = 2)
    self.up_conv_1 = double_conv(1024, 512)

    self.up_trans_2 = nn.ConvTranspose2d(512, 256, kernel_size = 2, stride = 2)
    self.up_conv_2 = double_conv(512, 256)

    self.up_trans_3 = nn.ConvTranspose2d(256, 128, kernel_size = 2, stride = 2)
    self.up_conv_3 = double_conv(256, 128)

    self.up_trans_4 = nn.ConvTranspose2d(128, 64, kernel_size = 2, stride = 2)
    self.up_conv_4 = double_conv(128, 64)

    self.out = nn.Conv2d(64, 32, kernel_size = 1)

  
  def forward(self, image):
    # Batch size, channel, height, width
    # Encoder
    x1 = self.down_conv_1(image)
    x2 = self.max_pool(x1)

    x3 = self.down_conv_2(x2)
    x4 = self.max_pool(x3)

    x5 = self.down_conv_3(x4)
    x6 = self.max_pool(x5)

    x7 = self.down_conv_4(x6)
    x8 = self.max_pool(x7)

    x9 = self.down_conv_5(x8)

    # Decoder
    x = self.up_trans_1(x9)
    y = crop_img(x7, x)
    x = self.up_conv_1(torch.cat([x, y], 1))

    x = self.up_trans_2(x)
    y = crop_img(x5, x)
    x = self.up_conv_2(torch.cat([x, y], 1))

    x = self.up_trans_3(x)
    y = crop_img(x3, x)
    x = self.up_conv_3(torch.cat([x, y], 1))

    x = self.up_trans_4(x)
    y = crop_img(x1, x)
    x = self.up_conv_4(torch.cat([x, y], 1))

    x = self.out(x)
    return x

In [16]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


In [17]:
torch.cuda.get_device_name(0)

'Tesla T4'

In [18]:
UNET = UNet()
UNET.to(device)

UNet(
  (max_pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (down_conv_1): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1))
    (3): ReLU(inplace=True)
  )
  (down_conv_2): Sequential(
    (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1))
    (3): ReLU(inplace=True)
  )
  (down_conv_3): Sequential(
    (0): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1))
    (3): ReLU(inplace=True)
  )
  (down_conv_4): Sequential(
    (0): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1))
    (3): ReLU(inplace=True)
  )
  (down_conv_5): Sequential(
    (0): Conv2d(512, 1024, kernel_size

In [19]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(UNET.parameters(), lr = 0.001, betas = (0.9, 0.999), eps = 1e-08, weight_decay = 0, amsgrad = False)

In [None]:
#import os
#os._exit(00)

In [None]:
#UNET = torch.load(os.path.join(os.getcwd(), unet20.pth))

In [None]:
train_loss = []
val_loss = []
epochs = 100
for ep in range(epochs):
  UNET.train()
  running_loss = 0.0
  counter = 0
  for i, data in enumerate(train_loader):
    inputs, labels = data
    inputs, labels = inputs.to(device), labels.to(device)
    if labels.size() == torch.Size([1, 1, 388, 388]):
      labels = labels.reshape(1, 388, 388)
    optimizer.zero_grad()
    outputs = UNET(inputs)
    loss = criterion(outputs, labels)
    loss.backward()
    optimizer.step()
    running_loss = running_loss + loss.item()
    counter = counter + 1
  epoch_loss = running_loss/counter
  print('Epoch {}/{}, {} Loss: {:.4f}'.format(ep, epochs, 'Training', epoch_loss))
  train_loss = train_loss + [epoch_loss]
  UNET.eval()
  running_loss = 0.0
  counter = 0
  for i, data in enumerate(val_loader):
    inputs, labels = data
    inputs, labels = inputs.to(device), labels.to(device)
    if labels.size() == torch.Size([1, 1, 388, 388]):
      labels = labels.reshape(1, 388, 388)
    optimizer.zero_grad()
    outputs = UNET(inputs)
    loss = criterion(outputs, labels)
    running_loss = running_loss + loss.item()
    counter = counter + 1
  epoch_loss = running_loss/counter
  print('Epoch {}/{}, {} Loss: {:.4f}'.format(ep, epochs, 'Validation', epoch_loss))
  val_loss = val_loss + [epoch_loss]
  if ep%10 == 0:
    print("Saving Model after epoch {}".format(ep))
    torch.save(UNET.state_dict(), os.path.join(os.getcwd(), 'unet{}.pth'.format(ep)))

Epoch 0/100, Training Loss: 0.8068
Epoch 0/100, Validation Loss: 0.6059
Saving Model after epoch 0
Epoch 1/100, Training Loss: 0.5924
Epoch 1/100, Validation Loss: 0.5862
Epoch 2/100, Training Loss: 1.2909
Epoch 2/100, Validation Loss: 0.7040
Epoch 3/100, Training Loss: 0.6021
Epoch 3/100, Validation Loss: 0.5558
