In [1]:
import torch
import os
import glob
import uuid
import cv2
import torch.utils.data
import subprocess
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision
import torchvision.datasets as datasets
import torchvision.models as models
import torchvision.transforms as transforms
import PIL.Image
import numpy as np

In [None]:
def get_x(path, width):
    """Gets the x value from the image filename"""
    return (float(int(path.split("_")[1])) - width/2) / (width/2)

def get_y(path, height):
    """Gets the y value from the image filename"""
    return (float(int(path.split("_")[2])) - height/2) / (height/2)
            
class XYDataset(torch.utils.data.Dataset):

    def __init__(self, directory, random_hflips=False):
        self.directory = directory
        self.random_hflips = random_hflips
        self.image_paths = glob.glob(os.path.join(self.directory, '*.jpg'))
        self.color_jitter = transforms.ColorJitter(0.3, 0.3, 0.3, 0.3)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]

        image = PIL.Image.open(image_path)
        width, height = image.size
        x = float(get_x(os.path.basename(image_path), width))
        y = float(get_y(os.path.basename(image_path), height))

        if float(np.random.rand(1)) > 0.5:
            image = transforms.functional.hflip(image)
            x = -x

        image = self.color_jitter(image)
        image = transforms.functional.resize(image, (224, 224))
        image = transforms.functional.to_tensor(image)
        image = image.numpy()[::-1].copy()
        image = torch.from_numpy(image)
        image = transforms.functional.normalize(image, [0.485, 0.456, 0.406], [0.229, 0.224, 0.225])

        return image, torch.tensor([x, y]).float()


dataset = XYDataset('jetbot_dataset', random_hflips=False)
print(len(dataset))

In [3]:
#hold-out
test_percent = 0.2
num_test = int(test_percent * len(dataset))
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [len(dataset) - num_test, num_test])

train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=8,
    shuffle=True,
    num_workers=0
)

test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=8,
    shuffle=True,
    num_workers=0
)

In [None]:
from rdnet18-ca import rdnet18_ca
model_path = "resnet18-5c106cde.pth"
pretrained_dict = torch.load(model_path)

model = rdnet18_ca()
model_dict = model.state_dict()

pretrained_dict = {k: v for k, v in pretrained_dict.items() if k in model_dict}
model_dict.update(pretrained_dict)
model.load_state_dict(model_dict)

In [5]:
model.fc = torch.nn.Linear(512, 2)
device = torch.device('cuda')
model = model.to(device)

In [6]:
class LiSHTLS(nn.Module):
    def __init__(self):
        super(CustomLoss, self).__init__()

    def forward(self, predictions, labels,scale):
        error = outputs - labels
        loss = (error/scale) * ((torch.exp(error) - torch.exp(- error)) / (torch.exp(error) + torch.exp(- error)))
        return loss.mean()
criterion = LiSHTLS()


In [None]:
import time
import json
import matplotlib.pyplot as plt

NUM_EPOCHS = 80
BEST_MODEL_PATH = 'best_steering_model_xy.pth'
best_loss = 1e9
best_epoch = -1
best_train_loss = 0.0
best_test_loss = 0.0

optimizer = optim.Adam(model.parameters())

initial_scale = torch.tensor(1.0, dtype=torch.float32, device=device, requires_grad=True)
learning_rate = 0.05
optimizer_delta_scale = optim.SGD([initial_scale], lr=learning_rate)

train_losses = []
test_losses = []
scale_values =[]

for epoch in range(NUM_EPOCHS):
    epoch_start_time = time.time()

    model.train()
    train_loss = 0.0
    for images, labels in iter(train_loader):
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        optimizer_delta_scale.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels, initial_scale) 
        train_loss += float(loss)

        loss.backward()
        optimizer_delta_scale.step()
        
        scale_values.append(initial_scale.item())
        optimizer.step()
    train_loss /= len(train_loader)
    train_losses.append(train_loss)
    mean_scale = sum(scale_values) / len(scale_values)
    model.eval()
    test_loss = 0.0
    for images, labels in iter(test_loader):
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        loss = criterion(outputs, labels, initial_scale) 
        test_loss += float(loss)
    test_loss /= len(test_loader)
    test_losses.append(test_loss)

    epoch_end_time = time.time()
    epoch_time = epoch_end_time - epoch_start_time

    print('Epoch %d completed | Training Loss: %.6f | Test Loss: %.6f | Epoch Time: %.4f seconds' % (epoch + 1, train_loss, test_loss, epoch_time))
    if test_loss < best_loss:
        torch.save(model.state_dict(), BEST_MODEL_PATH)
        best_loss = test_loss
        best_epoch = epoch
        best_train_loss = train_loss
        best_test_loss = test_loss
print('Best model saved at epoch %d with Train Loss = %f and Test Loss = %f' % (best_epoch, best_train_loss, best_test_loss))

with open('losses.json', 'w') as f:
    json.dump({'train_losses': train_losses, 'test_losses': test_losses}, f) 
    
with open('scales.json', 'w') as f:
    json.dump({'scales': scale_values}, f) 

with open('gradient_norms.json', 'w') as f:
    json.dump({'gradient_norms': gradient_norms}, f) 
        

plt.plot(range(1, NUM_EPOCHS + 1), train_losses, label='Train Loss')
plt.plot(range(1, NUM_EPOCHS + 1), test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Test Loss')
plt.legend()
plt.ylim(0, 0.1)
plt.yticks([i * 0.005 for i in range(21)])
plt.savefig('rdnet18-ca.png')
plt.show()