In [None]:

%matplotlib widget

from pathlib import Path

from rosbags.highlevel import AnyReader
from rosbags.typesys import Stores, get_typestore

import struct
import pandas as pd
import numpy as np
import csv

import matplotlib.pyplot as plt
from IPython.display import display, clear_output
import time
# import matplotlib.animation as animation


# gui.Application.instance.add_window(vis)
# gui.Application.instance.run()

#### Extract Sensor Data
##### '/livox/imu' publishes sensor_msgs/IMU message
###### Raw Msg Def: Header, quaterion orientation, orientation cov, angular velocity, velocity cov, linear acceleration, linear acceleration covariance
##### '/livox/lidar' publishes sensor_msgs/PointCloud message
The message is organized as follows {'name: (offset, datatype)}
###### {'x': (0, 7), 'y': (4, 7), 'z': (8, 7), 'intensity': (12, 7), 'tag': (16, 2), 'line': (17, 2), 'timestamp': (18, 8)}
###### uint8 UINT8 = 2, uint8 FLOAT32 = 7, uint8 FLOAT64 = 8


In [None]:
bagpath = Path('no_motion')

# Create a type store to use if the bag has no message definitions.
typestore = get_typestore(Stores.ROS2_HUMBLE)

imu_data = []
cloud_data = []

header_imu = ['time', 'orientation', 'angular velcoity', 'linear acceleration']
header_pc = ['count','x', 'y', 'z', 'intensity','tag','line']

# Create reader instance and open for reading.
with AnyReader([bagpath], default_typestore=typestore) as reader:
    connections = [x for x in reader.connections if x.topic == '/imu_raw/Imu']
    count = 0
    for connection, timestamp, rawdata in reader.messages(connections=connections):
        msg = reader.deserialize(rawdata, connection.msgtype)

        if connection.msgtype == 'sensor_msgs/msg/Imu':
            orientations = [msg.orientation.x, msg.orientation.y, msg.orientation.z]
            ang_velocity = [msg.angular_velocity.x, msg.angular_velocity.y, msg.angular_velocity.z]
            lin_acceleration = [msg.linear_acceleration.x, msg.linear_acceleration.y, msg.linear_acceleration.z]

            data = [timestamp, orientations, ang_velocity, lin_acceleration]
            imu_data.append(data)

        elif connection.msgtype == 'sensor_msgs/msg/PointCloud2':
            x_all = []
            y_all = []
            z_all = []
            intensity_all = []
            tag_all = []
            line_all = []

            points = msg.data#tolist()
            N = int((points.shape[0])/26)
            points = points.reshape((N,26))
            width = msg.width
            height = msg.height
            field = msg.fields
            # print(width)
            # print(height)

            # Extract data frin field
            field_names = {field.name: (field.offset, field.datatype) for field in msg.fields}
            struct_fmt = "ffffBB"
            f = struct.calcsize('f')
            d=struct.calcsize('d')
            B = struct.calcsize('B')
            for row in points:
                x,y,z,intensity,tag,line = struct.unpack(struct_fmt, bytes(row[0:-8]))
                # timestamp = struct.unpack('d', bytes(row[-8:]))
                # x_all.append(x)
                # y_all.append(y)
                # z_all.append(z)    
                # intensity_all.append(intensity)
                # tag_all.append(tag)
                # line_all.append(line)
                cloud_data.append([count, x,y,z,intensity, tag, line])        #  print(msg.header.frame_id)
            count+=1

In [None]:
headers = ['time', 'orientation', 'angular velcoity', 'linear acceleration']
df_imu = pd.DataFrame(imu_data, columns=headers)
df_imu.to_csv('imu_nm.csv')

In [None]:
headers = ['count','x', 'y', 'z', 'intensity','tag','line']
df_pc = pd.DataFrame(cloud_data, columns=headers)
df_pc.to_csv('point_cloud_nm.csv')

Plotting

In [None]:
pc = pd.read_csv('point_cloud_nm.csv')
# Get unique counts (assuming the 'count' column exists)
unique_counts = pc['count'].unique()
unique_counts.sort()  # Ensure correct order
# print(max(pc["count"]))

In [None]:
fig=plt.figure()
ax = fig.add_subplot(projection='3d')
sc = ax.scatter([], [], [], c=[], cmap="plasma", marker="o")

# Axis labels
ax.set_xlabel("X")
ax.set_ylabel("Y")
ax.set_zlabel("Z")
ax.set_title("Point Cloud Animation")


for counts in unique_counts:
    frame = pc[pc['count']==counts]
    points = np.array(frame[['x', 'y', 'z']])
    # Clear the previous plot
    ax.clear()

    # Plot the points for the current count
    r = np.sqrt(frame['x']**2+frame['y']**2+frame['z']**2)
    ax.scatter(frame['x'], frame['y'], frame['z'], c=r, s=0.1,cmap='inferno')

    # Set labels and title
    ax.set_xlabel('X')
    ax.set_ylabel('Y')
    ax.set_zlabel('Z')
    ax.set_title(f"Point Cloud - Count {counts}")
    # Set the viewpoint: We're in the middle, looking outward
    ax.view_init(elev=90, azim=90)  # You can change the angles to adjust the view

    # Redraw the plot
    display(fig)
    
    # Wait before displaying the next plot (create animation effect)
    time.sleep(0.1)  # Adjust the sleep time for speed of animation
    
    # Clear the output to update the plot
    clear_output(wait=True)

# Final plot display
plt.show()


In [None]:
# # Filter rows where count == 0
# filtered_df = pc[pc["count"] == 0]
# filtered_df2 = pc[pc["count"]==1]
# fig=plt.figure()
# ax = fig.add_subplot(projection='3d')
# r = np.sqrt(filtered_df['x']**2+filtered_df['y']**2+filtered_df['z']**2)
# ax.scatter(filtered_df['x'], filtered_df['y'], filtered_df['z'], c=r, s=0.1,cmap='inferno')

# plt.show()

In [None]:

frame = pc[pc['count']==0]
points = np.array(frame[['x', 'y', 'z']])
# Clear the previous plot
ax.clear()

# frame = frame[frame['line']==3]
frame = frame[(frame['x']**2+frame['y']**2+frame['z']**2)<=6]
# frame = frame[frame['z']<1.25]
# Plot the points for the current count
r = np.sqrt(frame['x']**2+frame['y']**2+frame['z']**2)
print(r)
# Filter out points where the radius is less than 2
# frame = frame[r <= 3]
# Set up the figure and 3D axis
fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(111, projection='3d')

# Scatter plot the points
sc=ax.scatter(frame['x'], frame['y'], frame['z'], c=r, s=0.25,cmap='plasma')

# Set axis labels (optional, to help with orientation)
ax.set_xlabel('X[m]')
ax.set_ylabel('Y[m]')
ax.set_zlabel('Z[m]')

# Set the viewpoint: We're in the middle, looking outward
ax.view_init(elev=90, azim=90)  # You can change the angles to adjust the view

cbar = plt.colorbar(sc)
cbar.set_label('Radius (m)')  # Label for the color bar



# Show the plot
plt.show()

In [None]:
unique_counts = pc['line'].unique()
unique_counts


Exploring SLAM

Trial 1: No Odometry

In [None]:
# Input data
# pick random ramples
# Fit model
# compute cost
# Repeat