In [1]:
# Mount to Google Drive
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

Mounted at /content/gdrive


In [14]:
%cd gdrive/MyDrive/Cars

/content/gdrive/MyDrive/Cars


In [3]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as T
import torch.optim as optim
from torch.utils.data import Dataset
from PIL import Image
import numpy as np

In [4]:
# Build for UNet
class DoubleConv(nn.Module):
  def __init__(self, in_channels, out_channels):
    super().__init__()

    # twice conv2d
    self.conv = nn.Sequential(
        nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias=False),
        nn.BatchNorm2d(out_channels),  # normalize every feature
        nn.ReLU(inplace=True),
        nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace=True),
    )

  def forward(self, x):
    return self.conv(x)

In [5]:
# Build UNet from scrach
class UNet(nn.Module):
  def __init__(self, in_channels=3, out_channels=1, features=[64, 128, 256, 512]):  # in_channels is RGB, out_channels is i or 0(pixel level)
    super().__init__()

    # module list
    self.downs = nn.ModuleList()
    self.ups = nn.ModuleList()

    # in_channels 3 to 64
    # down-sampling
    for feature in features:
      self.downs.append(DoubleConv(in_channels, feature))
      in_channels = feature

    # 512(the last feature) to 1024
    self.bottleneck = DoubleConv(features[-1], features[-1]*2)

    # up-sampling
    for feature in reversed(features):
      self.ups.append(nn.ConvTranspose2d(feature*2, feature, kernel_size=2, stride=2))
      self.ups.append(DoubleConv(feature*2, feature))  # connect

    # adjust the final kernel
    self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size=1)

  def forward(self, x):
    skip_connections = []

    # down sampling
    for down in self.downs:
      x = down(x)
      skip_connections.append(x)  # save for later skip connection
      x = F.max_pool2d(x, (2, 2))  # (kernel_size, stride)

    # bottle neck
    x = self.bottleneck(x)

    # reverse for skip connection
    skip_connections.reverse()

    # two module as a set in ups, step=2
    for i in range(0, len(self.ups), 2):

      # self.ups[i] is nn.ConvTranspose2d
      x = self.ups[i](x)
      skip_connection = skip_connections[i//2]

      # transpose + down
      concat = torch.cat((skip_connection, x), dim=1)

      # self.ups[i] is nn.DoubleConv2d, up-sample
      x = self.ups[i+1](concat)

    return self.final_conv(x)



In [6]:
# Test dimension
model = UNet()
toy_data = torch.ones(16, 3, 240, 160)  # 3 is RGB
out = model(toy_data)
print(out.shape)

torch.Size([16, 1, 240, 160])


In [7]:
model = UNet()
model = model.cuda()

In [8]:
# Load data from Google Drive
class CustomDataset(Dataset):  # pytorch Dataset
  def __init__(self, image_dir, mask_dir, transform):
    super().__init__()
    self.image_dir = image_dir
    self.mask_dir = mask_dir
    self.transform = transform
    self.images = os.listdir(self.image_dir)

  def __len__(self):
    return len(self.images)

  def __getitem__(self, i):
    image_path = os.path.join(self.image_dir, self.images[i])
    mask_path = os.path.join(self.mask_dir, self.images[i].replace('.jpg', '_mask.gif'))
    image = np.array(Image.open(image_path))
    mask = np.array(Image.open(mask_path).convert('L'))  # L is single channel
    return self.transform(image), self.transform(mask)

In [9]:
# Check the device we are using is GPU or CPU
if torch.cuda.is_available():
  device = torch.device('cuda')
else:
  device = torch.device('cpu')
print(device)

cuda


In [11]:
# Constants for UNet model training process
BATCH_SIZE = 16
NUM_EPOCHS = 2
IMG_WIDTH = 240
IMG_HEIGHT = 160

In [15]:
# Load data
all_data = CustomDataset('small_train', 'small_train_masks', T.Compose([T.ToTensor(), T.Resize((IMG_HEIGHT, IMG_WIDTH))]))

In [16]:
# Split data into train and val
train_data, val_data = torch.utils.data.random_split(all_data, [0.7, 0.3])

In [17]:
print(len(all_data))
print(len(train_data))
print(len(val_data))

1600
1120
480


In [18]:
# Create loader for mini-batch gradient descent
from torch.utils.data import DataLoader
train_loader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_data, batch_size=BATCH_SIZE, shuffle=False)

In [19]:
loss_function = nn.BCELoss()
optimizer = optim.Adam(model.parameters())

In [20]:
def train(model, num_epochs, train_loader, optimizer, print_every=30):
  for epoch in range(num_epochs):
    for count, (x, y) in enumerate(train_loader):
      model.train()
      x = x.to(device)
      y = y.to(device)
      out = model(x)
      if count % print_every == 0:
        eval(model, val_loader, epoch)
      out = torch.sigmoid(out)
      loss = loss_function(out, y)
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

In [21]:
def eval(model, val_loader, epoch):
  model.eval()
  num_correct = 0
  num_pixels = 0
  with torch.no_grad():
    for x, y in val_loader:
      x = x.to(device)
      y = y.to(device)
      out_img = model(x)
      probability = torch.sigmoid(out_img)
      predictions = probability>0.5
      num_correct += (predictions==y).sum()
      num_pixels += BATCH_SIZE*IMG_WIDTH*IMG_HEIGHT
  print(f'Epoch[{epoch+1}] Acc: {num_correct/num_pixels}')


In [22]:
train(model, NUM_EPOCHS, train_loader, optimizer)



Epoch[1] Acc: 0.7845152020454407
Epoch[1] Acc: 0.9618967175483704
Epoch[1] Acc: 0.9401195645332336
Epoch[2] Acc: 0.9822676181793213
Epoch[2] Acc: 0.9828576445579529
Epoch[2] Acc: 0.9641614556312561
