In [3]:
import mcap
import rosbag2_py
from rclpy.serialization import deserialize_message
from rosidl_runtime_py.utilities import get_message
import open3d as o3d
import open3d.pipelines.registration as treg
from sensor_msgs.msg import PointCloud2, PointField
import numpy as np
import math

import sys
from collections import namedtuple
import ctypes
import struct

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


# Range, hFOV, vFOV of 3D Lidar

### Read mcap bag using rosbag2_py

In [4]:
input_bag = "exercise_0.mcap"
reader = rosbag2_py.SequentialReader()
reader.open(
        rosbag2_py.StorageOptions(uri=input_bag, storage_id="mcap"),
        rosbag2_py.ConverterOptions(
            input_serialization_format="cdr", output_serialization_format="cdr"
        ),
    )

In [5]:
topic_types = reader.get_all_topics_and_types()
def typename(topic_name):
        for topic_type in topic_types:
            if topic_type.name == topic_name:
                return topic_type.type
        raise ValueError(f"topic {topic_name} not in bag")

In [6]:
_DATATYPES = {}
_DATATYPES[PointField.INT8]    = ('b', 1)
_DATATYPES[PointField.UINT8]   = ('B', 1)
_DATATYPES[PointField.INT16]   = ('h', 2)
_DATATYPES[PointField.UINT16]  = ('H', 2)
_DATATYPES[PointField.INT32]   = ('i', 4)
_DATATYPES[PointField.UINT32]  = ('I', 4)
_DATATYPES[PointField.FLOAT32] = ('f', 4)
_DATATYPES[PointField.FLOAT64] = ('d', 8)

def read_points(cloud, field_names=None, skip_nans=False, uvs=[]):
    """
    Read points from a L{sensor_msgs.PointCloud2} message.
    @param cloud: The point cloud to read from.
    @type  cloud: L{sensor_msgs.PointCloud2}
    @param field_names: The names of fields to read. If None, read all fields. [default: None]
    @type  field_names: iterable
    @param skip_nans: If True, then don't return any point with a NaN value.
    @type  skip_nans: bool [default: False]
    @param uvs: If specified, then only return the points at the given coordinates. [default: empty list]
    @type  uvs: iterable
    @return: Generator which yields a list of values for each point.
    @rtype:  generator
    """
#     assert isinstance(cloud, PointCloud2), 'cloud is not a sensor_msgs.msg.PointCloud2'
    fmt = _get_struct_fmt(cloud.is_bigendian, cloud.fields, field_names)
    width, height, point_step, row_step, data, isnan = cloud.width, cloud.height, cloud.point_step, cloud.row_step, cloud.data, math.isnan
    unpack_from = struct.Struct(fmt).unpack_from

    if skip_nans:
        if uvs:
            for u, v in uvs:
                p = unpack_from(data, (row_step * v) + (point_step * u))
                has_nan = False
                for pv in p:
                    if isnan(pv):
                        has_nan = True
                        break
                if not has_nan:
                    yield p
        else:
            for v in range(height):
                offset = row_step * v
                for u in range(width):
                    p = unpack_from(data, offset)
                    has_nan = False
                    for pv in p:
                        if isnan(pv):
                            has_nan = True
                            break
                    if not has_nan:
                        yield p
                    offset += point_step
    else:
        if uvs:
            for u, v in uvs:
                yield unpack_from(data, (row_step * v) + (point_step * u))
        else:
            for v in range(height):
                offset = row_step * v
                for u in range(width):
#                     yield unpack_from(data, offset)
                    yield unpack_from(data, offset)[:3]
                    offset += point_step

def _get_struct_fmt(is_bigendian, fields, field_names=None):
    fmt = '>' if is_bigendian else '<'

    offset = 0
    for field in (f for f in sorted(fields, key=lambda f: f.offset) if field_names is None or f.name in field_names):
        if offset < field.offset:
            fmt += 'x' * (field.offset - offset)
            offset = field.offset
        if field.datatype not in _DATATYPES:
            print('Skipping unknown PointField datatype [%d]' % field.datatype, file=sys.stderr)
        else:
            datatype_fmt, datatype_length = _DATATYPES[field.datatype]
            fmt    += field.count * datatype_fmt
            offset += field.count * datatype_length

    return fmt


### 1. Crop data based on topic name, 
### 2. Convert the PointCloud2 into PCD.(use Open3d to handle PCD)
### 3. Estimate hFOV, vFOV, Practical Range.

In [7]:
count=0
horizontal_fov=[]
vertical_fov=[]
p_range=[]

while reader.has_next():
    topic, data, timestamp = reader.read_next()
    if (topic == "/lidars/bpearl_back_left"):
        msg_type = get_message(typename(topic))
        msg = deserialize_message(data, msg_type)


        pcd_as_numpy_array = np.array(list(read_points(msg)))

        o3d_pcd = o3d.geometry.PointCloud()
        o3d_pcd = o3d.geometry.PointCloud(o3d.utility.Vector3dVector(pcd_as_numpy_array))

        points = np.asarray(o3d_pcd.points)

        # Extract x, y, z coordinates
        x = points[:, 0]
        x[np.isnan(x)] = 0.0
        y = points[:, 1]
        y[np.isnan(y)] = 0.0
        z = points[:, 2]
        z[np.isnan(z)] = 0.0

        # Calculate azimuth and elevation angles for each point
        azimuth = np.degrees(np.arctan2(y, x))
        elevation = np.degrees(np.arctan2(z, np.sqrt(x**2 + y**2)))
        ranges = np.sqrt(x**2 + y**2 + z**2)
        

        # Calculate FOVs
        horizontal_fov.append(np.max(azimuth) - np.min(azimuth))
        vertical_fov.append(np.max(elevation) - np.min(elevation))
        p_range.append(np.max(ranges))

        print("Count", count, "Hfov", np.max(azimuth) - np.min(azimuth) , "Vfov", np.max(elevation) - np.min(elevation), "Range", np.max(ranges))
        print()
        count=count+1
    
    

Count 0 Hfov 359.9987546827312 Vfov 89.44014579629805 Range 53.061493140347615

Count 1 Hfov 359.99342144106697 Vfov 89.44012206828836 Range 53.10649876731061

Count 2 Hfov 359.987116493083 Vfov 89.44012206404588 Range 48.287683040835105

Count 3 Hfov 359.99959898493455 Vfov 89.44012205932995 Range 48.20768473654448

Count 4 Hfov 359.99063420621803 Vfov 89.44012205115953 Range 48.23767850218237

Count 5 Hfov 359.99730428724905 Vfov 89.44012205932995 Range 48.26268186957481

Count 6 Hfov 359.9812293984744 Vfov 89.44012203570463 Range 48.20767975971927

Count 7 Hfov 359.9849445590918 Vfov 89.44012205781623 Range 53.0414912624376

Count 8 Hfov 359.97801727361775 Vfov 89.44012204886701 Range 53.11649187416321

Count 9 Hfov 359.991689963818 Vfov 89.44012205352662 Range 53.0714936691109

Count 10 Hfov 359.99456910123695 Vfov 89.44014576703003 Range 52.89149747507602

Count 11 Hfov 359.99083745684106 Vfov 89.44012204754178 Range 52.71649594554897

Count 12 Hfov 359.997238274061 Vfov 89.440122

In [8]:
np.max(horizontal_fov)

359.99960821199613

In [9]:
np.max(vertical_fov)

89.49443204519564

In [10]:
np.max(p_range)

145.05529193348278

In [14]:
# Range, hFOV, vFOV of 2D Lidar

In [11]:
input_bag = "exercise_0.mcap"
reader = rosbag2_py.SequentialReader()
reader.open(
    
        rosbag2_py.StorageOptions(uri=input_bag, storage_id="mcap"),
        rosbag2_py.ConverterOptions(
            input_serialization_format="cdr", output_serialization_format="cdr"
        ),
    )

In [12]:
import numpy as np 
def polar_to_cartesian(scan):
    angles = np.arange(scan.angle_min, scan.angle_max, scan.angle_increment)
    x = scan.ranges * np.cos(angles)
    y = scan.ranges * np.sin(angles)
    return x, y


### 1. Crop data based on topic name
### 2. Convert Polar coordinates into Cartisian (Spacial coverage)
### 3. Estimate hFOV, vFOV, Range of 2D hokuyo lidar 

In [13]:
hFOV=[]
m_range=[]

while reader.has_next():
    topic, data, timestamp = reader.read_next()
    
    if (topic == "/lidars/hokuyo_front_left"):
        msg_type = get_message(typename(topic))
        msg = deserialize_message(data, msg_type)
        x, y = polar_to_cartesian(msg)
        
        hFOV.append(np.arctan2(max(x),max(y)) - np.arctan2(min(x),min(y)))
        
        filtered_range =  np.array([ 0 if i>40 else i for i in msg.ranges])
        filtered_range[np.isnan(filtered_range)] = 0.0
        m_range.append(np.max(filtered_range))

print("FOV  in radians", np.max(hFOV))
print("FOV  in Degrees", np.max(hFOV) *(180/3.14))
print()
print("Maximum Range in Degrees", np.max(m_range))

FOV  in radians 4.334182685732962
FOV  in Degrees 248.4563323031634

Maximum Range in Degrees 39.99399948120117
