# Analyze raw data
This notebook is used to analyze the ROS 2 Bags that were used to create the given database.

In [1]:
import sqlite3
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from datetime import datetime, timedelta
from pathlib import Path
from typing import TypedDict

import pandas as pd
from mcap.reader import make_reader
from mcap.summary import Summary
from mcap_ros2.decoder import DecoderFactory

from ddlitlab2024 import DB_PATH

In [2]:
BASE_DIR = Path("/srv/rosbags")  # Same as https://data.bit-bots.de/ROSbags/

In [3]:
DB_PATH = Path("/srv/ssd_nvm/dataset/ddlitlab2024/db/robocup_2024_german_open_2025.sqlite3")  # TODO Remove me

In [4]:
# Connect read-only to the SQLite database
conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True)

In [5]:
# Get mcaps used by the database
query = """
SELECT original_file FROM Recording;
"""
files = pd.DataFrame()
files["mcap_file_name"] = pd.read_sql_query(query, conn)

In [6]:
# Find the full file paths recursively contained in the following directory
def find_file(file_name):
    candidates = []
    for path in BASE_DIR.rglob(file_name):
        candidates.append(path)
    match len(candidates):
        case 0:
            print(f"File {file_name} not found")
            return None
        case 1:
            return candidates[0]
        case 2:
            # Use the "untrimmed" version of the file if it exists
            for candidate in candidates:
                if "untrimmed" in str(candidate):
                    return candidate
            print(f"File {file_name} found multiple times: {candidates}")
            return None
        case _:
            print(f"File {file_name} found multiple times: {candidates}")
            return None


# Find the full file paths for each mcap file
files["mcap_file_path"] = files["mcap_file_name"].apply(find_file)
assert files["mcap_file_path"].notnull().all(), "Some mcap files were not found"

## Gather metrics:
- Amount [B]
- Duration [s]
- Image messages
- IMU messages
- Joint state messages
- Joint command messages
- Game state messages
- Total number of (previous) messages

In [7]:
# Preparations


class Metric(TypedDict):
    mcap_file_name: str
    mcap_file_path: Path
    mcap_file_size_B: int
    duration_s: float
    num_images: int
    num_imu: int
    num_joint_states: int
    num_joint_commands: int
    num_game_states: int
    num_messages: int


@contextmanager
def _mcap_reader(mcap_file_path: Path):
    with open(mcap_file_path, "rb") as f:
        yield make_reader(f, decoder_factories=[DecoderFactory()])


def duration(summary: Summary) -> timedelta:
    first_msg_start_time = None
    last_msg_end_time = None

    for chunk_index in summary.chunk_indexes:
        if first_msg_start_time is None or chunk_index.message_start_time < first_msg_start_time:
            first_msg_start_time = chunk_index.message_start_time
        if last_msg_end_time is None or chunk_index.message_end_time > last_msg_end_time:
            last_msg_end_time = chunk_index.message_end_time

    assert first_msg_start_time is not None, "No start time found in the MCAP file"
    assert last_msg_end_time is not None, "No end time found in the MCAP file"

    return datetime.fromtimestamp(last_msg_end_time / 1e9) - datetime.fromtimestamp(first_msg_start_time / 1e9)


USED_TOPICS = [
    "/DynamixelController/command",
    "/camera/image_proc",
    "/camera/image_to_record",
    "/gamestate",
    "/imu/data",
    "/joint_states",
    "/tf",
]

In [None]:
def get_metrics(file_path: Path) -> Metric:
    with _mcap_reader(file_path) as reader:
        summary = reader.get_summary()
        assert summary is not None, "Summary is None"

        has_imu_data = any(channel.topic == "/imu/data" for channel in summary.channels.values())

        metric: Metric = {
            "mcap_file_name": file_path.name,
            "mcap_file_path": file_path,
            "mcap_file_size_B": file_path.stat().st_size,
            "duration_s": duration(summary).total_seconds(),
            "num_images": 0,
            "num_imu": 0,
            "num_joint_states": 0,
            "num_joint_commands": 0,
            "num_game_states": 0,
            "num_messages": 0,
        }

        for _, channel, _, ros_msg in reader.iter_decoded_messages(topics=USED_TOPICS):
            match channel.topic:
                case "/camera/image_proc":
                    metric["num_images"] += 1
                case "/imu/data" if has_imu_data:
                    metric["num_imu"] += 1
                case "/joint_states":
                    metric["num_joint_states"] += 1
                case "/DynamixelController/command":
                    metric["num_joint_commands"] += 1
                case "/gamestate":
                    metric["num_game_states"] += 1
                case "/tf" if not has_imu_data:
                    for tf_msg in ros_msg.transforms:
                        if tf_msg.child_frame_id == "base_footprint" and tf_msg.header.frame_id == "base_link":
                            metric["num_imu"] += 1
                case _:
                    pass

        metric["num_messages"] = (
            metric["num_images"]
            + metric["num_imu"]
            + metric["num_joint_states"]
            + metric["num_joint_commands"]
            + metric["num_game_states"]
        )

        return metric


# Get the metrics for each mcap file in parallel
def get_metrics_for_all_files():
    metrics = []

    def process_file(row):
        file_path = row["mcap_file_path"]
        print(f"Processing {file_path}")
        metric = get_metrics(file_path)
        return metric

    with ThreadPoolExecutor() as executor:
        metrics = list(executor.map(process_file, [row for _, row in files.iterrows()]))

    return pd.DataFrame(metrics)


df = get_metrics_for_all_files()

Processing /srv/rosbags/robocup_2024/ID_donna_2024-07-19T16:30:37/ID_donna_2024-07-19T16:30:37_0.mcap
Processing /srv/rosbags/robocup_2024/ID_jack_2024-07-19T11:31:28/ID_jack_2024-07-19T11:31:28_0.mcap
Processing /srv/rosbags/robocup_2024/ID_jack_2024-07-17T15:38:04/ID_jack_2024-07-17T15:38:04_0.mcap
Processing /srv/rosbags/robocup_2024/ID_rory_2024-07-18T13:29:30/ID_rory_2024-07-18T13:29:30_0.mcap
Processing /srv/rosbags/robocup_2024/ID_donna_2024-07-19T11:48:16/ID_donna_2024-07-19T11:48:16_0.mcap
Processing /srv/rosbags/robocup_2024/ID_rory_2024-07-18T13:54:46/ID_rory_2024-07-18T13:54:46_0.mcap
Processing /srv/rosbags/robocup_2024/ID_jack_2024-07-20T11:08:32/ID_jack_2024-07-20T11:08:32_0.mcap
Processing /srv/rosbags/robocup_2024/ID_jack_2024-07-19T09:50:25/ID_jack_2024-07-19T09:50:25_0.mcap
Processing /srv/rosbags/robocup_2024/ID_jack_2024-07-19T17:27:57/ID_jack_2024-07-19T17:27:57_0.mcap
Processing /srv/rosbags/robocup_2024/ID_donna_2024-07-18T18:29:22/ID_donna_2024-07-18T18:29:22_0

In [None]:
df