In [27]:
"""script that reads ROS2 messages from an MCAP bag using the rosbag2_py API."""
import os
import subprocess

def source_setup_bash(setup_bash_path):
    command = f'source {setup_bash_path} && env'
    proc = subprocess.Popen(['/bin/bash', '-c', command], stdout=subprocess.PIPE)
    for line in proc.stdout:
        (key, _, value) = line.decode().partition("=")
        os.environ[key] = value.rstrip()
    proc.communicate()

source_setup_bash('~/race_common/install/setup.bash')  # Replace with your actual path


In [28]:
# ! source ~/race_common/install/setup.bash
import argparse
import os
import shutil
import rosbag2_py
import yaml
from rclpy.serialization import deserialize_message
from rosidl_runtime_py.utilities import get_message
from sensor_msgs.msg import PointCloud2
from race_msgs.msg import VehicleTelemetry
from ros2_ars540_msgs.msg import DetectionSegmentArray, DetectionSegment

INPUT_PATH = "/media/chris/IACSSD4TBT2/dec_13_23_run_2/rosbag2_2023_12_13-15_48_09/"

OUTPUT_PATH = "/media/chris/IACSSD4TBT2/dec_13_23_run_2/radar_ptc/rosbag2_2023_12_13-15_48_09/"

ModuleNotFoundError: No module named 'ros2_ars540_msgs'

In [18]:
# If the output folder already exists, ask the user if they want to delete it
if os.path.exists(OUTPUT_PATH):
    print ("OUTPUT_PATH: ", OUTPUT_PATH, "\n")
    answer = input("The OUTPUT_PATH already exists. Do you want to delete it? (y/n) ")
    if answer.lower() == "y":
        shutil.rmtree(OUTPUT_PATH)
        print("Folder deleted.")
    else:
        print("Deletion cancelled. Re-run the script with a different OUTPUT_PATH.")
        exit()
        
# Check if the extension is a db3 or MCAP
files = os.listdir(INPUT_PATH)
for file in files:
    if file.endswith(".db3"):
        store_type = "sqlite3"
        print("Detected Input bag is a db3 file.")
    elif file.endswith(".mcap"):
        store_type = "mcap"
        print("Detected Input bag is a mcap file.")
if not store_type:
    print("Error: Input bag is not a db3 or mcap file.")
    exit()

reader = rosbag2_py.SequentialReader()
writer = rosbag2_py.SequentialWriter()

# Opens the bag files and sets the converter options
try:
    reader.open(
        rosbag2_py.StorageOptions(uri=INPUT_PATH, storage_id=store_type),
        rosbag2_py.ConverterOptions(
            input_serialization_format="cdr", output_serialization_format="cdr"
        ),
    )

    writer.open(
        rosbag2_py.StorageOptions(uri=OUTPUT_PATH, storage_id="mcap"),
        rosbag2_py.ConverterOptions(
            input_serialization_format="cdr", output_serialization_format="cdr"
        ),
    )

except Exception as e:
    print(e)

OUTPUT_PATH:  /media/chris/IACSSD4TBT2/dec_13_23_run_2/radar_ptc/rosbag2_2023_12_13-15_48_09/ 

Folder deleted.
Detected Input bag is a mcap file.


In [19]:
# Get all topics and types
TOPIC_TYPES = reader.get_all_topics_and_types()
TYPE_MAP = {TOPIC_TYPES[i].name: TOPIC_TYPES[i].type for i in range(len(TOPIC_TYPES))}

print ("\n\n All topics and types in the input bag: \n")
for i in TYPE_MAP:
    print(i, "  |  ",  TYPE_MAP[i])

TOPICS_TO_EXCLUDE = {
    # Camera Topics
    "/vimba_front_left_center/image",
    "/vimba_front_left_center/image/compressed",
    "/vimba_front_right_center/image",
    "/vimba_front_right_center/image/compressed",
    "/vimba_front_left/image",
    "/vimba_front_left/image/compressed",
    "/vimba_front_right/image",
    "/vimba_front_right/image/compressed",
}

radar_topicname="/ars548_process/detections"
radar_ptc_topicname="/ars548_process/detections/pointcloud"



 All topics and types in the input bag: 

/gps_top/orientation   |   geometry_msgs/msg/PoseWithCovarianceStamped
/raptor_dbw_interface/steering_report_extd   |   raptor_dbw_msgs/msg/SteeringReportExtd
/ars548_process/sensor_status   |   ros2_ars540_msgs/msg/SensorStatus
/novatel_top/oem7raw   |   novatel_oem7_msgs/msg/Oem7RawMsg
/vectornav/raw/attitude   |   vectornav_msgs/msg/AttitudeGroup
/raptor_dbw_interface/tire_pressure_fl   |   raptor_dbw_msgs/msg/TirePressureFl
/luminar_front/status   |   luminar_iris_msgs/msg/SensorHealth
/luminar_rear/status   |   luminar_iris_msgs/msg/SensorHealth
/raptor_dbw_interface/tire_pressure_fr   |   raptor_dbw_msgs/msg/TirePressureFr
/raptor_dbw_interface/tire_temp_fl_2   |   raptor_dbw_msgs/msg/TireTempFl2
/raptor_dbw_interface/tire_temp_rl_1   |   raptor_dbw_msgs/msg/TireTempRl1
/novatel_top/heading2   |   novatel_oem7_msgs/msg/HEADING2
/gps_top/pose/transformed   |   geometry_msgs/msg/PoseWithCovarianceStamped
/luminar_right/status   |   lumina

In [20]:
print(TYPE_MAP["/luminar_left/points"])

sensor_msgs/msg/PointCloud2


In [21]:
def process_radar(msg):
    output = {
    'header': {
        'stamp': {
            'sec': msg.header.stamp.sec,
            'nanosec': msg.header.stamp.nanosec,
            },
        'frame_id': msg.header.frame_id,
    },
    'height': msg.height,
    'width': msg.width,
    'is_bigendian': msg.is_bigendian,
    'point_step': msg.point_step,
    'row_step': msg.row_step,
    'is_dense': msg.is_dense,
    }

In [22]:
import sys
print(sys.path)

['/home/chris/IAC_db3_to_mcap_converter_with_topic_extractor/mcap_topics_tools', '', '/opt/ros/iron/lib/python3.10/site-packages', '/usr/lib/python310.zip', '/usr/lib/python3.10', '/usr/lib/python3.10/lib-dynload', '/home/chris/.local/lib/python3.10/site-packages', '/usr/local/lib/python3.10/dist-packages', '/usr/lib/python3/dist-packages']


In [24]:
'''
This while loop does the following:
1. Read the next message
2. Check if the topic is in the set of topics to extract
'''
counter = 0
timestamp_buffer = 0
print ("\n***MODE: Filtering the whole bag***")
print ("\nIterating through the input bag...")

radar_msg_buf = None

while reader.has_next():
    
    # Read the next message
    topic_name, data, timestamp = reader.read_next()

    # Keep a map of the topics that are in the output bag
    out_bag_topics = set()

    # If the topics are in TOPICS_TO_EXCLUDE
    if topic_name not in TOPICS_TO_EXCLUDE:

        # Create the topic if it doesn't exist
        if topic_name not in out_bag_topics:
            topic = rosbag2_py.TopicMetadata(name=topic_name, type=TYPE_MAP[topic_name], \
                    serialization_format='cdr')
            writer.create_topic(topic)
            out_bag_topics.add(topic_name)
        
        # If is a radar topic, we process into pointcloud
        if topic_name == radar_topicname:

            # Add New radar pointcloud topic
            if radar_ptc_topicname not in out_bag_topics:
                topic = rosbag2_py.TopicMetadata(name=topic_name, type='sensor_msgs/msg/PointCloud2', \
                        serialization_format='cdr')
                writer.create_topic(topic)
                out_bag_topics.add(topic_name)

            # Process the incoming data
            msg_type = get_message(TYPE_MAP[topic_name])
            msg = deserialize_message(data, msg_type)
            radar_msg_buf = msg
            break
            # ptc_radar = process_radar(msg)

            # # Write the radar pointcloud to the output bag
            #   writer.write(radar_ptc_topicname, ptc_radar, timestamp)
            
            
        # Write the message to the output bag
        writer.write(topic_name, data, timestamp)

    # Print the current timestamp
    if counter % 50000 == 0:
        print("Currently: ", timestamp/1e9)

    counter += 1




***MODE: Filtering the whole bag***

Iterating through the input bag...


IndexError: package 'race_msgs' not found, searching: [/opt/ros/iron]

In [None]:
print(radar_msg_buf)

In [None]:
# Close the bag file
del reader
del writer