In [None]:
# TODO: Change the values of these.
# Define range of red color in HSV space
min_red1 = (0, 0, 0)
max_red1 = (180, 255, 255)
min_red2 = (0, 0, 0)
max_red2 = (180, 255, 255)

# Minimum area we look at
# Optional: Change this as needed
min_area = 800

# TODO: Put in your HSV values you found in Module 6
lower_yellow = (0, 0, 0)
upper_yellow = (180, 255, 255)

bias = 0

# TODO: Kp and Kd values here
Kp = 0
Kd = 0

In [None]:
from jetbot import Robot, Camera, bgr8_to_jpeg
import cv2
import numpy as np
from IPython.display import display, Image, clear_output
import ipywidgets as widgets
import time
import threading
from scipy.signal import medfilt
from collections import deque

robot = Robot()
# Initialize our camera
camera = Camera.instance()

# Save image data (modified to be numpy array)
image = np.array(camera.value)

jpeg_image = bgr8_to_jpeg(image)

# Display image within Jupyter Notebook
display(Image(data=jpeg_image))

In [None]:
# Function of Image Processing
def stopsign_detection(frame, overlay):
    global robot_stopsign_detected
    global recent_detection
    global buffer_time
    clear_output(wait = True)
    orig_image = np.array(overlay)
    image = frame

    # Convert to HSV (Hue, Saturation, Value) color space
    hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)

    # Create a mask for red color
    mask1 = cv2.inRange(hsv, min_red1, max_red1)
    mask2 = cv2.inRange(hsv, min_red2, max_red2)
    mask = mask1 | mask2

    # Apply the mask to the image
    red_traffic_signs = cv2.bitwise_and(image, image, mask=mask)

    # Dilate image to get rid of white stop text
    
    dilated_image = cv2.dilate(red_traffic_signs, np.ones((3, 3), np.uint8), iterations = 3)
    eroded_image = cv2.erode(dilated_image, np.ones((5, 5), np.uint8))

    # Convert to grayscale
    gray = cv2.cvtColor(eroded_image, cv2.COLOR_BGR2GRAY)

    # Apply Gaussian blur
    blurred = cv2.GaussianBlur(gray, (5, 5), 0)

    # Detect edges using Canny edge detector
    edges = cv2.Canny(blurred, 100, 200)

    # Find contours
    contours, hierarchy = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    # Loop through contours
    for contour in contours:
        contour_area = cv2.contourArea(contour)
        if contour_area > min_area:
            perimeter = cv2.arcLength(contour, True)
            circularity = 4 * np.pi * (contour_area / (perimeter * perimeter))
            # Approximate the contour to a polygon
            epsilon = 0.01 * perimeter
            approx = cv2.approxPolyDP(contour, epsilon, True)
            
            vertices_label.value = f"Number of Vertices: {len(approx)}"
            circularity_label.value = f"Circularity: {circularity:.2f}"

            # If the shape has 8 vertices, it could be a stop sign (octagon)
            if len(approx) >= 5 and len(approx) <= 13 and circularity > 0.7 and circularity < 1.3:
                cv2.drawContours(orig_image, [approx], -1, (0, 255, 0), 2)
                cv2.putText(orig_image, 'Stop Sign', tuple(approx[0][0]), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
                cv2.putText(orig_image, str(contour_area), (approx[0][0][0], approx[0][0][1] + 25), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
                if contour_area > 1000 and not robot_stopsign_detected and (time.time() - buffer_time) > 2:
                    recent_detection = time.time()
                    robot_stopsign_detected = True
    
    return orig_image, edges

# Brighten Gamma of Image
def adjust_gamma(image, gamma=1.2):
    invGamma = 1.0 / gamma
    table = np.array([(i / 255.0) ** invGamma * 255 for i in np.arange(256)]).astype("uint8")
    return cv2.LUT(image, table)

# Filter based on inter-quartile range (remove outliers)
def iqr_filter(x_list):
    x = np.array(x_list)
    q1, q3 = np.percentile(x, [25, 75])
    iqr = q3 - q1
    lower = q1 - 1.5 * iqr
    upper = q3 + 1.5 * iqr
    return [val for val in x if lower <= val <= upper]

# Calculate midpoints based on masked image
def lane_midpoint(masked_image):
    midline_points = []
    left_points = []
    right_points = []
    h, w = masked_image.shape[:2]

    for y in range(0, h, 4):
        x_candidates = [x for x in range(w) if masked_image[y, x] > 0]
        if len(x_candidates) >= 3:
            x_candidates = iqr_filter(x_candidates)
            left_x = min(x_candidates)
            right_x = max(x_candidates)
            center_x = int(np.mean(x_candidates))

            midline_points.append((center_x, y))
            left_points.append((left_x, y))
            right_points.append((right_x, y))

    if midline_points:
        x_vals = [x for x, _ in midline_points]
        x_smooth = medfilt(x_vals, kernel_size=5)
        midline_points = list(zip(x_smooth, [y for _, y in midline_points]))
    
    return midline_points, left_points, right_points

# Isolate Yellow Lane Marking
def get_yellow_lane(image, apply_gamma=True):

    # Brighten Images
    if apply_gamma:
        image = adjust_gamma(image, gamma=0.6)

    # Blur and Convert to HSV Color Space
    blur = cv2.bilateralFilter(image, 9, 75, 75)
    hsv = cv2.cvtColor(blur, cv2.COLOR_BGR2HSV)

    # HSV color masking
    mask_yellow = cv2.inRange(hsv, lower_yellow, upper_yellow)

    # Morphology to clean specks (Dilation and Erosion)
    kernel = np.ones((5, 5), np.uint8)
    opened = cv2.morphologyEx(mask_yellow, cv2.MORPH_OPEN, kernel, iterations=1)
    closed = cv2.morphologyEx(opened, cv2.MORPH_CLOSE, kernel, iterations=1)

    # Remove small blobs
    contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    clean_mask = np.zeros_like(closed)
    for cnt in contours:
        if cv2.contourArea(cnt) > 150:
            cv2.drawContours(clean_mask, [cnt], -1, 255, thickness=cv2.FILLED)
    contour_mask = clean_mask

    # Edge detection
    yellow_mask_blur = cv2.GaussianBlur(contour_mask, (5, 5), 0)
    sobelx = cv2.Sobel(yellow_mask_blur, cv2.CV_64F, 1, 0, ksize=3)
    sobelx = np.absolute(sobelx)
    if np.max(sobelx) > 0:
        sobelx = np.uint8(255 * sobelx / np.max(sobelx))
    else:
        sobelx = np.zeros_like(sobelx, dtype=np.uint8)
    _, edge_mask = cv2.threshold(sobelx, 50, 255, cv2.THRESH_BINARY)

    return mask_yellow, edge_mask

# Calculate error of slope from desired vertical line
def compute_slope_error(image, mid_pts, num_points=5):
    h, w = image.shape[:2]
    bottom_center = (w // 2, h - 1)
    if len(mid_pts) < num_points * 2:
        return None  # Not enough points

    # Takes sample of various points near top
    top_pts = mid_pts[:num_points]

    x_top_avg = np.median([pt[0] for pt in top_pts])
    y_top_avg = np.median([pt[1] for pt in top_pts])

    if bottom_center[1] == y_top_avg:
        return None  # Prevent division by zero

    # Calculate slope of line
    slope = (bottom_center[0] - x_top_avg) / (bottom_center[1] - y_top_avg)
    slope_error = 1/slope
    dx = bottom_center[0] - x_top_avg
    dy = bottom_center[1] - y_top_avg

    # Calculates angle of slope
    angle_rad = np.arctan2(dy, dx)
    angle_deg = np.degrees(angle_rad)

    # Calculating error from vertical
    slope_error_angle = angle_deg - 90

    out_img = image.copy()
    # Overlay line and text
    cv2.line(out_img, (int(x_top_avg), int(y_top_avg)), (int(bottom_center[0]), int(bottom_center[1])), (0, 255, 255), 2)
    cv2.putText(out_img, f"Angle: {slope_error_angle:.2f} deg", (30, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
    return out_img, slope_error_angle

# Lane Control
def lane_control(frame, overlay):
    try:
        # Find lane markings
        yellow_mask, edge_mask = get_yellow_lane(frame)
        # Calculate midpoint trajectory
        midline_points, left_points, right_points = lane_midpoint(edge_mask)
        # Create overlay
        for (x, y) in midline_points:
            cv2.circle(overlay, (int(x), int(y)), 2, (0, 255, 0), -1)
        for (x, y) in left_points:
            cv2.circle(overlay, (int(x), int(y)), 2, (255, 0, 0), -1)
        for (x, y) in right_points:
            cv2.circle(overlay, (int(x), int(y)), 2, (0, 0, 255), -1)
        # Calculate slope error
        result = compute_slope_error(overlay, midline_points, 1)

        if result is not None:
            out_img, slope_error_angle = result

            # Filter slopes
            filtered_slope = filter.update(slope_error_angle)
            slope_label.value = f"Slope Error: {filtered_slope:.2f}°"

            # Calculate control output
            control = pid.compute(filtered_slope)
            # Clip to max turn value
            control = max(min(control, max_turn), -max_turn)
            control_label.value = f"Control Output: {control:.4f}"
            left_speed = base_speed + bias
            right_speed = base_speed

            right_turn = right_speed - control   # turning right
            left_turn = left_speed + control     # turning left
            
            # Only changes one motor speed
            left_speed = max(left_speed, left_turn)
            right_speed = max(right_speed, right_turn)
            return left_speed, right_speed, out_img
        else:
            slope_label.value = "Slope Error: ---"
            control_label.value = "Control Output: ---"
            return 0, 0, frame
        
        

    except Exception as e:
        slope_label.value = f"Error: {str(e)}"
        control_label.value = "Control Output: ---"
        robot.stop()
        return None, None, frame

# Filters out outlier slopes
class SlopeFilter:
    def __init__(self, max_len=3, max_angle=40):
        self.buffer = deque(maxlen=max_len)
        self.max_angle = max_angle

    def update(self, new_val):
        if new_val is None:
            return self.get_median()

        if abs(new_val) <= self.max_angle:
            self.buffer.append(new_val)
        
        return self.get_median()

    def get_median(self):
        if not self.buffer:
            return 0.0  # default if empty
        return float(np.median(self.buffer))

# PD Controller Class
class PIDController:
    def __init__(self, Kp=0.4, Kd=0.2):
        self.Kp = Kp
        self.Kd = Kd
        self.last_error = 0.0
        self.last_time = time.time()

    def compute(self, error):
        current_time = time.time()
        dt = current_time - self.last_time
        de = (error - self.last_error) / dt if dt > 0 else 0.0

        output = self.Kp * error + self.Kd * de

        self.last_error = error
        self.last_time = current_time
        return output

In [None]:
# Widgets
overlay_widget = widgets.Image(format='jpeg', width=300, height=300)
edge_mask_widget = widgets.Image(format='jpeg', width=300, height=300)
slope_label = widgets.Label(value="Slope Error: ---")
control_label = widgets.Label(value="Control Output: ---")
vertices_label = widgets.Label(value = "Number of Vertices:")
circularity_label = widgets.Label(value = "Circularity:")
enabled_label = widgets.Label(value = "Enabled:")
detected_stop_label = widgets.Label(value = "Stop Sign:")
recent_detection_label = widgets.Label(value = "Last Detected:")
buffer_time_label = widgets.Label(value = "Buffer:")

# Start and stop buttons to control robot
go_button = widgets.Button(description="Go", button_style='success')
stop_button = widgets.Button(description="Stop", button_style='danger')

robot_enabled = False  # flag to control robot movement
robot_stopsign_detected = False
recent_detection = 0
buffer_time = 0

# Button Callbacks
def on_go_clicked(b):
    global robot_enabled
    robot_enabled = True

def on_stop_clicked(b):
    global robot_enabled
    robot_enabled = False
    robot.stop()

go_button.on_click(on_go_clicked)
stop_button.on_click(on_stop_clicked)

# Displaying the widgets
display(widgets.VBox([
    widgets.HBox([overlay_widget, edge_mask_widget]),
    widgets.HBox([go_button, stop_button]),
    slope_label, control_label,
    vertices_label, circularity_label,
    enabled_label, detected_stop_label,
    recent_detection_label, buffer_time_label
]))

# Class objects
filter = SlopeFilter(3, 40)
pid = PIDController(Kp=Kp, Kd=Kd)

# Driving Speeds for safe handling
base_speed = 0.12  # forward speed
max_turn = 0.08   # max allowed turn difference

stop_event = threading.Event()

go_button.on_click(on_go_clicked)
stop_button.on_click(on_stop_clicked)

def update_loop():
    global robot_enabled
    global robot_stopsign_detected
    global recent_detection
    global buffer_time
    while not stop_event.is_set():
        frame = camera.value
        overlay, edges = stopsign_detection(frame, frame)
        left_speed, right_speed, overlay = lane_control(frame, overlay)
        if robot_enabled:
            if robot_stopsign_detected:
                if (time.time() - recent_detection) > 3:
                    buffer_time = time.time()
                    robot_stopsign_detected = False
                else:
                    robot.stop()
            else:
                robot.set_motors(left_speed, right_speed)
        else:
            robot.stop()        
        overlay_widget.value = bgr8_to_jpeg(overlay)
        edge_mask_widget.value = bgr8_to_jpeg(edges)
        enabled_label.value = f"Enabled: {robot_enabled}"
        detected_stop_label.value = f"Stop Sign: {robot_stopsign_detected}"
        recent_detection_label.value = f"Last Detected: {time.time() - recent_detection:.2f}s ago"
        buffer_time_label.value = f"Buffer: {time.time() - buffer_time:.2f}s"
        

camera.start()
thread = threading.Thread(target=update_loop, daemon=True)
thread.start()

In [None]:
stop_event.set()
thread.join()
stop_event.clear()
camera.stop()
robot.stop()