In [None]:
import torch
import torch.optim as optim
import torch.nn.functional as F
import torchvision
import torchvision.datasets as datasets
import torchvision.models as models
import torchvision.transforms as transforms
import IPython
import PIL.Image
import glob
import os
import cv2
import copy
import numpy as np
import matplotlib.pyplot as plt
!pip install albumentations==0.5.2
import albumentations as albu

In [None]:
!pip install efficientnet_pytorch
from efficientnet_pytorch import EfficientNet

device = torch.device('cuda')

In [None]:
data_stop_path='../input/sensor-stop/dataset/dataset'
model_stop_path='../input/sensor-stop/'
data_xy_path='../input/dataset-xy/dataset_xy/dataset_xy'
focus_xy_path='../input/dataset-xy/test_focus/test_focus/'
model_xy_path='../input/dataset-xy/'
multi_path='../input/multi-model/'

### For efficiency the classifier should reuse the backbone of the regression model

#### Loading a backbone pretained on the regression task

In [None]:
# Load road following model
model = EfficientNet.from_name('efficientnet-b0')
model._fc=torch.nn.Linear(model._fc.in_features, 2)
model.load_state_dict(torch.load(model_xy_path+'efficientnet_b0_steering_model_xy_tape.pth'))
# Copy final layer
model_xy_top=torch.nn.Linear(model._fc.in_features, 2)
model_xy_top.load_state_dict(model._fc.state_dict())
model_xy_top=model_xy_top.to(device)
# Turn the model into a backbone/feature extractor
backbone = model.to(device)
backbone._fc = torch.nn.Identity()
backbone.eval()
print("")

#### Loading the backbone only pretrained on ImageNet

In [None]:
# Load ImageNet model
backbone = EfficientNet.from_pretrained('efficientnet-b0')
# Create regression head
regressor=torch.nn.Linear(backbone._fc.in_features, 2)
regressor.to(device)
# Turn the model into a backbone/feature extractor
backbone._fc = torch.nn.Identity()
backbone = backbone.to(device)
print("")

### Initialize the classifier

In [None]:
# model0: 64, no dropout

classifier = torch.nn.Sequential(
    torch.nn.Linear(model_xy_top.in_features, 256),
    torch.nn.SiLU(),
    torch.nn.Dropout(p=0.2, inplace=False),
    torch.nn.Linear(256, 2),
)

classifier=classifier.to(device)

In [None]:
classifier = torch.nn.Linear(regressor.in_features, 2)
classifier=classifier.to(device)

### Create the multi-headed network

In [None]:
class MultiHeadNetwork(torch.nn.Module):
    '''
    This class only serves for training purposes, 
    the components get saved and deployed separately.'''
    def __init__(self,backbone,head_xy,head_cl):
        super(MultiHeadNetwork,self).__init__()

        # This represents the shared layer(s) before the different heads
        # Here, I used a single linear layer for simplicity purposes
        # But any network configuration should work
        self.shared = backbone

        # Set up the different heads
        # Each head can take any network configuration
        self.head_xy = head_xy
        self.head_cl = head_cl

    def forward(self, x, head):

        # Run the shared layer(s)
        #x = self.shared(x)
        # Run the different heads with the output of the shared layers as input
        if head=="xy":
            y = self.head_xy(self.shared(x))
        elif head=="cl":
            y = self.head_cl(self.shared(x))

        return y


#multi_model = MultiHeadNetwork(backbone,regressor,classifier)

In [None]:
def motion_blur():
    train_transform = [
        albu.MotionBlur(blur_limit=30, p=1)        
    ]
    return albu.Compose(train_transform)

motion_blur_composed = motion_blur()

class CustomImageFolder(datasets.ImageFolder):
  def __getitem__(self, index: int):
    path, target = self.samples[index]
    sample = self.loader(path)
    if float(np.random.rand(1)) > 0.1:
      sample = np.array(sample)
      sample = motion_blur_composed(image=sample)
      sample = PIL.Image.fromarray(sample['image'])
    if self.transform is not None:
        sample = self.transform(sample)
    if self.target_transform is not None:
        target = self.target_transform(target)

    return sample, target


def motion_blur():
    train_transform = [
        albu.MotionBlur(blur_limit=30, p=1)        
    ]
    return albu.Compose(train_transform)

def get_x(path, width):
    """Gets the x value from the image filename"""
    return (float(int(path.split("_")[1])) - width/2) / (width/2)

def get_y(path, height):
    """Gets the y value from the image filename"""
    return (float(int(path.split("_")[2])) - height/2) / (height/2)

class XYDataset(torch.utils.data.Dataset):
    
    def __init__(self, directory, random_hflips=False, col_params=(0.3,0,0,0), p_blur=0.1):
        self.directory = directory
        self.random_hflips = random_hflips
        self.image_paths = glob.glob(os.path.join(self.directory, '*.jpg'))
        self.color_jitter = transforms.ColorJitter(brightness=col_params[0],
                                                   contrast=col_params[1],
                                                   saturation=col_params[2],
                                                   hue=col_params[3])
        self.motion_blur = motion_blur()
        self.p_blur=p_blur
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        
        image = PIL.Image.open(image_path)
        width, height = image.size
        x = float(get_x(os.path.basename(image_path), width))
        y = float(get_y(os.path.basename(image_path), height))
      
        if float(np.random.rand(1)) > 0.5:
            image = transforms.functional.hflip(image)
            x = -x
        
        if float(np.random.rand(1)) < self.p_blur:
            image_np = np.array(image)
            image_np = self.motion_blur(image=image_np)
            image = PIL.Image.fromarray(image_np['image'])

        image = self.color_jitter(image)
        image = transforms.functional.resize(image, (224, 224))
        image = transforms.functional.to_tensor(image)
        image = image.numpy()[::-1].copy()
        image = torch.from_numpy(image)
        # ImageNet mean and stdev
        image = transforms.functional.normalize(image, [0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        
        return image, torch.tensor([x, y]).float()

class InspectionDataset(torch.utils.data.Dataset):
    
    def __init__(self, directory, random_hflips=False, col_params=(0.3,0,0,0), p_blur=0.1):
        self.directory = directory
        self.random_hflips = random_hflips
        self.image_paths = glob.glob(os.path.join(self.directory, '*.jpg'))
        self.color_jitter = transforms.ColorJitter(brightness=col_params[0],
                                                   contrast=col_params[1],
                                                   saturation=col_params[2],
                                                   hue=col_params[3])
        self.motion_blur = motion_blur()
        self.p_blur=p_blur
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        
        image = PIL.Image.open(image_path)
        width, height = image.size
        x = float(get_x(os.path.basename(image_path), width))
        y = float(get_y(os.path.basename(image_path), height))
      
        if float(np.random.rand(1)) > 0.5:
            image = transforms.functional.hflip(image)
            x = -x
        
        if float(np.random.rand(1)) < self.p_blur:
            image_np = np.array(image)
            image_np = self.motion_blur(image=image_np)
            image = PIL.Image.fromarray(image_np['image'])

        image = self.color_jitter(image)
        image = transforms.functional.resize(image, (224, 224))
        image = transforms.functional.to_tensor(image)
        image_norm = image.numpy()[::-1].copy()
        image_norm = torch.from_numpy(image_norm)
        # ImageNet mean and stdev
        image_norm = transforms.functional.normalize(image_norm, [0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        
        return image_norm, torch.tensor([x, y]).float(), image
    



In [None]:
dataset_xy = XYDataset(data_xy_path, random_hflips=True)

dataset_stop = CustomImageFolder(
    data_stop_path,
    transforms.Compose([
        transforms.ColorJitter(brightness=0.3, contrast=0, saturation=0, hue=0),
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
)
dataset_stop.class_to_idx 

In [None]:
test_percent = 0.2
num_test = int(test_percent * len(dataset_stop))
torch.manual_seed(2021)
train_stop_dataset, test_stop_dataset = torch.utils.data.random_split(dataset_stop, [len(dataset_stop) - num_test, num_test])
num_test = int(test_percent * len(dataset_xy))
torch.manual_seed(2021)
train_xy_dataset, test_xy_dataset = torch.utils.data.random_split(dataset_xy, [len(dataset_xy) - num_test, num_test])
# Add hand picked test images to make sure it fixes errors
test_extra_xy_dataset=torch.utils.data.ConcatDataset([test_xy_dataset,XYDataset(focus_xy_path, random_hflips=True)])

In [None]:
train_stop_loader = torch.utils.data.DataLoader(
    train_stop_dataset,
    batch_size=16,
    shuffle=True,
    num_workers=0
)

test_stop_loader = torch.utils.data.DataLoader(
    test_stop_dataset,
    batch_size=16,
    shuffle=True,
    num_workers=0
)

train_xy_loader = torch.utils.data.DataLoader(
    train_xy_dataset,
    batch_size=16,
    shuffle=True,
    num_workers=0
)

test_xy_loader = torch.utils.data.DataLoader(
    test_extra_xy_dataset,
    batch_size=16,
    shuffle=True,
    num_workers=0
)

inspection_loader = torch.utils.data.DataLoader(
    InspectionDataset(focus_xy_path, random_hflips=True,col_params=(0.3,0,0,0),p_blur=1),
    batch_size=16,
    shuffle=False,
    num_workers=0
)

### Training regression and classifier heads together

In [None]:
backbone = EfficientNet.from_name('efficientnet-b0')
regressor = torch.nn.Linear(1280, 2)
classifier = torch.nn.Linear(1280, 2)
backbone._fc = torch.nn.Identity()

backbone = backbone.to(device)
regressor = regressor.to(device)
classifier = classifier.to(device)

backbone.load_state_dict(torch.load(multi_path+'backbone_0.pth'))
regressor.load_state_dict(torch.load(multi_path+'head_xy_0.pth'))
classifier.load_state_dict(torch.load(multi_path+'head_stop_0.pth'))
multi_model = MultiHeadNetwork(backbone,regressor,classifier)


In [None]:
NUM_EPOCHS = 300
BEST_SHARED_PATH = 'backbone_1.pth'
BEST_XY_PATH = 'head_xy_1.pth'
BEST_STOP_PATH = 'head_stop_1.pth'
best_loss = 0.191133

optimizer = optim.Adam(multi_model.parameters(),lr=1e-4)
xy_weight=20.0
out = display(IPython.display.Pretty('Start training'), display_id=True)
for epoch in range(NUM_EPOCHS):
    multi_model.train()
    
    # Train for regression
    for images, labels in iter(train_xy_loader):
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = multi_model(images,"xy")
        loss_xy = F.mse_loss(outputs, labels)
        loss_xy.backward()
        optimizer.step()
    
    # Train for classification
    for images, labels in iter(train_stop_loader):
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = multi_model(images,"cl")
        loss_stop = F.cross_entropy(outputs, labels)
        loss_stop.backward()
        optimizer.step()
        
    # Train for regression
    for images, labels in iter(train_xy_loader):
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = multi_model(images,"xy")
        loss_xy = F.mse_loss(outputs, labels)
        loss_xy.backward()
        optimizer.step()
    
     
    multi_model.eval()
    test_xy_loss = 0.0
    test_stop_loss = 0.0
    test_multi_loss = 0.0
    # Evaluate on regression
    for images, labels in iter(test_xy_loader):
        images = images.to(device)
        labels = labels.to(device)
        outputs = multi_model(images,"xy")
        loss_xy = F.mse_loss(outputs, labels)
        test_xy_loss += float(loss_xy)
    test_xy_loss /= len(test_xy_loader)
    
    # Evaluate on classification
    for images, labels in iter(test_stop_loader):
        images = images.to(device)
        labels = labels.to(device)
        outputs = multi_model(images,"cl")
        loss_stop = F.cross_entropy(outputs, labels)
        test_stop_loss += float(loss_stop)
    test_stop_loss /= len(test_stop_loader)
    
    test_multi_loss=xy_weight*test_xy_loss+test_stop_loss
    
    out.update(IPython.display.Pretty('Epoch %d | xy: %f, stop: %f, joint: %f' % (epoch,test_xy_loss, test_stop_loss, test_multi_loss)))

    if test_multi_loss < best_loss:
        torch.save(multi_model.shared.state_dict(), BEST_SHARED_PATH)
        torch.save(multi_model.head_xy.state_dict(), BEST_XY_PATH)
        torch.save(multi_model.head_cl.state_dict(), BEST_STOP_PATH)
        best_loss = test_multi_loss
        print('Epoch %d | xy: %f, stop: %f, joint: %f' % (epoch,test_xy_loss, test_stop_loss, test_multi_loss))

### Training classifier head on regression backbone

In [None]:
#classifier.load_state_dict(torch.load(model_path+'stop_model.pth'))
#classifier.load_state_dict(torch.load('./'+'stop_model_3.pth'))

In [None]:
NUM_EPOCHS = 300
BEST_MODEL_PATH = 'stop_model_3.pth'
best_loss = 0.131893
# 0.069680
# 0.123112
# 0.133830
# 0.131893

optimizer = optim.Adam(classifier.parameters(),lr=1e-5)
out = display(IPython.display.Pretty('Start training'), display_id=True)
for epoch in range(NUM_EPOCHS):
    
    classifier.train()
    for images, labels in iter(train_stop_loader):
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = classifier(backbone(images))
        loss = F.cross_entropy(outputs, labels)
        loss.backward()
        optimizer.step()
    
    classifier.eval()
    test_loss = 0.0
    for images, labels in iter(test_stop_loader):
        images = images.to(device)
        labels = labels.to(device)
        outputs = classifier(backbone(images))
        loss = F.cross_entropy(outputs, labels)
        test_loss += float(loss)
    test_loss /= len(test_loader)
    out.update(IPython.display.Pretty('%d: %f' % (epoch, test_loss)))

    if test_loss < best_loss:
        torch.save(classifier.state_dict(), BEST_MODEL_PATH)
        best_loss = test_loss
        print('%d: %f' % (epoch, test_loss))

# Reuse backbone for road following!

### Testing

In [None]:
def scale_y(y):
    return (0.5 - y) / 2.0
def scale_to_pixels(x, width=224):
    return int(x*(224/2)+(224/2))
#model = EfficientNet.from_name('efficientnet-b0')
#model._fc=torch.nn.Linear(model_road._fc.in_features, 2)
#model.load_state_dict(torch.load(model_path+'efficientnet_b0_steering_model_xy_tape.pth'))
multi_model.eval()
for images_norm, labels, images in iter(inspection_loader):
    images_norm = images_norm.to(device)
    outputs = multi_model(images_norm,"xy")
    xy_batch=outputs.detach().float().cpu().numpy()
    #for i in range(images.shape[0]):
    for i in range(12):
        x_hat=scale_to_pixels(xy_batch[i,0])
        y_hat=scale_to_pixels(xy_batch[i,1])
        x=scale_to_pixels(labels.detach().float().cpu().numpy()[i][0])
        y=scale_to_pixels(labels.detach().float().cpu().numpy()[i][1])
        # convert tensors back to images to add circles
        image_np=images[i].permute(1, 2, 0).numpy()* 255
        image_np=image_np.astype(np.uint8)
        overlay1=cv2.circle(image_np.copy(), (x, y), 8, (0, 255, 0), 3)
        overlay2=cv2.circle(image_np.copy(), (x_hat, y_hat), 8, (255, 0, 0), 3)
        alpha=0.5
        image_np=cv2.addWeighted(overlay2, alpha, overlay1, 1 - alpha, 0)
        # and then back to tensors for plotting
        images[i]=transforms.functional.to_tensor(image_np)
    grid_img = torchvision.utils.make_grid(images[0:12,:,:], nrow=4)
    fig, ax = plt.subplots(figsize=(18, 10))
    ax.imshow(grid_img.permute(1, 2, 0))
    plt.tight_layout()
    fig.savefig("regression_example.pdf")