In [1]:
import argparse
from rclpy.serialization import deserialize_message
from rosidl_runtime_py.utilities import get_message
from std_msgs.msg import String
import rosbag2_py

from cv_bridge import CvBridge
import cv2
import os
import numpy as np
import csv

In [3]:
input_bag = 'spot-cam-lidar-table/rosbag2_2024_12_06-16_30_12_0.mcap'

reader = rosbag2_py.SequentialReader()
reader.open(
    rosbag2_py.StorageOptions(uri=input_bag, storage_id="mcap"),
    rosbag2_py.ConverterOptions(
        input_serialization_format="cdr", output_serialization_format="cdr"
    ),
)

topic_types = reader.get_all_topics_and_types()
del reader

In [38]:
for i in topic_types:
    print(f"{i.name} ----  {i.type} ")

/vectornav/raw/imu ----  vectornav_msgs/msg/ImuGroup 
/vectornav/raw/time ----  vectornav_msgs/msg/TimeGroup 
/ouster/points ----  sensor_msgs/msg/PointCloud2 
/cam_sync/cam1/image_raw ----  sensor_msgs/msg/Image 
/cam_sync/cam0/image_raw ----  sensor_msgs/msg/Image 
/tf_static ----  tf2_msgs/msg/TFMessage 
/vectornav/imu ----  sensor_msgs/msg/Imu 
/vectornav/pose ----  geometry_msgs/msg/PoseWithCovarianceStamped 


In [5]:
bridge = CvBridge()

reader = rosbag2_py.SequentialReader()
reader.open(
    rosbag2_py.StorageOptions(uri=input_bag, storage_id="mcap"),
    rosbag2_py.ConverterOptions(
        input_serialization_format="cdr", output_serialization_format="cdr"
    ),
)

def typename(topic_name):
    for topic_type in topic_types:
        if topic_type.name == topic_name:
            return topic_type.type
    raise ValueError(f"topic {topic_name} not in bag")
msg0, msg1 = 0,0

while reader.has_next():
    topic, data, timestamp = reader.read_next()
    msg_type = get_message(typename(topic))
    msg = deserialize_message(data, msg_type)

    if typename(topic) == 'sensor_msgs/msg/Image':
        if topic == '/cam_sync/cam1/image_raw':
            msg1 += 1
        else:
            msg0 += 1

        dirName = topic[1:].replace('/','_')
        file_path = dirName + '/' + str(timestamp) + '.png'

        if os.path.exists(file_path):
            print(f"Found duplicate for {file_path}")
        
        cv_image = bridge.imgmsg_to_cv2(msg, desired_encoding=msg.encoding)
        cv2.imwrite(file_path, cv_image)
        if not os.path.exists(file_path):
            print(f"Unable to create {file_path}")
            break
        if (msg1 % 100 == 0) or (msg0 % 100 == 0):
            print(msg0, msg1)
del reader
print(msg0, msg1)

1 0
100 99
100 100
101 100
200 199
200 200
201 200
300 299
300 300
301 300
400 399
400 400
401 400
500 499
500 500
501 500
600 599
600 600
601 600
700 699
700 700
701 700
787 787


In [4]:
os.makedirs('cam_sync_cam1_image_raw')
os.makedirs('cam_sync_cam0_image_raw')

In [4]:
image_folder = 'spot-cam-lidar-loop/cam_sync_cam0_image_raw/'
output_video = 'video.avi'

In [5]:
images = [img for img in os.listdir(image_folder) if img.endswith(".png") or img.endswith(".jpg")]
images.sort()

frame = cv2.imread(os.path.join(image_folder, images[0]))

height, width, layers = frame.shape
frame_size = (width, height)

fourcc = cv2.VideoWriter_fourcc(*'XVID')  # 'XVID' codec for .avi files
video_writer = cv2.VideoWriter(output_video, fourcc, 60, frame_size)  # 1 fps, adjust fps as needed

# Loop through each image and add it to the video
for image in images:
    img_path = os.path.join(image_folder, image)
    img = cv2.imread(img_path)
    
    if img is not None:
        video_writer.write(img)  # Write the frame to the video

# Release the video writer object and close any OpenCV windows
video_writer.release()
cv2.destroyAllWindows()

print(f"Video saved as {output_video}")

Video saved as video.avi
