In [38]:
import traitlets
import cv2
import numpy as np
import pyzed.sl as sl
import math
import numpy as np
import sys
import math
import threading
import motors
import ipywidgets.widgets as widgets
from IPython.display import display
from traitlets.config.configurable import HasTraits

In [36]:
LIGHT_YELLOW = np.array([20,100, 100])
DARK_YELLOW = np.array([30,255,255])


class Camera(HasTraits):
    color_value = traitlets.Any() # monitor the color_value variable
    cx, cy = traitlets.Any(), traitlets.Any()
    def __init__(self):
        super(Camera, self).__init__()

        self.zed = sl.Camera()
        # Create a InitParameters object and set configuration parameters
        init_params = sl.InitParameters()
        init_params.camera_resolution = sl.RESOLUTION.HD1080 #VGA(672*376), HD720(1280*720), HD1080 (1920*1080) or ...
        init_params.depth_mode = sl.DEPTH_MODE.ULTRA  # Use ULTRA depth mode
        init_params.coordinate_units = sl.UNIT.MILLIMETER  # Use meter units (for depth measurements)

        # Open the camera
        status = self.zed.open(init_params)
        if status != sl.ERROR_CODE.SUCCESS: #Ensure the camera has opened succesfully
            print("Camera Open : "+repr(status)+". Exit program.")
            self.zed.close()
            exit(1)

         # Create and set RuntimeParameters after opening the camera
        self.runtime = sl.RuntimeParameters()

        #flag to control the thread
        self.thread_runnning_flag = False

        # Get the height and width
        camera_info = self.zed.get_camera_information()
        self.width = camera_info.camera_configuration.resolution.width
        self.height = camera_info.camera_configuration.resolution.height
        self.image = sl.Mat(self.width,self.height,sl.MAT_TYPE.U8_C4, sl.MEM.CPU)
        self.depth = sl.Mat(self.width,self.height,sl.MAT_TYPE.F32_C1, sl.MEM.CPU)
        self.point_cloud = sl.Mat(self.width,self.height,sl.MAT_TYPE.F32_C4, sl.MEM.CPU) 

    def _capture_frames(self): #For data capturing only

        while(self.thread_runnning_flag==True): #continue until the thread_runnning_flag is set to be False
           
            if self.zed.grab(self.runtime) == sl.ERROR_CODE.SUCCESS:
                
                # Retrieve Left image
                self.zed.retrieve_image(self.image, sl.VIEW.LEFT)
                # Retrieve depth map. Depth is aligned on the left image
                self.zed.retrieve_measure(self.depth, sl.MEASURE.DEPTH)

                frame = self.image.get_data()
                frame[:720,:] = 0
                scale = 0.2
                frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
                

                
                hsv = cv2.cvtColor(frame,cv2.COLOR_BGR2HSV)
                mask = cv2.inRange(hsv, LIGHT_YELLOW, DARK_YELLOW)
                kernel = np.ones((5,5), np.uint8)
                mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
                contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
                
                if contours:
                    c = max(contours, key=cv2.contourArea)
                    M = cv2.moments(c)
                    if M["m00"] > 0:
                        self.cx = int(M["m10"] / M["m00"])
                        self.cy = int(M["m01"] / M["m00"])

                        cv2.circle(frame, (self.cx, self.cy), 5, (0, 0, 255), -1)

                        frame_center = frame.shape[1] // 2
                        error = self.cx - frame_center
                        correction = 0.1 * error
                        #print(f"error:  {error} correction: {correction}")

                #cv2.imshow("Original", frame)
                #cv2.imshow("Mask", mask)
                self.color_value = self.image.get_data()
                #When displaying the images, the robot sends data to the computers. If the images are in very high resolution, it may cause network issues. Since the images are for display purposes only, please use a lower resolution if possible. Thank you.
                scale = 0.1
                resized_image = cv2.resize(self.color_value, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
                cv2.circle(resized_image, (int(self.width*scale//2),int(self.height*scale//2)), 1, (0, 255, 0))
                display_color.value = bgr8_to_jpeg(frame)
                
                # self.depth_image = np.asanyarray(self.depth.get_data())

                # depth_image_test = self.depth_image.copy()  #copy the depth image
                # depth_image_test = np.nan_to_num(depth_image_test, nan=0.0).astype(np.float32)  # Change NaN value to 0
                # depth_image_test[282:,:]=0
                # depth_image_test[:,:168]=0
                # depth_image_test[:,504:]=0

                # depth_image_test[depth_image_test<100]=0
                # depth_image_test[depth_image_test>1000]=0
                # depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image_test, alpha=0.03), cv2.COLORMAP_JET)
                #if(depth_image_test.max() == 0):
                #    robot.left(1) #turn left at full speed
                #    cv2.putText(depth_colormap, 'warning!!!', (336,188), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
                #else:
                #    distance = depth_image_test[depth_image_test!=0].min()
                #    
                #    if(distance < 400):
                #        robot.left(1) #turn left at full speed
                #        cv2.putText(depth_colormap, 'warning!!!', (336,188), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
                #    else:
                #        robot.forward(0.5) #move forward at half speed
                    
                # resized_depth_colormap=cv2.resize(depth_colormap, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
                display_depth.value = bgr8_to_jpeg(mask)
                
    def start(self): #start the data capture thread
        if self.thread_runnning_flag == False: #only process if no thread is running yet
            self.thread_runnning_flag=True #flag to control the operation of the _capture_frames function
            self.thread = threading.Thread(target=self._capture_frames) #link thread with the function
            self.thread.start() #start the thread

    def stop(self): #stop the data capture thread
        if self.thread_runnning_flag == True:
            self.thread_runnning_flag = False #exit the while loop in the _capture_frames
            self.thread.join() #wait the exiting of the thread
            self.zed.close()

def bgr8_to_jpeg(value):#convert numpy array to jpeg coded data for displaying 
    return bytes(cv2.imencode('.jpg',value)[1])

camera = Camera()
camera.start()

[2025-03-05 10:03:33 UTC][ZED][INFO] Logging level INFO
[2025-03-05 10:03:33 UTC][ZED][INFO] Logging level INFO
[2025-03-05 10:03:33 UTC][ZED][INFO] Logging level INFO
[2025-03-05 10:03:34 UTC][ZED][INFO] [Init]  Depth mode: ULTRA
[2025-03-05 10:03:35 UTC][ZED][INFO] [Init]  Camera successfully opened.
[2025-03-05 10:03:35 UTC][ZED][INFO] [Init]  Camera FW version: 1523
[2025-03-05 10:03:35 UTC][ZED][INFO] [Init]  Video mode: HD1080@30
[2025-03-05 10:03:35 UTC][ZED][INFO] [Init]  Serial Number: S/N 35129214
242

Exception in thread Exception in threading.excepthook:
Exception ignored in thread started by: <bound method Thread._bootstrap of <Thread(Thread-17 (_capture_frames), stopped 281471730118944)>>
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 973, in _bootstrap
    self._bootstrap_inner()
  File "/usr/lib/python3.10/threading.py", line 1018, in _bootstrap_inner
    self._invoke_excepthook(self)
  File "/usr/lib/python3.10/threading.py", line 1336, in invoke_excepthook
    local_print("Exception in threading.excepthook:",
  File "/usr/local/lib/python3.10/dist-packages/ipykernel/iostream.py", line 604, in flush
    self.pub_thread.schedule(self._flush)
  File "/usr/local/lib/python3.10/dist-packages/ipykernel/iostream.py", line 267, in schedule
    self._event_pipe.send(b"")
  File "/usr/local/lib/python3.10/dist-packages/zmq/sugar/socket.py", line 707, in send
    return super().send(data, flags=flags, copy=copy, track=track)
  File "_zmq.py", line 109

242

In [4]:
display_color = widgets.Image(format='jpeg', width='45%') 
display_depth = widgets.Image(format='jpeg', width='45%')  
layout=widgets.Layout(width='100%')

sidebyside = widgets.HBox([display_color, display_depth],layout=layout) #horizontal 
display(sidebyside) #display the widget

def func(change):
    frame = change['new']
    #display the first 100 images
    scale = 0.1
    resized_image = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
    display_color.value = bgr8_to_jpeg(resized_image)
    
    depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(camera.depth_image , alpha=0.03), cv2.COLORMAP_JET)
    resized_depth_colormap=cv2.resize(depth_colormap, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
    display_depth.value = bgr8_to_jpeg(resized_depth_colormap)

camera.observe(func, names=['color_value'])

HBox(children=(Image(value=b'', format='jpeg', width='45%'), Image(value=b'', format='jpeg', width='45%')), la…

In [37]:

robot = motors.MotorsYukon(mecanum=False)

def handleMotion(change):
    temp = change['new']
    OFFSET = 10
    while True:
        if temp > 540 + OFFSET:
            robot.right(0.1)
        elif temp < 540 - OFFSET:
            robot.left(0.1)
        else:
            robot.forward(0.1)
        print(temp, end='\r')
camera.observe(handleMotion, names=['cx'])

In [40]:
camera.stop()
robot.stop()