## DATA CAPTURE FROM CAMERA

• CREATE CAMERA CLASS.   
• INITIALISE SETUP.    
• CREATE CAPTURE FUNCTION.    
• SAVE VIDEO CAPTURED.   

In [2]:
import os
from pathlib import Path

# --- FIND GIT REPOSITORY ROOT ---
def find_repo_root():
    path = Path.cwd().resolve()
    
    for parent in [path] + list(path.parents):
        if (parent / ".git").exists():
            return parent
    
    raise RuntimeError("Git repository root not found")


REPO_ROOT = find_repo_root()
print(f"Repository root found at: {REPO_ROOT}")

# Optional: list files in repo root
print(os.listdir(REPO_ROOT))

Repository root found at: /Users/f.kissi/Documents/github_projects/cw
['25COC001-Coursework.docx', '.DS_Store', 'requirements.txt', 'motors.py', '__pycache__', 'README.md', '.gitignore', '.venv', 'depth_files', '.git', 'colour_files', 'robotics_cw.ipynb']


In [1]:
import ipywidgets.widgets as widgets
import motors

robot = motors.MotorsYukon(mecanum=False)
print("Robot is ready:)")

Robot is ready:)


In [8]:
import traitlets
import cv2
import numpy as np
import pyzed.sl as sl
import math
import numpy as np
import sys
import math
import threading
from traitlets.config.configurable import SingletonConfigurable
import time

timestamp = time.strftime('%Y%m%d_%H%M%S')
# Define a Camera class that inherits from SingletonConfigurable
class Camera(SingletonConfigurable):
    color_value = traitlets.Any() # monitor the color_value variable
    def __init__(self):
        super(Camera, self).__init__()

        self.zed = sl.Camera()
        # Create a InitParameters object and set configuration parameters
        init_params = sl.InitParameters()
        init_params.camera_resolution = sl.RESOLUTION.VGA #VGA(672*376), HD720(1280*720), HD1080 (1920*1080) or ...
        init_params.depth_mode = sl.DEPTH_MODE.ULTRA  # Use ULTRA depth mode
        init_params.coordinate_units = sl.UNIT.MILLIMETER  # Use meter units (for depth measurements)

        # Open the camera
        status = self.zed.open(init_params)
        if status != sl.ERROR_CODE.SUCCESS: #Ensure the camera has opened succesfully
            print("Camera Open : "+repr(status)+". Exit program.")
            self.zed.close()
            exit(1)

         # Create and set RuntimeParameters after opening the camera
        self.runtime = sl.RuntimeParameters()

        #flag to control the thread
        self.thread_runnning_flag = False

        # Get the height and width
        camera_info = self.zed.get_camera_information()
        self.width = camera_info.camera_configuration.resolution.width
        self.height = camera_info.camera_configuration.resolution.height
        self.image = sl.Mat(self.width,self.height,sl.MAT_TYPE.U8_C4, sl.MEM.CPU)
        self.depth = sl.Mat(self.width,self.height,sl.MAT_TYPE.F32_C1, sl.MEM.CPU)
        self.point_cloud = sl.Mat(self.width,self.height,sl.MAT_TYPE.F32_C4, sl.MEM.CPU)

        #setup output files
        fourcc = cv2.VideoWriter_fourcc(*'XVID')
        self.color_writer = cv2.VideoWriter(f'color_video{timestamp}.avi', fourcc, 30, (672, 376))
        self.depth_file   = open(f'depth_video_{timestamp}.bin', 'wb') 

    def _capture_frames(self): #For data capturing only

        while(self.thread_runnning_flag==True): #continue until the thread_runnning_flag is set to be False
            if self.zed.grab(self.runtime) == sl.ERROR_CODE.SUCCESS:
                
                # Retrieve Left image
                self.zed.retrieve_image(self.image, sl.VIEW.LEFT)
                # Retrieve depth map. Depth is aligned on the left image
                self.zed.retrieve_measure(self.depth, sl.MEASURE.DEPTH)
    
                self.color_value = self.image.get_data()
                self.color_value= cv2.cvtColor(self.color_value, cv2.COLOR_BGRA2BGR)
                self.depth_image = np.asanyarray(self.depth.get_data())

                 # Save to files
                try:
                    self.color_writer.write(self.color_value)       # write BGR frame to AVI
                    self.depth_file.write(self.depth_image.tobytes()) # write raw depth to BIN
                except:
                    print("Error writing files")
                    
    def start(self): #start the data capture thread
        if self.thread_runnning_flag == False: #only process if no thread is running yet
            self.thread_runnning_flag=True #flag to control the operation of the _capture_frames function
            self.thread = threading.Thread(target=self._capture_frames) #link thread with the function
            self.thread.start() #start the thread

    def stop(self): #stop the data capture thread
        if self.thread_runnning_flag == True:
            self.color_writer.release()
            self.depth_file.close()
            self.thread_runnning_flag = False #exit the while loop in the _capture_frames
            self.thread.join() #wait the exiting of the thread       

def bgr8_to_jpeg(value):#convert numpy array to jpeg coded data for displaying 
    return bytes(cv2.imencode('.jpg',value)[1])
    
#create a camera object
camera = Camera()
camera.start() # start capturing the data

ModuleNotFoundError: No module named 'pyzed'

In [3]:
text_input = widgets.Text(value='', placeholder='w/a/s/d', description='Drive:')

def on_text_change(change):
    cmd = change['new'][-1] if change['new'] else ''
    if   cmd == 'w': robot.forward(0.4)
    elif cmd == 's': robot.backward(0.4)
    elif cmd == 'a': robot.right(0.4)
    elif cmd == 'd': robot.left(0.4)
    else:            
        robot.stop()

text_input.observe(on_text_change, names='value')
display(text_input)

Text(value='', description='Drive:', placeholder='w/a/s/d')

In [4]:
camera.stop()

In [3]:
import cv2
import numpy as np

def read_image_files(colour_file, depth_file):
    cap = cv2.VideoCapture(colour_file)
    depth_file = open(depth_file, 'rb')
    colour_frames = []
    depth_frames = []
    while True:
        ret, colour_frame = cap.read()
        depth_data = depth_file.read(672 * 376 * 4)  # F32_C1 means 4 bytes per pixel
        if not ret:
            break

        depth_frame = np.frombuffer(depth_data, dtype=np.float32).reshape((672, 376))

        #process the frames as needed
        colour_frames.append(colour_frame)
        depth_frames.append(depth_frame)

    cap.release()
    print(f"Read {len(colour_frames)} color frames from {colour_file}")
    print(f"Read {len(depth_frames)} depth frames from {depth_file.name}")
    return colour_frames, depth_frames

In [11]:
colour_dir = REPO_ROOT / 'colour_files'
depth_dir  = REPO_ROOT / 'depth_files'

colour_files = sorted([f for f in os.listdir(colour_dir) if f.endswith('.avi')])
depth_files  = sorted([f for f in os.listdir(depth_dir)  if f.endswith('.bin')])

for colour_file, depth_file in zip(colour_files, depth_files):
    colour_path = colour_dir / colour_file
    depth_path  = depth_dir  / depth_file
    
    colour_frames, depth_frames = read_image_files(str(colour_path), str(depth_path))

Read 2170 color frames from /Users/f.kissi/Documents/github_projects/cw/colour_files/color_video20260226_125100.avi
Read 2170 depth frames from /Users/f.kissi/Documents/github_projects/cw/depth_files/depth_video_20260226_125100.bin
Read 108 color frames from /Users/f.kissi/Documents/github_projects/cw/colour_files/color_video20260226_125325.avi
Read 108 depth frames from /Users/f.kissi/Documents/github_projects/cw/depth_files/depth_video_20260226_125325.bin
Read 2071 color frames from /Users/f.kissi/Documents/github_projects/cw/colour_files/color_video20260226_125346.avi
Read 2071 depth frames from /Users/f.kissi/Documents/github_projects/cw/depth_files/depth_video_20260226_125346.bin


In [13]:
colour_frames[0]


array([[[ 26,  18,  74],
        [ 18,  10,  66],
        [ 13,   6,  59],
        ...,
        [138, 139, 137],
        [140, 141, 139],
        [143, 144, 141]],

       [[ 28,  20,  76],
        [ 19,  11,  67],
        [ 13,   6,  59],
        ...,
        [140, 141, 139],
        [143, 144, 141],
        [145, 146, 144]],

       [[ 33,  25,  81],
        [ 23,  15,  70],
        [ 16,   8,  63],
        ...,
        [144, 145, 143],
        [146, 147, 145],
        [147, 148, 146]],

       ...,

       [[ 82,  84,  83],
        [ 82,  84,  83],
        [ 82,  84,  83],
        ...,
        [ 77,  81,  77],
        [ 77,  81,  77],
        [ 77,  81,  77]],

       [[ 81,  83,  82],
        [ 81,  83,  82],
        [ 81,  83,  82],
        ...,
        [ 77,  81,  77],
        [ 77,  81,  77],
        [ 77,  81,  77]],

       [[ 80,  82,  81],
        [ 80,  82,  81],
        [ 80,  82,  81],
        ...,
        [ 77,  81,  77],
        [ 77,  81,  77],
        [ 77,  81,  77]]

In [14]:
depth_frames[0]

array([[       nan,        nan,        nan, ...,  8116.3022,  7966.288 ,
         7944.157 ],
       [ 7811.368 ,  7815.382 ,  7895.6675, ...,  5263.9355,  5161.666 ,
         5231.499 ],
       [ 5068.5645,  5073.4336,  5211.1025, ...,        nan,        nan,
               nan],
       ...,
       [ 5387.046 ,  5392.939 ,  5356.8726, ..., 19860.441 , 19860.441 ,
        15524.028 ],
       [19035.754 , 19862.121 , 19862.17  , ...,        nan, 11916.133 ,
        11916.132 ],
       [11916.133 ,  8997.882 ,  8511.523 , ..., 19861.879 , 19861.879 ,
        19861.879 ]], shape=(672, 376), dtype=float32)

In [16]:
print(depth_frames[0].min(), depth_frames[0].max())

nan nan
