In [None]:
import os
import rosbag
from sensor_msgs import point_cloud2
import csv
import sys

from enum import Enum

class OutputFormat(Enum):
    PCD = 1  # ASCII PCD
    CSV = 2  # Comma Separated Values

In [None]:
####################################################
########## Modify the Configuration  Here ##########
####################################################

# Replace with your .bag file path
bag_file = "./2023-11-03-11-07-11/radars.bag"  

# Replace with your PointCloud2 Topics you want to extract
topics = [
    "/smartmicro/radar/front_left",  
    "/smartmicro/radar/front_right"
]

# Replace with your output folder path.
# The script will create subfolder for every topic
out_folder = r"./"

# Choose file format for every point cloud file
OUT_CHOICE = OutputFormat.PCD

## For Saving as PCD

In [None]:
def create_folder(folder_name):
    if not os.path.exists(folder_name):
        os.makedirs(folder_name)

ROS_TO_PCD_TYPE_MAP = {
    1: ('I', 1),  # INT8
    2: ('U', 1),  # UINT8
    3: ('I', 2),  # INT16
    4: ('U', 2),  # UINT16
    5: ('I', 4),  # INT32
    6: ('U', 4),  # UINT32
    7: ('F', 4),  # FLOAT32
    8: ('F', 8),  # FLOAT64
}
def get_pcd_type_and_size(field_type):
    if field_type in ROS_TO_PCD_TYPE_MAP:
        return ROS_TO_PCD_TYPE_MAP[field_type]


def write_pcd_header(file, fields, size, dtype, count, width, height, points):
    file.write("# .PCD v0.7 - Point Cloud Data file format\n")
    file.write("VERSION 0.7\n")
    file.write(f"FIELDS {' '.join(fields)}\n")
    file.write(f"SIZE {' '.join(map(str, size))}\n")
    file.write(f"TYPE {' '.join(dtype)}\n")
    file.write(f"COUNT {' '.join(map(str, count))}\n")
    file.write(f"WIDTH {width}\n")
    file.write(f"HEIGHT {height}\n")
    file.write("VIEWPOINT 0 0 0 1 0 0 0\n")
    file.write(f"POINTS {points}\n")
    file.write("DATA ascii\n")


def save_pointcloud_as_pcd(msg, pcd_file_path):
    os.makedirs(os.path.dirname(pcd_file_path), exist_ok=True)

    with open(pcd_file_path, 'w') as pcd_file:
        # Extract points
        points = list(point_cloud2.read_points(msg, skip_nans=True))
        num_points = len(points)  # The actual number of points
        
        # Set height to 1 and width to the actual number of points
        width = num_points
        height = 1

        # Write header
        fields_names = [field.name for field in msg.fields]
        count = [1] * len(fields_names)
        # size and type
        size = []
        dtype = []
        for field in msg.fields:
            field_type, field_size = get_pcd_type_and_size(field.datatype)
            dtype.append(field_type)
            size.append(field_size)

        
        write_pcd_header(pcd_file, fields_names, size,dtype, count, width, height, num_points)

        # Write points in ASCII format
        for point in points:
            pcd_file.write(' '.join(map(str, point)) + '\n')

## For saving as CSV

In [None]:
def save_pointcloud_as_csv(msg, csv_file_path):
    create_folder(os.path.dirname(csv_file_path))

    with open(csv_file_path, 'w', newline='') as csv_file:
        writer = csv.writer(csv_file)
        
        # Extract points
        points = list(point_cloud2.read_points(msg, skip_nans=True))
        
        # Write header
        fields_names = [field.name for field in msg.fields]
        writer.writerow(fields_names)
        
        # Write data
        for point in points:
            writer.writerow(point)

In [None]:
def extract_pointcloud_data(bag_file, topics,out_folder):
    if not os.path.isfile(bag_file):
        print(f"Error: Bag file '{bag_file}' does not exist.")
        return

    
    print("opening :",bag_file)
    bag = rosbag.Bag(bag_file, 'r')

    # Check if the topics exist in the bag file
    available_topics = bag.get_type_and_topic_info()[1].keys()
    missing_topics = [topic for topic in topics if topic not in available_topics]
    if missing_topics:
        print(f"Error: The following topics are not available in the bag file: {', '.join(missing_topics)}")
        bag.close()
        return

    #bag timing info
    start_time = bag.get_start_time()
    end_time = bag.get_end_time()
    total_duration = end_time - start_time

    csv_writers = {}
    open_csv_files = {}

    for topic in topics:
        folder_name = topic.replace('/', '_') 
        folder_name = folder_name.lstrip('_')

        if OUT_CHOICE == OutputFormat.PCD:
            pcd_folder = os.path.join(out_folder, folder_name, 'pcd_pointclouds')
            csv_file_path = os.path.join(out_folder,folder_name, "pcd_metadata.csv")
        elif OUT_CHOICE == OutputFormat.CSV:
            pcd_folder = os.path.join(out_folder, folder_name, 'csv_pointclouds')
            csv_file_path = os.path.join(out_folder,folder_name, "csv_metadata.csv")
        create_folder(pcd_folder)
        
        
        
        csvfile = open(csv_file_path, 'w', newline='')
        open_csv_files[topic] = csvfile
        csvwriter = csv.writer(csvfile)
        csvwriter.writerow(['time','header.seq', 'header.stamp.secs', 'header.stamp.nsecs', 'header.frame_id', 'filename'])
        csv_writers[topic] = csvwriter

    last_time = start_time
    for topic, msg, t in bag.read_messages(topics=topics):
        folder_name = topic.replace('/', '_') 
        folder_name = folder_name.lstrip('_')

        if OUT_CHOICE == OutputFormat.PCD:
            filename = f"{t.to_nsec()}.pcd"
            file_path = os.path.join(out_folder,folder_name, 'pcd_pointclouds', filename)        
            save_pointcloud_as_pcd(msg, file_path)
        elif OUT_CHOICE == OutputFormat.CSV:
            filename = f"{t.to_nsec()}.csv"
            file_path = os.path.join(out_folder,folder_name, 'csv_pointclouds', filename)        
            save_pointcloud_as_csv(msg, file_path)

        
        csv_writers[topic].writerow([t.to_sec(), msg.header.seq, msg.header.stamp.secs, msg.header.stamp.nsecs,msg.header.frame_id, filename])

        # Calculate progress
        current_time = t.to_sec()
        elapsed_time = current_time - start_time
        progress = (elapsed_time / total_duration) * 100

        # Print progress Every second
        if current_time - last_time >= 1: 
            sys.stdout.write(f"\rProcessing bag: {progress:.2f}% complete")
            sys.stdout.flush()
            last_time = current_time

    

    bag.close()

    sys.stdout.write(f"\rProcessing bag: 100.0% complete")
    sys.stdout.flush()

    for csvfile in open_csv_files.values():
        csvfile.close()

    print("\n---------------------------------------------")
    print(f"Data from topics {', '.join(topics)} saved to '{out_folder}'")

In [None]:
  
extract_pointcloud_data(bag_file,topics,out_folder)
