# Collision Avoidance - Live Demo

In this notebook we'll use the model we trained to detect whether the robot is ``free`` or ``blocked`` to enable a collision avoidance behavior on the robot.  


#### [realImg ~> unet ~> robot control]

## Load the trained model

We'll assumed that you've already downloaded the ``best_model.pth`` to your workstation as instructed in the training notebook.  Now, you should upload this model into this notebook's
directory by using the Jupyter Lab upload tool.  Once that's finished there should be a file named ``best_model.pth`` in this notebook's directory.  

> Please make sure the file has uploaded fully before calling the next cell

Execute the code below to initialize the PyTorch model.  This should look very familiar from the training notebook.

In [1]:
# import torch
# import torchvision

# model = torchvision.models.alexnet(pretrained=False)
# model.classifier[6] = torch.nn.Linear(model.classifier[6].in_features, 2)

Next, load the trained weights from the ``best_model.pth`` file that you uploaded

In [13]:
# model.load_state_dict(torch.load('best_model.pth'))

<All keys matched successfully>

Currently, the model weights are located on the CPU memory execute the code below to transfer to the GPU device.

In [14]:
# device = torch.device('cuda')
# model = model.to(device)

### Create the preprocessing function

We have now loaded our model, but there's a slight issue.  The format that we trained our model doesnt *exactly* match the format of the camera.  To do that, 
we need to do some *preprocessing*.  This involves the following steps

1. Convert from BGR to RGB
2. Convert from HWC layout to CHW layout
3. Normalize using same parameters as we did during training (our camera provides values in [0, 255] range and training loaded images in [0, 1] range so we need to scale by 255.0
4. Transfer the data from CPU memory to GPU memory
5. Add a batch dimension

In [2]:
import os
import argparse
import sys
import time
import signal
import cv2
import numpy as np

import torch.nn.functional as F
import torchvision.transforms.functional as TF
import torchvision
import torch
from torchvision import transforms
from PIL import Image

from unet.unet import UNet
# from jetbotSim import Robot, Env
# from rl import dqn, models


In [3]:
def preprocess(state, img_size, th=0.4):
    # x = state
    # state = cv2.cvtColor(state, cv2.COLOR_BGR2RGB)
    state = np.array(Image.fromarray(state).resize(img_size, Image.BILINEAR))
    state = state.astype(float) / 255.
    state = state.transpose(2,0,1)
    return state

def step(action):
    global robot, dist
    if action == 0:
        robot.set_motors(0.03, 0.03)
    elif action == 1:
        robot.set_motors(0.2, 0.)
    elif action == 2:
        robot.set_motors(0., 0.2)
    elif action == 3:
        robot.stop()
    elif action == 4:
        robot.reset()
    elif action == 99:
        Kp = 0.1
        left_wheel = 0.18
        right_wheel = 0.17
        forward_Kp = 0.7
        range_ = 5
        if dist > 100:
            dist = 80
        elif abs(dist) < 10:
            dist *= 1.1

#         print (dist)
        if dist < -range_:  # right
            # robot.set_motor(0.2, 0.2 - (blocked_right/100 + dist/1000))
#             robot.right(Kp * abs(dist))
            robot.set_motors(left_wheel*(Kp * abs(dist)), 0.05*(Kp * abs(dist)))
            print("right")
        elif dist > range_: # left
            # robot.set_motor(0.2 - 0.2*(blocked_right/100 + dist/1000), 0.2)
#             robot.left(Kp * abs(dist))
            robot.set_motors(0.05*(Kp * abs(dist)), right_wheel*(Kp * abs(dist)))
            print("left")
        else:
            robot.set_motors(left_wheel*forward_Kp, right_wheel*forward_Kp)
            print("straight")

In [4]:


def predict_img(full_img, out_threshold=0.5):
    global transform, device, net
    r, g, b = full_img.split()
    img = Image.merge("RGB", (b, g, r))
    img = transform(img)
    img = img.unsqueeze(0).to(device=device)
    with torch.no_grad():
        output = net(img).cpu()
        output = F.interpolate(output, (full_img.size[1], full_img.size[0]), mode='bilinear')
        if net.n_classes > 1:
            mask = output.argmax(dim=1)
        else:
            mask = torch.sigmoid(output) > out_threshold

    return mask[0].long().squeeze().numpy()

def process_unet_img(image_bgr):
    global dist, img_size, ISPATH
    # Convert BGR to RGB
    image_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)
    # Create a PIL Image object from the RGB image
    ori_img = Image.fromarray(image_rgb)

    mask = predict_img(ori_img)
    # classes = mask.max() + 1

    path_mask = np.array(mask==1)

    # Initialize the mask array with all white pixels and set black pixels where values are True
    path_mask = np.where(path_mask[:, :, None], [0, 0, 0], [255, 255, 255]).astype(np.uint8)

    img = cv2.resize(path_mask, img_size)
    hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)

#     print(img.shape)

    lower_black = np.array([0, 0, 0])
    upper_black = np.array([10, 10, 10])

    mask_road = cv2.inRange(hsv, lower_black, upper_black)
    fin_img = cv2.bitwise_and(img, img, mask=mask_road)

    # calculate next position
    h_Kp1 = 0.65
    h_Kp2 = 0.7
    v_Kp = 0.1
#     coord1 = cv2.findNonZero(mask_road[int(img_size[0]*h_Kp1):int(img_size[0]*h_Kp1) + 1, int(img_size[1]*v_Kp):int(img_size[1]-img_size[1]*v_Kp)])
#     coord2 = cv2.findNonZero(mask_road[int(img_size[0]*h_Kp2):int(img_size[0]*h_Kp2) + 1, int(img_size[1]*v_Kp):int(img_size[1]-img_size[1]*v_Kp)])
    coord1 = cv2.findNonZero(mask_road[int(img_size[0]*h_Kp1):int(img_size[0]*h_Kp1) + 1])
    coord2 = cv2.findNonZero(mask_road[int(img_size[0]*h_Kp2):int(img_size[0]*h_Kp2) + 1])

    if coord1 is not None and coord2 is not None:
        left1 = np.min(coord1, axis=0)
        right1 = np.max(coord1, axis=0)
        left2 = np.min(coord2, axis=0)
        right2 = np.max(coord2, axis=0)
        # print(coord.shape)
        # print(coord[:5, :])
        line_mean1 = img_size[0] / 2
        line_mean1 = int(np.mean([left1[0][0], right1[0][0]]))
        line_mean2 = img_size[0] / 2
        line_mean2 = int(np.mean([left2[0][0], right2[0][0]]))

        line_mean3 = int(np.mean([line_mean1, line_mean2]))
        
        # print(left1, right1)
        # print(left2, right2)
        dist = img_size[0] / 2 - line_mean3
        ISPATH = True
    else:
        dist = img_size[0] / 2
        ISPATH = False
    
    print("coords", dist)

    return dist

In [5]:
# import cv2
# import numpy as np

# mean = 255.0 * np.array([0.485, 0.456, 0.406])
# stdev = 255.0 * np.array([0.229, 0.224, 0.225])

# normalize = torchvision.transforms.Normalize(mean, stdev)

# def preprocess(camera_value):
#     global device, normalize
#     x = camera_value
#     x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)
#     x = x.transpose((2, 0, 1))
#     x = torch.from_numpy(x).float()
#     x = normalize(x)
#     x = x.to(device)
#     x = x[None, ...]
#     return x

In [6]:
mean = 255.0 * np.array([0.485, 0.456, 0.406])
stdev = 255.0 * np.array([0.229, 0.224, 0.225])
normalize = torchvision.transforms.Normalize(mean, stdev)

qnet_model_dir = "./rl_checkpoint/"
qnet_model_name = "qnet_200000_jetbot.pt"
# model_path = "./unet/checkpoints/checkpoint_epoch4_jetbot.pth"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

unet_model_path = "./unet/checkpoints2/checkpoint_epoch5_64_jetbot.pth"


frames = 0
total_reward = 0
stack_frames = 3
img_size = (64, 64)

recent_distances = []  # List to store recent distance measurements
MAX_HISTORY = 4
dist = 0
ISPATH = False

transform = transforms.Compose([
                transforms.Resize(img_size),
                transforms.ToTensor()
            ])

In [7]:
def initialize_all(model_dir, model_name, unet_model_path):
  global total_reward, stack_frames, img_size, device
  print(f"Device: {device}")

  # Load UNET
  net = UNet(n_channels=3, n_classes=3, bilinear=False)
  net.to(device=device)
  state_dict = torch.load(unet_model_path, map_location=device)
  mask_values = state_dict.pop('mask_values', [0, 1, 2])
  net.load_state_dict(state_dict)
  net.eval()

  # Load QNET
  # agent = dqn.DeepQNetwork(
  #     n_actions = 3,
  #     input_shape = [(stack_frames)*3, *img_size],
  #     qnet = models.QNet,
  #     device = device,
  #     learning_rate = 2e-4, 
  #     reward_decay = 0.98,
  #     replace_target_iter = 1000, 
  #     memory_size = 10000,
  #     batch_size = 32,
  # )
  # agent.save_load_model("load", path=model_dir, fname=model_name)
  
  # return net, agent
  return net


In [8]:

net = None

net = initialize_all(qnet_model_dir, qnet_model_name, unet_model_path)

Device: cuda


Next, we'll create a function that will get called whenever the camera's value changes.  This function will do the following steps

1. Pre-process the camera image
2. Execute the neural network
3. While the neural network output indicates we're blocked, we'll turn left, otherwise we go forward.

Great! We've now defined our pre-processing function which can convert images from the camera format to the neural network input format.

Now, let's start and display our camera.  You should be pretty familiar with this by now.  We'll also create a slider that will display the
probability that the robot is blocked.

In [9]:
import traitlets
from IPython.display import display
import ipywidgets.widgets as widgets
from jetbot import Camera, bgr8_to_jpeg

camera = Camera.instance(width=224, height=224)
image = widgets.Image(format='jpeg', width=224, height=224)
blocked_slider = widgets.FloatSlider(description='blocked', min=0.0, max=1.0, orientation='vertical')

camera_link = traitlets.dlink((camera, 'value'), (image, 'value'), transform=bgr8_to_jpeg)

display(widgets.HBox([image, blocked_slider]))

HBox(children=(Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C…

We'll also create our robot instance which we'll need to drive the motors.

In [10]:
from jetbot import Robot

robot = Robot()

In [11]:
update({'new': camera.value})  # we call the function once to intialize


NameError: name 'update' is not defined

Cool! We've created our neural network execution function, but now we need to attach it to the camera for processing. 

We accomplish that with the ``observe`` function.

> WARNING: This code will move the robot!! Please make sure your robot has clearance.  The collision avoidance should work, but the neural
> network is only as good as the data it's trained on!

In [13]:
camera.observe(update, names='value')  # this attaches the 'update' function to the 'value' traitlet of our camera

  "See the documentation of nn.Upsample for details.".format(mode))


coords 1.0
straight
coords 1.0
straight
coords 1.0
straight
coords 1.0
straight
coords 1.0
straight
coords 1.0
straight
coords 1.0
straight
coords 1.0
straight
coords 1.0
straight
coords 1.0
straight
coords 1.0
straight
coords 1.0
straight
coords 1.0
straight
coords 1.0
straight
coords -1.0
straight
coords -1.0
straight
coords -12.0
right
coords -19.0
right
coords 1.0
straight
coords 1.0
straight
coords 1.0
straight
coords 1.0
straight
coords 1.0
straight
coords 1.0
straight
coords 1.0
straight
coords 1.0
straight
coords 2.0
straight
coords 2.0
straight
coords 6.0
left
coords 6.0
left
coords 4.0
straight
coords 4.0
straight
coords 6.0
left
coords 6.0
left
coords 3.0
straight
coords 3.0
straight
coords 3.0
straight
coords 3.0
straight
coords 2.0
straight
coords 2.0
straight
coords 3.0
straight
coords 3.0
straight
coords 4.0
straight
coords 4.0
straight
coords 5.0
left
coords 5.0
left
coords 3.0
straight
coords 3.0
straight
coords 4.0
straight
coords 4.0
straight
coords 5.0
left
coords 5

Awesome! If your robot is plugged in it should now be generating new commands with each new camera frame.  Perhaps start by placing your robot on the ground and seeing what it does when it reaches an obstacle.

If you want to stop this behavior, you can unattach this callback by executing the code below.

In [13]:
import time

camera.unobserve(update, names='value')

time.sleep(0.1)  # add a small sleep to make sure frames have finished processing

robot.stop()

In [14]:
robot.stop()

Perhaps you want the robot to run without streaming video to the browser.  You can unlink the camera as below.

In [15]:
camera_link.unlink()  # don't stream to browser (will still run camera)

To continue streaming call the following.

In [None]:
camera_link.link()  # stream to browser (wont run camera)

### Conclusion

That's it for this live demo!  Hopefully you had some fun and your robot avoided collisions intelligently! 

If your robot wasn't avoiding collisions very well, try to spot where it fails.  The beauty is that we can collect more data for these failure scenarios
and the robot should get even better :)