In [None]:
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
import torchvision.transforms.functional as func
import torch
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import os
from google.colab import drive
drive.mount('/content/drive')
from google.colab import files
import time
import glob

Mounted at /content/drive


In [None]:
path = "/content/drive/MyDrive/VIP/Custom ML Model/" #Path of train/test data
train_path = path + "Train/"
test_path = path + "Test/"

train_data = os.listdir(train_path + "Train Data")
train_label = os.listdir(train_path + "Train Label")
test_data = os.listdir(test_path + "Test Data")
test_label = os.listdir(test_path + "Test Label")

In [None]:
class dataset(Dataset):
  def __init__(self,test=False,transf=transforms.ToTensor()):
    super().__init__()
    self.test=test
    self.trans = transf
    if test:
      self.data = test_data
      self.labels = test_label
    else:
      self.data = train_data
      self.labels = train_label

  def __len__(self):
      return len(self.data)

  def __getitem__(self, idx):
    data_name = self.data[idx]
    if self.test:
      data = Image.open(test_path + "Test Data/" + data_name)
      label = Image.open(test_path + "Test Label/" + data_name) # data name same as label name
      data = data.resize((320, 320)) # Resize image to pass into model
      label = func.rgb_to_grayscale(label, num_output_channels=1) # convert label to grayscale
      label = label.resize((320,320))
      return self.trans(data), self.trans(label), data_name
    else:
      data = Image.open(train_path + "Train Data/" + data_name)
      label = Image.open(train_path + "Train Label/" + data_name) # data name same as label name
      data = data.resize((320, 320))
      label = func.rgb_to_grayscale(label, num_output_channels=1)
      label = label.resize((320,320))
      return self.trans(data), self.trans(label), data_name

In [None]:
def train_model(model, data_loader, loss, optimizer, epochs):
  device=torch.device('cuda:0') # Use GPU to run
  model.to(device)
  model.train()
  loss_list=[]
  b_list=[]
  for epoch in range(epochs):
    print("Epoch: ", (epoch + 1))
    model.train()
    for data, label, _ in tqdm(train_data_load):
      data=data.to(device)
      label = label.to(device)
      output, _, _ = model(data)
      batch_loss = loss(output, label)
      optimizer.zero_grad()
      batch_loss.backward()
      optimizer.step()
      b_list.append(batch_loss.cpu().detach().numpy())
    loss_list.append(sum(b_list)/len(b_list))
    print("Loss: ", sum(b_list)/len(b_list)) # Prints value of loss
  return loss_list

In [None]:
class model(nn.Module):
  def __init__(self):
    super().__init__()

    # Branch 1: (larger stride/pool size/filter size) (General Feature)
    self.b1_conv1 = nn.Sequential(
                      nn.Conv2d(3, 64, kernel_size=3, stride=3),
                      nn.LeakyReLU(),
                      nn.AvgPool2d(kernel_size=3, stride=3),
                      nn.Dropout2d(),
                      nn.BatchNorm2d(64),
                      nn.Upsample(size=128)                      
                    )

    self.b1_conv2 = nn.Sequential(
                      nn.Conv2d(64, 256, kernel_size=3, stride=3),
                      nn.LeakyReLU(),
                      nn.AvgPool2d(kernel_size=3, stride=3),
                      nn.BatchNorm2d(256)
                    )

    self.b1_resize = nn.Sequential(
                        nn.ConvTranspose2d(256, 64, kernel_size=3, padding=(2,2)),
                        nn.Upsample(size=64),
                        nn.ConvTranspose2d(64, 1, kernel_size=3, padding=(2,2)),
                        nn.Upsample(size=320)
                     )

    # Branch 2: local, smaller convolution kernels, or even 1x1 convolutions
    self.b2_conv1 = nn.Sequential(
                      nn.Conv2d(3, 128, kernel_size=2, stride=1),
                      nn.LeakyReLU(),
                      nn.AvgPool2d(kernel_size=2, stride=1),
                      nn.BatchNorm2d(128)
                    )

    self.b2_conv2 = nn.Sequential(
                      nn.Conv2d(128, 512, kernel_size=2, stride=2, padding=(2,2)),
                      nn.LeakyReLU(),
                      nn.AvgPool2d(kernel_size=2, stride=1),
                      nn.BatchNorm2d(512)
                    )

    self.b2_resize = nn.Sequential(
                      nn.ConvTranspose2d(512, 128, kernel_size=3, stride=3, padding=(3,3)),
                      nn.ConvTranspose2d(128, 1, kernel_size=2, stride=2, padding=(2,2)), # only 1 feature channel --> grey scale
                      nn.Upsample(320)
                    )

    # Final smoothen layer
    self.fin_conv = nn.Sequential(
                      nn.Conv2d(1, 1, kernel_size=2, stride=1, padding=1),
                      nn.ConvTranspose2d(1, 1, kernel_size=4, stride=1, padding=(2,2)), 
                      nn.BatchNorm2d(1) 
                    )

    self.sig = nn.Sigmoid()

  def forward(self,x):
    # Branch 1
    x1 = self.b1_conv1(x)
    x1 = self.b1_conv2(x1)
    x1 = self.b1_resize(x1)

    # Branch 2
    # Crop image (1/6 from top)
    height = x.shape[2]
    width = x.shape[3]
    x2 = x[:,:,height//6:,:] # [8, 3, 267, 320]

    x2 = self.b2_conv1(x)
    x2 = self.b2_conv2(x2)

    # fill cropped part with 0
    fill_tensor = torch.zeros((x2.shape[0], x2.shape[1], (x2.shape[3]-x2.shape[2]), x2.shape[3]))
    device=torch.device('cuda:0')
    fill_tensor = fill_tensor.to(device) # convert tensor from cpu to gpu
    x2 = torch.cat((fill_tensor, x2), 2) # concatenate top with 0

    x2 = self.b2_resize(x2)

    # Crop x1 and x2 into top, middle, bottom
    x1_top = x1[:,:,:80,:] # 0 ~ 79 (top 1/4)
    x1_mid = x1[:,:,80:160,:] # middle 1/4
    x1_bot = x1[:,:,160:,:] # bottom 1/2

    x2_top = x2[:,:,:80,:] # 0 ~ 79 (top 1/4)
    x2_mid = x2[:,:,80:160,:] # middle 1/4
    x2_bot = x2[:,:,160:,:] # bottom 1/2

    # Apply weights to each branch and add them up
    x_top = 0.7 * x1_top + 0.3 * x2_top # global feature heavy (7:3)
    x_mid = 0.4 * x1_mid + 0.6 * x2_mid # 4:6
    x_bot = 0.3 * x1_bot + 0.7 * x2_bot # local feature heavy (3:7)

    # Concatenate top, mid, and bottom
    x = torch.cat((x_top, x_mid, x_bot), 2)

    # Smoothen convolutional layer
    x = self.fin_conv(x)

    return x, x1, x2

In [None]:
# predict all images in test folder
path = "/content/drive/MyDrive/VIP/Custom ML Model/" #Path of train/test data
# train_path = path + "Train/"
test_img_path = path + "Test/Test Data/"
result_path = "result/"

model.eval()
test_data_obj=dataset(test=True)
test_data_loader=DataLoader(test_data_obj, batch_size=8, shuffle=False)

for data, _, img_name in test_data_loader:
  device=torch.device('cuda:0')
  data=data.to(device)
  predictions, _, _ = model(data)

  for i in range(len(predictions)):
    pred = predictions[i].permute(1,2,0).cpu().detach().numpy()

    thresh_1 = 0.5
    plt.figure()
    thresh_1_pred = pred
    thresh_1_pred[thresh_1_pred < thresh_1] = 0 # post-processing
    thresh_1_pred[thresh_1_pred > (thresh_1)] = 1 # post-processing
    plt.imshow(thresh_1_pred, cmap='gray')
    plt.tick_params(left = False, right = False , labelleft = False , labelbottom = False, bottom = False)
    plt.savefig(result_path + "mask_" + img_name[i])

    test_img=data[i].permute(1,2,0).cpu().detach().numpy()
    plt.imshow(test_img, alpha = 0.6)
    plt.tick_params(left = False, right = False , labelleft = False , labelbottom = False, bottom = False)
    plt.savefig(result_path + "result_" + img_name[i])

    plt.close()

In [None]:
!zip -r /content/result.zip /content/result
files.download("/content/result.zip")