# Assessment 2: NVIDIA Jetbot Path Following and Pest Detection
### Intro
This notebook is to be the record of completion for Assessment 2: Real-World Application of Machine Learning and Computer Vision.
### Scenario
Building on the previous pest identification project, program a bot that can both follow a set course and respond when specific pests are located.

In [None]:
# Pre-setup
%pip install -Uqq ipywidgets
%pip install -Uqq torch torchvision torchaudio

## Path Following
#### Data Collection
Setting up the JetBot to follow paths was done in two stages. The following section is the code and widgets I ran on the JetBot to gather the images required for path following.

In [None]:
## RUN ON JETBOT ##
# Imports for data collection
import ipywidgets
import traitlets
import ipywidgets.widgets as widgets
from IPython.display import display
from jetbot import Robot, Camera, bgr8_to_jpeg
from uuid import uuid1
import os
import json
import glob
import datetime
import numpy as np
import cv2
import time
from jupyter_clickable_image_widget import ClickableImageWidget

DATASET_DIR = 'dataset_xy'

if not DATASET_DIR.exists():
    os.makedirs(DATASET_DIR)

camera = Camera()

camera_widget = ClickableImageWidget(width=camera.width, height=camera.height)
snapshot_widget = ipywidgets.Image(width=camera.width, height=camera.height)
traitlets.dlink((camera, 'value'), (camera_widget, 'value'), transform=bgr8_to_jpeg)
count_widget = ipywidgets.IntText(description='count')
count_widget.value = len(glob.glob(os.path.join(DATASET_DIR, '*.jpg')))

def save_snapshot(_, content, msg):
    if content['event'] == 'click':
        data = content['eventData']
        x = data['offsetX']
        y = data['offsetY']        
        uuid = 'xy_%03d_%03d_%s' % (x, y, uuid1())
        image_path = os.path.join(DATASET_DIR, uuid + '.jpg')
        with open(image_path, 'wb') as f:
            f.write(camera_widget.value)
        
        snapshot = camera.value.copy()
        snapshot = cv2.circle(snapshot, (x, y), 8, (0, 255, 0), 3)
        snapshot_widget.value = bgr8_to_jpeg(snapshot)
        count_widget.value = len(glob.glob(os.path.join(DATASET_DIR, '*.jpg')))
        
camera_widget.on_msg(save_snapshot)

data_collection_widget = ipywidgets.VBox([
    ipywidgets.HBox([camera_widget, snapshot_widget]),
    count_widget
])

display(data_collection_widget)

The next block stops the camera once data collection is done, then saves the images to a zip file so it can be exported to a more powerful machine to train and refine the model.

In [None]:
## RUN ON JETBOT ##
camera.stop()
!zip -r -q path_following_dataset.zip {DATASET_DIR}

#### Training Path Following
The following code was used to train the model based on the images collected, to teach the bot how to follow the path set out. I used resnet18 for the path following model due to its ubiquity and ease of use, with 15 training epochs for quick and consistent results. This model was trained on CPU rather than the faster GPU, so reducing the epochs substantially increased my ability to make updates to the model and rebuild within a reasonable timeframe.

In [None]:
# Run first to unzip dataset
!unzip -q path_following_dataset.zip

In [None]:
# Imports for training model
import torch
import torch.optim as optim
import torch.nn.functional as F
import torchvision
import torchvision.datasets as datasets
import torchvision.models as models
import torchvision.transforms as transforms
import glob
import PIL.Image
import os
import numpy as np

def get_x(path, width):
    return (float(int(path.split("_")[1])) - width/2) / (width/2)

def get_y(path, height):
    return (float(int(path.split("_")[2])) - height/2) / (height/2)

class XYDataset(torch.utils.data.Dataset):    
    def __init__(self, directory, random_hflips=False):
        self.directory = directory
        self.random_hflips = random_hflips
        self.image_paths = glob.glob(os.path.join(self.directory, '*.jpg'))
        self.color_jitter = transforms.ColorJitter(0.3, 0.3, 0.3, 0.3)
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        image_path = self.image_paths[idx]        
        image = PIL.Image.open(image_path)
        width, height = image.size
        x = float(get_x(os.path.basename(image_path), width))
        y = float(get_y(os.path.basename(image_path), height))
      
        # apply transforms to some images to make the dataset more resilient
        if float(np.random.rand(1)) > 0.5:
            image = transforms.functional.hflip(image)
            x = -x        
        image = self.color_jitter(image)
        image = transforms.functional.resize(image, (224, 224))
        image = transforms.functional.to_tensor(image)
        image = image.numpy()[::-1].copy()
        image = torch.from_numpy(image)
        image = transforms.functional.normalize(image, [0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        
        return image, torch.tensor([x, y]).float()
    
dataset = XYDataset('dataset_xy') # path following training images dataset

test_percent = 0.1 # use 10% of images for verification
num_test = int(test_percent * len(dataset))
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [len(dataset) - num_test, num_test])

train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=8,
    shuffle=True,
    num_workers=0
)
test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=8,
    shuffle=True,
    num_workers=0
)

model = models.resnet18(pretrained=True) # using resnet18
model.fc = torch.nn.Linear(512, 2)
device = torch.device('cpu')
model = model.to(device)
NUM_EPOCHS = 15 # uses 15 epochs to refine model
BEST_MODEL_PATH = 'path_following.pth'
best_loss = 1e9
optimizer = optim.Adam(model.parameters())

for epoch in range(NUM_EPOCHS):    
    model.train()
    train_loss = 0.0
    for images, labels in iter(train_loader):
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = F.mse_loss(outputs, labels)
        train_loss += float(loss)
        loss.backward()
        optimizer.step()
    train_loss /= len(train_loader)    
    model.eval()
    test_loss = 0.0
    for images, labels in iter(test_loader):
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        loss = F.mse_loss(outputs, labels)
        test_loss += float(loss)
    test_loss /= len(test_loader)
    
    print('Training loss: %f, Testing loss: %f' % (train_loss, test_loss))
    if test_loss < best_loss:
        torch.save(model.state_dict(), BEST_MODEL_PATH) # export the best model as a .pth file
        best_loss = test_loss

## Adding Pest Detection
#### Combining Path Following and Bug Identification
To combine both models, I set up two folders, one labeled 'free' with images collected when training the path following, and one labeled 'blocked' with images of insects from the bug finding project. The new model combines both of these datasets to allow the JetBot to follow the path and check for insects, and will stop when it detects an insect matching one it was trained on. The following code is importing the previously trained models to set up this new model.

In [None]:
# Importing models for path following and bug detection
path_model = torchvision.models.resnet18(pretrained=False)
path_model.fc = torch.nn.Linear(512, 2)
path_model.load_state_dict(torch.load('path_following.pth'))

bug_model = torchvision.models.alexnet(pretrained=False)
bug_model.classifier[6] = torch.nn.Linear(bug_model.classifier[6].in_features, 2)
bug_model.load_state_dict(torch.load('bug_finder.pth'))

device = torch.device('cuda')
path_model = path_model.to(device)
bug_model = bug_model.to(device)

The following segment of code was run on the JetBot to refine the parameters used by testing impacts of changing them in real time. The first section of code is setting up the widgets to control the JetBot and update its parameters on the fly, and the second attaches the controls to the JetBot itself. This allowed me to set optimal parameters to get the best results from the models I had.

I found that using a speed of 0.1 worked best as the JetBot moved along slow enough that it could analyse its path well and react to changes efficiently. The insect identification did not work well, so I found using a 'blocked' level of anything less than 25% meant the bot would never trigger that condition and never stop. This meant it did occasionally have false positives, like chair wheels being identified as bugs, but allowed the model to register actual positives also. Using a lengthy stop time (>20 frames) made it easier to identify when the insect identification model had worked, too.

In [None]:
## RUN ON JETBOT ##
from IPython.display import display
import ipywidgets
import traitlets
from jetbot import Robot, Camera, bgr8_to_jpeg
import torch.nn.functional as F
import cv2
import PIL.Image
import numpy as np
import time
import math

camera = Camera()
image_widget = ipywidgets.Image()
traitlets.dlink((camera, 'value'), (image_widget, 'value'), transform=bgr8_to_jpeg)
robot = Robot()
# path following sliders
speed_control_slider = ipywidgets.FloatSlider(min=0.0, max=0.2, step=0.05, description='Speed control')
steering_gain_slider = ipywidgets.FloatSlider(min=0.0, max=1.0, step=0.01, value=0.04, description='Steering gain')
steering_dgain_slider = ipywidgets.FloatSlider(min=0.0, max=0.5, step=0.001, value=0.0, description='Steering dgain')
steering_bias_slider = ipywidgets.FloatSlider(min=-0.3, max=0.3, step=0.01, value=0.0, description='Steering bias')
# bug identifying sliders
blocked_slider = ipywidgets.FloatSlider(min=0.0, max=1.0, orientation='horizontal', description='Blocked level')
stopduration_slider= ipywidgets.IntSlider(min=20, max=1000, step=20, value=40, description='Time to stop') 
blocked_threshold= ipywidgets.FloatSlider(min=0, max=1.0, step=0.05, value=0.25, description='Blocked threshold')
display(speed_control_slider, steering_gain_slider, steering_dgain_slider, steering_bias_slider)
display(image_widget)
display(ipywidgets.HBox([blocked_slider, blocked_threshold, stopduration_slider]))

In [None]:
## RUN ON JETBOT ##
angle = 0.0
angle_last = 0.0
count_stops = 0
go_on = 1
stop_time = 40 # The number of frames to remain stopped
x = 0.0
y = 0.0
speed_value = speed_control_slider.value
mean = torch.Tensor([0.485, 0.456, 0.406]).cuda().half()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda().half()
normalize = torchvision.transforms.Normalize(mean, std)

# functions for pre-processing images to be used by bot
def preprocess(image):
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device).half()
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

def preprocess_col(camera_value):
    global device, normalize
    x = camera_value
    x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)
    x = x.transpose((2, 0, 1))
    x = torch.from_numpy(x).float()
    x = normalize(x)
    x = x.to(device)
    x = x[None, ...]
    return x

def execute(change):
    global angle, angle_last, blocked_slider, robot, count_stops, stop_time, go_on, x, y, blocked_threshold
    global speed_value, steer_gain, steer_dgain, steer_bias                
    steer_gain = steering_gain_slider.value
    steer_dgain = steering_dgain_slider.value
    steer_bias = steering_bias_slider.value       
    image_preproc = preprocess(change['new']).to(device)
    image_preproc2 = preprocess_col(change['new']).to(device)
    
    prob_blocked = float(F.softmax(bug_model(image_preproc2), dim=1).flatten()[0])    
    blocked_slider.value = prob_blocked    
    stop_time=stopduration_slider.value
    
    if go_on == 1:    
        if prob_blocked > blocked_threshold.value: # if insect detected by camera
            count_stops += 1
            go_on = 2
        else: # if no insect found, follow path
            go_on = 1
            count_stops = 0
            xy = path_model(image_preproc2).detach().float().cpu().numpy().flatten()        
            x = xy[0]            
            y = (0.5 - xy[1]) / 2.0
            speed_value = speed_control_slider.value
    else:
        count_stops += 1
        if count_stops < stop_time:
            x = 0.0
            y = 0.0
            speed_value = 0 
        else:
            go_on = 1
            count_stops = 0
                
    angle = math.atan2(x, y)        
    pid = angle * steer_gain + (angle - angle_last) * steer_dgain
    steer_val = pid + steer_bias 
    angle_last = angle
    robot.left_motor.value = max(min(speed_value + steer_val, 1.0), 0.0)
    robot.right_motor.value = max(min(speed_value - steer_val, 1.0), 0.0) 
execute({'new': camera.value})

In [None]:
# Run bot using model - run when previous cells are done and you are ready to start the bot
camera.observe(execute, names='value')

In [None]:
# Stop bot - run when testing is complete
camera.unobserve(execute, names='value')
time.sleep(0.1)  # add a small sleep to make sure frames have finished processing
robot.stop()

# Video Demonstration:

<video controls src="BugPath.mp4" />

# Critical Evaluation:
The way this model was trained - on a select number of stock images of insects sourced from the internet - very likely hindered its performance in testing. This model was quite bad at identifying images of insects unless they were very close to its camera (taking up most of the screen), or if the bot could not see the line it was supposed to be following. The line-following portion worked very well, the bot was able to follow the line fairly reliably, and was usually able to find its way back to the line if it did drift off-course. This may be partly because the model had many images to reference with path following, and only a fraction of that number of images of the type of insect we used to test its effectiveness. 


With the JetBot I was using, there was no cooling fan or active cooling on the bot, so I was only able to test with it for short durations. The addition of a cooling fan would allow more reliable testing, and likely also improve inference speed. Replacing the JetBot with more powerful hardware would also be likely to increase inference speed.


For future improvements, a greater number of insect images in a greater variety of settings and distances would improve the pest detection accuracy. Better hardware and better cooling would improve inference speed. A broader model trained not only on a handful of insect types but a range of different insects would be more generalizable and could be used in more scenarios.