In [None]:
from jetcam.csi_camera import CSICamera
# from jetcam.usb_camera import USBCamera

camera = CSICamera(width=224, height=224)
# camera = USBCamera(width=224, height=224)

camera.running = True

import torchvision.transforms as transforms
from xy_dataset import XYDataset

TASK = 'corner_detection2'

# We'll use two categories: 'straight' for normal path and 'corner' for corners
CATEGORIES = ['straight', 'corner']

DATASETS = ['A', 'B']

TRANSFORMS = transforms.Compose([
    transforms.ColorJitter(0.2, 0.2, 0.2, 0.2),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

datasets = {}
for name in DATASETS:
    datasets[name] = XYDataset(TASK + '_' + name, CATEGORIES, TRANSFORMS, random_hflip=True)

import cv2
import ipywidgets
import traitlets
from IPython.display import display
from jetcam.utils import bgr8_to_jpeg
from jupyter_clickable_image_widget import ClickableImageWidget

# initialize active dataset
dataset = datasets[DATASETS[0]]

# unobserve all callbacks from camera in case we are running this cell for second time
camera.unobserve_all()

# create image preview
camera_widget = ClickableImageWidget(width=camera.width, height=camera.height)
snapshot_widget = ipywidgets.Image(width=camera.width, height=camera.height)
traitlets.dlink((camera, 'value'), (camera_widget, 'value'), transform=bgr8_to_jpeg)

# create widgets
dataset_widget = ipywidgets.Dropdown(options=DATASETS, description='dataset')
category_widget = ipywidgets.Dropdown(options=dataset.categories, description='category')
count_widget = ipywidgets.IntText(description='count')

# manually update counts at initialization
count_widget.value = dataset.get_count(category_widget.value)

# sets the active dataset
def set_dataset(change):
    global dataset
    dataset = datasets[change['new']]
    count_widget.value = dataset.get_count(category_widget.value)
dataset_widget.observe(set_dataset, names='value')

# update counts when we select a new category
def update_counts(change):
    count_widget.value = dataset.get_count(change['new'])
category_widget.observe(update_counts, names='value')

def save_snapshot(_, content, msg):
    if content['event'] == 'click':
        data = content['eventData']
        x = data['offsetX']
        y = data['offsetY']
        
        # save to disk
        dataset.save_entry(category_widget.value, camera.value, x, y)
        
        # display saved snapshot
        snapshot = camera.value.copy()
        snapshot = cv2.circle(snapshot, (x, y), 8, (0, 255, 0), 3)
        snapshot_widget.value = bgr8_to_jpeg(snapshot)
        count_widget.value = dataset.get_count(category_widget.value)
        
camera_widget.on_msg(save_snapshot)

data_collection_widget = ipywidgets.VBox([
    ipywidgets.HBox([camera_widget, snapshot_widget]),
    dataset_widget,
    category_widget,
    count_widget
])

display(data_collection_widget)

import torch
import torchvision

device = torch.device('cuda')
output_dim = 2 * len(dataset.categories)  # x, y coordinate for each category

# Using RESNET 18 for better feature extraction
model = torchvision.models.resnet18(pretrained=True)
model.fc = torch.nn.Linear(512, output_dim)

model = model.to(device)

model_save_button = ipywidgets.Button(description='save model')
model_load_button = ipywidgets.Button(description='load model')
model_path_widget = ipywidgets.Text(description='model path', value='corner_detection2_model.pth')

def load_model(c):
    model.load_state_dict(torch.load(model_path_widget.value))
model_load_button.on_click(load_model)
    
def save_model(c):
    torch.save(model.state_dict(), model_path_widget.value)
model_save_button.on_click(save_model)

model_widget = ipywidgets.VBox([
    model_path_widget,
    ipywidgets.HBox([model_load_button, model_save_button])
])

display(model_widget)

import threading
import time
from utils import preprocess
import torch.nn.functional as F

state_widget = ipywidgets.ToggleButtons(options=['stop', 'live'], description='state', value='stop')
prediction_widget = ipywidgets.Image(format='jpeg', width=camera.width, height=camera.height)

def live(state_widget, model, camera, prediction_widget):
    global dataset
    while state_widget.value == 'live':
        image = camera.value
        preprocessed = preprocess(image)
        output = model(preprocessed).detach().cpu().numpy().flatten()
        
        # Show predictions for both categories
        prediction = image.copy()
        for i, category in enumerate(dataset.categories):
            x = output[2 * i]
            y = output[2 * i + 1]
            
            x = int(camera.width * (x / 2.0 + 0.5))
            y = int(camera.height * (y / 2.0 + 0.5))
            
            color = (255, 0, 0) if category == 'corner' else (0, 0, 255)
            prediction = cv2.circle(prediction, (x, y), 8, color, 3)
            prediction = cv2.putText(prediction, category, (x-30, y-20), 
                                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
        
        prediction_widget.value = bgr8_to_jpeg(prediction)
            
def start_live(change):
    if change['new'] == 'live':
        execute_thread = threading.Thread(target=live, args=(state_widget, model, camera, prediction_widget))
        execute_thread.start()

state_widget.observe(start_live, names='value')

live_execution_widget = ipywidgets.VBox([
    prediction_widget,
    state_widget
])

display(live_execution_widget)

BATCH_SIZE = 16  # Increased batch size for better generalization

optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

epochs_widget = ipywidgets.IntText(description='epochs', value=10)  # More epochs for better learning
eval_button = ipywidgets.Button(description='evaluate')
train_button = ipywidgets.Button(description='train')
loss_widget = ipywidgets.FloatText(description='loss')
progress_widget = ipywidgets.FloatProgress(min=0.0, max=1.0, description='progress')

def train_eval(is_training):
    global BATCH_SIZE, model, dataset, optimizer, eval_button, train_button, loss_widget, progress_widget, state_widget
    
    try:
        train_loader = torch.utils.data.DataLoader(
            dataset,
            batch_size=BATCH_SIZE,
            shuffle=True
        )

        state_widget.value = 'stop'
        train_button.disabled = True
        eval_button.disabled = True
        time.sleep(1)

        if is_training:
            model = model.train()
        else:
            model = model.eval()

        while epochs_widget.value > 0:
            i = 0
            sum_loss = 0.0
            for images, category_idx, xy in iter(train_loader):
                # send data to device
                images = images.to(device)
                xy = xy.to(device)

                if is_training:
                    optimizer.zero_grad()

                outputs = model(images)

                # compute MSE loss over x, y coordinates for associated categories
                loss = 0.0
                for batch_idx, cat_idx in enumerate(list(category_idx.flatten())):
                    loss += torch.mean((outputs[batch_idx][2 * cat_idx:2 * cat_idx+2] - xy[batch_idx])**2)
                loss /= len(category_idx)

                if is_training:
                    loss.backward()
                    optimizer.step()

                # increment progress
                count = len(category_idx.flatten())
                i += count
                sum_loss += float(loss)
                progress_widget.value = i / len(dataset)
                loss_widget.value = sum_loss / i
                
            if is_training:
                epochs_widget.value = epochs_widget.value - 1
            else:
                break
    except Exception as e:
        print(f"Error: {e}")
    model = model.eval()

    train_button.disabled = False
    eval_button.disabled = False
    state_widget.value = 'live'
    
train_button.on_click(lambda c: train_eval(is_training=True))
eval_button.on_click(lambda c: train_eval(is_training=False))
    
train_eval_widget = ipywidgets.VBox([
    epochs_widget,
    progress_widget,
    loss_widget,
    ipywidgets.HBox([train_button, eval_button])
])

display(train_eval_widget)

all_widget = ipywidgets.VBox([
    ipywidgets.HBox([data_collection_widget, live_execution_widget]), 
    train_eval_widget,
    model_widget
])

display(all_widget)

In [None]:
# Cell X: Camera Cleanup Cell (run this before re-running camera initialization)
try:
    if 'camera' in globals():
        print("Stopping camera...")
        camera.running = False
        camera.cap.release()
        del camera
        print("Camera successfully stopped")
    else:
        print("No camera instance found to stop")
except Exception as e:
    print(f"Error stopping camera: {e}")
finally:
    # Additional cleanup for good measure
    import gc
    gc.collect()
    print("Cleanup complete - you can now re-run the camera initialization code")

In [None]:
# Cell 1: Import libraries and set up categories
import cv2
import torch
import torchvision
import numpy as np
import time
from torch2trt import TRTModule
from jetracer.nvidia_racecar import NvidiaRacecar
from jetcam.csi_camera import CSICamera
from utils import preprocess

# Define our two categories
CATEGORIES = ['straight', 'corner']
device = torch.device('cuda')

In [None]:
# Cell 2: Model setup - tries to load TensorRT model first, falls back to regular PyTorch
model_path = 'corner_detection2_model.pth'
optimized_path = 'corner_detection2_model_trt.pth'

# Initialize base model
model = torchvision.models.resnet18(pretrained=False)
model.fc = torch.nn.Linear(512, 2 * len(CATEGORIES))
model = model.to(device).eval().half()

# Try loading TensorRT optimized model
try:
    model_trt = TRTModule()
    model_trt.load_state_dict(torch.load(optimized_path))
    print("Successfully loaded TensorRT optimized model")
except Exception as e:
    print(f"Could not load TensorRT model: {e}")
    print("Loading regular PyTorch model instead")
    model.load_state_dict(torch.load(model_path))
    model_trt = model  # Fall back to regular model
    print("Loaded regular PyTorch model")

In [None]:
# Cell 3: Initialize hardware components
# Set up racecar
car = NvidiaRacecar()

# Set up camera
camera = CSICamera(width=224, height=224, capture_fps=65)

# Initial throttle setting
car.throttle = 0.0

In [None]:
# Cell 4: Configuration parameters
# Steering parameters
STEERING_GAIN = 0.75
STEERING_BIAS = 0.00

# Throttle parameters
FORWARD_THROTTLE = 0.2    # Normal forward speed
TURN_THROTTLE = 0.1       # Reduced speed during turns
TURN_DURATION = 1.0       # How long to turn (seconds)

# Detection thresholds
CORNER_CONFIDENCE_THRESHOLD = 0.7  # How confident we need to be
CORNER_POSITION_THRESHOLD = 0.5    # Where corner appears in frame

In [None]:
# Cell 5: Corner detection function
def is_corner_detected(output):
    """Determine if a corner is detected with sufficient confidence"""
    corner_x = output[2]  # x position for corner
    corner_y = output[3]  # y position for corner
    
    # Calculate confidence as magnitude of the output vector
    corner_confidence = np.sqrt(corner_x**2 + corner_y**2)
    
    # Only trigger if confidence is high and corner is in detection zone
    return (corner_confidence > CORNER_CONFIDENCE_THRESHOLD and 
            abs(corner_x) > CORNER_POSITION_THRESHOLD)

In [None]:
# Cell 6: Main control loop
try:
    while True:
        # Get camera image
        image = camera.read()
        if image is None:
            print("No image from camera")
            time.sleep(0.1)
            continue
            
        try:
            # Preprocess and run inference
            image_preprocessed = preprocess(image).half()
            output = model_trt(image_preprocessed)
            
            # Convert output based on model type
            if hasattr(output, '_trt'):  # TensorRT output
                output = output.detach().cpu().numpy().flatten()
            else:  # Regular PyTorch output
                output = output.detach().cpu().numpy().flatten()
            
            # Decision making
            if is_corner_detected(output):
                print("Corner detected! Turning left...")
                
                # Execute turn
                car.throttle = TURN_THROTTLE
                car.steering = 1.51  # Full left
                
                # Turn for specified duration
                start_time = time.time()
                while time.time() - start_time < TURN_DURATION:
                    time.sleep(0.1)
                
                # Resume straight driving
                car.steering = 0.0
                car.throttle = FORWARD_THROTTLE
            else:
                # Default straight behavior
                car.steering = 0.0
                car.throttle = FORWARD_THROTTLE
                
        except Exception as e:
            print(f"Processing error: {e}")
            car.throttle = 0.0  # Stop on error
            time.sleep(1)

except KeyboardInterrupt:
    print("\nStopping...")
    
finally:
    # Ensure car stops no matter how we exit
    car.throttle = 0.0
    car.steering = 0.0
    print("Car stopped safely")

In [None]:
  car.throttle = 0.0
    car.steering = 0.0
    print("Car stopped safely")

In [None]:
# hard coded

In [1]:
# Cell 1: Setup and Imports
import time
import ipywidgets as widgets
from IPython.display import display
from jetracer.nvidia_racecar import NvidiaRacecar

car = NvidiaRacecar()
car.throttle = 0.0
car.steering = 0.0

WARNNIG: Jetson.GPIO library has not been verified with this carrier board,


In [12]:
# Cell 2: Create Control Panel
# Sliders
turn_interval = widgets.FloatSlider(
    value=5.0, min=1.0, max=30.0, step=0.5,
    description='Turn every (sec):'
)

turn_duration = widgets.FloatSlider(
    value=1.0, min=0.1, max=3.0, step=0.1,
    description='Turn duration (sec):'
)

normal_throttle = widgets.FloatSlider(
    value=0.2, min=0.0, max=0.5, step=0.01,
    description='Normal throttle:'
)

turn_throttle = widgets.FloatSlider(
    value=0.1, min=0.0, max=0.3, step=0.01,
    description='Turn throttle:'
)

turn_sterring = widgets.FloatSlider(
    value=-3, min=-5, max=5, step=0.1,
    description='Turn throttle:'
)

# Buttons
start_button = widgets.Button(
    description='Start Auto-Turn',
    button_style='success',
    icon='play'
)

stop_button = widgets.Button(
    description='Emergency Stop',
    button_style='danger',
    icon='stop'
)

status_output = widgets.Output()

display(widgets.VBox([
    turn_interval,
    turn_duration,
    normal_throttle,
    turn_throttle,
    turn_sterring,
    widgets.HBox([start_button, stop_button]),
    status_output
]))

VBox(children=(FloatSlider(value=5.0, description='Turn every (sec):', max=30.0, min=1.0, step=0.5), FloatSlid…

In [13]:
# Cell 3: Control Logic
import threading

running = False

def auto_turn_loop():
    global running
    running = True
    last_turn_time = time.time()
    
    with status_output:
        print("Started periodic turning behavior")
    
    while running:
        try:
            # Normal straight driving
            car.steering = 0.0
            car.throttle = normal_throttle.value
            
            # Check if time for periodic turn
            current_time = time.time()
            if current_time - last_turn_time >= turn_interval.value:
                with status_output:
                    print(f"{time.ctime()}: Turning left for {turn_duration.value} sec")
                
                last_turn_time = current_time
                car.steering = turn_sterring.value
                car.throttle = turn_throttle.value
                
                # Wait for turn duration
                turn_end = current_time + turn_duration.value
                while time.time() < turn_end and running:
                    time.sleep(0.1)
                
                if not running:
                    break
                    
            time.sleep(0.1)
            
        except Exception as e:
            with status_output:
                print(f"Error: {str(e)}")
            break
    
    car.throttle = 0.0
    car.steering = 0.0
    with status_output:
        print("Stopped")

def start_auto_turn(b):
    global running
    if not running:
        thread = threading.Thread(target=auto_turn_loop)
        thread.start()

def stop_auto_turn(b):
    global running
    running = False
    with status_output:
        print("Stop requested...")

start_button.on_click(start_auto_turn)
stop_button.on_click(stop_auto_turn)

In [None]:
# Cell 4: Instructions for use
print("""
INSTRUCTIONS:
1. Adjust sliders to desired values:
   - How often to turn (seconds)
   - How long turns should last
   - Normal driving throttle
   - Turning throttle

2. Click the Emergency Stop button to START the periodic behavior
   (The button will turn green when active)

3. Click again to STOP the car completely
   (Button will turn red when stopped)

Note: The car will automatically:
- Drive straight at normal throttle
- Turn hard left at specified intervals
- Use reduced throttle during turns
""")