In [1]:
import numpy as np
from google.colab import drive
import skimage.io as io
import numpy as np
import os
import torch
import torchvision.transforms as T
from PIL import Image, ImageColor
from scipy import ndimage
from torch.utils.data import Dataset
from skimage.transform import resize
import random
from torch import nn
import torch.nn.functional as F
!pip install torchmetrics
from torchmetrics.classification import Dice
import torch.nn.init as init

Collecting torchmetrics
  Downloading torchmetrics-1.2.0-py3-none-any.whl (805 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m805.2/805.2 kB[0m [31m10.5 MB/s[0m eta [36m0:00:00[0m
Collecting lightning-utilities>=0.8.0 (from torchmetrics)
  Downloading lightning_utilities-0.10.0-py3-none-any.whl (24 kB)
Installing collected packages: lightning-utilities, torchmetrics
Successfully installed lightning-utilities-0.10.0 torchmetrics-1.2.0


In [2]:
drive.mount('/content/drive')
drive_path = 'drive/My Drive/'

Mounted at /content/drive


In [3]:
parts = {10: {'col': 'orange', 'name':'hood'},
         20: {'col':'darkgreen', 'name':'front door'},
         30: {'col':'yellow', 'name':'rear door'},
         40: {'col':'cyan', 'name':'frame'},
         50: {'col':'purple', 'name':'rear quarter panel'},
         60: {'col':'lightgreen', 'name':'trunk lid'},
         70: {'col':'blue', 'name':'fender'},
         80: {'col':'pink', 'name':'bumper'},
         90: {'col':'darkgray', 'name':'rest of car'},
         0 : {'col':'black', 'name':'background'}}

def display_car(data_arr):
    # Can take both full data and already split data
    if type(data_arr) == torch.Tensor: data_arr = np.moveaxis(data_arr.numpy().astype(np.uint8), 0, 2)
    elif data_arr.shape[0] == 3: data_arr = np.moveaxis(data_arr.astype(np.uint8), 0, 2)
    elif data_arr.shape[2] > 3: data_arr = data_arr[:,:,:3]
    img = Image.fromarray(data_arr)
    display(img) # img.show() for jupyter

def display_labels(data_arr):
    # Can take both full data and already split data
    if type(data_arr) == torch.Tensor: data_arr = data_arr.numpy()
    if data_arr.dtype != np.uint8: data_arr = data_arr.astype(np.uint8)*10
    if data_arr.ndim > 2: data_arr = data_arr[:,:,3]
    img = Image.fromarray(data_arr)
    pixels = list(img.getdata())
    pixels = [ImageColor.getrgb(parts.get(pixel)['col']) for pixel in pixels]
    image = Image.new("RGB", (256, 256), (0,0,0))
    image.putdata(pixels)
    display(image)

def numpy_to_tensor(arr):
    return np.moveaxis(arr, 2, 0).astype(np.float32)

def tensor_to_numpy(tens):
    arr = np.moveaxis(tens, 0, 2).astype(np.uint8)
    return arr

In [4]:
def center_square(img):
    """Returns the cropped central square of an image (crops the largest dimension to match the smallest one)"""
    if img.size[0] == img.size[1]: return img
    smallest_dim = np.argmin(img.size)
    largest_dim = np.argmax(img.size)
    square_dim = img.size[smallest_dim]
    crop_dims = [0,0,0,0]
    crop_dims[largest_dim] = int(img.size[largest_dim]/2-square_dim/2)
    crop_dims[largest_dim+2] = int(img.size[largest_dim]/2+square_dim/2)
    crop_dims[smallest_dim] = 0
    crop_dims[smallest_dim+2] = img.size[smallest_dim]
    crop_img = img.crop(crop_dims)

    return crop_img

def set_background(car_arr, labels_arr, img):
    """Places all non-0 pixels of the car on the background img"""
    center_img = center_square(img)
    back_arr = np.array(center_img.resize(labels_arr.shape))
    # Use both car and labels just in case
    back_arr[labels_arr!=0] = car_arr[labels_arr!=0]

    # In the black car dataset, label pixel count should be similar to non-black pixel count
    if np.sum(car_arr!=0)/3 < np.sum(labels_arr!=0)*1.2:
        # In the black dataset, part of the car isn't correctly labeled, so also use car data for setting background
        back_arr[car_arr!=0] = car_arr[car_arr!=0]

    return back_arr

def move_full_car(arr, x, y, angle=0, zoom=1):
    """Moves the center of the car to (x, y). Takes the whole array (car AND labels)"""
    car_idxs = np.where(arr!=0)
    car_bbox = [max(0,np.min(car_idxs[1])-10), max(0,np.min(car_idxs[0])-10), min(255, np.max(car_idxs[1])+10), min(255,np.max(car_idxs[0])+10)]
    # Array with just the car
    car_arr = arr[car_bbox[1]:car_bbox[3],car_bbox[0]:car_bbox[2]]
    # Rotate the car
    car_arr = ndimage.rotate(car_arr, angle, reshape=True, order=0)
    car_arr = ndimage.zoom(car_arr, (zoom, zoom, 1), order=0)
    # Edges of the car in the new array (without taking into account new image borders)
    edges = [y-np.ceil(car_arr.shape[0]/2),y+np.floor(car_arr.shape[0]/2),x-np.ceil(car_arr.shape[1]/2),x+np.floor(car_arr.shape[1]/2)]
    # Where to crop the car if it goes off bounds
    car_limits = [max(0,-1*int(edges[0])), 255-int(edges[1]) if 255-int(edges[1]) < 0 else car_arr.shape[0], max(0,-1*int(edges[2])), 255-int(edges[3]) if 255-int(edges[3]) < 0 else car_arr.shape[1]]
    edges = [max(0,int(edges[0])), min(255, int(edges[1])), max(0,int(edges[2])), min(255, int(edges[3]))]

    new_arr = np.zeros(arr.shape)
    new_arr[edges[0]:edges[1],edges[2]:edges[3]] = car_arr[car_limits[0]:car_limits[1],car_limits[2]:car_limits[3]]

    return new_arr.astype(np.uint8)


In [5]:
def load_images_from_folder(folder_path, resize_shape=(256, 256), limit=100):
    background_list = []
    count = 0

    for filename in os.listdir(folder_path):
        # Check if the file is an image file
        file_path = os.path.join(folder_path, filename)

        background = Image.open(file_path).convert('RGB')
        background_list.append(background)

        count += 1
        if count >= limit:
            break

    return background_list

# Example usage:
folder_path = 'drive/My Drive/carseg_data/images/landscapes'
background_list = load_images_from_folder(folder_path, limit=250)

In [6]:
class CarDataset(Dataset):
    def __init__(self, root, file_list: list=None, backgrounds: list=[], move_car: bool=False, rotate_car: bool=False, zoom_car: bool=False):
        """
        Initializes the dataset.
        Parameters:
            file_list: a list of filenames from 'root' to use. If not specified, all files will be used.
            background: list with backgrounds. If not specified, no backgrounds will be used.
            move_car: specifies if the cars should be moved to a random location in the image
            rotate_car: specifies if the cars should be given a random rotation (within a range)
        Backgrounds, rotations and translations are random. There is a chance that none will be performed at all.
        This chance is higher for 'photo' images, which will only be rotated/translated when the background is changed (to avoid black bars)
        """
        self.root = root
        self.filenames = os.listdir(self.root) if file_list is None else file_list
        self.backgrounds = backgrounds
        self.move_car = move_car
        self.rotate_car = rotate_car
        self.zoom_car = zoom_car

    def __len__(self):
        return len(self.filenames)

    def __getitem__(self, index):
        filename = self.filenames[index]
        arr = np.load(os.path.join(self.root, filename))
        photo_mod = True
        if 'photo' in filename:
            # Photos only get new background with 33% chance
            photo_mod = random.randrange(0,3)==1

        if self.move_car:
            x = random.randrange(80,255-80)
            y = random.randrange(80,255-80)
            angle = random.randrange(-30,30) if self.rotate_car else 0
            zoom = random.uniform(0.8,1.4) if self.zoom_car else 1
            arr = move_full_car(arr, x, y, angle, zoom)

        car = arr[:,:,0:3]
        labels = arr[:,:,3]

        if len(self.backgrounds) > 0 and photo_mod:
            rand_idx = random.randrange(0,len(self.backgrounds))

            # Some backgrounds are RGB
            img = self.backgrounds[rand_idx]
            car = set_background(car, labels, img)

        car = np.moveaxis(car, 2, 0)

        return car, labels/10

In [7]:
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader

black_car = []
orange_car = []
photos = []
for file in os.listdir(f'{drive_path}carseg_data/arrays'):
    if 'orange' in file: orange_car.append(file)
    elif 'black' in file: black_car.append(file)
    elif 'photo' in file and '(' not in file: photos.append(file)

root = f'{drive_path}carseg_data/arrays'


photo_test = photos[:30]
photos = photos[30:]

black_train, _ = train_test_split(black_car, test_size=0.1, random_state=42, shuffle=True)
orange_train, _ = train_test_split(orange_car, test_size=0.1, random_state=42, shuffle=True)
photos_train, photos_val = train_test_split(photos, test_size=0.2, random_state=42, shuffle=True)


photos_train_ds = CarDataset(root, photos_train*3+black_train[200:400]+orange_train[200:400])
train1_ds = CarDataset(root, photos_train*2, rotate_car=True, move_car=True, backgrounds=background_list)
train2_ds = CarDataset(root, black_train[:200]+orange_train[:200], backgrounds=background_list)

val_ds = CarDataset(root, photos_val)
test_ds = CarDataset(root, photo_test)


train_loader = DataLoader(photos_train_ds+train1_ds+train2_ds, batch_size=16, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=16)
test_loader =  DataLoader(test_ds, batch_size=16)


In [8]:
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.block = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
            nn.Dropout2d(0.3),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
            nn.Dropout2d(0.3)
        )

        for layer in self.block:
            if isinstance(layer, nn.Conv2d):
                init.kaiming_normal_(layer.weight, mode='fan_out', nonlinearity='relu')

    def forward(self, x):
        return self.block(x)

class UNet(nn.Module):
    def __init__(self, in_channels, num_classes):
        super(UNet, self).__init__()
        self.encoder0 = nn.Sequential(ConvBlock(in_channels, 64))
        self.encoder1 = nn.Sequential(nn.MaxPool2d(2,2), ConvBlock(64, 128))
        self.encoder2 = nn.Sequential(nn.MaxPool2d(2,2), ConvBlock(128, 256))
        self.encoder3 = nn.Sequential(nn.MaxPool2d(2,2), ConvBlock(256, 512))
        self.bottleneck = nn.Sequential(nn.MaxPool2d(2,2), ConvBlock(512,1024), nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2))
        self.decoder0 = nn.Sequential(ConvBlock(1024,512), nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2))
        self.decoder1 = nn.Sequential(ConvBlock(512,256), nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2))
        self.decoder2 = nn.Sequential(ConvBlock(256,128), nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2))
        self.decoder3 = nn.Sequential(ConvBlock(128,64), nn.Conv2d(in_channels=64, out_channels=num_classes, kernel_size=1))

    def forward(self, x):
        x0 = self.encoder0(x)
        x1 = self.encoder1(x0)
        x2 = self.encoder2(x1)
        x3 = self.encoder3(x2)
        x4 = self.bottleneck(x3)
        x4 = self.decoder0(torch.cat([x3,x4],dim=1))
        x4 = self.decoder1(torch.cat([x2,x4],dim=1))
        x4 = self.decoder2(torch.cat([x1,x4],dim=1))
        x4 = self.decoder3(torch.cat([x0,x4],dim=1))

        return x4

In [9]:
device = "cuda"
model = UNet(3, 10).to(device)

In [10]:
import torch.optim as optim
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)

In [11]:
num_epochs = 200
dice = Dice(average='micro')

for epoch in range(num_epochs):
    model.train()
    total_train_loss = 0.0
    dice_scores_train = []


    for batch in train_loader:
        inputs, labels = batch
        inputs = inputs.to(device)
        labels = labels.to(device)

        inputs = inputs.float()
        labels = labels.long().to(device)

        # Forward pass
        outputs = model(inputs)

        # Calculate loss
        loss = criterion(outputs, labels)

        total_train_loss += loss.item()

        # Calculate dice
        _, pred = torch.max(outputs, 1)
        pred_cpu = pred.to('cpu')
        labels_cpu = labels.to('cpu')

        dice_scores_train.append(dice(pred_cpu, labels_cpu))

        # Backpropagation and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # Validation loop
    model.eval()
    total_val_loss = 0.0
    dice_scores_val = []

    for batch in val_loader:
        inputs, labels = batch
        inputs = inputs.to(device)
        labels = labels.to(device)

        inputs = inputs.float()
        labels = labels.long().to(device)

        with torch.no_grad():
            outputs = model(inputs)
            val_loss = criterion(outputs, labels)
            total_val_loss += val_loss.item()

            # Calculate dice
            _, pred = torch.max(outputs, 1)
            pred_cpu = pred.to('cpu')
            labels_cpu = labels.to('cpu')

            dice_scores_val.append(dice(pred_cpu, labels_cpu))


    print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {(total_train_loss / len(train_loader)):.4f}, Train dice: {np.mean(dice_scores_train):.4f}, Val Loss: {(total_val_loss / len(val_loader)):.4f}, Val dice: {np.mean(dice_scores_val):.4f}")


Epoch 1/200, Train Loss: 1.3272, Train dice: 0.6424, Val Loss: 0.9920, Val dice: 0.7149
Epoch 2/200, Train Loss: 0.8767, Train dice: 0.7326, Val Loss: 0.9901, Val dice: 0.7018
Epoch 3/200, Train Loss: 0.8196, Train dice: 0.7451, Val Loss: 0.9673, Val dice: 0.7068
Epoch 4/200, Train Loss: 0.7753, Train dice: 0.7591, Val Loss: 0.9442, Val dice: 0.7157
Epoch 5/200, Train Loss: 0.7512, Train dice: 0.7623, Val Loss: 0.9702, Val dice: 0.7038
Epoch 6/200, Train Loss: 0.7284, Train dice: 0.7676, Val Loss: 0.8733, Val dice: 0.7296
Epoch 7/200, Train Loss: 0.7094, Train dice: 0.7734, Val Loss: 0.8391, Val dice: 0.7393
Epoch 8/200, Train Loss: 0.6920, Train dice: 0.7773, Val Loss: 0.8592, Val dice: 0.7378
Epoch 9/200, Train Loss: 0.7047, Train dice: 0.7762, Val Loss: 0.8367, Val dice: 0.7502
Epoch 10/200, Train Loss: 0.6815, Train dice: 0.7792, Val Loss: 0.8250, Val dice: 0.7497
Epoch 11/200, Train Loss: 0.6726, Train dice: 0.7829, Val Loss: 0.8206, Val dice: 0.7535
Epoch 12/200, Train Loss: 0.66

In [12]:
def test_model(model, loader):
  """Test a model on a test dataset"""
  dice = Dice(average='micro')
  model.eval()
  dice_scores = []

  for batch in loader:
      inputs, labels = batch
      inputs, labels = batch
      inputs = inputs.to(device)
      labels = labels.to(device)

      inputs = inputs.float()
      labels = labels.long().to(device)

      with torch.no_grad():
          outputs = model(inputs)

          # Calculate accuracy on the test set
          _, pred = torch.max(outputs, 1)

          # Move tensors to CPU before performing numpy operations
          pred_cpu = pred.to('cpu')
          labels_cpu = labels.to('cpu')

          dice_scores.append(dice(pred_cpu, labels_cpu))

  return np.mean(dice_scores)



In [13]:
test_model(model, test_loader)

0.9012814

In [14]:
torch.save(model.state_dict(), f'{drive_path}carseg_data/model_90_dice.pth')