In this notebook, different statistics of the detected ramp planes will be analyzed and visualized.<br>
Also a score will be calculated to determine how well the algorithm works.<br>
This will be done for multiple rosbags to get an average score.

In [1]:
from tf.transformations import euler_from_quaternion, euler_matrix
import ros_numpy
import pcl
import numpy as np
import math
import rosbag
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.offline import plot
from plotly.subplots import make_subplots
import plotly.io as pio
pio.templates.default = 'plotly_dark'
from IPython import display

Class of the actual ramp detection algorithm

In [2]:
# My ROS Node
class VisualDetection():

    def __init__(self):
        self.calibrated = False
        self.rp = [0, 0]
        self.ramp_data = [[] for i in range(5)]
        self.publish_clouds = False
        self.arr = np.zeros((1,3))
        # x coordinates where ramp starts
        self.ramp_start = x_range[0]

    def spin(self, cloud, pose):
        self.cloud = cloud
        self.pose = pose
        # Robosense Lidar has a rate of 10 Hz
        # Convert PointCloud2 msg to numpy array
        pc_array = ros_numpy.point_cloud2.pointcloud2_to_xyz_array(self.cloud, remove_nans=True)

        # Apply lidar to car frame transformation
        pc_array_tf = self.transform_pc(pc_array, roll=0, pitch=19.174, yaw=0, transl_x=-1.944)

        # Filter unwanted points (to reduce point cloud size) with passthrough filter
        # TODO: Check if Points further than 30m have intensity less than 10%
        # * Max Range of lidar is 100m (30m @ 10% NIST)
        pc_array_cut = self.reduce_pc(pc_array_tf, 0, 30, -3.5, 3.5, -1, 1.5)
            
        # Convert numpy array to pcl object
        pc_cut = pcl.PointCloud()
        pc_cut.from_array(pc_array_cut.astype('float32'))

        # Downsample point cloud using voxel filter to further decrease size
        pc_small = self.voxel_filter(pc_cut, 0.1)
        if self.publish_clouds: self.publish_pc(pc_small.to_list(), 'pc_small')

        # Perform RANSAC until no new planes are being detected
        plane_coor, data = self.plane_detection(pc_small, 100, 4)
        return plane_coor, data, self.relative_to_absolute(pc_array_cut)
        # self.ramp_detection_confidence(10, 5, data)

    def transform_pc(self, pc, roll=0, pitch=0, yaw=0, transl_x=1.753, transl_y=0, transl_z=1.156):
        """Transformation from Lidar frame to car frame. Rotation in rad and translation in m."""
        # Rotation matrix
        rot = euler_matrix(roll, pitch, yaw, 'sxyz')[:3, :3]
        # Apply rotation
        pc_tf = np.inner(pc, rot)

        # Translation
        translation = [transl_x, transl_y, transl_z]
        # Combine rotation and translation
        pc_tf += translation
        return pc_tf

    def reduce_pc(self, pc, x_lower, x_upper, y_lower, y_upper, z_lower, z_upper):
        """Removes points outside of box"""
        pc_cut = pc[(pc[:, 0] > x_lower) & (pc[:, 0] < x_upper) & (pc[:, 1] > y_lower) & (
            pc[:, 1] < y_upper) & (pc[:, 2] > z_lower) & (pc[:, 2] < z_upper)]
        return pc_cut

    def voxel_filter(self, pc, leaf_size):
        """Downsample point cloud using voxel filter"""
        vg = pc.make_voxel_grid_filter()
        # Leaf_size is the length of the side of the voxel cube in m
        vg.set_leaf_size(leaf_size, leaf_size, leaf_size)
        pc_filtered = vg.filter()
        # print('Reduced size from {} to {}'.format(pc.size, pc_filtered.size))
        return pc_filtered

    def plane_detection(self, pc, min_points, max_planes):
        """Detects all planes in point cloud"""
        # Ground vector
        g_vec = None
        counter = 0
        while pc.size > min_points and counter < max_planes:
            # Detect most dominate plane and get inliers and normal vector
            indices, coefficients = self.ransac(pc)
            n_vec = coefficients[:-1]

            # Split pointcloud in inliers and outliers of plane
            pc, plane, pc_points = self.split_pc(pc, indices)

            # Exit if plane is empty
            if not plane:
                # print('ERROR: No plane could be detected')
                return [], []

            # Ignore walls to the side or in front
            if self.is_plane_near_ground(n_vec):
                # First ground like detection is most probably the ground
                if g_vec is None:
                    g_vec = n_vec
                    if self.publish_clouds: self.publish_pc(plane, 'pc_ground')
                # Either ground is detected again or potential ramp
                else:
                    # self.pub_angle.publish(angle)
                    is_ramp, data = self.ramp_detection(plane, g_vec, n_vec, 4, 7, 2, 4)
                    if is_ramp:
                        # Transform plane from local rslidar coordinates to global map coordinates
                        plane_global = self.relative_to_absolute(plane)
                    
                        if self.publish_clouds: self.publish_pc(plane, 'pc_ramp')
                        return plane_global, data
                    else:  
                        continue
            counter += 1
        return [], []

    def relative_to_absolute(self, pc):
        """Transforms relative lidar data to absolute by adding translation rotating"""
        # pc_arr = pc.to_array()
        pc_arr = np.array(pc)
    
        # Odometer
        pos = self.pose.position
        translation = [pos.x, pos.y, pos.z]
        ori = self.pose.orientation
        quat = [ori.x, ori.y, ori.z, ori.w]
        roll, pitch, yaw = euler_from_quaternion(quat)

        # Rotation matrix
        rot = euler_matrix(roll, pitch, yaw, 'sxyz')[:3, :3]
        # Apply rotation
        pc_tf = np.inner(pc_arr, rot)

        # Combine rotation and translation
        pc_tf += translation
        return pc_tf

    def ramp_detection(
            self, plane, g_vec, n_vec, min_angle, max_angle, 
            min_width, max_width, logging=False):
        """Checks if conditions to be considered a ramp are fullfilled."""
        # Convert pcl plane to numpy array
        plane_array = np.array(plane)

        # Calculate angle [deg] between new and previously recorded normal vector of ground
        angle = self.angle_calc(g_vec, n_vec)
        # Assert ramp angle threshold
        if min_angle <= angle <= max_angle:
            if logging: print('ANGLE PASSED')
            pass
        else:
            if logging: print('Angle wrong with {}'.format(angle))
            return False, []

        # Get ramp width (Difference between y-values)
        width = max(plane_array[:, 1]) - min(plane_array[:, 1])
        # Assert ramp width threshold
        if min_width <= width <= max_width:
            if logging: print('WIDTH PASSED')
            pass
        else:
            if logging: print('Width wrong with {}'.format(width))
            return False, []

        # Ramp distance (x-value of nearest point of the plane)
        dist = min(plane_array[:,0])

        self.angle = angle
        self.d = dist
        if logging:
            print('Possible ramp in {:05.2f}m with angle {:05.2f}deg and width {:05.2f}m'.format(
            dist, angle, width))        
        true_dist = self.ramp_start - self.pose.position.x
        return True, [angle, width, dist, true_dist]

    def ransac(self, pc):
        """Finds inliers and normal vector of dominant plane"""
        # 50?
        seg = pc.make_segmenter_normals(50)
        # Doubles the speed if True
        seg.set_optimize_coefficients(True)
        seg.set_model_type(pcl.SACMODEL_NORMAL_PLANE)
        seg.set_method_type(pcl.SAC_RANSAC)
        # How close a point must be to model to be considered inlier
        seg.set_distance_threshold(0.01)
        # normal_distance_weight?
        seg.set_normal_distance_weight(0.01)
        # How many tries
        seg.set_max_iterations(100)
        indices, coefficients = seg.segment()
        return indices, coefficients

    def split_pc(self, pc, inliers):
        """Extract detected plane from point cloud and split into two pcs"""
        # Get point cooridnates of plane
        detected_plane = [pc[i] for i in inliers]
        # Point cloud of detected plane (inliers)
        pc_inliers = pc.extract(inliers)

        # Point cloud of outliers
        outlier_indices = list(set(np.arange(pc.size)).symmetric_difference(inliers))
        pc_outliers = pc.extract(outlier_indices)

        return pc_outliers, detected_plane, pc_inliers

    def is_plane_near_ground(self, v, threshold=0.8):
        """Returns True if plane is on the ground (and false if e.g. side wall)"""
        return abs(v[2]) > threshold   

    def angle_calc(self, v1, v2, degrees=True):
        """Calculate angle between two vectors (planes)"""
        # Assuming both vectors can be rotated alongside one axis to be aligned
        dot = np.dot(v1, v2)
        if dot <= 1:
            angle = np.arccos(dot)
        else:
            # print('ERROR: dot product > 1')
            # print('v1 is {} abs({}) and v2 is {} abs ({})'.format(v1, np.linalg.norm(v1), v2, np.linalg.norm(v2)))
            angle = 0

        if degrees is True:
            return np.degrees(angle)
        else:
            return angle

Load data (pointcloud and odom) from rosbag and make sure both topics are synchronized

In [3]:
class GetScore():
    def __init__(self, bag_name, x_range, y_range):
        self.bag_name = bag_name
        self.x_range = x_range
        self.y_range = y_range

    def extract_data(self):
        bag_path = '/home/user/rosbags/big/evaluation/' + self.bag_name
        # Load rosbag
        bag = rosbag.Bag(bag_path)

        # Synch help
        msg_pose_t = []
        msg_lidar_t = []

        # Extract data from hdl bag
        # Contains raw lidar data and the position/orientation of car
        pose = []
        for topic, msg, t in bag.read_messages(topics='/odom'):
            msg_pose_t.append(t.to_time())
            pose.append(msg.pose.pose)
        lidar = []
        for topic, msg, t in bag.read_messages(topics='/right/rslidar_points'):
            msg_lidar_t.append(t.to_time())
            lidar.append(msg)

        # Difference between length of the two topics
        len_diff = len(msg_lidar_t) - len(msg_pose_t)
        # Time difference at start and end of recording
        diff1 = msg_pose_t[0] - msg_lidar_t[0]
        diff2 = msg_pose_t[-1] - msg_lidar_t[-1]
        if  diff1 > 0:
            lidar = lidar[len_diff:]
        else:
            print(msg_pose_t[abs(len_diff)] - msg_lidar_t[0])
        
        return lidar, pose

Run algorithm with rosbag data.<br>
Store many different calculated metrics generated by the algorithm in a pandas data frame (to allow a faster plotting).

In [4]:
class GetScore(GetScore):
    def run_the_algorithm(self, lidar, pose):
        # Create instance of class (using standard parameters):
        vd = VisualDetection()
        # Lists to fill, will contain entry for each plane
        planes = []
        ramp_stats = []
        true_dist = []
        all_points = []
        for i in range(len(lidar)):
            plane_points, data, pc_whole = vd.spin(lidar[i], pose[i])
            planes.append(plane_points)
            ramp_stats.append(data)
            all_points.append(pc_whole)
            # True distance to start of ramp
            true_dist.append(self.x_range[0] - pose[i].position.x)
        true_dist = np.array(true_dist)
        return planes, ramp_stats, true_dist

    def convert_to_df(self, planes, ramp_stats):
        # Remove empty lists (when no ramp has been detected)
        # because information such as angle, width of ramp etc 
        # are only stored when a ramp is detected, 0 otherwise
        ramp_arrays = [x for x in planes if not isinstance(x, list)]
        ramp_stats = [x for x in ramp_stats if x != []]
        # Get indices where ramp has been detected
        ramp_indices = [i for i,v in enumerate(planes) if not isinstance(v, list)]

        # Convert list to dictionary
        dic = []
        for i, arr in enumerate(ramp_arrays):
            for j, point in enumerate(arr):
                dic.append(
                    {
                        'sampleIdx': i,
                        'pointIdx': j,
                        'x': point[0],
                        'y': point[1],
                        'z': point[2]
                    }
                )
        # And finally to pandas data frame
        df = pd.DataFrame(dic)
        # Reorder columns
        df = df[['sampleIdx', 'pointIdx', 'x', 'y', 'z']]
        return df, ramp_stats

    def ground_truth_check(self, df, ramp_stats):
        # Check if a point lies within ramp region
        lies_inside = []
        for i,x in enumerate(df['x']):
            if self.x_range[0] < x < self.x_range[1]:
                if self.y_range[0] < df['y'][i] < self.y_range[1]:
                    # True if x and y coordinate inside region
                    lies_inside.append(True)
                else:
                    lies_inside.append(False)
            else:
                lies_inside.append(False)
        # Add column (bool: if point lies in region) to data frame
        df['inlier'] = lies_inside

        # Calculate how many points of each sample lie in ramp region
        true_inliers = []
        samples_num = df.sampleIdx.max() + 1
        for i in range(samples_num):
            # Bool list of inliers and outliers of sample
            bool_lst = df[df['sampleIdx'] == i]['inlier']
            # Percentage of inliers of sample
            true_inliers.append(sum(bool_lst) / float(len(bool_lst)))

        # New dataframe with stats for each frame
        # Structure reminder of ramp_stats: [angle, width, dist, true_dist]
        dic = []
        for i in range(samples_num):
            dic.append(
                {
                    'sampleIdx': i,
                    'TrueInliers': true_inliers[i],
                    'Angle': ramp_stats[i][0],
                    'Width': ramp_stats[i][1],
                    'Dist': ramp_stats[i][2],
                    'TrueDist': ramp_stats[i][3],
                }
            )
        # Convert dictionary to dataframe
        df_stats = pd.DataFrame(dic)
        # Reorder columns
        df_stats = df_stats[['sampleIdx', 'TrueInliers', 'Angle', 'Width', 'Dist', 'TrueDist']]
        return df_stats

    def some_plot(self, true_dist, df_stats):
        expected_detections = []
        actual_detections = []
        # Split dataset in 1m intervals
        for i in range(15, -5, -1):
            # Filter distance to ramp (from odom data) in 1m intervals
            expected_detections.append(len(true_dist[(i < true_dist) & (true_dist < i + 1)]))
            # Get corresponding distance estimations
            actual_detections.append(len(df_stats['TrueDist'][(i < df_stats['TrueDist']) & (df_stats['TrueDist'] < i + 1)]))
        # Create pandas data frame
        eval_df = pd.DataFrame()
        # Distance to ramp e.g. a value of 15 means interval from 16m to 15m 
        eval_df['distToRamp'] = range(15, -5, -1)
        eval_df['expectedDetections'] = expected_detections
        eval_df['actualDetections'] = actual_detections

        # Plot
        fig = go.Figure()
        fig.add_trace(go.Bar(
            y = eval_df['expectedDetections'], x=eval_df['distToRamp'],
            name="False negatives (Not detected)",
            offsetgroup=0
        ))
        fig.add_trace(go.Bar(
            y = eval_df['actualDetections'], x=eval_df['distToRamp'],
            name="True positives (Detected)",
            offsetgroup=0
        ))
        fig.update_xaxes(title_text="Distance to ramp [m]", autorange='reversed')
        fig.update_yaxes(title_text="Number of ramps detected")
        fig.update_layout(title_text="How many samples were collected in 1m intervals and how many of them have been identified as ramp")
        fig.show()
        return fig

    def calc_score(self, df_stats, true_dist, min_dist, max_dist):
        # Sensitivity
        # How many sample were recorded in given range
        expected_detections = len(true_dist[(min_dist < true_dist) & (true_dist < max_dist)])
        # How many samples were identified as ramp in given range
        actual_detections = len(df_stats['TrueDist'][(min_dist < df_stats['TrueDist']) & (df_stats['TrueDist'] < max_dist)])
        # Calculate ratio
        sensitivity = float(actual_detections) / expected_detections
        print(self.bag_name)
        print('Out of {} recorded samples {} were detected in the range of {} to {}m before the ramp.'.format(
            expected_detections, actual_detections, min_dist, max_dist))
        print('Resulting in a score of {:.2f}%\n'.format(sensitivity*100))

        # Inlier score 
        f = df_stats['TrueInliers'][(min_dist < df_stats['TrueDist']) & (df_stats['TrueDist'] < max_dist)]
        score2 = sum(f) / len(f)
        print('Of the {} detected planes {:.2f}% inlier points were actually inside the ramp region.\n'.format(
            actual_detections, score2*100))

        # False positives?  
        f = df_stats['TrueInliers'][(min_dist < df_stats['TrueDist']) & (df_stats['TrueDist'] < max_dist)]
        thresh = 0.5
        true_detections = sum(f > thresh)
        score3 = float(true_detections) / len(f)
        print('Of the {} detected planes {} had at least {}% of points inside the ramp region.'.format(
            actual_detections, true_detections, int(thresh*100)))
        print('Resulting in a score of {:.2f}%\n\n'.format(score3*100))
        
        scores = [sensitivity, score2, score3]
        return scores

    # Run everything
    def boss_method(self, min_dist=4, max_dist=10, show_plot=True):
        lidar, pose = self.extract_data()
        planes, ramp_stats, true_dist = self.run_the_algorithm(lidar, pose)
        df, ramp_stats = self.convert_to_df(planes, ramp_stats)
        df_stats = self.ground_truth_check(df, ramp_stats)
        if show_plot: self.some_plot(true_dist, df_stats)
        scores = self.calc_score(df_stats, true_dist, min_dist, max_dist)
        return scores


- Straight onto the curved ramped

<img src="Images/2021-10-19-15-30-50_map_odom.png" alt="Drawing" style="width: 1200px;"/>

- Straight onto the straight ramp

<img src="Images/2021-10-19-15-43-09_map_odom.png" alt="Drawing" style="width: 1200px;"/>

- Driving at a slight angle onto the ramp in -1th floor

<img src="Images/2021-10-19-15-52-21_map_odom.png" alt="Drawing" style="width: 1200px;"/>



In [5]:
# Rosbag names to evaluate
bag_names = [
    '2021-10-19-15-30-50_hdl.bag', 
    '2021-10-19-15-43-09_hdl.bag', 
    '2021-10-19-15-52-21_hdl.bag']
    
# Ground truth ramp region for each bag
xy_range = [[[24, 36], [-2.6, 1.5]],
            [[20, 32], [-1.4, 2.5]], 
            [[14.5, 24], [1.5, 5.8]]]

scores = []
for i,bag_name in enumerate(bag_names):
    # Extract ramp region data
    x_range, y_range = xy_range[i]
    # Create instance of class
    gs = GetScore(bag_name, x_range, y_range)
    score = gs.boss_method()
    scores.append(score)

2021-10-19-15-30-50_hdl.bag
Out of 54 recorded samples 46 were detected in the range of 4 to 10m before the ramp.
Resulting in a score of 85.19%

Of the 46 detected planes 90.37% inlier points were actually inside the ramp region.

Of the 46 detected planes 42 had at least 50% of points inside the ramp region.
Resulting in a score of 91.30%




2021-10-19-15-43-09_hdl.bag
Out of 28 recorded samples 17 were detected in the range of 4 to 10m before the ramp.
Resulting in a score of 60.71%

Of the 17 detected planes 61.64% inlier points were actually inside the ramp region.

Of the 17 detected planes 11 had at least 50% of points inside the ramp region.
Resulting in a score of 64.71%




2021-10-19-15-52-21_hdl.bag
Out of 31 recorded samples 12 were detected in the range of 4 to 10m before the ramp.
Resulting in a score of 38.71%

Of the 12 detected planes 65.87% inlier points were actually inside the ramp region.

Of the 12 detected planes 9 had at least 50% of points inside the ramp region.
Resulting in a score of 75.00%


