---
title: "ROS bag data to CSV with the bagpy package"
subtitle: This notebook shows how to convert ROS bag data into individual CSV files applied to the Underwater cave data set.
authors:
  - name: Thomas Guilment
    affiliations:
      - University of Louisiana at Lafayette
    email: thomas.guilment@gmail.com
    corresponding: true
    orcid: 0009-0003-8163-3976
  - name: Gabriele Morra
    affiliations:
      - University of Louisiana at Lafayette
format:
  html:
    page-layout: article
    toc: true
    toc-depth: 2
    number-sections: false
jupyter: python3
execute:
  eval: false
include: true
---

# ROS bag data to CSV with the bagpy package
> The Underwater cave data sets bag data are available at [https://cirs.udg.edu/caves-dataset/](https://cirs.udg.edu/caves-dataset/)

:::{important}
Data are considered to be downloaded in the path **data/bag/full_dataset.bag**.
:::

## Imports

In [None]:
#| echo: true
import os
import shutil

# Import bagpy
from bagpy import bagreader

## Read the sensor dataset

In [None]:
s_bag_data_path = os.path.join('.','data','full_dataset.bag')

In [None]:
#| echo: true
bag_data_sensors = bagreader(s_bag_data_path)

[INFO]  Successfully created the data folder ./data/bag/full_dataset.


## Display all topics in a table

In [None]:
#| echo: true
bag_data_sensors.topic_table

Unnamed: 0,Topics,Types,Message Count,Frequency
0,/depth_sensor,cirs_girona_cala_viuda/Depth,19553,9.767973
1,/dvl_linkquest,cirs_girona_cala_viuda/LinkquestDvl,5564,2.849055
2,/imu_adis,cirs_girona_cala_viuda/Imu,19965,9.878213
3,/imu_adis_ros,sensor_msgs/Imu,19965,9.878213
4,/imu_xsens_mti,cirs_girona_cala_viuda/Imu,19553,9.766972
5,/imu_xsens_mti_ros,sensor_msgs/Imu,19553,9.766972
6,/odometry,nav_msgs/Odometry,21843,10.00001
7,/sonar_micron,cirs_girona_cala_viuda/RangeImageBeam,45599,24.229546
8,/sonar_micron_ros,sensor_msgs/LaserScan,45599,24.229546
9,/sonar_seaking,cirs_girona_cala_viuda/RangeImageBeam,97501,48.79651


## Export the bag data to csv files

In [None]:
s_create_data_path = s_bag_data_path[:-4]
s_create_data_path

'./data/bag/full_dataset'

In [None]:
#| echo: true

# Check if the "full_dataset" folder exists
if os.path.exists(s_create_data_path):
    
    # Get a list of all files in the "full_dataset" folder
    l_s_files = os.listdir(s_create_data_path)
    
    # Filter the list to only include CSV files
    l_s_csv_files = [f for f in l_s_files if f.endswith(".csv")]
    
    if l_s_csv_files:
        print(f"The 'full_dataset' already contains {len(l_s_csv_files)} CSV file(s):")
        print(*l_s_csv_files,sep='\n')
    else:
        print("Let's convert the bag data to CSV for each topic")
        # Generic message retrieval
        # ROS messages of any type and topic can be decoded as follows:
        l_s_csv_files = []
        for s_topic in bag_data_sensors.topics:
            s_data = bag_data_sensors.message_by_topic(s_topic) # Create a folder and write the CSV files
            l_s_csv_files.append(s_data) # Memorize file paths if needed

        print("The following files have been created:")
        print(*l_s_csv_files,sep='\n')

Let's convert the bag data to CSV for each topic
The following files have been created:
./data/bag/full_dataset/depth_sensor.csv
./data/bag/full_dataset/dvl_linkquest.csv
./data/bag/full_dataset/imu_adis.csv
./data/bag/full_dataset/imu_adis_ros.csv
./data/bag/full_dataset/imu_xsens_mti.csv
./data/bag/full_dataset/imu_xsens_mti_ros.csv
./data/bag/full_dataset/odometry.csv
./data/bag/full_dataset/sonar_micron.csv
./data/bag/full_dataset/sonar_micron_ros.csv
./data/bag/full_dataset/sonar_seaking.csv
./data/bag/full_dataset/sonar_seaking_ros.csv
./data/bag/full_dataset/tf.csv


## If needed move the csv file to a new CSV folder

In [None]:
s_destination_path = os.path.join('.','data','csv')

# Create the destination directory if it doesn't exist
if not os.path.exists(s_destination_path):
    os.makedirs(s_destination_path)
    print(f"Created directory: {s_destination_path}")

    # Move each specified file
    for s_filename in l_s_csv_files:
        s_source_file = os.path.join(s_create_data_path, s_filename)
        s_dest_file = os.path.join(s_destination_path, s_filename)
        
        if os.path.exists(s_source_file):
            shutil.move(s_source_file, s_destination_path)
            print(f"Moved {s_filename} to {s_destination_path}")
        else:
            print(f"File not found: {s_source_file}")

else:
    print("The csv directory already exist")

The csv directory already exist


In [None]:
#| hide
import nbdev; nbdev.nbdev_export()