In [None]:
import subprocess
import os
import signal

# Start the usb_cam node as a background process
# This provides real-time image data for inference
usb_cam_process = subprocess.Popen(
    ['ros2', 'run', 'usb_cam', 'usb_cam_node_exe'],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    preexec_fn=os.setsid  # Group the process for easier termination
)

print("usb_cam node started.")

In [None]:
import torch
device = torch.device('cuda')

# Load TensorRT optimized model
from torch2trt import TRTModule

model_trt = TRTModule()
model_trt.load_state_dict(torch.load('best_steering_model_xy_trt.pth'))

print("Loaded optimized trained weights")

# Define image preprocessing steps to match training format

# We have now loaded our model, but there's a slight issue. The format that we trained our model doesn't exactly match the format of the camera. To do that, we need to do some preprocessing. This involves the following steps:

# 1. Convert from HWC layout to CHW layout
# 2. Normalize using same parameters as we did during training (our camera provides values in [0, 255] range and training loaded images in [0, 1] range so we need to scale by 255.0
# 3. Transfer the data from CPU memory to GPU memory
# 4. Add a batch dimension

import torchvision.transforms as transforms
import torch.nn.functional as F
import cv2
import PIL.Image
import numpy as np

mean = torch.Tensor([0.485, 0.456, 0.406]).cuda().half()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda().half()

def preprocess(image):
    image = PIL.Image.fromarray(image).resize((224, 224))
    image = transforms.functional.to_tensor(image).to(device).half()
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

print("Created the Pre-Processing Function")


In [None]:
from IPython.display import display
import ipywidgets as widgets

# PID and inference sliders for control and feedback
speed_gain_slider = widgets.FloatSlider(min=0.0, max=1.0, step=0.01, description='speed gain')
steering_gain_slider = widgets.FloatSlider(min=0.0, max=1.0, step=0.01, value=0.2, description='steering gain')
steering_dgain_slider = widgets.FloatSlider(min=0.0, max=0.5, step=0.001, value=0.0, description='steering kd')
steering_bias_slider = widgets.FloatSlider(min=-0.3, max=0.3, step=0.01, value=0.0, description='steering bias')

display(speed_gain_slider, steering_gain_slider, steering_dgain_slider, steering_bias_slider)

# Sliders for real-time predictions
x_slider = widgets.FloatSlider(min=-1.0, max=1.0, description='x')
y_slider = widgets.FloatSlider(min=0, max=1.0, orientation='vertical', description='y')
steering_slider = widgets.FloatSlider(min=-1.0, max=1.0, description='steering')

display(x_slider, y_slider, steering_slider)

# Image display widget
image_widget = widgets.Image(format='jpeg')
display(image_widget)

In [None]:
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image as ROSImage
from cv_bridge import CvBridge
import cv2
from IPython.display import display
import ipywidgets as widgets
from threading import Thread
import numpy as np
import time
from geometry_msgs.msg import Twist

def bgr8_to_jpeg(value):
    return cv2.imencode('.jpg', value)[1].tobytes()
    
angle = 0.0
angle_last = 0.0
bridge = CvBridge()

# This node listens to the camera feed, runs inference,
# calculates steering angle using a basic PID controller,
# and publishes the result as a Twist message.
class ImageSubscriber(Node):
    def __init__(self):
        super().__init__('image_subscriber')
        self.subscription = self.create_subscription(
            ROSImage,
            '/image_raw',
            self.listener_callback,
            10)
        
        self.twist_publisher = self.create_publisher(Twist, 'JetBot1/target', 10)
    
    def listener_callback(self, msg):
        global angle, angle_last
        frame = bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
        
        # Display input frame
        image_widget.value = bgr8_to_jpeg(frame)
        
        # Run inference
        xy = model_trt(preprocess(frame)).detach().float().cpu().numpy().flatten()
        x = xy[0]
        y = (0.5 - xy[1]) / 2.0
        
        # Update sliders with predictions
        x_slider.value = x
        y_slider.value = y

        # Compute steering using a proportional-derivative controller
        angle = np.arctan2(x, y)
        pid = angle * steering_gain_slider.value + (angle - angle_last) * steering_dgain_slider.value
        angle_last = angle
        steering_slider.value = pid + steering_bias_slider.value
        
        # Publish Twist message
        twist_msg = Twist()
        twist_msg.linear.x = speed_gain_slider.value     
        twist_msg.angular.z = steering_slider.value
        self.twist_publisher.publish(twist_msg)

# Run the ROS 2 node in a background thread
def start_ros_node():
    if not rclpy.ok():
        rclpy.init()
    node = ImageSubscriber()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

ros_thread = Thread(target=start_ros_node, daemon=True)
ros_thread.start()


In [None]:
# Stop the camera node process when done
os.killpg(os.getpgid(usb_cam_process.pid), signal.SIGINT)