In [2]:
# Import all the libraries needed

import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, models, transforms
from torch.utils.data import Dataset, DataLoader


import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import os
import copy
print("PyTorch Version: ",torch.__version__)
print("Torchvision Version: ",torchvision.__version__)

ImportError: cannot import name 'PILLOW_VERSION' from 'PIL' (C:\Users\fabia\anaconda3\envs\pt-labi\lib\site-packages\PIL\__init__.py)

In [None]:
# Define the transformations/augmentations for the data
array_transforms = {
    'train':
    transforms.Compose([
#        transforms.RandomResizedCrop(size=256, scale=(0.8, 1.0)),
        transforms.RandomRotation(degrees=180),
        transforms.RandomHorizontalFlip(),
        transforms.RandomVerticalFlip(),
#        transforms.CenterCrop(size=224),  # Image net standards
        transforms.ToTensor(),
#        transforms.Normalize([0.485, 0.456, 0.406],
#                             [0.229, 0.224, 0.225])  # Imagenet standards
    ]),
    'val':
    transforms.Compose([
#        transforms.Resize(size=256),
#        transforms.CenterCrop(size=224),
        transforms.RandomRotation(degrees=180),
        transforms.RandomHorizontalFlip(),
        transforms.RandomVerticalFlip(),
        transforms.ToTensor(),
#        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test':
    transforms.Compose([
#        transforms.Resize(size=256),
#        transforms.CenterCrop(size=224),
        transforms.RandomRotation(degrees=180),
        transforms.RandomHorizontalFlip(),
        transforms.RandomVerticalFlip(),
        transforms.ToTensor(),
#        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}


# Tell the dataloader where to search
traindir = 'Data/traindir'
train_channels = [f'{traindir}/r_data', f'{traindir}/g_data']

valdir = 'Data/valdir'
val_channels = [f'{valdir}/r_data', f'{valdir}/g_data']

testdir = 'Data/testdir'
test_channels = [f'{testdir}/r_data', f'{testdir}/g_data']

train_inputs = []
train_labels = []

val_inputs = []
val_labels = []

test_inputs = []
test_labels = []


def load_train_inputs():
    for channel in train_channels:
        path_to_channel = os.fsdecode(channel)
        for file in os.listdir(channel):
            filename = os.fsdecode(file)
            if filename.endswith('.npy'):
                loaded_array = np.load(f'{path_to_channel}/{filename}')
                train_inputs.append(loaded_array)
                continue
            else:
                continue

def load_train_labels():
        labelfolder = f'{traindir}/labels'
        for file in os.listdir(labelfolder):
            filename = os.fsdecode(file)
            if filename.endswith('.npy'):
                loaded_array = np.load(f'{labelfolder}/{filename}')
                train_labels.append(loaded_array)
                continue
            else:
                continue
load_train_inputs()
load_train_labels()



def load_val_inputs():
    for channel in val_channels:
        path_to_channel = os.fsdecode(channel)
        for file in os.listdir(channel):
            filename = os.fsdecode(file)
            if filename.endswith('.npy'):
                loaded_array = np.load(f'{path_to_channel}/{filename}')
                val_inputs.append(loaded_array)
                continue
            else:
                continue

def load_val_labels():
        labelfolder = f'{valdir}/labels'
        for file in os.listdir(labelfolder):
            filename = os.fsdecode(file)
            if filename.endswith('.npy'):
                loaded_array = np.load(f'{labelfolder}/{filename}')
                val_labels.append(loaded_array)
                continue
            else:
                continue
load_val_inputs()
load_val_labels()



def load_test_inputs():
    for channel in test_channels:
        path_to_channel = os.fsdecode(channel)
        for file in os.listdir(channel):
            filename = os.fsdecode(file)
            if filename.endswith('.npy'):
                loaded_array = np.load(f'{path_to_channel}/{filename}')
                test_inputs.append(loaded_array)
                continue
            else:
                continue

def load_test_labels():
        labelfolder = f'{testdir}/labels'
        for file in os.listdir(labelfolder):
            filename = os.fsdecode(file)
            if filename.endswith('.npy'):
                loaded_array = np.load(f'{labelfolder}/{filename}')
                test_labels.append(loaded_array)
                continue
            else:
                continue
load_test_inputs()
load_test_labels()

class CreateDataset(Dataset):
    def __init__(self, inputs, labels, transform=None):
        self.inputs = inputs
        self.labels = torch.LongTensor(labels)
        self.transform = transform

    def __getitem__(self, index):
        x = self.inputs[index]
        y = self.labels[index]

        if self.transform:
            x = Image.fromarray(self.inputs[index].astype(np.uint8).transpose(1,2,0))
            x = self.transform(x)

        return x, y

    def __len__(self):
        return len(self.inputs)

# Split up the red and green channel again so that the axes match in size
train_inputs = np.asarray(train_inputs)
train_inputs = np.split(train_inputs, 2, axis=0)
train_inputs = np.asarray(train_inputs).transpose(1,0,2,3)
print(np.shape(train_inputs))

val_inputs = np.asarray(val_inputs)
val_inputs = np.split(val_inputs, 2, axis=0)
val_inputs = np.asarray(val_inputs).transpose(1,0,2,3)
print(np.shape(val_inputs))

test_inputs = np.asarray(test_inputs)
test_inputs = np.split(test_inputs, 2, axis=0)
test_inputs = np.asarray(test_inputs).transpose(1,0,2,3)
print(np.shape(test_inputs))

batch_size = 8

# Get the data, transform it
data = {
   'train':
   CreateDataset(torch.FloatTensor(train_inputs), train_labels, transform = None),
   'val':
   CreateDataset(torch.FloatTensor(val_inputs), val_labels, transform = None),
    'test':
   CreateDataset(torch.FloatTensor(test_inputs), test_labels, transform = array_transforms['test']),
}

# Load Data in batches, shuffled
dataloaders = {
   'train': DataLoader(data['train'], batch_size=batch_size, shuffle=True),
   'val': DataLoader(data['val'], batch_size=batch_size, shuffle=True),
    'test': DataLoader(data['test'], batch_size=batch_size, shuffle=True),
}

In [None]:
# Load in pretrained VGG16 net, extract the FCN part and modify first layer to recieve 2 channels
vgg16 = models.vgg16(pretrained=True)
vgg16_mod = vgg16.features
vgg16_mod[0] = nn.Conv2d(2, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
# print(vgg16_mod)

In [None]:
# Check if GPU is available and move the model over to GPU
if torch.cuda.is_available():
    device = torch.device("cuda: 0")
    gpu_name = torch.cuda.get_device_name()
    print(f"Running on your {gpu_name} (GPU)")
else:
    device = torch.device("cpu")
    print("Running on your CPU")

net = vgg16_mod.to(device)

In [None]:
# define the loss function and the optimizer

def distance(x1,y1,x2,y2):
        return ((x2-x1)**2 + (y2-y1)**2)**0.5

def custom_loss(output, target):
    x1,y1 = list(zip(*np.where(output==1)))[0]
    x2,y2 = list(zip(*np.where(target==1)))[0]
    return distance(x1,y1,x2,y2)


# criterion = custom_loss(output, target)
optimizer = optim.Adam(net.parameters(), lr=0.001)

In [None]:
# Define the training function

epochs = 1

epochs_no_improve = 0
epochs_no_improve_limit = 7

min_val_loss = np.Inf


def train():
    for epoch in range(epochs):
        
        val_loss = 0
        
        for train_inputs, train_labels in dataloaders['train']:
            optimizer.zero_grad()
            output = net(train_inputs.to(device))
            loss = custom_loss(output, train_labels)
            loss.backward()
            optimizer.step()
            
        for val_inputs, val_labels in dataloaders['val']:
            output = net(val_inputs.to(device))
            loss = custom_loss(output, val_labels)
            val_loss += loss
            
        val_loss = val_loss / len(dataloaders['val'])
        
        if val_loss < min_val_loss:
            torch.save(net, 'Nets')
            epochs_no_improve = 0
            min_val_loss = val_loss
        else:
            epochs_no_improve += 1
            if epochs_no_improve == epochs_no_improve_limit:
                print('Early stopping initiated')
                model = torch.load(checkpoint_path)
                print('Best model so far has been loaded')
    print('Finished training')
train()