# MCAP file reader example
This script reads an MCAP file into a pandas DataFrame and prints the first few rows of the DataFrame and visualizes a image data.

In [None]:
# pip install pandas 
import pandas as pd

# pip install mcap-protobuf-support
from mcap_protobuf.reader import read_protobuf_messages

# pip install pydeck
import pydeck as pdk

# pip install opencv-python-headless
import cv2

# pip install matplotlib
import matplotlib.pyplot as plt

# pip install plotly>=5.20
import plotly.express as px   

In [None]:
df = pd.DataFrame()
MCAP_FILE_TO_READ = "example.mcap"  # Replace with your actual MCAP file path

for msg in read_protobuf_messages(MCAP_FILE_TO_READ):
        # Create a row dictionary from the message
        row = {
            'topic': msg.topic,
            'timestamp': msg.log_time,
            'message': msg.proto_msg
        }
        
        # Append the row to the DataFrame
        df = pd.concat([df, pd.DataFrame([row])], ignore_index=True)

df # Display the first few rows of the DataFrame

In [None]:
# Show unique topics
df['topic'].unique()

## Exampe of ploting LocationFix / Position

In [None]:
from IPython.display import display

# Filter location_fix messages
location_df = df[df['topic'].str.contains('location_fix')].copy()


# Extract latitude and longitude from the message strings
def extract_lat_lon(message):
    lat = message.latitude
    lon = message.longitude
    return lat, lon

# Extract coordinates
coords = location_df['message'].apply(extract_lat_lon)
location_df['latitude'] = [coord[0] for coord in coords]
location_df['longitude'] = [coord[1] for coord in coords]

# Remove rows with missing coordinates
location_df = location_df.dropna(subset=['latitude', 'longitude'])
location_df.drop(columns=['message'], inplace=True)

# Create a deck.gl visualization
layer = pdk.Layer(
    'ScatterplotLayer',
    location_df,
    get_position='[longitude, latitude]',
    get_color='[200, 30, 0, 160]',
    get_radius=2,
    radius_scale=1,
    radius_min_pixels=2,
    radius_max_pixels=10,
    pickable=True,
)

# Set the viewport location
view_state = pdk.ViewState(
    latitude=location_df['latitude'].mean(),
    longitude=location_df['longitude'].mean(),
    zoom=15,
    pitch=0
)

# Render deck visualization
deck = pdk.Deck(
    layers=[layer],
    initial_view_state=view_state
)
display(deck)

## Example of visualizing subject image_compressed / Image 

In [None]:
import numpy as np
import base64

# Filter image_compressed messages
image_df = df[df['topic'].str.contains('image_compressed')].copy()

# Get the first image message and convert to JSON-like structure
first_image_msg = image_df.iloc[0]['message']

first_image_json = {
    "timestamp": first_image_msg.timestamp,
    "frame_id": first_image_msg.frame_id,  # Convert timestamp to JSON string
    'format': first_image_msg.format,
    'data': base64.b64encode(first_image_msg.data).decode('utf-8')  # Convert binary data to base64 string
}

# Convert to JSON string
first_image_json

In [None]:
# Decode the base64 image data and display the image
img_bytes = base64.b64decode(first_image_json['data'])
img_array = np.frombuffer(img_bytes, np.uint8)
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)

if img is not None:
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    plt.figure(figsize=(12, 8))
    plt.imshow(img_rgb)
    plt.title(f"First image_compressed message (frame_id: {first_image_json['frame_id']})")
    plt.axis('off')
    plt.show()
else:
    print("Failed to decode image data")

## Example of visualizing point cloud 

In [None]:
# Filter for point cloud messages
point_cloud_df = df[df['topic'].str.contains('point_cloud')].copy()

first_pc_msg = point_cloud_df.iloc[1]['message']

first_pc_msg

In [None]:
print(type(first_pc_msg.data))  # Check the type of the data field

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Field definitions and stride
fields = [
    {"name": "x", "offset": 0, "type": "FLOAT64"},
    {"name": "y", "offset": 8, "type": "FLOAT64"},
    {"name": "z", "offset": 16, "type": "FLOAT64"},
    {"name": "signal", "offset": 24, "type": "FLOAT64"},
    {"name": "reflectivity", "offset": 32, "type": "FLOAT64"},
    {"name": "near_ir", "offset": 40, "type": "FLOAT64"},
]
point_stride = 48

# Use the binary data directly
data_bytes = first_pc_msg.data

# Calculate number of points
num_points = len(data_bytes) // point_stride

# Prepare numpy dtype for structured array
dtype = []
for f in fields:
    dtype.append((f["name"], np.float64))

# Decode binary data into structured numpy array
points = np.ndarray(
    shape=(num_points,),
    dtype=np.dtype(dtype),
    buffer=data_bytes
)

# Convert to DataFrame
df_points = pd.DataFrame(points)

# 3D Visualization
fig = plt.figure(figsize=(8, 6))
ax = fig.add_subplot(111, projection='3d')
ax.scatter(df_points['x'], df_points['y'], df_points['z'], c=df_points['reflectivity'], cmap='viridis', s=2)
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
plt.show()

In [None]:
import numpy as np
import pandas as pd

def show_point_cloud(msg, max_points=200_000):
    """
    Interactive 3-D scatter of a ROS PointCloud2 message.
    
    Parameters
    ----------
    msg : sensor_msgs.msg.PointCloud2
        First point-cloud message from your bag.
    max_points : int, optional
        Randomly subsample to this many points (keeps UI responsive).
    """
    # ------------------------------------------------------------------
    # 1. Build dtype that matches the PointCloud2 layout  ───────────────
    # ------------------------------------------------------------------
    fields = [
        {"name": "x",            "offset": 0},
        {"name": "y",            "offset": 8},
        {"name": "z",            "offset": 16},
        {"name": "signal",       "offset": 24},
        {"name": "reflectivity", "offset": 32},
        {"name": "near_ir",      "offset": 40},
    ]
    point_stride = 48                       # bytes per point

    dtype = np.dtype(
        {
            "names":  [f["name"] for f in fields],
            "formats": [np.float64] * len(fields),
            "offsets": [f["offset"] for f in fields],
            "itemsize": point_stride,
        }
    )

    # ------------------------------------------------------------------
    # 2. Decode to NumPy without copying  ───────────────────────────────
    # ------------------------------------------------------------------
    pts = np.frombuffer(memoryview(msg.data), dtype=dtype)
    df  = pd.DataFrame(pts)

    # Optional down-sample for very dense clouds
    if len(df) > max_points:
        df = df.sample(max_points, random_state=0)

    # ------------------------------------------------------------------
    # 3. Interactive Plotly scatter  ────────────────────────────────────
    # ------------------------------------------------------------------
    # Robust auto-limits that ignore extreme outliers
    cmin, cmax = df['reflectivity'].quantile([0.02, 0.98])

    fig = px.scatter_3d(
        df, x='x', y='y', z='z',
        color='reflectivity',
        color_continuous_scale='viridis',
        range_color=(cmin, cmax),
        opacity=0.8,
        size_max=0.5,
    )

    fig.update_layout(
        scene_aspectmode='data',           # equal aspect ratio
        margin=dict(l=0, r=0, b=0, t=40),
        title='LiDAR Point Cloud – coloured by Reflectivity',
    )
    fig.update_traces(marker_size=2)  
    
    fig.show()

# ----------------------------------------------------------------------
# Example usage
# ----------------------------------------------------------------------
show_point_cloud(first_pc_msg)   # just pass your PointCloud2 message