# JetBot Steering + Speed - JetBot Notebook

This notebook runs on your **JetBot** for data collection and inference.

## Cells
1. **Config** - All parameters (run first, always)
2. **Speed Tuning** - Find MAX_SPEED before blur (run once during setup)
3. **Data Collection** - Collect training data with joystick
4. **Inference** - Run trained model autonomously
5. **DAgger** - Collect corrections while model drives

In [None]:
# =============================================================================
# CELL 1: CONFIGURATION (Run this first!)
# =============================================================================

import torch

# -----------------------------------------------------------------------------
# PATHS
# -----------------------------------------------------------------------------
DATASET_DIR = 'dataset_steering_speed_v1'
DAGGER_DIR = 'dataset_steering_speed_dagger'
MODEL_PATH = 'steering_speed_model_v1.pth'

# -----------------------------------------------------------------------------
# CAMERA
# -----------------------------------------------------------------------------
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480

# -----------------------------------------------------------------------------
# PREPROCESSING (same as steering-only project)
# -----------------------------------------------------------------------------
CROP_TOP = 0.20
CROP_BOTTOM = 0.00
CROP_LEFT = 0.08
CROP_RIGHT = 0.12
INPUT_SIZE = (224, 224)

# ImageNet normalization
IMAGENET_MEAN = [0.485, 0.456, 0.406]
IMAGENET_STD = [0.229, 0.224, 0.225]

# -----------------------------------------------------------------------------
# SPEED CONTROL
# -----------------------------------------------------------------------------
MIN_SPEED = 0.12    # Speed in corners (safe, tested)
MAX_SPEED = 0.25    # Speed on straights (tuned in Cell 2)

# Model outputs speed_factor (0 to 1), we scale it:
# actual_speed = MIN_SPEED + speed_factor * (MAX_SPEED - MIN_SPEED)

# -----------------------------------------------------------------------------
# STEERING CONTROL
# -----------------------------------------------------------------------------
STEERING_GAIN = 0.08

# -----------------------------------------------------------------------------
# TIMING
# -----------------------------------------------------------------------------
LOOP_HZ = 20
LOOP_SLEEP = 1.0 / LOOP_HZ

# -----------------------------------------------------------------------------
# JOYSTICK MAPPING (Logitech controller)
# -----------------------------------------------------------------------------
AXIS_STEERING = 0   # Left stick X
RT_BUTTON = 7       # Right trigger (analog 0-1)
RB_BUTTON = 5       # Right bumper
DEADZONE = 0.05

# -----------------------------------------------------------------------------
# DEVICE
# -----------------------------------------------------------------------------
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# -----------------------------------------------------------------------------
# HELPER FUNCTIONS
# -----------------------------------------------------------------------------
def speed_factor_to_actual(speed_factor):
    """Convert model output (0-1) to actual motor speed."""
    return MIN_SPEED + speed_factor * (MAX_SPEED - MIN_SPEED)

def actual_to_speed_factor(actual_speed):
    """Convert actual motor speed to model target (0-1)."""
    return (actual_speed - MIN_SPEED) / (MAX_SPEED - MIN_SPEED)

print("="*50)
print("STEERING + SPEED CONFIG")
print("="*50)
print(f"Device: {DEVICE}")
print(f"Speed range: {MIN_SPEED} - {MAX_SPEED}")
print(f"Dataset: {DATASET_DIR}")
print(f"DAgger dir: {DAGGER_DIR}")
print(f"Model: {MODEL_PATH}")
print("="*50)

In [None]:
# =============================================================================
# CELL 2: SPEED TUNING (Run once during setup)
# =============================================================================
# Test different speeds on a straight section to find MAX_SPEED before blur.
#
# Instructions:
# 1. Place robot at start of a straight section
# 2. Press START
# 3. Gradually increase speed slider
# 4. Watch camera feed - when blur appears, press MARK BLUR POINT
# 5. Press STOP and update MAX_SPEED in Cell 1

import time
import threading
import cv2
import ipywidgets as widgets
from IPython.display import display
from jetbot import Camera, Robot, bgr8_to_jpeg

# Hardware
camera = Camera.instance(width=CAMERA_WIDTH, height=CAMERA_HEIGHT)
robot = Robot()

# Preprocessing
def preprocess(img):
    h, w = img.shape[:2]
    y0, y1 = int(h * CROP_TOP), int(h * (1 - CROP_BOTTOM))
    x0, x1 = int(w * CROP_LEFT), int(w * (1 - CROP_RIGHT))
    return cv2.resize(img[y0:y1, x0:x1], INPUT_SIZE)

def calculate_blur_metric(img):
    """Calculate Laplacian variance - lower = more blur."""
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    return cv2.Laplacian(gray, cv2.CV_64F).var()

# Widgets
image_widget = widgets.Image(format='jpeg', width=224, height=224)
speed_slider = widgets.FloatSlider(
    value=MIN_SPEED, min=MIN_SPEED, max=0.35, step=0.01,
    description='Speed:', layout=widgets.Layout(width='400px')
)
info_label = widgets.Label(value=f'Speed: {MIN_SPEED:.2f} | Blur: --')
blur_indicator = widgets.HTML(value='<b style="color:green">Image Quality: Good</b>')
status_label = widgets.Label(value='Ready - Press START')

start_btn = widgets.Button(description='START', button_style='success')
stop_btn = widgets.Button(description='STOP', button_style='danger')
mark_btn = widgets.Button(description='MARK BLUR POINT', button_style='warning')

blur_point = {'speed': None}
running = False

def tuning_loop():
    global running
    blur_values = []
    
    while running:
        raw = camera.value
        processed = preprocess(raw)
        
        # Blur metric
        blur = calculate_blur_metric(processed)
        blur_values.append(blur)
        if len(blur_values) > 10:
            blur_values.pop(0)
        avg_blur = sum(blur_values) / len(blur_values)
        
        # Update blur indicator
        if avg_blur > 100:
            blur_indicator.value = f'<b style="color:green">Image Quality: Good ({avg_blur:.0f})</b>'
        elif avg_blur > 50:
            blur_indicator.value = f'<b style="color:orange">Image Quality: OK ({avg_blur:.0f})</b>'
        else:
            blur_indicator.value = f'<b style="color:red">Image Quality: BLURRY ({avg_blur:.0f})</b>'
        
        # Drive straight
        speed = speed_slider.value
        robot.left_motor.value = speed
        robot.right_motor.value = speed
        
        # UI
        image_widget.value = bgr8_to_jpeg(processed)
        info_label.value = f'Speed: {speed:.2f} | Blur: {avg_blur:.0f}'
        
        time.sleep(0.05)
    
    robot.stop()

def on_start(b):
    global running
    if not running:
        running = True
        status_label.value = 'RUNNING - Increase speed gradually'
        threading.Thread(target=tuning_loop, daemon=True).start()

def on_stop(b):
    global running
    running = False
    robot.stop()
    status_label.value = 'STOPPED'
    if blur_point['speed']:
        print(f"\nBlur detected at speed: {blur_point['speed']:.2f}")
        print(f"Recommended MAX_SPEED: {blur_point['speed'] - 0.02:.2f}")

def on_mark(b):
    blur_point['speed'] = speed_slider.value
    status_label.value = f'MARKED blur at speed {blur_point["speed"]:.2f}'

start_btn.on_click(on_start)
stop_btn.on_click(on_stop)
mark_btn.on_click(on_mark)

display(widgets.VBox([image_widget, speed_slider, blur_indicator, info_label, status_label]))
display(widgets.HBox([start_btn, mark_btn, stop_btn]))

In [None]:
# Cleanup after speed tuning
running = False
robot.stop()
camera.stop()
print("Stopped.")

In [None]:
# =============================================================================
# CELL 3: DATA COLLECTION (Steering + Speed)
# =============================================================================
# Hold RB to drive and record. Left stick = steering, Right trigger = speed.

import os
import time
import threading
import cv2
import numpy as np
import ipywidgets as widgets
from IPython.display import display
from jetbot import Camera, Robot, bgr8_to_jpeg

# Create directory
os.makedirs(DATASET_DIR, exist_ok=True)
existing = len([f for f in os.listdir(DATASET_DIR) if f.endswith('.jpg')])
print(f"Dataset: {DATASET_DIR} ({existing} existing images)")

# Hardware
camera = Camera.instance(width=CAMERA_WIDTH, height=CAMERA_HEIGHT)
robot = Robot()

# Preprocessing
def preprocess(img):
    h, w = img.shape[:2]
    y0, y1 = int(h * CROP_TOP), int(h * (1 - CROP_BOTTOM))
    x0, x1 = int(w * CROP_LEFT), int(w * (1 - CROP_RIGHT))
    return cv2.resize(img[y0:y1, x0:x1], INPUT_SIZE)

# Widgets
image_widget = widgets.Image(format='jpeg', width=224, height=224)
steering_slider = widgets.FloatSlider(value=0, min=-1, max=1, description='Steering:', disabled=True)
speed_slider = widgets.FloatSlider(value=0, min=0, max=1, description='Speed:', disabled=True)
count_widget = widgets.IntText(value=existing, description='Images:', disabled=True)
status_label = widgets.Label(value='Ready - Press START')
start_btn = widgets.Button(description='START', button_style='success')
stop_btn = widgets.Button(description='STOP', button_style='danger')
controller = widgets.Controller()

running = False
image_count = existing

def collection_loop():
    global running, image_count
    
    while running:
        t0 = time.time()
        
        raw = camera.value
        processed = preprocess(raw)
        
        # Read controller
        try:
            steering = controller.axes[AXIS_STEERING].value
            speed_raw = controller.buttons[RT_BUTTON].value  # RT is 0-1
            rb = controller.buttons[RB_BUTTON].value > 0.5
        except:
            steering, speed_raw, rb = 0.0, 0.0, False
        
        # Apply deadzone
        if abs(steering) < DEADZONE:
            steering = 0.0
        
        # Speed factor is directly from RT (0-1)
        speed_factor = max(0, min(1, speed_raw))
        actual_speed = speed_factor_to_actual(speed_factor)
        
        if rb:
            # Drive
            left = actual_speed + steering * STEERING_GAIN
            right = actual_speed - steering * STEERING_GAIN
            robot.left_motor.value = max(-1, min(1, left))
            robot.right_motor.value = max(-1, min(1, right))
            
            # Save image with steering AND speed_factor
            filename = f"{int(time.time()*1000)}_{steering:.3f}_{speed_factor:.3f}.jpg"
            cv2.imwrite(os.path.join(DATASET_DIR, filename), processed)
            image_count += 1
            status_label.value = f'RECORDING ({image_count}) | Steer: {steering:.2f} | Speed: {speed_factor:.2f}'
        else:
            robot.stop()
            status_label.value = f'PAUSED ({image_count}) - Hold RB to record'
        
        # UI
        image_widget.value = bgr8_to_jpeg(processed)
        steering_slider.value = steering
        speed_slider.value = speed_factor
        count_widget.value = image_count
        
        time.sleep(max(0, LOOP_SLEEP - (time.time() - t0)))
    
    robot.stop()
    status_label.value = f'STOPPED ({image_count} images)'

def on_start(b):
    global running
    if not running:
        running = True
        threading.Thread(target=collection_loop, daemon=True).start()

def on_stop(b):
    global running
    running = False
    robot.stop()

start_btn.on_click(on_start)
stop_btn.on_click(on_stop)

print("Connect controller. Hold RB to drive and record.")
print("Left stick = steering, Right trigger = speed")
display(controller)
display(widgets.VBox([image_widget, steering_slider, speed_slider, count_widget, status_label]))
display(widgets.HBox([start_btn, stop_btn]))

In [None]:
# Cleanup after data collection
running = False
robot.stop()
camera.stop()
print(f"Done. {image_count} images in {DATASET_DIR}")

In [None]:
# =============================================================================
# CELL 4: INFERENCE (Autonomous Driving with Speed Control)
# =============================================================================
# Model predicts both steering and speed.
# Speed factor (0-1) is converted to actual speed (MIN_SPEED to MAX_SPEED)

import os
import time
import threading
import cv2
import numpy as np
import torch
import torch.nn as nn
import torchvision.models as models
import ipywidgets as widgets
from IPython.display import display
from jetbot import Camera, Robot, bgr8_to_jpeg

# Model
def get_model():
    model = models.resnet18(pretrained=False)
    model.fc = nn.Linear(model.fc.in_features, 2)  # 2 outputs: steering, speed
    return model

# Preprocessing
def preprocess_for_inference(img):
    h, w = img.shape[:2]
    y0, y1 = int(h * CROP_TOP), int(h * (1 - CROP_BOTTOM))
    x0, x1 = int(w * CROP_LEFT), int(w * (1 - CROP_RIGHT))
    cropped = cv2.resize(img[y0:y1, x0:x1], INPUT_SIZE)
    
    rgb = cv2.cvtColor(cropped, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0
    for i in range(3):
        rgb[:,:,i] = (rgb[:,:,i] - IMAGENET_MEAN[i]) / IMAGENET_STD[i]
    
    tensor = torch.from_numpy(rgb.transpose(2,0,1)).unsqueeze(0)
    return tensor, cropped

# Load model
print(f"Loading {MODEL_PATH}...")
model = get_model()
model.load_state_dict(torch.load(MODEL_PATH, map_location=DEVICE))
model.to(DEVICE).eval()
print("Model loaded.")

# Hardware
camera = Camera.instance(width=CAMERA_WIDTH, height=CAMERA_HEIGHT)
robot = Robot()

# Widgets
image_widget = widgets.Image(format='jpeg', width=224, height=224)
steering_slider = widgets.FloatSlider(
    value=0, min=-1, max=1, description='Steering:', disabled=True
)
speed_slider = widgets.FloatSlider(
    value=0, min=0, max=1, description='Speed:', disabled=True
)
actual_speed_label = widgets.Label(value=f'Actual: {MIN_SPEED:.2f}')
fps_widget = widgets.FloatText(value=0, description='FPS:', disabled=True)
status_label = widgets.Label(value='Ready')

start_btn = widgets.Button(description='START', button_style='success')
stop_btn = widgets.Button(description='STOP', button_style='danger')

running = False

def inference_loop():
    global running
    frame_times = []
    
    while running:
        t0 = time.time()
        
        # Get prediction
        raw = camera.value
        tensor, display_img = preprocess_for_inference(raw)
        
        with torch.no_grad():
            outputs = model(tensor.to(DEVICE))
            steering = outputs[0, 0].item()
            speed_factor = outputs[0, 1].item()
        
        # Clamp values
        steering = max(-1, min(1, steering))
        speed_factor = max(0, min(1, speed_factor))
        
        # Convert speed factor to actual speed
        actual_speed = speed_factor_to_actual(speed_factor)
        
        # Drive
        left = actual_speed + steering * STEERING_GAIN
        right = actual_speed - steering * STEERING_GAIN
        robot.left_motor.value = max(-1, min(1, left))
        robot.right_motor.value = max(-1, min(1, right))
        
        # FPS
        frame_times.append(time.time() - t0)
        if len(frame_times) > 30:
            frame_times.pop(0)
        fps = 1.0 / (sum(frame_times) / len(frame_times))
        
        # UI
        image_widget.value = bgr8_to_jpeg(display_img)
        steering_slider.value = steering
        speed_slider.value = speed_factor
        actual_speed_label.value = f'Actual: {actual_speed:.2f}'
        fps_widget.value = round(fps, 1)
        status_label.value = f'RUNNING | Steer: {steering:.2f} | Speed: {actual_speed:.2f}'
        
        # Loop timing
        elapsed = time.time() - t0
        if LOOP_SLEEP - elapsed > 0:
            time.sleep(LOOP_SLEEP - elapsed)
    
    robot.stop()
    status_label.value = 'STOPPED'

def on_start(b):
    global running
    if not running:
        running = True
        threading.Thread(target=inference_loop, daemon=True).start()

def on_stop(b):
    global running
    running = False
    robot.stop()

start_btn.on_click(on_start)
stop_btn.on_click(on_stop)

print(f"Speed range: {MIN_SPEED} - {MAX_SPEED}")
display(widgets.VBox([image_widget, steering_slider, speed_slider, actual_speed_label, fps_widget, status_label]))
display(widgets.HBox([start_btn, stop_btn]))

In [None]:
# Cleanup after inference
running = False
robot.stop()
camera.stop()
print("Stopped.")

In [None]:
# =============================================================================
# CELL 5: DAgger (Dataset Aggregation) - Steering + Speed
# =============================================================================
# Model drives autonomously. Hold RB to take over and save corrections.
# When correcting: Left stick = steering, Right trigger = speed.
#
# The model predicts both steering and speed, but when you intervene,
# YOUR steering and speed inputs are saved as corrections.

import os
import time
import threading
import cv2
import numpy as np
import torch
import torch.nn as nn
import torchvision.models as models
import ipywidgets as widgets
from IPython.display import display
from jetbot import Camera, Robot, bgr8_to_jpeg

# Create DAgger directory
os.makedirs(DAGGER_DIR, exist_ok=True)
existing = len([f for f in os.listdir(DAGGER_DIR) if f.endswith('.jpg')])
print(f"DAgger dir: {DAGGER_DIR} ({existing} existing corrections)")

# Model (2 outputs: steering, speed)
def get_model():
    model = models.resnet18(pretrained=False)
    model.fc = nn.Linear(model.fc.in_features, 2)
    return model

# Preprocessing
def preprocess_for_save(img):
    h, w = img.shape[:2]
    y0, y1 = int(h * CROP_TOP), int(h * (1 - CROP_BOTTOM))
    x0, x1 = int(w * CROP_LEFT), int(w * (1 - CROP_RIGHT))
    return cv2.resize(img[y0:y1, x0:x1], INPUT_SIZE)

def preprocess_for_inference(img):
    cropped = preprocess_for_save(img)
    rgb = cv2.cvtColor(cropped, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0
    for i in range(3):
        rgb[:,:,i] = (rgb[:,:,i] - IMAGENET_MEAN[i]) / IMAGENET_STD[i]
    tensor = torch.from_numpy(rgb.transpose(2,0,1)).unsqueeze(0)
    return tensor, cropped

# Load model
print(f"Loading {MODEL_PATH}...")
model = get_model()
model.load_state_dict(torch.load(MODEL_PATH, map_location=DEVICE))
model.to(DEVICE).eval()
print("Model loaded.")

# Hardware
camera = Camera.instance(width=CAMERA_WIDTH, height=CAMERA_HEIGHT)
robot = Robot()

# Widgets
image_widget = widgets.Image(format='jpeg', width=224, height=224)
model_steer_slider = widgets.FloatSlider(value=0, min=-1, max=1, description='Model Steer:', disabled=True)
model_speed_slider = widgets.FloatSlider(value=0, min=0, max=1, description='Model Speed:', disabled=True)
human_steer_slider = widgets.FloatSlider(value=0, min=-1, max=1, description='Human Steer:', disabled=True)
human_speed_slider = widgets.FloatSlider(value=0, min=0, max=1, description='Human Speed:', disabled=True)
count_widget = widgets.IntText(value=existing, description='Corrections:', disabled=True)
status_label = widgets.Label(value='Ready')
start_btn = widgets.Button(description='START', button_style='success')
stop_btn = widgets.Button(description='STOP', button_style='danger')
controller = widgets.Controller()

running = False
correction_count = existing

def dagger_loop():
    global running, correction_count
    
    while running:
        t0 = time.time()
        
        raw = camera.value
        tensor, display_img = preprocess_for_inference(raw)
        
        # Model prediction
        with torch.no_grad():
            outputs = model(tensor.to(DEVICE))
            model_steer = outputs[0, 0].item()
            model_speed_factor = outputs[0, 1].item()
        
        model_steer = max(-1, min(1, model_steer))
        model_speed_factor = max(0, min(1, model_speed_factor))
        
        # Human input
        try:
            human_steer = controller.axes[AXIS_STEERING].value
            human_speed_raw = controller.buttons[RT_BUTTON].value  # RT is 0-1
            rb = controller.buttons[RB_BUTTON].value > 0.5
        except:
            human_steer, human_speed_raw, rb = 0.0, 0.0, False
        
        # Apply deadzone to steering
        if abs(human_steer) < DEADZONE:
            human_steer = 0.0
        
        human_speed_factor = max(0, min(1, human_speed_raw))
        
        # Who controls?
        if rb:
            # Human takes over
            active_steer = human_steer
            active_speed_factor = human_speed_factor
            
            # Save correction with BOTH steering and speed
            processed = preprocess_for_save(raw)
            filename = f"{int(time.time()*1000)}_{human_steer:.3f}_{human_speed_factor:.3f}.jpg"
            cv2.imwrite(os.path.join(DAGGER_DIR, filename), processed)
            correction_count += 1
            status_label.value = f'CORRECTING - RB held ({correction_count})'
        else:
            # Model drives
            active_steer = model_steer
            active_speed_factor = model_speed_factor
            status_label.value = f'MODEL DRIVING ({correction_count} corrections)'
        
        # Convert speed factor to actual speed and drive
        actual_speed = speed_factor_to_actual(active_speed_factor)
        left = actual_speed + active_steer * STEERING_GAIN
        right = actual_speed - active_steer * STEERING_GAIN
        robot.left_motor.value = max(-1, min(1, left))
        robot.right_motor.value = max(-1, min(1, right))
        
        # UI
        image_widget.value = bgr8_to_jpeg(display_img)
        model_steer_slider.value = model_steer
        model_speed_slider.value = model_speed_factor
        human_steer_slider.value = human_steer
        human_speed_slider.value = human_speed_factor
        count_widget.value = correction_count
        
        time.sleep(max(0, LOOP_SLEEP - (time.time() - t0)))
    
    robot.stop()
    status_label.value = f'STOPPED ({correction_count} corrections)'

def on_start(b):
    global running
    if not running:
        running = True
        threading.Thread(target=dagger_loop, daemon=True).start()

def on_stop(b):
    global running
    running = False
    robot.stop()

start_btn.on_click(on_start)
stop_btn.on_click(on_stop)

print("Connect controller. Model drives, hold RB to correct.")
print("When correcting: Left stick = steering, Right trigger = speed")
display(controller)
display(widgets.VBox([
    image_widget,
    widgets.HTML('<b>Model:</b>'),
    model_steer_slider, model_speed_slider,
    widgets.HTML('<b>Human:</b>'),
    human_steer_slider, human_speed_slider,
    count_widget, status_label
]))
display(widgets.HBox([start_btn, stop_btn]))

In [None]:
# Cleanup after DAgger
running = False
robot.stop()
camera.stop()
print(f"Done. {correction_count} corrections in {DAGGER_DIR}")
print("Copy DAgger folder to PC, combine with original data, and retrain.")