### Step 1: Optimize road_following model
> Skip if you already have trt model

##### First, create the model. This must match the model used in the interactive training notebook.

In [None]:
import cv2
import torch
import torchvision
import torch.optim as optim
import torch.nn.functional as F
import torchvision.models as models
import numpy as np
import torch.nn as nn

CATEGORIES = ['apex']
# Specify the device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
resnet18 = models.resnet18(pretrained=False)

# Modify the fully connected layers
class ModifiedResNet18(nn.Module):
    def __init__(self, original_model):
        super(ModifiedResNet18, self).__init__()
        self.features = nn.Sequential(*list(original_model.children())[:-1])  # Exclude the original FC layer
        self.fc1 = nn.Linear(original_model.fc.in_features, 128)
        self.relu1 = nn.ReLU(inplace=True)
        self.fc2 = nn.Linear(128, 32)
        self.relu2 = nn.ReLU(inplace=True)
        self.fc3 = nn.Linear(32, 2)

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)  # Flatten the tensor
        x = self.fc1(x)
        x = self.relu1(x)
        x = self.fc2(x)
        x = self.relu2(x)
        x = self.fc3(x)
        return x

# Instantiate the modified model
model = ModifiedResNet18(resnet18)
model.load_state_dict(torch.load('resnet18_road_follow.pth'))

# Move the model to the GPU
model = model.to(device)
model = model.cuda().eval().half()

##### Convert and optimize the model using ``torch2trt`` for faster inference with TensorRT.  Please see the [torch2trt](https://github.com/NVIDIA-AI-IOT/torch2trt) readme for more details.

> This optimization process can take a couple minutes to complete. 

In [None]:
from torch2trt import torch2trt

data = torch.zeros((1, 3, 224, 224)).cuda().half()
model_trt = torch2trt(model, [data], fp16_mode=True)

##### Save the optimized model using the cell below

In [None]:
torch.save(model_trt.state_dict(), 'road_following_model_trt.pth')

### Step 2: Load the optimized model by executing the cell below

In [1]:
import cv2
import torch
import torchvision
import torch.optim as optim
import torch.nn.functional as F
import torchvision.models as models
import numpy as np
import torch.nn as nn
from torch2trt import TRTModule

CATEGORIES = ['apex']
# Specify the device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

road_model_trt = TRTModule()
road_model_trt.load_state_dict(torch.load('road_following_model_trt.pth'))

def detect_road(img_tensor):
    return road_model_trt(img_tensor).detach().cpu().numpy().flatten()

##### Access YOLO via API

In [27]:
import requests

def upload_image(api_url, img_bytes, throttle, steering, road_x, road_y):
    files = {'image': ('image.jpg', img_bytes, 'image/jpeg')}
    data = {
        'throttle': throttle,
        'steering': steering,
        'road_x': road_x,
        'road_y': road_y
    }
    response = requests.post(api_url, files=files, data=data)
    response.raise_for_status()
    detections = response.json()
    return detections

API_URL = 'http://192.168.45.247:8485/detect'  # Replace with your API URL

def detect_traffic_sign(byte_jpeg, throttle, steering, road_x, road_y):
    return upload_image(API_URL, byte_jpeg, throttle, steering, road_x, road_y)

### Step 3: Library

In [3]:
import torchvision.transforms as transforms
import cv2
import PIL.Image
import numpy as np
import requests


mean = torch.Tensor([0.485, 0.456, 0.406]).cuda()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda()

def bgr8_to_jpeg(value, quality=75):
    return bytes(cv2.imencode('.jpg', value)[1])

def bgr8_to_tensor(image):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device)
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]


### Step 4: Create the racecar class

In [4]:
from jetracer.nvidia_racecar import NvidiaRacecar

car = NvidiaRacecar()

In [26]:
from collections import deque

# Constants for steering adjustments
STEERING_GAIN_HIGH = -1.2
THROTTLE_HIGH = -0.28
STEERING_GAIN_LOW = -1.0
THROTTLE_LOW = -0.20
STEERING_BIAS = -0.05
FRAME_QUEUE_SIZE = 16
COUNT_NEED = int(FRAME_QUEUE_SIZE / 2) + 1

# Define and initialize car_status and observation objects
class CarStatus:
    def __init__(self):
        self.is_car_running = False
        self.speed = THROTTLE_HIGH
        self.steering_gain = STEERING_GAIN_HIGH

class Observation:
    def __init__(self):
        self.current_time = 1
        self.bgr8 = None
        self.bgr8fixed = None
        self.jpeg = None
        self.road_output = None
        self.traffic_sign_queue = deque(maxlen=FRAME_QUEUE_SIZE)
        self.frame_index = 0
        self.timer = Timer()
        self.flag = Flag()

class Timer:
    def __init__(self):
        self.park = 1
        self.s100 = 1
        self.s50 = 1
        self.stop = 1
        self.traffic = 1

class Flag:
    def __init__(self):
        self.park = False
        self.s100 = False
        self.s50 = False
        self.stop = False
        self.traffic = False
        self.resume = False

car_status = CarStatus()
observation = Observation()

### Step 5: Create the camera class.

In [6]:
from jetcam.csi_camera import CSICamera

camera = CSICamera(width=224, height=224, capture_fps=40)

In [7]:
import ipywidgets

image_widget = ipywidgets.Image(format='jpeg')

In [8]:
from IPython.display import display
import cv2

observation.bgr8 = camera.read()
print(observation.bgr8.shape)
observation.jpeg = bgr8_to_jpeg(observation.bgr8)
image_widget.value = observation.jpeg
# display(image_widget)

(224, 224, 3)


### Step 6: Start camera

In [9]:
import numpy as np
from IPython.display import clear_output
from collections import deque

camera.running = True

def update_image(change):
    observation.bgr8 = change['new']
#     # Update the image widget
#     byte_img = bgr8_to_jpeg(observation.bgr8)
#     image_widget.value = byte_img

# Attach the update_image function to the camera's value observer
camera.observe(update_image, names='value')

In [10]:
display(image_widget)

Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C\x00\x02\x01\x0…

## Step 7: Run car

In [34]:
import time
from collections import deque, Counter
import copy

def set_speed_high():
    """Set car speed to high and adjust steering."""
    car.throttle = THROTTLE_HIGH
    car_status.speed = car.throttle
    car_status.steering_gain = STEERING_GAIN_HIGH

def set_speed_low():
    """Set car speed to low and adjust steering."""
    car.throttle = THROTTLE_LOW
    car_status.speed = car.throttle
    car_status.steering_gain = STEERING_GAIN_LOW
    
def set_speed_zero():
    car.throttle = 0.0
    
def resume_speed():
    car.throttle = car_status.speed

def hit_brake(seconds=1):
    """Stop the car for a specified number of seconds."""
    car.throttle = 0.0
    time.sleep(seconds)
    car.throttle = car_status.speed

def park_car():
    """Perform a smooth parking maneuver."""
    car.throttle = -THROTTLE_HIGH
    time.sleep(1)
    
    # Steer to the right
    car.steering = 0.8
    car.throttle = THROTTLE_HIGH
    time.sleep(1)
    
    # Move backward while steering to the left
    car.steering = -0.8
    car.throttle = -THROTTLE_HIGH
    time.sleep(1)
    
    # Steer to the right again
    car.steering = 0.8
    car.throttle = THROTTLE_HIGH
    time.sleep(2.5)
    
    # Stop the car
    car.throttle = 0.0
    car.steering = 0.0
    car_status.is_car_running = False


def evaluate_observation():
    # Make a deep copy of the bgr8 image
    observation.bgr8fixed = copy.deepcopy(observation.bgr8)
    
    img_tensor = bgr8_to_tensor(observation.bgr8fixed).half()
    observation.road_output = detect_road(img_tensor)

    observation.jpeg = bgr8_to_jpeg(observation.bgr8fixed)
    if (observation.frame_index % 2) == 1:
        # Convert the image to a format suitable for upload (e.g., NumPy array)
        sign_output = detect_traffic_sign(observation.jpeg, car.throttle, car.steering, observation.road_output[0], observation.road_output[1])
        clear_output(wait=True)  # Clear the output
        print(f'speed {car.throttle}')
        print(sign_output)
        # Add the detection to the queue
        observation.traffic_sign_queue.append(sign_output)
    observation.frame_index += 1

    # Use Counter to count occurrences of each traffic sign
    observation.counter = Counter(detection['cls'] for detection in observation.traffic_sign_queue if detection)

    # Update flags based on detection counts
    observation.flag.park = observation.counter['park'] >= COUNT_NEED and observation.current_time >= observation.timer.park
    observation.flag.s100 = observation.counter['s100'] >= COUNT_NEED and observation.current_time >= observation.timer.s100
    observation.flag.s50 = observation.counter['s50'] >= COUNT_NEED and observation.current_time >= observation.timer.s50
    if observation.counter['stop'] >= COUNT_NEED:
        observation.flag.stop = True
    elif observation.flag.stop:
        observation.flag.stop = False
        observation.flag.resume = True
    observation.flag.traffic = observation.counter['traffic'] >= COUNT_NEED and observation.current_time >= observation.timer.traffic

    # Get the current time in seconds since the Unix epoch
    observation.current_time = time.time()

    cv2.circle(observation.bgr8fixed, (observation.road_output[0], observation.road_output[1]), 5, (0, 0, 255), -1)  # Red dot
    # mutable class, change on observation.bgr8fixed will affect observation.jpeg
    image_widget.value = observation.jpeg

def take_action():
    x = float(observation.road_output[0])
    x_percent = (x - 112) / 112
    if car_status.is_car_running:
        car.steering = x_percent * car_status.steering_gain + STEERING_BIAS
    
    # React to traffic sign detections based on flags
    if observation.flag.park:
        newest_detection = observation.traffic_sign_queue[-1]
        xmin = newest_detection['xmin']
        if xmin > 30:
            pass
        if xmin > 15:
            car.throttle = THROTTLE_LOW
#         elif 0 <= xmin < 5:
#             car.throttle = -THROTTLE_LOW
        else:
            # do whatever sign description
            park_car()
            sign_output = detect_traffic_sign(observation.jpeg, 0.0, 0.0, 112, 112)
            observation.flag = Flag() # Fasle all flag
    elif observation.flag.s100:
        set_speed_high()
        observation.timer.s100 = observation.current_time + 10
    elif observation.flag.s50:
        set_speed_low()
        observation.timer.s50 = observation.current_time + 10
    elif observation.flag.stop:
        newest_detection = observation.traffic_sign_queue[-1]
        xmin = newest_detection['xmin']
        if xmin > 30:
            pass
        if xmin > 15:
            car.throttle = THROTTLE_LOW
#         elif 0 <= xmin < 5:
#             car.throttle = -THROTTLE_LOW
        else:
            # do whatever sign description
            car.throttle = 0
    elif observation.flag.traffic:
        newest_detection = observation.traffic_sign_queue[-1]
        xmin = newest_detection['xmin']
        if xmin > 30:
            pass
        if xmin > 15:
            car.throttle = THROTTLE_LOW
#         elif 0 <= xmin < 5:
#             car.throttle = -THROTTLE_LOW
        else:
            # do whatever sign description
            sign_output = detect_traffic_sign(observation.jpeg, 0.0, 0.0, 112, 112)
            hit_brake(seconds=10)  # Brake then wait 10s then continue to run
            observation.timer.traffic = time.time() + 10
        pass
    elif observation.flag.resume: # No longer detect "STOP" sign
        print("resume")
        resume_speed()
        observation.flag.resume = False
    else:
        # No traffic sign detected action
        pass

In [35]:

try:
    car_status = CarStatus()
    observation = Observation()
    car_status.is_car_running = True
    set_speed_high()
    while car_status.is_car_running:
        evaluate_observation()
        take_action()
finally:
    car_status.is_car_running = False
    car.throttle = 0
    car.steering = 0


speed -0.2
{'cls': 'park', 'confident': '0.9697', 'xmax': 21, 'xmin': 2, 'ymax': 85, 'ymin': 39}


In [None]:
try:
    park_car()
finally:
    car_status.is_car_running = False
    car.throttle = 0
    car.steering = 0

##### Stop camera

In [None]:
camera.unobserve(update_image, names='value')
camera.running = False

In [None]:
def stop_camera(camera):
    camera.running = False
    camera.cap.release()  # Release the camera resource if applicable

stop_camera(camera)