In [None]:
from pathlib import Path
from rosbags.highlevel import AnyReader
from rosbags.typesys import Stores, get_typestore
import numpy as np
import pandas as pd
from tqdm import tqdm
import os

In [None]:
is_imu_topic = '/inertial_sense/imu_raw'
lidar_imu_topic = '/ouster_top/imu'

def read_bag(path: Path):
    typestore = get_typestore(Stores.ROS2_HUMBLE)
    
    with AnyReader([path], default_typestore=typestore) as reader:
        connections = [x for x in reader.connections if x.topic == is_imu_topic or x.topic == lidar_imu_topic]
        i = 0
        is_imu_data = []
        lidar_imu_data = []
        for connection, timestamp, rawdata in tqdm(reader.messages(connections=connections)):
            msg = reader.deserialize(rawdata, connection.msgtype)
            time = msg.header.stamp.sec + msg.header.stamp.nanosec / 1e9
            imu_gyro = [msg.angular_velocity.x, msg.angular_velocity.y, msg.angular_velocity.z]
            imu_accel = [msg.linear_acceleration.x, msg.linear_acceleration.y, msg.linear_acceleration.z]
            msg_info = [connection.topic, time, *imu_gyro, *imu_accel]
            if connection.topic == is_imu_topic:
                is_imu_data.append(msg_info)
            elif connection.topic == lidar_imu_topic:
                lidar_imu_data.append(msg_info)

            if i == 0:
                print(time, msg.header.frame_id, connection.msgtype, connection.topic)
            i += 1
            # if i > 1000:
            #     break
        
        return is_imu_data, lidar_imu_data

In [None]:
bag_path = Path('/home/benb/data_byu/bags/imu_calibration/ramp')

is_imu_data, lidar_imu_data = read_bag(bag_path)

25281it [00:00, 127312.05it/s]

1763742611.4852154 os_imu_top sensor_msgs/msg/Imu /ouster_top/imu


64451it [00:00, 109348.63it/s]


In [None]:
print(np.shape(is_imu_data))
print(np.shape(lidar_imu_data))

(53708, 8)
(10743, 8)


In [None]:

tmp = is_imu_data[0]

for t in tmp:
    print(type(t), t)
    if type(t) == list:
        for i in t:
            print("\t", type(i), t)

<class 'str'> /inertial_sense/imu_raw
<class 'float'> 1763742611.5179825
<class 'float'> -0.0016485381638631225
<class 'float'> 0.0016140618827193975
<class 'float'> -0.0017665538471192122
<class 'float'> -0.1242554783821106
<class 'float'> -0.42609450221061707
<class 'float'> -9.758075714111328


In [None]:
def convert_imu_to_df(imu_list: list, save_name=None) -> pd.DataFrame:
    df = pd.DataFrame(imu_list, columns=["topic", "time", "gyro.x", "gyro.y", "gyro.z", "acceleration.x", "acceleration.y", "acceleration.z"])
    if type(save_name) is str:
        file_path = "../data"
        if not os.path.exists(file_path):
            os.makedirs(file_path)
        file_path = os.path.join(file_path, save_name+".csv")
        df.to_csv(file_path, index=False)
    return df

df_is = convert_imu_to_df(is_imu_data, "is_imu_data")
df_lidar = convert_imu_to_df(lidar_imu_data, "lidar_imu_data")