# Road Following - Live demo

In this notebook, we will use model we trained to move jetBot smoothly on track. 

### Load Trained Model

We will assume that you have already downloaded ``best_steering_model_xy.pth`` to work station as instructed in "train_model.ipynb" notebook. Now, you should upload model file to JetBot in to this notebook's directory. Once that's finished there should be a file named ``best_steering_model_xy.pth`` in this notebook's directory.

> Please make sure the file has uploaded fully before calling the next cell

Execute the code below to initialize the PyTorch model. This should look very familiar from the training notebook.

In [121]:
def mode(array):
    vals,counts = np.unique(array, return_counts=True)
    index = np.argmax(counts)
    return vals[index]

In [1]:
import torchvision
import torch

model_steer = torchvision.models.resnet18(pretrained=False)
model_steer.fc = torch.nn.Linear(512, 2)
model_cls = torchvision.models.resnet18(pretrained=False)
model_cls.fc = torch.nn.Sequential(torch.nn.Linear(512, 4))

Next, load the trained weights from the ``best_steering_model_xy.pth`` file that you uploaded.

In [2]:
# model_steer.load_state_dict(torch.load('best_steering_model_xy.pth'))
# model_turn.load_state_dict(torch.load('best_turning_model_xy.pth'))
model_steer.load_state_dict(torch.load('best_combined_final_model_xy.pth'))
model_cls.load_state_dict(torch.load('best_classify_model_xy.pth'))

<All keys matched successfully>

Currently, the model weights are located on the CPU memory execute the code below to transfer to the GPU device.

In [3]:
device = torch.device('cuda')
model_steer = model_steer.to(device)
model_steer = model_steer.eval().half()
model_cls = model_cls.to(device)
model_cls = model_cls.eval().half()

In [4]:
model_steer

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

### Creating the Pre-Processing Function

We have now loaded our model, but there's a slight issue. The format that we trained our model doesn't exactly match the format of the camera. To do that, we need to do some preprocessing. This involves the following steps:

1. Convert from HWC layout to CHW layout
2. Normalize using same parameters as we did during training (our camera provides values in [0, 255] range and training loaded images in [0, 1] range so we need to scale by 255.0
3. Transfer the data from CPU memory to GPU memory
4. Add a batch dimension

In [5]:
mean = torch.Tensor([0.485, 0.456, 0.406]).cuda().half()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda().half()

In [6]:
std

tensor([0.2290, 0.2240, 0.2250], device='cuda:0', dtype=torch.float16)

In [7]:
import torchvision.transforms as transforms
import torch.nn.functional as F
import cv2
import PIL.Image
import numpy as np

mean = torch.Tensor([0.485, 0.456, 0.406]).cuda().half()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda().half()

def preprocess(image):
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device).half()
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

Awesome! We've now defined our pre-processing function which can convert images from the camera format to the neural network input format.

Now, let's start and display our camera. You should be pretty familiar with this by now. 

In [8]:
from IPython.display import display
import ipywidgets
import traitlets
from jetbot import Camera, bgr8_to_jpeg


camera = Camera()

In [9]:




image_widget = ipywidgets.Image()

traitlets.dlink((camera, 'value'), (image_widget, 'value'), transform=bgr8_to_jpeg)

display(image_widget)

Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C\x00\x02\x01\x0…

In [9]:
xy = model_steer(preprocess(camera.value)).detach().float().cpu().numpy().flatten()

In [10]:
xy

array([ 0.00466537, -0.33398438], dtype=float32)

In [11]:
np.sqrt(4)

2.0

In [12]:
theta = np.arccos(xy[0]/np.sqrt(xy[0]*xy[0] + (xy[1]+1)*(xy[1]+1)))

In [165]:
theta

1.540352181300057

In [167]:
pi

3.141592653589793

In [12]:
xy = model_steer(preprocess(camera.value)).detach().float().cpu().numpy().flatten()
theta = np.arccos(xy[0]/np.sqrt(xy[0]*xy[0] + ((-xy[1]+1)*(-xy[1]+1))))
print(xy, -(theta - (pi/2)))

[-0.06445312  0.18835449] -0.07924414518767997


In [103]:
np.arccos(xy[0]/np.sqrt(xy[0]*xy[0] + ((xy[1]+1)*(xy[1]+1))))\

1.4286007774658978

In [99]:
pi

3.141592653589793

In [191]:
np.cos(pi/2)

6.123233995736766e-17

In [231]:
(xy[1]+1)/xy[0]

18.97139303482587

In [28]:
xy = model_steer(preprocess(camera.value)).detach().float().cpu().numpy().flatten()
print(1/((xy[1]+1)/xy[0]))

-0.11754587155963302


In [244]:
pi/2

1.5707963267948966

In [73]:
cl = model_turn(preprocess(camera.value)).detach().float().cpu().numpy().flatten()

In [74]:
cl.argmax()

2

In [9]:
from jetbot import Robot
from math import pi
import time

In [10]:
robot = Robot()

In [11]:
robot.left_motor.value = 0.2
robot.right_motor.value = 0.2

In [12]:
robot.left_motor.value = 0.08
robot.right_motor.value = 0.08
time.sleep(1000/1000)
robot.stop()

In [13]:
robot.stop()

In [43]:
controller.stop()

In [13]:
class JetbotPIDControl:
    def __init__(self):
        # Initialize robot
        self.robot = Robot()
        
        # PID coefficients for steering (tune these values)
        self.kp = 0.15  # Proportional gain
        self.ki = 0.005  # Integral gain
        self.kd = 0.005  # Derivative gain
#         self.y_coeff = 0.2
        self.steering_threshold = 0.05
        
        # Speed settings
        self.base_speed = 0.08
        self.min_speed = 0.08
        self.max_speed = 0.09      
        # Error tracking for PID
        self.last_error = 0
        self.integral = 0
        self.last_time = time.time()
    
#     def jitter(self, l, r):
#         self.robot.stop()
#         time.sleep(400/1000)
#         self.robot.left_motor.value = 0.2*l
#         self.robot.right_motor.value = 0.2*r
#         time.sleep(40/1000)
#         self.robot.stop()
#         time.sleep(400/1000)
    
#     def right(self):
#         self.jitter(4,4)
#         self.robot.stop()
#         time.sleep(400/1000)
#         self.robot.left_motor.value = 0.2
#         self.robot.right_motor.value = -0.2
#         time.sleep(300/1000)
#         self.robot.stop()
#         time.sleep(400/1000)
    
    def left(self):
#         self.jitter(4,4)
        self.robot.stop()
        time.sleep(400/1000)
        self.robot.left_motor.value = -0.2
        self.robot.right_motor.value = 0.2
        time.sleep(500/1000)
        self.robot.stop()
        time.sleep(400/1000)
    
    def update_motors(self, x, y, cl, fast_mode):
        """
        Update motor speeds based on target point (x,y)
        x: [-1, 1] where negative is left, positive is right
        y: [-1, 1] where higher values mean faster speed
        """
        if (cl == 1) or (cl == 2):
            self.temp_stop()
            return None, None
        
        elif cl == 3:
            self.left()
            self.left()
            return None, None
        else:
            current_time = time.time()
            dt = current_time - self.last_time

            # Prevent division by zero or very small dt
            if dt < 0.001:
                dt = 0.001

            # Calculate steering with PID
            theta = np.arccos(x/np.sqrt(x*x + (-y+1)*(-y+1)))
            error = -(theta - (pi/2))
    #         if y < -0.5:
    #             error = x  # x is already the error from center
    #         else:
    #             if x > 0:
    #                 error = x + abs(y)*self.y_coeff
    #             else:
    #                 error = x + y*self.y_coeff

            # PID terms
            p_term = self.kp * error
            self.integral += error * dt
            i_term = self.ki * self.integral
            derivative = (error - self.last_error) / dt
            d_term = self.kd * derivative

            # Calculate steering value
            steering = p_term + i_term + d_term

            # Clamp steering to [-1, 1]
            steering = max(min(steering, 1.0), -1.0)

            # Calculate speed based on y value
            # Map y from [-1, 1] to [min_speed, max_speed]
            speed = self.min_speed + (self.max_speed - self.min_speed) * (-y + 1) / 2
    #         if y < 0.2:
    #             speed = self.min_speed + (self.max_speed - self.min_speed) * (y + 1) / 2
    #         else:
    #             speed = self.max_speed * 1.2

            # Calculate left and right motor speeds
            if abs(steering) < self.steering_threshold:
                left_speed = speed + steering
                right_speed = speed - steering
            else:
                left_speed = speed*1.5*steering/abs(steering)
                right_speed = -speed*1.5*steering/abs(steering)
    #         if left_speed < 0.06:
    #             left_speed = -(speed - steering)
    #         if right_speed < 0.06:
    #             right_speed = -(speed + steering)


            # Normalize speeds if they exceed [-1, 1]
            max_speed = max(abs(left_speed), abs(right_speed))
            if max_speed > 1.0:
                left_speed /= max_speed
                right_speed /= max_speed

            if fast_mode:
                self.robot.left_motor.value = float(left_speed*1.8)
                self.robot.right_motor.value = float(right_speed*1.8)
            # Update motors
            else:
                self.robot.left_motor.value = float(left_speed)
                self.robot.right_motor.value = float(right_speed)

            # Update error tracking
            self.last_error = error
            self.last_time = current_time
            return steering, speed
        
    def stop(self):
        """Stop the robot"""
        self.robot.stop()
        # Reset PID variables
        self.last_error = 0
        self.integral = 0
    def temp_stop(self):
        self.robot.stop()

In [14]:
model_cls(preprocess(camera.value)).detach().float().cpu().numpy().flatten().argmax()


0

In [15]:
from IPython.display import display, clear_output

In [16]:
sum(buffer)/n

NameError: name 'buffer' is not defined

In [17]:
controller = JetbotPIDControl()
from collections import deque
n = 20
buffer = deque([0.1]*n, maxlen = n)

while True: 
    fast_mode = False
    xy = model_steer(preprocess(camera.value)).detach().float().cpu().numpy().flatten()
    x = xy[0]
    y = xy[1]
    
    cl = model_cls(preprocess(camera.value)).detach().float().cpu().numpy().flatten()
    cl = cl.argmax()
    if cl == 3:
        cl = 0
    if abs(x) < 0.15 and y > 0.35 and abs(sum(buffer))/n < 0.015:
        controller.left()
        controller.left()
        continue
    
#     cum_cl = [cl]
#     if (cl != 0):
#         controller.jitter(1,1);
#         cum_cl += [cl]
#         controller.jitter(-1,1);
#         cum_cl += [cl]
#         controller.jitter(1,-1);
#         cum_cl += [cl]
#         controller.jitter(-1,-1);
#         cum_cl += [cl]
#     cl = mode(cum_cl)
#     controller.temp_stop()

    if abs(sum(buffer))/n < 0.0035 and y < -0.4:
        fast_mode = True
    else:
        fast_mode = False
        
    steering, speed = controller.update_motors(x, y, cl, fast_mode)
   
    clear_output(wait=True)
    if steering != None and speed != None:
        buffer.append(abs(steering))
        display(f'''steering: {steering:.2f}
                    Speed: {speed:.2f} 
                    x: {x:.2f} 
                    y: {y:.2f}
                    buffer: {sum(buffer)/n:.4f}
                    cl: {cl}
                    ''')#     break
                # Small delay to prevent CPU overload
    time.sleep(1/1000)

'steering: -0.12\n                    Speed: 0.08 \n                    x: -0.17 \n                    y: 0.03\n                    buffer: 0.1422\n                    cl: 0\n                    '

KeyboardInterrupt: 

In [336]:
controller.stop()

In [137]:
camera.stop()

In [None]:
preprocess(image_widget.value)

We'll also create our robot instance which we'll need to drive the motors.

In [None]:
from jetbot import Robot

robot = Robot()

Now, we will define sliders to control JetBot
> Note: We have initialize the slider values for best known configurations, however these might not work for your dataset, therefore please increase or decrease the sliders according to your setup and environment

1. Speed Control (speed_gain_slider): To start your JetBot increase ``speed_gain_slider`` 
2. Steering Gain Control (steering_gain_slider): If you see JetBot is wobbling, you need to reduce ``steering_gain_slider`` till it is smooth
3. Steering Bias control (steering_bias_slider): If you see JetBot is biased towards extreme right or extreme left side of the track, you should control this slider till JetBot start following line or track in the center.  This accounts for motor biases as well as camera offsets

> Note: You should play around above mentioned sliders with lower speed to get smooth JetBot road following behavior.

In [None]:
speed_gain_slider = ipywidgets.FloatSlider(min=0.0, max=1.0, step=0.01, description='speed gain')
steering_gain_slider = ipywidgets.FloatSlider(min=0.0, max=1.0, step=0.01, value=0.2, description='steering gain')
steering_dgain_slider = ipywidgets.FloatSlider(min=0.0, max=0.5, step=0.001, value=0.0, description='steering kd')
steering_bias_slider = ipywidgets.FloatSlider(min=-0.3, max=0.3, step=0.01, value=0.0, description='steering bias')

display(speed_gain_slider, steering_gain_slider, steering_dgain_slider, steering_bias_slider)

Next, let's display some sliders that will let us see what JetBot is thinking.  The x and y sliders will display the predicted x, y values.

The steering slider will display our estimated steering value.  Please remember, this value isn't the actual angle of the target, but simply a value that is
nearly proportional.  When the actual angle is ``0``, this will be zero, and it will increase / decrease with the actual angle.  

In [None]:
x_slider = ipywidgets.FloatSlider(min=-1.0, max=1.0, description='x')
y_slider = ipywidgets.FloatSlider(min=0, max=1.0, orientation='vertical', description='y')
steering_slider = ipywidgets.FloatSlider(min=-1.0, max=1.0, description='steering')
speed_slider = ipywidgets.FloatSlider(min=0, max=1.0, orientation='vertical', description='speed')

display(ipywidgets.HBox([y_slider, speed_slider]))
display(x_slider, steering_slider)

Next, we'll create a function that will get called whenever the camera's value changes. This function will do the following steps

1. Pre-process the camera image
2. Execute the neural network
3. Compute the approximate steering value
4. Control the motors using proportional / derivative control (PD)

In [None]:
angle = 0.0
angle_last = 0.0

def execute(change):
    global angle, angle_last
    image = change['new']
    xy = model(preprocess(image)).detach().float().cpu().numpy().flatten()
    x = xy[0]
    y = (0.5 - xy[1]) / 2.0
    
    x_slider.value = x
    y_slider.value = y
    
    speed_slider.value = speed_gain_slider.value
    
    angle = np.arctan2(x, y)
    pid = angle * steering_gain_slider.value + (angle - angle_last) * steering_dgain_slider.value
    angle_last = angle
    
    steering_slider.value = pid + steering_bias_slider.value
    
    robot.left_motor.value = max(min(speed_slider.value + steering_slider.value, 1.0), 0.0)
    robot.right_motor.value = max(min(speed_slider.value - steering_slider.value, 1.0), 0.0)
    
execute({'new': camera.value})

Cool! We've created our neural network execution function, but now we need to attach it to the camera for processing.

We accomplish that with the observe function.

>WARNING: This code will move the robot!! Please make sure your robot has clearance and it is on Lego or Track you have collected data on. The road follower should work, but the neural network is only as good as the data it's trained on!

In [None]:
camera.observe(execute, names='value')

Awesome! If your robot is plugged in it should now be generating new commands with each new camera frame. 

You can now place JetBot on  Lego or Track you have collected data on and see whether it can follow track.

If you want to stop this behavior, you can unattach this callback by executing the code below.

In [None]:
import time

camera.unobserve(execute, names='value')

time.sleep(0.1)  # add a small sleep to make sure frames have finished processing

robot.stop()

Again, let's close the camera conneciton properly so that we can use the camera in other notebooks.

In [None]:
camera.stop()

### Conclusion
That's it for this live demo! Hopefully you had some fun seeing your JetBot moving smoothly on track following the road!!!

If your JetBot wasn't following road very well, try to spot where it fails. The beauty is that we can collect more data for these failure scenarios and the JetBot should get even better :)