In [1]:
import os
import zlib
import json
import bisect
from pathlib import Path
from collections import defaultdict

import numpy as np
import cv2
from tqdm.notebook import tqdm
from scipy.spatial.transform import Slerp, Rotation as R

import rclpy
from rclpy.serialization import deserialize_message
from rosbag2_py import SequentialReader, StorageOptions, ConverterOptions

from sensor_msgs.msg import Imu
from rosgraph_msgs.msg import Clock
from sensor_msgs.msg import CompressedImage
from px4_msgs.msg import VehicleOdometry

In [2]:
bag_dir = "/home/oleksii/bags/flight_20250603_025520/flight_20250603_025520_0.mcap"
output_dir = Path("./output")

IMU_DOWNSAMPLE = 1
IMU_WINDOW = 50
MAX_ODOM_GAP_NS = int(0.2 * 1e9)  # 200ms max gap for interpolation

IMG_WIDTH = 640
IMG_HEIGHT = 480

RGB_TOPIC = "/camera/image/cpsd"
IMU_TOPIC = "/imu"
ODOM_TOPIC = "/fmu/out/vehicle_odometry"
CLOCK_TOPIC = "/clock"

In [3]:
bag_dir = "/home/oleksii/bags/flight_20250604_211016/flight_20250604_211016_0.mcap"
output_dir = Path("./output")

In [4]:
image_dir = output_dir / "images"
imu_dir = output_dir / "imu"
clock_log = output_dir / "clock_log.json"
label_dir = output_dir / "labels"
timestamps_file = output_dir / "image_timestamps.npy"

for folder in (image_dir, imu_dir, label_dir):
    folder.mkdir(parents=True, exist_ok=True)

print(f"Folders created:\n  {image_dir}\n  {imu_dir}\n  {label_dir}")

Folders created:
  output/images
  output/imu
  output/labels


In [5]:
rclpy.init()

In [6]:
storage_opts = StorageOptions(uri=bag_dir, storage_id="mcap")
conv_opts = ConverterOptions("cdr", "cdr")
reader = SequentialReader()
reader.open(storage_opts, conv_opts)

In [7]:
topic_types = {}
for topic_info in reader.get_all_topics_and_types():
    topic_types[topic_info.name] = topic_info.type

print("Topics and types in bag:")
for name, ttype in topic_types.items():
    print(f"{name:<35s} -> {ttype}")

Topics and types in bag:
/clock                              -> rosgraph_msgs/msg/Clock
/fmu/out/vehicle_odometry           -> px4_msgs/msg/VehicleOdometry
/imu                                -> sensor_msgs/msg/Imu
/camera/image/cpsd                  -> sensor_msgs/msg/CompressedImage


In [8]:
# Get topic types
reader = SequentialReader()
reader.open(storage_opts, conv_opts)
topic_types = {info.name: info.type for info in reader.get_all_topics_and_types()}
reader.close()

print(f"Found {len(topic_types)} topics")
print("Collecting timestamps for synchronization...")

# First pass: collect sync data - optimized
clock_times = []  # (bag_timestamp, sim_clock_time)
px4_times = []  # (bag_timestamp, px4_time)

reader = SequentialReader()
reader.open(storage_opts, conv_opts)

while reader.has_next():
    topic, data_bytes, bag_timestamp_ns = reader.read_next()

    if topic == CLOCK_TOPIC:
        clock_msg = deserialize_message(data_bytes, Clock)
        sim_clock_ns = clock_msg.clock.sec * 10 ** 9 + clock_msg.clock.nanosec
        clock_times.append((bag_timestamp_ns, sim_clock_ns))

    elif topic == ODOM_TOPIC:
        vo = deserialize_message(data_bytes, VehicleOdometry)
        px4_ts = getattr(vo, 'timestamp_sample', getattr(vo, 'timestamp', 0))
        if px4_ts > 0:
            px4_times.append((bag_timestamp_ns, px4_ts * 1000))

reader.close()

# Pre-compute synchronization lookup
print(f"Building sync lookup from {len(clock_times)} clock and {len(px4_times)} odom messages")

if clock_times and px4_times:
    # Create arrays for fast lookup
    clock_bag_times = np.array([bt for bt, _ in clock_times])
    clock_sim_times = np.array([st for _, st in clock_times])
    px4_bag_times = np.array([bt for bt, _ in px4_times])
    px4_timestamps = np.array([pt for _, pt in px4_times])

    # Calculate median offset for synchronization
    mid_idx = len(px4_times) // 2
    ref_bag_time = px4_bag_times[mid_idx]
    ref_px4_time = px4_timestamps[mid_idx]

    # Find closest clock time
    closest_clock_idx = np.argmin(np.abs(clock_bag_times - ref_bag_time))
    ref_sim_time = clock_sim_times[closest_clock_idx]

    # Global offset: sim_time = px4_time - offset
    px4_to_sim_offset = ref_px4_time - ref_sim_time
    print(f"Sync offset: {px4_to_sim_offset / 1e6:.1f}ms")


    def convert_px4_to_sim(px4_time):
        return px4_time - px4_to_sim_offset
else:
    def convert_px4_to_sim(px4_time):
        return px4_time

print("Processing data...")

# Pre-allocate arrays for better performance
imu_buffer = []
odom_buffer = []
clock_list = []
image_timestamps = []

# Pre-allocate common objects to avoid repeated instantiation
empty_imu = Imu()
imu_pad_entry = (0, empty_imu)

img_count = 0
imu_count = 0

reader = SequentialReader()
reader.open(storage_opts, conv_opts)

pbar = tqdm(desc="Processing...", unit="msg")

while reader.has_next():
    topic, data_bytes, bag_timestamp_ns = reader.read_next()

    if topic == RGB_TOPIC:
        ci = deserialize_message(data_bytes, CompressedImage)

        # Get timestamp
        if ci.header.stamp.sec > 0 or ci.header.stamp.nanosec > 0:
            timestamp_ns = ci.header.stamp.sec * 10 ** 9 + ci.header.stamp.nanosec
        else:
            timestamp_ns = bag_timestamp_ns

        # Process image
        try:
            raw = zlib.decompress(ci.data)
            arr = np.frombuffer(raw, dtype=np.uint8).reshape((IMG_HEIGHT, IMG_WIDTH, 3))
            bgr = cv2.cvtColor(arr, cv2.COLOR_RGB2BGR)
            cv2.imwrite(str(image_dir / f"img_{img_count:06d}.png"), bgr)
        except:
            continue

        # Process IMU window - optimized
        if imu_buffer:
            imu_timestamps = [t for t, _ in imu_buffer]
            idx = bisect.bisect_right(imu_timestamps, timestamp_ns) - 1

            if idx >= 0:
                start_idx = max(0, idx - IMU_WINDOW + 1)
                imu_window = imu_buffer[start_idx:idx + 1]
            else:
                imu_window = []
        else:
            imu_window = []

        # Pad and save IMU
        pad_count = max(0, IMU_WINDOW - len(imu_window))
        padded_window = [imu_pad_entry] * pad_count + imu_window

        imu_data = np.zeros((IMU_WINDOW, 6), dtype=np.float32)
        for i, (_, imu_msg) in enumerate(padded_window):
            if imu_msg.header.stamp.sec > 0:  # Valid IMU data
                imu_data[i] = [
                    imu_msg.linear_acceleration.x, imu_msg.linear_acceleration.y, imu_msg.linear_acceleration.z,
                    imu_msg.angular_velocity.x, imu_msg.angular_velocity.y, imu_msg.angular_velocity.z
                ]

        np.save(imu_dir / f"imu_{img_count:06d}.npy", imu_data)

        # Process pose - optimized
        label_arr = np.zeros(7, dtype=np.float32)

        if odom_buffer:
            odom_timestamps = [t for t, _ in odom_buffer]
            pos = bisect.bisect_left(odom_timestamps, timestamp_ns)

            # Exact match
            if pos < len(odom_buffer) and odom_buffer[pos][0] == timestamp_ns:
                pose = odom_buffer[pos][1]
                label_arr[:3] = [pose.position.x, pose.position.y, pose.position.z]
                label_arr[3:] = [pose.orientation.x, pose.orientation.y, pose.orientation.z, pose.orientation.w]

            # Interpolate
            elif 0 < pos < len(odom_buffer):
                t_prev, pose_prev = odom_buffer[pos - 1]
                t_next, pose_next = odom_buffer[pos]

                if t_next - t_prev < MAX_ODOM_GAP_NS:
                    ratio = (timestamp_ns - t_prev) / (t_next - t_prev)

                    # Position interpolation
                    pos_prev = np.array([pose_prev.position.x, pose_prev.position.y, pose_prev.position.z])
                    pos_next = np.array([pose_next.position.x, pose_next.position.y, pose_next.position.z])
                    interp_pos = (1.0 - ratio) * pos_prev + ratio * pos_next

                    # Quaternion slerp
                    q_prev = [pose_prev.orientation.x, pose_prev.orientation.y, pose_prev.orientation.z,
                              pose_prev.orientation.w]
                    q_next = [pose_next.orientation.x, pose_next.orientation.y, pose_next.orientation.z,
                              pose_next.orientation.w]

                    rots = R.from_quat([q_prev, q_next])
                    slerp = Slerp([0.0, 1.0], rots)
                    interp_quat = slerp([ratio])[0].as_quat()

                    label_arr[:3] = interp_pos
                    label_arr[3:] = interp_quat
                else:
                    # Use nearest
                    nearest_idx = min(range(len(odom_buffer)), key=lambda i: abs(odom_timestamps[i] - timestamp_ns))
                    pose = odom_buffer[nearest_idx][1]
                    label_arr[:3] = [pose.position.x, pose.position.y, pose.position.z]
                    label_arr[3:] = [pose.orientation.x, pose.orientation.y, pose.orientation.z, pose.orientation.w]
            else:
                # Use nearest available
                if odom_buffer:
                    nearest_idx = min(range(len(odom_buffer)), key=lambda i: abs(odom_timestamps[i] - timestamp_ns))
                    pose = odom_buffer[nearest_idx][1]
                    label_arr[:3] = [pose.position.x, pose.position.y, pose.position.z]
                    label_arr[3:] = [pose.orientation.x, pose.orientation.y, pose.orientation.z, pose.orientation.w]

        np.save(label_dir / f"label_{img_count:06d}.npy", label_arr)
        image_timestamps.append(timestamp_ns)
        img_count += 1

    elif topic == IMU_TOPIC:
        imu_msg = deserialize_message(data_bytes, Imu)

        # Get timestamp
        if imu_msg.header.stamp.sec > 0 or imu_msg.header.stamp.nanosec > 0:
            timestamp_ns = imu_msg.header.stamp.sec * 10 ** 9 + imu_msg.header.stamp.nanosec
        else:
            timestamp_ns = bag_timestamp_ns

        imu_count += 1
        if imu_count % IMU_DOWNSAMPLE == 0:
            imu_buffer.append((timestamp_ns, imu_msg))

            # Maintain buffer size
            if len(imu_buffer) > IMU_WINDOW * 3:
                imu_buffer.pop(0)

    elif topic == ODOM_TOPIC:
        vo = deserialize_message(data_bytes, VehicleOdometry)

        # Get and convert timestamp
        px4_ts = getattr(vo, 'timestamp_sample', getattr(vo, 'timestamp', 0))
        if px4_ts > 0:
            timestamp_ns = convert_px4_to_sim(px4_ts * 1000)
        else:
            timestamp_ns = bag_timestamp_ns


        # Create pose object - simplified
        class FakePose:
            def __init__(self, x, y, z, qx, qy, qz, qw):
                class P: pass

                self.position = P()
                self.position.x, self.position.y, self.position.z = x, y, z
                self.orientation = P()
                self.orientation.x, self.orientation.y, self.orientation.z, self.orientation.w = qx, qy, qz, qw


        x, y, z = vo.position
        qx, qy, qz, qw = vo.q
        odom_buffer.append((timestamp_ns, FakePose(x, y, z, qx, qy, qz, qw)))

    elif topic == CLOCK_TOPIC:
        clock_msg = deserialize_message(data_bytes, Clock)
        clock_ns = clock_msg.clock.sec * 10 ** 9 + clock_msg.clock.nanosec
        clock_list.append(clock_ns)

    pbar.update(1)

pbar.close()
reader.close()

Found 4 topics
Collecting timestamps for synchronization...
Building sync lookup from 745791 clock and 183563 odom messages
Sync offset: 1749060585609.8ms
Processing data...


Processing...: 0msg [00:00, ?msg/s]

In [9]:
with open(clock_log, "w") as f:
    json.dump(clock_list, f, indent=2)

np.save(timestamps_file, np.array(image_timestamps))

print("Processing completed.")
print(f"  Saved images: {img_count}")
print(f"  IMU messages: {imu_count} (downsampled: {len(imu_buffer)})")
print(f"  Odometry messages: {len(odom_buffer)}")
print(f"  Clock stamps: {len(clock_list)}")
print(f"  Data saved to: {output_dir}")

rclpy.shutdown()

Processing completed.
  Saved images: 36885
  IMU messages: 374012 (downsampled: 150)
  Odometry messages: 183563
  Clock stamps: 745791
  Data saved to: output
