May need to run the whole code twice as for the first run, the JetBot will power off itself

# Collision Avoidance

In [1]:
import torch
import torchvision

model = torchvision.models.resnet18(pretrained=False) # A pre-trained 18 layer CNN network that can classify images into 1000 object categories
model.fc = torch.nn.Linear(512, 4) # Change the last layer of the model, change second parameter according to how many labels there are

Load trained weights from pre-trained model

In [2]:
model.load_state_dict(torch.load('best_model_resnet18_modified.pth')) # Change according to the path of the pre-trained model
device = torch.device('cuda')
model = model.to(device)
model = model.eval().half()

Preprocessing
1. Convert from HWC layout to CHW layout
2. Normalize using same parameters as we did during training (our camera provides values in [0, 255] range and training loaded images in [0, 1] range so we need to scale by 255.0
3. Transfer the data from CPU memory to GPU memory
4. Add a batch dimension

In [3]:
import torchvision.transforms as transforms
import torch.nn.functional as F
import cv2
import PIL.Image
import numpy as np

mean = torch.Tensor([0.485, 0.456, 0.406]).cuda().half()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda().half()

normalize = torchvision.transforms.Normalize(mean, std)

def preprocess(image):
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device).half()
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

# Robot controls

In [4]:
from jetbot import Robot

robot = Robot()

In [None]:
# Test Forward
# robot.forward(0.5)

In [None]:
# Test Turn left
# robot.set_motors(0.4,0.6)

In [None]:
# Test Turn around
# robot.right(0.5)
# time.sleep(1)
# robot.stop()

In [None]:
# Stop jetbot
# robot.stop()

# Camera display

In [5]:
import traitlets
from IPython.display import display
import ipywidgets.widgets as widgets
from jetbot import Camera, bgr8_to_jpeg

camera = Camera.instance(width=224, height=224)
image = widgets.Image(format='jpeg', width=224, height=224)


camera_link = traitlets.dlink((camera, 'value'), (image, 'value'), transform=bgr8_to_jpeg)

display(widgets.VBox([widgets.HBox([image])]))

VBox(children=(HBox(children=(Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x…

Make the JetBot move to prevent power draw issues later such as camera freeze

Ensure camera is not frozen after running the below cell

In [6]:
import time
robot.left(speed=0.3)
time.sleep(0.5)
robot.stop()

1. Preprocess camera image
2. Execute neural network
3. Control what robot should do based on neural network output

In [None]:
import torch.nn.functional as F
import time

override = False
stopthreads = False

def update(change):
    global robot, override
    x = change['new'] # Camera input
    x = preprocess(x)
    y = model(x)
    
    # we apply the `softmax` function to normalize the output vector so it sums to 1 (which makes it a probability distribution)
    y = F.softmax(y, dim=1)
    
    highestblock = torch.argmax(y).item() # Get label value with highest probability
    
    if highestblock == 0: # Front is blocked
        override = True
        print("Front blocked")
        robot.stop()
    elif highestblock == 1: # Left is blocked
        override = True
        print("Left blocked")
        robot.right(0.5)
        time.sleep(1.5)
        robot.stop()
    elif highestblock == 2: # Right is blocked
        override = True
        print("Right blocked")
        robot.left(0.5)
        time.sleep(1.5)
        robot.stop()
    elif highestblock == 3: # Free
        override = False
    
    time.sleep(0.001)

def update_loop(): # Loop update function
    print("Starting collision avoidance")
    global stopthreads
    while True:
        try: 
            update({'new': camera.value})
        except:
            print("Error occurred for collision avoidance")
            stopthreads = True
            break
        if stopthreads:
            break

Call the function once to initialize and preload

In [None]:
update({'new': camera.value})  

# Socket Server

To allow the JetBot and the Hand Gesture Recognition application to communicate

In [7]:
import socket

def socket_server():
    global stopthreads
    s = socket.socket()

    print("Socket successfully created")

    port = 12345

    s.bind(('', port))

    print("Socket binded to %s" %(port))

    s.listen(5)
    print("Socket is listening")
    c, addr = s.accept()
    print(f"Connected to {addr}")
    
    leftcount = 0
    rightcount = 0

    try:
        with c:
            while True:
                input = c.recv(1024).decode()
                if override == False:
                    if input == "Toggle Reverse":
                        robot.right(0.5)
                        time.sleep(1)
                        robot.stop()
                    elif input == "Left":
                        rightcount = 0
                        rightmotor = 0.5 + (0.025 * leftcount)
                        leftmotor = rightmotor / 2
                        if leftmotor > 0.3:
                            if rightmotor >= 0.85: 
                                robot.set_motors(0.3, 0.85)
                            else:
                                robot.set_motors(0.3, rightmotor)
                        else: 
                            robot.set_motors(leftmotor, rightmotor)
                        leftcount += 1
                    elif input == "Right":
                        leftcount = 0
                        leftmotor = 0.5 + (0.025 * rightcount)
                        rightmotor = leftmotor / 2
                        if rightmotor > 0.3:
                            if leftmotor >= 0.85:
                                robot.set_motors(0.85, 0.3)
                            else:
                                robot.set_motors(leftmotor, 0.3)
                        else:
                            robot.set_motors(leftmotor, rightmotor)
                        rightcount += 1
                    elif input == "Throttle":
                        rightcount = 0
                        leftcount = 0
                        robot.forward(0.5)
                    elif input == "Brake" or "Full Stop":
                        leftcount = 0
                        rightcount = 0
                        robot.stop()
                    if not input:
                        break
                if stopthreads:
                    s.close()
                    break
    except: 
        s.close()
        print("Error occurred when communicating with client")

    print("Connection closed")

# Threading

To run both the while True loops (update_loop and socket_server) at once

So that one loop does not have to end before running the other

Needed as if both loops are implemented together, the program will be stuck at c, addr = s.accept() as it is waiting for a socket client to connect to the server

In [8]:
from threading import Thread

In [None]:
t1 = Thread(target = update_loop) # Collision avoidance
t1.start()

In [10]:
t2 = Thread(target = socket_server) # Socket server
t2.start()

Socket successfully created
Socket binded to 12345
Socket is listening
Connected to ('192.168.50.183', 50442)
Error occurred when communicating with client
Connection closed


# Turn off

Stop both the running functions

In [None]:
stopthreads = True
t1.join()
t2.join()
print("Threads stoppped")

Stop camera input to prevent errors when re-running the notebook or when using other notebooks

In [11]:
camera.stop()

In [None]:
import time

time.sleep(0.1)  # add a small sleep to make sure frames have finished processing

robot.stop()

Force close socket if there was a issue closing it earlier 

In [None]:
s.close()

Stop the running functions individually

In [None]:
t1.join()

In [None]:
t2.join()