In [None]:
import torch
import torch.optim as optim
import torch.nn.functional as F
import torchvision
import torchvision.datasets as datasets
import torchvision.models as models
import torchvision.transforms as transforms
import glob
import PIL.Image
import os
import numpy as np

In [None]:
!unzip -q lego_city_dataset_right_2019-03-15_13-23-46.zip

In [None]:
NBINS = 21

In [None]:
def get_steering(path):
    return (float(int(path[9:12])) - 50.0) / 50.0

def get_gaussian(mean, stdev, nbins):
    x = np.linspace(-1.0, 1.0, num=nbins)
    y = np.exp(-(x - mean)**2 / (stdev**2))
    return y

def get_bin(value, nbins):
    return np.digitize(value, np.linspace(-1.0, 1.0, num=nbins))

class SteeringDataset(torch.utils.data.Dataset):
    
    def __init__(self, directory, stdev=0.3, nbins=21):
        self.nbins = nbins
        self.stdev = stdev
        self.directory = directory
        self.image_paths = glob.glob(os.path.join(directory, '*.jpg'))
        self.color_jitter = transforms.ColorJitter(0.1, 0.1, 0.1, 0.1)
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = PIL.Image.open(image_path)
        
        image = self.color_jitter(image)
        image = transforms.functional.resize(image, (224, 224))
        image = transforms.functional.to_tensor(image)
        image = transforms.functional.normalize(image, [0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        
        steering = float(get_steering(os.path.basename(image_path)))
        #target = get_gaussian(steering, self.stdev, self.nbins)
        #target = get_bin(steering, self.nbins)
        
        return image, torch.tensor([steering]).float()
    
    
class MergedSteeringDataset(torch.utils.data.Dataset):
    
    def __init__(self, all_dir, task_dir):
        self.all_dir = all_dir
        self.task_dir = task_dir
        self.all_image_paths = glob.glob(os.path.join(self.all_dir, '*.jpg'))
        self.task_image_paths = glob.glob(os.path.join(self.task_dir, '*.jpg'))
        self.color_jitter = transforms.ColorJitter(0.1, 0.1, 0.1, 0.1)
    
    def __len__(self):
        return len(self.all_image_paths) + len(self.task_image_paths)
    
    def __getitem__(self, idx):
        if idx < len(self.all_image_paths):
            image_path = self.all_image_paths[idx]
        else:
            image_path = self.task_image_paths[idx - len(self.all_image_paths)]
        
        image = PIL.Image.open(image_path)
        steering = float(get_steering(os.path.basename(image_path)))
        
        image = self.color_jitter(image)
        image = transforms.functional.resize(image, (224, 224))
        image = transforms.functional.to_tensor(image)
        image = transforms.functional.normalize(image, [0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        
        #target = get_gaussian(steering, self.stdev, self.nbins)
        #target = get_bin(steering, self.nbins)
        
        return image, torch.tensor([steering]).float()
    

In [None]:
dataset = MergedSteeringDataset('dataset_all', 'dataset_left')

test_percent = 0.1
num_test = int(test_percent * len(dataset))
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [len(dataset) - num_test, num_test])

train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=64,
    shuffle=True,
    num_workers=4
)

test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=64,
    shuffle=True,
    num_workers=4
)

In [None]:
model = models.resnet18(pretrained=True)
model.fc = torch.nn.Linear(512, 1)
device = torch.device('cuda')
model = model.to(device)

# train regression:

In [None]:
NUM_EPOCHS = 60
BEST_MODEL_PATH = 'best_steering_model_left_merged.pth'
best_loss = 1e9

#optimizer = optim.SGD(model.parameters(), lr=0.0001, momentum=0.9)
optimizer = optim.Adam(model.parameters())

for epoch in range(NUM_EPOCHS):
    
    model.train()
    train_loss = 0.0
    for images, labels in iter(train_loader):
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = F.mse_loss(outputs, labels)
        train_loss += loss
        loss.backward()
        optimizer.step()
    train_loss /= len(train_loader)
    
    model.eval()
    test_loss = 0.0
    for images, labels in iter(test_loader):
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        loss = F.mse_loss(outputs, labels)
        test_loss += loss
    test_loss /= len(test_loader)
    
    print('%f, %f' % (train_loss, test_loss))
    if test_loss < best_loss:
        torch.save(model.state_dict(), BEST_MODEL_PATH)
        best_loss = test_loss

In [None]:
torch.save(model.state_dict(), BEST_MODEL_PATH)