In [16]:
import threading
import time
import cv2
import numpy
import ipywidgets.widgets as widgets
from IPython.display import display, HTML
import sys
import time
import motors
import zed
#define the thread function
def thread_function():
    #Let computer relax for 1ms
    time.sleep(0.001)


#link the thread with the target function
x = threading.Thread(target = thread_function)
x.start()

In [17]:
from zed import Camera 
import traitlets
from traitlets.config.configurable import SingletonConfigurable

import threading

def bgr8_to_jpeg(value):#convert numpy array to jpeg coded data for displaying 
    return bytes(cv2.imencode('.jpg',value)[1])

#create a camera object
camera = Camera.instance()
camera.start() # start capturing the data


In [21]:
import cv2
import numpy as np
import time
import ipywidgets.widgets as widgets
from IPython.display import display
from zed import Camera            # Import the ZED camera interface
import traitlets

# --- Utility Function ---
def bgr8_to_jpeg(image):
    """Convert a BGR image (numpy array) to JPEG-encoded bytes."""
    return bytes(cv2.imencode('.jpg', image)[1])

# --- Custom PID Controller ---
class PID:
    def __init__(self, Kp, Ki, Kd, setpoint=0, sample_time=0.01, output_limits=(-100, 100)):
        self.Kp = Kp
        self.Ki = Ki
        self.Kd = Kd
        self.setpoint = setpoint
        self.sample_time = sample_time
        self.output_limits = output_limits
        self._last_time = time.time()
        self._last_error = 0.0
        self._integral = 0.0

    def update(self, measurement):
        now = time.time()
        dt = now - self._last_time
        if dt < self.sample_time:
            return None  # Not enough time has elapsed for an update.
        error = self.setpoint - measurement
        self._integral += error * dt
        derivative = (error - self._last_error) / dt if dt > 0 else 0.0
        output = (self.Kp * error) + (self.Ki * self._integral) + (self.Kd * derivative)
        lower, upper = self.output_limits
        if lower is not None:
            output = max(lower, output)
        if upper is not None:
            output = min(upper, output)
        self._last_error = error
        self._last_time = now
        return output

# --- Global Objects & Parameters ---
robot = motors.MotorsYukon(mecanum=True)  # Instantiate your motor controller.
pid_controller = PID(Kp=0.1, Ki=0.01, Kd=0.05, setpoint=0, output_limits=(-100, 100))
base_speed = 0.3          # Base forward speed.
error_threshold = 100      # Pixel error threshold for going straight.

# --- Set Up the ZED Camera and Display Widgets ---
camera = Camera.instance()  # Create a singleton camera instance.
camera.start()              # Start capturing images.

# Create ipywidgets for displaying the color and depth images.
display_color = widgets.Image(format='jpeg', width='100%')
display_depth = widgets.Image(format='jpeg', width='100%')
display_mask  = widgets.Image(format='jpeg', width='320px')
layout = widgets.Layout(width='100%')
sidebyside = widgets.HBox([display_color, display_depth, display_mask], layout=layout)
display(sidebyside)

# --- Process Callback Function ---
def process(change):
    """
    This callback is invoked whenever a new color frame is available.
    It processes the frame to detect a yellow rope and computes motor actions.
    """
    frame = change['new']
    if frame is None:
        return

    # Crop the image to focus on the ground (bottom 50% of the frame)
    height, width = frame.shape[:2]
    crop_ratio = 2 # Adjust this ratio to keep more or less of the bottom portion.
    roi = frame[int(height * crop_ratio):, :] 

    
    # For visualization, you can also draw the ROI rectangle on the original frame.
    cv2.rectangle(frame, (0, int(height * crop_ratio)), (width, height), (0, 255, 0), 2)

    # Copy the frame for processing.
    proc_frame = roi.copy()

    # Convert BGR to HSV and create a mask for the yellow rope.
    hsv = cv2.cvtColor(proc_frame, cv2.COLOR_BGR2HSV)
    lower_yellow = np.array([10, 80, 80])
    upper_yellow = np.array([40, 255, 255])
    mask = cv2.inRange(hsv, lower_yellow, upper_yellow)

    # Clean up the mask using morphological operations.
    kernel = np.ones((5, 5), np.uint8)
    mask = cv2.erode(mask, kernel, iterations=1)
    mask = cv2.dilate(mask, kernel, iterations=2)

    # Compute the moments of the mask to locate the rope's centroid.
    M = cv2.moments(mask)
    if M['m00'] > 0:
        cx = int(M['m10'] / M['m00'])
        cy = int(M['m01'] / M['m00'])
        # Visualize the centroid.
        cv2.circle(proc_frame, (cx, cy), 5, (255, 0, 0), -1)

        # Calculate horizontal error (difference between image center and centroid).
        frame_center = proc_frame.shape[1] // 2
        error = frame_center - cx

        # Update the PID controller.
        control_output = pid_controller.update(error)
        if control_output is not None:
            # If error is small, move forward.
            if abs(error) < error_threshold:
                robot.forward(speed=base_speed)
                print("Moving Forward")
            elif error > 0:
                # Rope is left of center; turn left.
                turn_speed = min(0.6, base_speed + (abs(control_output) / 100.0) * 0.3)
                robot.right(speed=turn_speed)
                print(f"Turning Left | Error: {error}, Turn Speed: {turn_speed}")
            else:
                # Rope is right of center; turn right.
                turn_speed = min(0.6, base_speed + (abs(control_output) / 100.0) * 0.3)
                robot.left(speed=turn_speed)
                print(f"Turning Right | Error: {error}, Turn Speed: {turn_speed}")
    else:
        # If the rope is not detected, stop the robot.
        robot.stop()
        print("Rope not detected. Stopping.")

    # Update the widget displays with resized images.
    display_color.value = bgr8_to_jpeg(cv2.resize(proc_frame, (320, 240)))
    display_depth.value = bgr8_to_jpeg(cv2.resize(camera.depth_value, (320, 240)))
    # Convert the mask (grayscale) to BGR for proper JPEG encoding and display.
    mask_bgr = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)
    display_mask.value = bgr8_to_jpeg(cv2.resize(mask_bgr, (320, 240)))

# --- Start Observing the Camera's Color Frame ---
camera.observe(process, names='color_value')

HBox(children=(Image(value=b'', format='jpeg', width='100%'), Image(value=b'', format='jpeg', width='100%'), I…