In [113]:
import matplotlib.image as mpimg
import numpy as np
import matplotlib.pyplot as plt
import os,sys
from PIL import Image
import torch
import torchvision
import shutil
import stat
import glob

from sklearn.metrics import f1_score

In [114]:
if torch.cuda.is_available():
    print("Name of the Graphics card", torch.cuda.get_device_name())
    print("Number of GPU available", torch.cuda.device_count())
    device = "cuda"
else:
    print("No GPU available on this machine")
    device = "cpu"

No GPU available on this machine


In [115]:
# Helper functions

def load_image(infilename):
    data = mpimg.imread(infilename)
    return data

def img_float_to_uint8(img):
    rimg = img - np.min(img)
    rimg = (rimg / np.max(rimg) * 255).round().astype(np.uint8)
    return rimg

# Concatenate an image and its groundtruth
def concatenate_images(img, gt_img):
    nChannels = len(gt_img.shape)
    w = gt_img.shape[0]
    h = gt_img.shape[1]
    if nChannels == 3:
        cimg = np.concatenate((img, gt_img), axis=1)
    else:
        gt_img_3c = np.zeros((w, h, 3), dtype=np.uint8)
        gt_img8 = img_float_to_uint8(gt_img)          
        gt_img_3c[:,:,0] = gt_img8
        gt_img_3c[:,:,1] = gt_img8
        gt_img_3c[:,:,2] = gt_img8
        img8 = img_float_to_uint8(img)
        cimg = np.concatenate((img8, gt_img_3c), axis=1)
    return cimg

def img_crop(im, w, h):
    list_patches = []
    imgwidth = im.shape[0]
    imgheight = im.shape[1]
    is_2d = len(im.shape) < 3
    for i in range(0,imgheight,h):
        for j in range(0,imgwidth,w):
            if is_2d:
                im_patch = im[j:j+w, i:i+h]
            else:
                im_patch = im[j:j+w, i:i+h, :]
            list_patches.append(im_patch)
    return list_patches

def accuracy(predicted_logits, reference):
    """
    Compute the ratio of correctly predicted labels
    
    @param predicted_logits: float32 tensor of shape (batch size, num classes)
    @param reference: int64 tensor of shape (batch_size) with the class number
    """
    labels = torch.argmax(predicted_logits, 1)
    correct_predictions = labels.eq(reference)
    return correct_predictions.sum().float() / correct_predictions.nelement()

In [116]:
def test_accuracy():
    #predictions = torch.tensor([[0.5, -0.6], [1.0, -0.3], [0.7, 0]]) #0
    predictions = torch.tensor([[0.5, 1.0, 0.7], [-0.6, -0.3, 0]]) #1
    correct_labels = torch.tensor([0, 2])  # first is wrong, second is correct
    print()
    assert accuracy(predictions, correct_labels).allclose(torch.tensor([0.5]))

    #predictions = torch.tensor([[0.5, -0.6, -1], [1.0, -0.3, 0], [0.7, 0, 1]]) #0
    predictions = torch.tensor([[0.5, 1.0, 0.7], [-0.6, -0.3, 0], [-1, 0, 1]]) #1
    correct_labels = torch.tensor([1, 1, 2])  # correct, wrong, correct
    assert accuracy(predictions, correct_labels).allclose(torch.tensor([2/3]))

    print("Tests passed")
  
test_accuracy()


Tests passed


In [117]:
# Loaded a set of images
root_dir = "data/training/"
image_test = "data/test_set_images"
image_dir = root_dir + "images/"
files = os.listdir(image_dir)
n = min(20, len(files)) # Load maximum 20 images
print("Loading " + str(n) + " images")
imgs = [load_image(image_dir + files[i]) for i in range(n)]
print(files[0])

gt_dir = root_dir + "groundtruth/"
print("Loading " + str(n) + " images")
gt_imgs = [load_image(gt_dir + files[i]) for i in range(n)]
print(files[0])

n = 10 # Only use 10 images for training

Loading 20 images
satImage_001.png
Loading 20 images
satImage_001.png


#### Training images patch extraction

In [118]:
# Extract patches from input training images
patch_size = 16 # each patch is 16*16 pixels

img_patches = [img_crop(imgs[i], patch_size, patch_size) for i in range(n)]
gt_patches = [img_crop(gt_imgs[i], patch_size, patch_size) for i in range(n)]

# Linearize list of patches
img_patches = np.asarray([img_patches[i][j] for i in range(len(img_patches)) for j in range(len(img_patches[i]))])
gt_patches =  np.asarray([gt_patches[i][j] for i in range(len(gt_patches)) for j in range(len(gt_patches[i]))])

In [119]:
# Extract 6-dimensional features consisting of average RGB color as well as variance
def extract_features(img):
    feat_m = np.mean(img, axis=(0,1))
    feat_v = np.var(img, axis=(0,1))
    feat = np.append(feat_m, feat_v)
    return feat

# Extract 2-dimensional features consisting of average gray color as well as variance
def extract_features_2d(img):
    feat_m = np.mean(img)
    feat_v = np.var(img)
    feat = np.append(feat_m, feat_v)
    return feat

# Extract features for a given image
def extract_img_features(filename):
    img = load_image(filename)
    img_patches = img_crop(img, patch_size, patch_size)
    X = np.asarray([ extract_features_2d(img_patches[i]) for i in range(len(img_patches))])
    return X 

#### Test files extraction

In [132]:
DATA_DIR = 'data'

x_train_dir = os.path.join(DATA_DIR, 'training/images')
y_train_dir = os.path.join(DATA_DIR, 'training/groundtruth')

#x_valid_dir = os.path.join(DATA_DIR, 'val')
#y_valid_dir = os.path.join(DATA_DIR, 'val_labels')

# extracting images from subfolders
x_test_dir = os.path.join(DATA_DIR, 'test_set_images/')
os.chmod(x_test_dir , stat.S_IWRITE)
folder = x_test_dir
subfolders = [f.path for f in os.scandir(folder) if f.is_dir()]

for sub in subfolders:
    for f in os.listdir(sub):
        if '.png' in f:
            src = os.path.join(sub, f)
            dst = os.path.join(folder, f)
            shutil.move(src, dst)
        else:
            to_delete = os.path.join(sub,f)
            os.remove(to_delete)

# remove unnecessary files
for file in os.listdir(x_test_dir):
    if '.ini' in file:
        file_to_remove_dir = os.path.join(x_test_dir, 'desktop.ini')
        os.remove(file_to_remove_dir)
print("Test files extracted from subfolders | Other unnecessary files removed")

test_files = os.listdir(x_test_dir)
m = min(20, len(files)) # Load maximum 20 images
print("Loading " + str(m) + " images")
test_imgs = list(glob.iglob(x_test_dir + '*.png', recursive=True))
print(test_imgs[0])

Test files extracted from subfolders | Other unnecessary files removed
Loading 20 images
data\test_set_images\test_1.png


#### Extracting patches from test images

In [133]:
# Extract patches from input test images
patch_size = 16 # each patch is 16*16 pixels

test_img_patches = [img_crop(imgs[i], patch_size, patch_size) for i in range(m)]
test_img_patches = np.asarray([test_img_patches[i][j] for i in range(len(test_img_patches)) for j in range(len(test_img_patches[i]))])

#### Train & Test classes

In [40]:
class RoadsTrainset(torch.utils.data.Dataset):

    """Read images, apply augmentation and preprocessing transformations.
    
    Args:
        images_dir (str): path to images folder
        masks_dir (str): path to segmentation masks folder
        class_rgb_values (list): RGB values of select classes to extract from segmentation mask
        augmentation (albumentations.Compose): data transfromation pipeline 
            (e.g. flip, scale, etc.)
        preprocessing (albumentations.Compose): data preprocessing 
            (e.g. noralization, shape manipulation, etc.)
    
    """
    
    def __init__(
            self, 
            X, 
            y, 
    ):        
        self.X = X
        self.y = y
    
    def __getitem__(self, i):
        
        # read images and masks
        image = self.X[i]
        X = self.transform(image)
        mask = self.y[i]
        y = torch.from_numpy(np.asarray(mask)).long()
        return X, y
        
        return image, mask
    
    #Transformation, used for data augmentation
    transform = torchvision.transforms.Compose([
    torchvision.transforms.ToTensor()])
        
    def __len__(self):
        # return length of 
        return len(self.X)

In [41]:
class RoadsTestset(torch.utils.data.Dataset):

    """Read images, apply augmentation and preprocessing transformations.
    
    Args:
        images_dir (str): path to images folder
        masks_dir (str): path to segmentation masks folder
        class_rgb_values (list): RGB values of select classes to extract from segmentation mask
        augmentation (albumentations.Compose): data transfromation pipeline 
            (e.g. flip, scale, etc.)
        preprocessing (albumentations.Compose): data preprocessing 
            (e.g. noralization, shape manipulation, etc.)
    
    """
    
    def __init__(
            self, 
            X, 
    ):        
        self.X = X
    
    def __getitem__(self, i):
        
        # read images and masks        
        image = self.X[i]
        X = self.transform(image)
        return X
        
        return image
    
    #Transformation, used for data augmentation
    transform = torchvision.transforms.Compose([
    torchvision.transforms.ToTensor()])
        
    def __len__(self):
        # return length of 
        return len(self.X)

In [42]:
# Compute features for each image patch
foreground_threshold = 0.25 # percentage of pixels > 1 required to assign a foreground label to a patch

def value_to_class(v):
    df = np.sum(v)
    if df > foreground_threshold:
        return 1
    else:
        return 0

X = np.asarray([ extract_features_2d(img_patches[i]) for i in range(len(img_patches))])
Y = np.asarray([value_to_class(np.mean(gt_patches[i])) for i in range(len(gt_patches))])

#Onehot encode
Y_onehot = np.array([[(1-i), i] for i in Y])
Y_onehot.shape, gt_patches.shape

# Balancing the data
roads = img_patches[Y == 1]
nb_roads = len(roads)
backgrounds = img_patches[Y == 0]
nb_backgrounds = len(backgrounds)

training = []
labels = []
for i in range (min(nb_roads, nb_backgrounds)):
    training.append(roads[i % nb_roads])
    training.append(backgrounds[i % nb_backgrounds])
    labels.append(1)
    labels.append(0)

((6250, 2), (6250, 16, 16))

In [43]:
#Load datasets
batch_size = 10
dataset_train = torch.utils.data.DataLoader(RoadsTrainset(img_patches, Y), batch_size=batch_size, shuffle=True)
dataset_test = torch.utils.data.DataLoader(RoadsTestset(img_patches), batch_size=50, shuffle=True)

In [50]:
class LeNetModel(torch.nn.Module):
  def __init__(self):
    """From: LeCun et al., 1998. Gradient-Based Learning Applied to Document Recognition"""
    super().__init__()
    self.conv1 = torch.nn.Conv2d(3, 200, kernel_size=5)
    self.conv2 = torch.nn.Conv2d(200, 400, kernel_size=5)
    self.conv2_drop = torch.nn.Dropout2d(0.5)
    self.fc1 = torch.nn.Linear(400, 100)
    self.fc2 = torch.nn.Linear(100, 2)

  def forward(self, x):
    relu = torch.nn.functional.relu
    max_pool2d = torch.nn.functional.max_pool2d

    x = relu(max_pool2d(self.conv1(x), 2))
    x = relu(max_pool2d(self.conv2_drop(self.conv2(x)), 2))
    x = x.view(batch_size, -1)
    x = relu(self.fc1(x))
    x = torch.nn.functional.dropout(x, training=self.training)
    x = self.fc2(x)
    return x # Previously there was torch.nn.functional.log_softmax(x, dim=1) here which was incorrect (although the network could still train)

In [77]:
def train(model, criterion, dataset_train, dataset_test, optimizer, num_epochs):
  """
  @param model: torch.nn.Module
  @param criterion: torch.nn.modules.loss._Loss
  @param dataset_train: torch.utils.data.DataLoader
  @param dataset_test: torch.utils.data.DataLoader
  @param optimizer: torch.optim.Optimizer
  @param num_epochs: int
  """
  print("Starting training")
  for epoch in range(num_epochs):
    # Train an epoch
    model.train()
    averageLoss = 0
    for batch_x, batch_y in dataset_train:
      batch_x, batch_y = batch_x.to(device), batch_y.to(device)

      prediction = model(batch_x)
      loss = criterion(prediction, batch_y)
    
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
      averageLoss += loss.item()/len(dataset_train)


    # Test the quality on the test set
    model.eval()
    accuracies_test = []
    i=0
    # to calculate f1 score
    target_true = 0
    predicted_true = 0
    correct_true = 0
    for batch_x, batch_y in dataset_test:
      with torch.no_grad():
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)

        # Evaluate the network (forward pass)
        prediction = model(batch_x)

        # calculating f1 score
        predicted_classes = torch.argmax(prediction, dim=1) == 0
        target_classes = batch_y
        target_true += torch.sum(target_classes == 0).float()
        predicted_true += torch.sum(predicted_classes).float()

        correct_true_array = torch.logical_and(predicted_classes == target_classes, predicted_classes == 0)
        correct_true_array = correct_true_array.type(torch.ByteTensor)
        correct_true += torch.sum(correct_true_array).float()
        if i==-5:
              print(prediction, batch_y)
        accuracies_test.append(accuracy(prediction, batch_y))
        i+=1
      
  recall = correct_true / target_true
  precision = correct_true / predicted_true
  f1_score = 2 * precision * recall / (precision + recall)

  print("Test accuracy: {:.5f} | F1 Score: {:.3f}".format(sum(accuracies_test).item()/len(accuracies_test), f1_score))

In [78]:
num_epochs = 20
learning_rate = 1e-3
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
criterion = torch.nn.CrossEntropyLoss()
model_lenet = LeNetModel().to(device)
optimizer = torch.optim.Adam(model_lenet.parameters(), lr=learning_rate)

train(model_lenet, criterion, dataset_train, dataset_train, optimizer, num_epochs)

Starting training
Test accuracy: 0.90128 | F1 Score: 0.043
