# Road Follower - Train Model

In this notebook we will train a neural network to take an input image, and output a set of x, y values corresponding to a target.

We will be using PyTorch deep learning framework to train ResNet18 neural network architecture model for road follower application.

In [1]:
import torch
import torch.optim as optim
import torch.nn.functional as F
import torchvision
print(torchvision.__version__)
import torchvision.datasets as datasets
import torchvision.models as models
import torch.nn as nn
import torchvision.transforms as transforms
import glob
import PIL.Image
import os
import numpy as np

0.8.0a0+45f960c


### Download and extract data

Before you start, you should upload the ``road_following_<Date&Time>.zip`` file that you created in the ``data_collection.ipynb`` notebook on the robot. 

> If you're training on the JetBot you collected data on, you can skip this!

You should then extract this dataset by calling the command below:

In [2]:
 !zip -q road_following_dataset_xy_2024-04-05_22-42-38.zip

### Create Dataset Instance

Here we create a custom ``torch.utils.data.Dataset`` implementation, which implements the ``__len__`` and ``__getitem__`` functions.  This class
is responsible for loading images and parsing the x, y values from the image filenames.  Because we implement the ``torch.utils.data.Dataset`` class,
we can use all of the torch data utilities :)

We hard coded some transformations (like color jitter) into our dataset.  We made random horizontal flips optional (in case you want to follow a non-symmetric path, like a road
where we need to 'stay right').  If it doesn't matter whether your robot follows some convention, you could enable flips to augment the dataset.

In [3]:
def get_x(path, width):
    """Gets the x value from the image filename"""
    return (float(int(path.split("_")[1])) - width/2) / (width/2)

def get_y(path, height):
    """Gets the y value from the image filename"""
    return (float(int(path.split("_")[2])) - height/2) / (height/2)

# CROPS THE DATASET IMAGES BY CHOPPING OFF TOP FEW ROWS SO JETBOT DOES NOT SEE BACKGROUND    
def crop_images_in_directory(directory, output_suffix='_cropped'):
    # Define the crop box dimensions
    box = (0, 224 // 2.75, 224, 224 // 2 + 112)
    
    # Iterate over all jpg files in the specified directory
    for filename in os.listdir(directory):
        if filename.endswith('.jpg'):
            # Construct the full file path
            file_path = os.path.join(directory, filename)
            # Open the image file
            img = PIL.Image.open(file_path)
            # Crop the image
            cropped_img = img.crop(box)
            # Define the output file path
            base, ext = os.path.splitext(filename)
            output_filename = f'{base}{output_suffix}{ext}'
            output_path = os.path.join(directory, output_filename)
            # Save the cropped image
            cropped_img.save(output_path)
            print(f'Cropped image saved to {output_path}')

class XYDataset(torch.utils.data.Dataset):
    
    def __init__(self, directory, random_hflips=False):
        self.directory = directory
        self.random_hflips = random_hflips
        self.image_paths = glob.glob(os.path.join(self.directory, '*.jpg'))
        self.color_jitter = transforms.ColorJitter(0.3, 0.3, 0.3, 0.3)
    
    def __len__(self):
        return len(self.image_paths)

    
    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        
        image = PIL.Image.open(image_path)
        width, height = image.size
        x = float(get_x(os.path.basename(image_path), width))
        y = float(get_y(os.path.basename(image_path), height))
      
        if float(np.random.rand(1)) > 0.5:
            image = transforms.functional.hflip(image)
            x = -x
        
        image = self.color_jitter(image)
        image = transforms.functional.resize(image, (224, 224))
        image = transforms.functional.to_tensor(image)
        image = image.numpy()[::-1].copy()
        image = torch.from_numpy(image)
        image = transforms.functional.normalize(image, [0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        
        return image, torch.tensor([x, y]).float()
    
dataset = XYDataset('dataset_xy', random_hflips=False)
# crop all images in dataset before training to remove background city
# crop_images_in_directory('dataset_xy', output_suffix='')

In [4]:
# path = 'dataset_xy/xy_097_110_8d3e36cc-f915-11ee-8503-7c76354dde24.jpg'
# img = PIL.Image.open(path)
# box = (0, 224 // 2.75, 224, 224 // 2 + 112) # crop height by 63.8%
# img = img.crop(box)
# print(img)
# img.save('dataset_xy/img4.jpg')


### Split dataset into train and test sets
Once we read dataset, we will split data set in train and test sets. In this example we split train and test a 90%-10%. The test set will be used to verify the accuracy of the model we train.

In [5]:
test_percent = 0.1
num_test = int(test_percent * len(dataset))
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [len(dataset) - num_test, num_test])

### Create data loaders to load data in batches

We use ``DataLoader`` class to load data in batches, shuffle data and allow using multi-subprocesses. In this example we use batch size of 64. Batch size will be based on memory available with your GPU and it can impact accuracy of the model.

In [6]:
train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=8,
    shuffle=True,
    num_workers=0
)

test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=8,
    shuffle=True,
    num_workers=0
)

### Define Neural Network Model 

We use ResNet-18 model available on PyTorch TorchVision. 

In a process called transfer learning, we can repurpose a pre-trained model (trained on millions of images) for a new task that has possibly much less data available.


More details on ResNet-18 : https://github.com/pytorch/vision/blob/master/torchvision/models/resnet.py

More Details on Transfer Learning: https://www.youtube.com/watch?v=yofjFQddwHE 

In [7]:
# Mobile Net ###
# Load the MobileNet model pre-trained on ImageNet
# model = models.mobilenet_v2(pretrained=True)
# model.classifier[1] = nn.Linear(1280, 2)

### AlexNet ###
# model = models.alexnet(pretrained=True)
# model.classifier[6] = nn.Linear(4096, 2)

### Squeeze Net ###
#model = models.squeezenet1_0(pretrained=True)


### VGG ###
model = models.vgg16(pretrained=True)
model.classifier[6] = nn.Linear(4096, 2)

ResNet model has fully connect (fc) final layer with 512 as ``in_features`` and we will be training for regression thus ``out_features`` as 1

Finally, we transfer our model for execution on the GPU

In [8]:
print(model)

VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

In [9]:
# model.fc = torch.nn.Linear(512, 2)
# Since MobileNet does not use the same output features as ResNet, adjust the classifier
# MobileNet v3 Small ends with a last_channel dimension (1024 default)
# model.classifier[1] = nn.Conv2d(512, 2, kernel_size=(1,1), stride=(1,1)) 
# model = torchvision.models.resnet18(pretrained=True)
# model.fc = torch.nn.Linear(512, 2)
# model.classifier[1] = nn.Linear(1280, 2)
device = torch.device('cuda')
model = model.to(device)

### Train Regression:

We train for 50 epochs and save best model if the loss is reduced. 

In [None]:
NUM_EPOCHS = 7
BEST_MODEL_PATH = 'vgg_steering_model_xy.pth'
best_loss = 1e9

optimizer = optim.Adam(model.parameters())
print(len(train_loader))
print(train_loader)
for epoch in range(NUM_EPOCHS):
    model.train()
    train_loss = 0.0
    for images, labels in iter(train_loader):
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = F.mse_loss(outputs, labels)
        train_loss += float(loss)
        loss.backward()
        optimizer.step()
    train_loss /= len(train_loader)

    model.eval()
    test_loss = 0.0
    for images, labels in iter(test_loader):
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        loss = F.mse_loss(outputs, labels)
        test_loss += float(loss)
    test_loss /= len(test_loader)

    print(f'Epoch {epoch}: Train Loss: {train_loss}, Test Loss: {test_loss}')
    if test_loss < best_loss:
        torch.save(model.state_dict(), BEST_MODEL_PATH)
        best_loss = test_loss


46
<torch.utils.data.dataloader.DataLoader object at 0x7f474b75c0>


Once the model is trained, it will generate ``best_steering_model_xy.pth`` file which you can use for inferencing in the live demo notebook.

If you trained on a different machine other than JetBot, you'll need to upload this to the JetBot to the ``road_following`` example folder.