In [None]:
!rs-enumerate-devices
# Intel RealSense D435 serial number: 818312071101
# Intel RealSense D415 serial number: 822512060233
# Intel RealSense D415 serial number: 802212061325

In [1]:

# Import the necessary libraries
import cv2
import numpy as np
import pyrealsense2 as rs

# Create the first pipeline for the first camera
pipe1 = rs.pipeline()
cfg1 = rs.config()
cfg1.enable_device('818312071101')
cfg1.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
pipe1.start(cfg1)

# Create the second pipeline for the second camera
pipe2 = rs.pipeline()
cfg2 = rs.config()
cfg2.enable_device('822512060233')
cfg2.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
pipe2.start(cfg2)

# Create the third pipeline for the third camera
pipe3 = rs.pipeline()
cfg3 = rs.config()
cfg3.enable_device('802212061325')
cfg3.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
pipe3.start(cfg3)

# Define the output video file name
output_file = 'output.mp4'

# Define the codec and create a VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_file, fourcc, 30.0, (640, 480))

while True:
    # Wait for frames from the first camera
    frames1 = pipe1.wait_for_frames()
    color_frame1 = frames1.get_color_frame()
    color_image1 = np.asanyarray(color_frame1.get_data())
    cv2.imshow('RealSense 1', color_image1)

    # Wait for frames from the second camera
    frames2 = pipe2.wait_for_frames()
    color_frame2 = frames2.get_color_frame()
    color_image2 = np.asanyarray(color_frame2.get_data())
    cv2.imshow('RealSense 2', color_image2)

    # Wait for frames from the third camera
    frames3 = pipe3.wait_for_frames()
    color_frame3 = frames3.get_color_frame()
    color_image3 = np.asanyarray(color_frame3.get_data())
    cv2.imshow('RealSense 3', color_image3)
    
    # Write the frames to the output video file
    out.write(color_image1)
    
    if cv2.waitKey(1) == ord('q'):
        break

# Release the VideoWriter and destroy all windows
out.release()
pipe1.stop()
pipe2.stop()
pipe3.stop()
cv2.destroyAllWindows()


Version that doesn't block the main thread

In [None]:
import cv2
import numpy as np
import pyrealsense2 as rs
import threading
import time

def record_video(pipeline, output_filename, record_time):
    try:
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_filename, fourcc, 30.0, (640, 480))

        start_time = time.time()
        while time.time() - start_time < record_time:
            frames = pipeline.wait_for_frames()
            color_frame = frames.get_color_frame()
            if not color_frame:
                continue
            color_image = np.asanyarray(color_frame.get_data())
            out.write(color_image)

    finally:
        out.release()

# Initialize and configure pipelines
pipe1 = rs.pipeline()
cfg1 = rs.config()
cfg1.enable_device('818312071101')
cfg1.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
pipe1.start(cfg1)

pipe2 = rs.pipeline()
cfg2 = rs.config()
cfg2.enable_device('822512060233')
cfg2.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
pipe2.start(cfg2)

record_time = 10
thread1 = threading.Thread(target=record_video, args=(pipe1, 'output1.mp4', record_time))
thread2 = threading.Thread(target=record_video, args=(pipe2, 'output2.mp4', record_time))

thread1.start()
thread2.start()

thread1.join()
thread2.join()

pipe1.stop()
pipe2.stop()

print("Recording completed.")


Stopping triggered by main thread

In [None]:
import cv2
import numpy as np
import pyrealsense2 as rs
import threading

def record_video(pipeline, output_filename, stop_event):
    try:
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_filename, fourcc, 30.0, (640, 480))

        while not stop_event.is_set():
            frames = pipeline.wait_for_frames()
            color_frame = frames.get_color_frame()
            if not color_frame:
                continue
            color_image = np.asanyarray(color_frame.get_data())
            out.write(color_image)

    finally:
        out.release()

# Initialize and configure pipelines
pipe1 = rs.pipeline()
cfg1 = rs.config()
cfg1.enable_device('818312071101')
cfg1.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
pipe1.start(cfg1)

pipe2 = rs.pipeline()
cfg2 = rs.config()
cfg2.enable_device('822512060233')
cfg2.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
pipe2.start(cfg2)

# Shared event to signal threads to stop
stop_event = threading.Event()

thread1 = threading.Thread(target=record_video, args=(pipe1, 'output1.mp4', stop_event))
thread2 = threading.Thread(target=record_video, args=(pipe2, 'output2.mp4', stop_event))

thread1.start()
thread2.start()

# Here you can wait for a user input or another condition to stop recording
input("Press Enter to stop recording...")

# Signal the recording threads to stop
stop_event.set()

# Wait for both threads to finish
thread1.join()
thread2.join()

# Stop the pipelines
pipe1.stop()
pipe2.stop()

print("Recording completed.")


Test using the realsense module

In [2]:
from planning_through_contact.simulation.sensors.realsense import (RealsenseCamera, RealsenseCameraConfig)

# Create a configuration for the camera
config = RealsenseCameraConfig(
    width=640,
    height=480,
    fps=30,
    output_dir='.',
)

# Create the camera
top_camera = RealsenseCamera("top_camera", "822512060233", config)
front_camera = RealsenseCamera("front_camera", "818312071101", config)
side_camera = RealsenseCamera("side_camera", "802212061325", config)

top_camera.start_recording()
front_camera.start_recording()
side_camera.start_recording()

# Here you can wait for a user input or another condition to stop recording
input("Press Enter to stop recording...")

top_camera.stop_recording()
front_camera.stop_recording()
side_camera.stop_recording()