In [1]:
import sqlite3
from rosidl_runtime_py.utilities import get_message
from rclpy.serialization import deserialize_message

In [2]:
import pandas as pd
import json
import sbg_driver
import yaml
import collections
from tqdm import tqdm
import os

In [3]:
SIMPLE_TYPES = ['uint32', 'boolean', 'int', 'int32', 'string', 'uint8', 'sequence<uint8>', 'float', 'double', 'uint16']

def make_dict(lst):
    it = reversed(lst)
    d = next(it)
    for n in it:
        if n == "":
            continue
        d = {n: d}
    return d
    
def msg2json(msg, res=[], root_name=""):
    ''' Convert a ROS message to JSON format'''
    if not isinstance(msg, list):
        fields = msg.get_fields_and_field_types()
        for field_name, field_type in fields.items():
            # print({root_name+"/"+field_name: field_type})
            if field_type in SIMPLE_TYPES:
                if field_type == 'sequence<uint8>':
                    res.append({root_name+"/"+field_name: getattr(msg, field_name).tolist()})
                else:
                    res.append({root_name+"/"+field_name: getattr(msg, field_name)})
                
            else:
                msg2json(getattr(msg, field_name), res, root_name+"/"+field_name)
    else:
        for msg_id, msg_sub in enumerate(msg):
            msg2json(msg_sub, res, root_name+f"_id{str(msg_id)}")
    return res

def dict_merge(dct, merge_dct):
    """ Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
    updating only top-level keys, dict_merge recurses down into dicts nested
    to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
    ``dct``.
    :param dct: dict onto which the merge is executed
    :param merge_dct: dct merged into dct
    :return: None
    """
    for k, v in merge_dct.items():
        if (k in dct and isinstance(dct[k], dict) and isinstance(merge_dct[k], dict)):  #noqa
            dict_merge(dct[k], merge_dct[k])
        else:
            dct[k] = merge_dct[k]

def parse_list(list_jsons):
    new_res = {}
    for res_value in list_jsons:
        field_name, field_value = list(res_value.items())[0]
        field_name_split = field_name.split("/")
        field_name_split.append(field_value)
        dict_merge(new_res, make_dict(field_name_split))
    return new_res

In [4]:
bag_file = "./train_dataset/outside/rosbag2_2023_08_29-10_08_36_0.db3"
# bag_file = "./train_dataset/slam_and_nav2/rosbag2_2023_08_29-13_03_38_0.db3"
# bag_file = "./train_dataset/clear_data/rosbag2_2023_09_04-11_56_58_0.db3"

bag_dir = os.path.dirname(bag_file)
bag_dir

'./train_dataset/slam_and_nav2'

In [5]:
conn = sqlite3.connect(bag_file)
cursor = conn.cursor()

In [6]:
topics_data = cursor.execute("SELECT id, name, type FROM topics").fetchall()
topic_type = {name_of:type_of for id_of,name_of,type_of in topics_data}
topic_ids = {name_of:id_of for id_of,name_of,type_of in topics_data}
topic_names = {id_of:name_of for id_of,name_of,type_of in topics_data}
topic_msg_message = {}
for id_of,name_of,type_of in topics_data:
    try:
        topic_msg_message.update({name_of:get_message(type_of)})
    except:
        print("error topic", name_of)
        continue
# topic_msg_message = {name_of:get_message(type_of) for id_of,name_of,type_of in topics_data}

error topic /can/chassis_front_tx
error topic /can/wheels_feedback_front
error topic /can/votol_voltage
error topic /can/uds_C
error topic /can/uds_A
error topic /can/command_transit
error topic /can/votol_back
error topic /can/chassis_feedback_back
error topic /can/battery_main
error topic /can/tank_fullness
error topic /can/ble_dist
error topic /can/attachment_transmit_tx
error topic /can/votol_front
error topic /can/wheels_feedback_back


In [7]:
topic_names

{1: '/rosout',
 2: '/parameter_events',
 3: '/joint_states',
 4: '/goal_pose',
 5: '/events/write_split',
 6: '/clicked_point',
 7: '/initialpose',
 8: '/chassis_back',
 9: '/lio_sam/mapping/trajectory',
 10: '/lio_sam/feature/cloud_surface',
 11: '/lio_sam/mapping/map_global',
 12: '/imu_sensor/imu/pos_ecef',
 13: '/tank_fullness',
 14: '/ouster_driver/transition_event',
 15: '/lio_sam/mapping/cloud_registered',
 16: '/lio_sam/deskew/cloud_deskewed',
 17: '/imu_sensor/imu/velocity',
 18: '/imu_sensor/imu/nav_sat_fix',
 19: '/tf',
 20: '/lio_sam/mapping/cloud_registered_raw',
 21: '/odometry/imu_incremental',
 22: '/lio_sam/mapping/icp_loop_closure_history_cloud',
 23: '/odometry/imu',
 24: '/odom',
 25: '/points',
 26: '/lio_sam/mapping/map_local',
 27: '/imu_sensor/imu/utc_ref',
 28: '/tf_static',
 29: '/robot_description',
 30: '/imu_sensor/imu/data',
 31: '/cmd_vel_correct',
 32: '/imu_sensor/imu/odometry',
 33: '/imu_sensor/imu/temp',
 34: '/lio_sam/mapping/path',
 35: '/can/chass

In [18]:
# topic_name = "/points"
topic_name = "/imu_sensor/sbg/gps_pos"
# topic_name = '/lio_sam/mapping/map_local'
# topic_name = '/lio_sam/mapping/path'
topic_id = topic_ids[topic_name]
rows = cursor.execute(f"""SELECT * FROM messages WHERE topic_id = {topic_id}""").fetchall()

In [19]:
len(rows)

943

In [20]:
import os
import sys
import numpy as np

In [25]:
pbar = tqdm(total=len(rows))
for row_idx, row in enumerate(rows):
    df_row = {}
    row_id, topic_id, timestamp, data = row
    topic_name = topic_names[topic_id]
    json_file = f"""{bag_dir}/{topic_name.replace("/","__")}/{topic_name.replace("/","__")}_rowid_{row_id}.json"""
    bin_file = f"""{bag_dir}/{topic_name.replace("/","__")}/{topic_name.replace("/","__")}_rowid_{row_id}.bin"""
    os.makedirs(os.path.dirname(json_file), exist_ok=True)
    df_row.update({"row_id": row_id})
    df_row.update({"topic_id": topic_id})
    df_row.update({"topic_name": topic_name})
    df_row.update({"timestamp": timestamp})
    data_deserialize = deserialize_message(data, topic_msg_message[topic_name])
    data_deserialize = msg2json(data_deserialize, res=[])
    data_deserialize = parse_list(data_deserialize)
    
    data_bytes = bytes(data_deserialize["data"])
    data_deserialize["data"] = bin_file
    df_row.update({"data": data_deserialize})
    
    with open(bin_file, "wb") as binary_file:
        binary_file.write(data_bytes)
    
    with open(json_file, "w") as outfile:
        outfile.write(json.dumps(df_row, indent=4))
    pbar.update(1)
        
    break
pbar.close()

  0%|▏                                                                                                                                        | 1/943 [00:00<00:07, 131.40it/s]


In [81]:
# with open(bin_file, mode="rb") as binary_file:
#      contents = binary_file.read()
# # list(contents)