# Ground Plane Fitting and Scan Line Run for 3D LiDAR

Ground Plane Fitting (GPF) and Naive Baseline for 3D LiDAR Segmentation

This notebook implements ground segmentation using the Ground Plane Fitting (GPF) algorithm 
proposed in:

"Fast Segmentation of 3D Point Clouds: A Paradigm on LiDAR Data for Autonomous Vehicle Applications"
by D. Zermas, I. Izzat, and N. Papanikolopoulos, 2017.

The implementation also includes a naive baseline method for comparison, as well as 
basic clustering and visualization tools.

# Imports

In [145]:
import numpy as np
from pypcd import pypcd  # Use local pypcd version (Python 3 compatible, no lzf support)
from sklearn.decomposition import PCA  # Used for plane fitting
from sklearn.neighbors import NearestNeighbors  # Used for spatial clustering
import pptk  # To install: pip install pptk
import os
# Note: Ubuntu users may need to fix libz.so.1 symlink issue (see: https://github.com/heremaps/pptk/issues/3)

# Hyperparameters

In [146]:
# Number of lowest points to use for initial ground seed estimation (LPR)
N_LPR = 1000

# Number of iterations for ground plane refinement
N_ITERATIONS = 3

# Height threshold above the LPR to select initial seed points
TH_SEEDS = 0.4

# Distance threshold to classify a point as ground
TH_DISTANCE_FROM_PLANE = 0.2

# Naive Baseline Method

In [147]:
def naive_ground_extractor(point_cloud, num_lowest_points):
    """
    Naive ground extraction method (baseline).
    
    This simple method selects the points with the lowest Z values 
    and assumes they belong to the ground surface. It does not model 
    the ground plane and is used as a baseline for comparison against 
    more robust algorithms like Ground Plane Fitting (GPF).
    
    Args:
        point_cloud (np.ndarray): N x 3 or N x D array of point cloud data.
        num_lowest_points (int): Number of points with lowest Z values to classify as ground.
    
    Returns:
        ground_indices (np.ndarray): Indices of the selected ground points.
    """

    # Step 1: Sort points by Z axis (height)
    sorted_indices = np.argsort(point_cloud[:, 2])

    # Step 2: Take the first N indices as ground
    ground_indices = sorted_indices[:num_lowest_points]

    return ground_indices

# Ground Plane Fitting (GPF)

In [148]:
def gpf_extract_initial_seeds_ids(point_cloud, number_of_lowest_points, thresh_seeds=0.4):
    """
    Extract initial seed points for ground plane estimation (GPF).
    
    Args:
        point_cloud (np.ndarray): N x 3 or N x 4 array of points (x, y, z[, class]).
        number_of_lowest_points (int): number of lowest Z points to average as LPR.
        thresh_seeds (float): threshold to select seeds close to LPR height.
    
    Returns:
        seeds_ids (np.ndarray): indices of points selected as initial seeds.
    """

    # Step 1: Sort the point cloud by Z axis (height)
    sorted_indices = np.argsort(point_cloud[:, 2])          # Get indices sorted by height
    sorted_points = point_cloud[sorted_indices]             # Apply sorting

    # Step 2: Compute LPR (Lowest Point Representative)
    LPR_height = np.mean(sorted_points[:number_of_lowest_points, 2])

    # Step 3: Select points that are within threshold distance from LPR
    seed_mask = sorted_points[:, 2] < (LPR_height + thresh_seeds)
    seeds_ids = sorted_indices[seed_mask]

    return seeds_ids

In [149]:
def gpf_estimate_plane(seed_points):
    """
    Estimate ground plane from seed points using PCA.
    
    Args:
        seed_points (np.ndarray): N x 3 or N x D array of selected ground seed points.
    
    Returns:
        normal_vector (np.ndarray): Unit normal vector [a, b, c] of the estimated plane.
        d (float): Plane offset in the equation ax + by + cz + d = 0
    """

    # Step 1: Compute the mean position of the seed points
    mean_point = np.mean(seed_points, axis=0)

    # Step 2: Center the point cloud around the origin
    centered = seed_points - mean_point

    # Step 3: Use PCA to estimate the plane's normal vector
    pca = PCA(n_components=3)
    pca.fit(centered)
    normal_vector = pca.components_[-1]  # Last component corresponds to the direction with least variance

    # Step 4: Compute the 'd' value using plane equation: n·x + d = 0 → d = -n·x̄
    d = -np.dot(normal_vector[:3], mean_point[:3])

    return normal_vector[:3], d

In [150]:
def gpf_refinement(point_cloud, initial_seed_indices, distance_threshold=0.2, num_iterations=5):
    """
    Iteratively refine the ground plane estimation using seed points and distance threshold.
    
    Args:
        point_cloud (np.ndarray): N x 3 or N x D array of input point cloud.
        initial_seed_indices (np.ndarray): Indices of initial ground seed points.
        distance_threshold (float): Max allowed point-to-plane distance to be considered ground.
        num_iterations (int): Number of iterations for refinement loop.
    
    Returns:
        ground_indices (np.ndarray): Indices of points classified as ground.
        non_ground_indices (np.ndarray): Indices of points classified as non-ground.
    """

    seed_indices = initial_seed_indices

    for _ in range(num_iterations):
        # Step 1: Estimate plane from current seed points
        normal_vector, d = gpf_estimate_plane(point_cloud[seed_indices])

        # Step 2: Compute point-to-plane distances
        numerator = np.abs(np.dot(point_cloud[:, :3], normal_vector) + d)
        denominator = np.linalg.norm(normal_vector)
        distances = numerator / denominator

        # Step 3: Classify points as ground or non-ground
        is_ground = distances < distance_threshold
        is_not_ground = ~is_ground

        # Step 4: Update seed indices with the new ground points
        seed_indices = np.where(is_ground)[0]

    # Final classification after last iteration
    ground_indices = np.where(is_ground)[0]
    non_ground_indices = np.where(is_not_ground)[0]

    return ground_indices, non_ground_indices

# Scan Line Run (SLR)

In [151]:
def cluster_connected_components(points, eps=0.5, min_samples=1):
    """
    Cluster points into connected components using radius-based neighborhood and Union-Find.

    This method groups spatially connected points into clusters. It is a simplified version 
    of DBSCAN without density-based core points — it only relies on spatial proximity and 
    minimum cluster size filtering.

    Args:
        points (np.ndarray): N x 3 (or N x D) array of point coordinates.
        eps (float): Maximum distance between neighbors (radius in meters).
        min_samples (int): Minimum number of points to form a valid cluster.

    Returns:
        final_labels (np.ndarray): Cluster labels for each point (-1 for outliers).
    """
    # Step 1: Build neighborhood graph using radius search
    neighbors = NearestNeighbors(radius=eps).fit(points)
    adjacency = neighbors.radius_neighbors_graph(points, mode='connectivity').tocoo()

    # Step 2: Union-Find (Disjoint Set) initialization
    parent = np.arange(len(points))

    def find(i):
        # Path compression
        while parent[i] != i:
            parent[i] = parent[parent[i]]
            i = parent[i]
        return i

    def union(i, j):
        # Union by parent
        root_i, root_j = find(i), find(j)
        if root_i != root_j:
            parent[root_i] = root_j

    # Step 3: Connect all neighbors
    for i, j in zip(adjacency.row, adjacency.col):
        if i != j:
            union(i, j)

    # Step 4: Assign cluster labels
    labels = np.array([find(i) for i in range(len(points))])

    # Step 5: Filter small clusters (outliers)
    unique_labels, counts = np.unique(labels, return_counts=True)
    valid_labels = unique_labels[counts >= min_samples]
    mask = np.isin(labels, valid_labels)

    # Step 6: Remap valid clusters to consecutive indices, mark outliers as -1
    final_labels = -np.ones_like(labels)
    final_labels[mask] = np.searchsorted(valid_labels, labels[mask])
    return final_labels

In [152]:
def process_non_ground_clusters(point_cloud, non_ground_indices, eps=0.5, min_pts=1):
    """
    Apply spatial clustering to points not classified as ground.

    This function filters out ground points (using provided indices),
    and applies connected components clustering to the remaining non-ground points.

    Args:
        point_cloud (np.ndarray): N x 4 array with [x, y, z, class] per point.
        ground_indices (np.ndarray): Indices of points classified as ground.

    Returns:
        cluster_labels (np.ndarray): Cluster label for each non-ground point (-1 for noise).
        non_ground_indices (np.ndarray): Indices of the non-ground points in the original array.
    """

    # Step 1: Identify non-ground point indices
    # total_points = point_cloud.shape[0]
    # all_indices = np.arange(total_points)
    # non_ground_indices = np.setdiff1d(all_indices, ground_indices)

    # Step 2: Extract non-ground points for clustering
    non_ground_points = point_cloud[non_ground_indices, :3]  # Use only XYZ for clustering

    # Step 3: Apply spatial clustering to non-ground points
    cluster_labels = cluster_connected_components(non_ground_points, eps=eps, min_samples=min_pts)

    return cluster_labels, non_ground_indices


# Visualizer for Point Clouds

In [153]:
def visualize_clusters(
    point_cloud,
    non_ground_indices=None,
    cluster_labels=None,
    point_size=0.03,
    show_ground=True,
    show_clusters=True,
    show_unlabeled=True
):
    """
    Visualize a point cloud with consistent class colors using pptk.

    Args:
        point_cloud (np.ndarray): N x 4 array with [x, y, z, class] per point.
        non_ground_indices (np.ndarray): Indices of non-ground points.
        cluster_labels (np.ndarray): Cluster labels for non-ground points (-1 = noise).
        point_size (float): Size of points in viewer.
        cluster_offset (int): Class offset where clusters begin (default: 2).
        show_ground (bool): Show ground points (class == 1).
        show_clusters (bool): Show clustered non-ground points.
        show_unlabeled (bool): Show points with class == 0.

    Returns:
        None
    """
    # Copy the point cloud to avoid modifying original
    pc = np.copy(point_cloud)

    # Assign cluster labels if provided
    if cluster_labels is not None and non_ground_indices is not None and show_clusters:
        for cluster_id in np.unique(cluster_labels):
            if cluster_id == -1:
                continue  # skip noise
            cluster_point_ids = non_ground_indices[cluster_labels == cluster_id]
            pc[cluster_point_ids, 3] = cluster_id

    # Create mask for points to visualize
    show_mask = np.full(pc.shape[0], False)

    if show_unlabeled:
        show_mask |= (pc[:, 3] == 0)

    if show_ground:
        show_mask |= (pc[:, 3] == 1)

    if show_clusters and cluster_labels is not None:
        show_mask |= (pc[:, 3] > 1)

    # Extract visible points and labels
    xyz = pc[show_mask, :3]
    class_labels = pc[show_mask, 3].astype(int)

    # Fixed color palette per class
    fixed_colors = {
        0: [1.0, 1.0, 1.0],  # unlabeled - white
        1: [1.0, 0.0, 1.0],  # ground    - violet
    }

    # Re-map visible class labels to continuous local indices for pptk
    unique_labels = np.unique(class_labels)
    label_to_index = {label: idx for idx, label in enumerate(unique_labels)}
    mapped_labels = np.array([label_to_index[label] for label in class_labels])

    # Build color map in the local label space
    color_map = []
    for label in unique_labels:
        if label in fixed_colors:
            color_map.append(fixed_colors[label])
        else:
            np.random.seed(label)  # keep color stable per class
            color_map.append(np.random.rand(3))  # fallback

    # Launch viewer
    viewer = pptk.viewer(xyz, mapped_labels)
    viewer.set(point_size=point_size)
    viewer.color_map(color_map)

# Main

In [154]:
def main(pcd_file):
    """
    Process a single point cloud file using GPF + optional clustering.
    Visualizes result using pptk.
    """

    # Load PCD file
    point_cloud_from_path = pypcd.PointCloud.from_path(pcd_file)

    # Create Nx4 array: x, y, z, class
    point_cloud = np.stack((
        point_cloud_from_path.pc_data['x'],
        point_cloud_from_path.pc_data['y'],
        point_cloud_from_path.pc_data['z'],
        np.zeros((point_cloud_from_path.pc_data.shape[0]))  # default class 0
    ), axis=1)

    # Step 1: Extract ground seeds using GPF
    seeds_ids = gpf_extract_initial_seeds_ids(point_cloud, N_LPR, TH_SEEDS)

    # Step 2: Refine ground segmentation
    ground_ids, non_ground_ids = gpf_refinement(point_cloud, seeds_ids, TH_DISTANCE_FROM_PLANE, N_ITERATIONS)

    # Mark ground points with class = 1
    point_cloud[ground_ids, 3] = 1

    # Step 3: Cluster non-ground points (SLR-style)
    cluster_labels, non_ground_ids = process_non_ground_clusters(point_cloud, non_ground_ids, eps=0.5, min_pts=1)

    # Step 4: Visualize result
    visualize_clusters(point_cloud, non_ground_ids, cluster_labels)    

# Run Example

In [155]:
pcd_file = './pointclouds/1504941060.199916000.pcd'

point_cloud_from_path = pypcd.PointCloud.from_path(pcd_file)

point_cloud = np.stack((point_cloud_from_path.pc_data['x'], 
                        point_cloud_from_path.pc_data['y'], 
                        point_cloud_from_path.pc_data['z'], 
                        np.zeros((point_cloud_from_path.pc_data.shape[0]))), 
                        axis=1)


seeds_ids = gpf_extract_initial_seeds_ids(point_cloud, N_LPR, TH_SEEDS)
ground_ids, non_ground_ids = gpf_refinement(point_cloud, seeds_ids, TH_DISTANCE_FROM_PLANE, N_ITERATIONS)
point_cloud[ground_ids, 3] = 1 # 1 for ground points
cluster_labels, non_ground_ids = process_non_ground_clusters(point_cloud, non_ground_ids, eps=0.5, min_pts=1)

visualize_clusters(point_cloud, non_ground_ids, cluster_labels)

In [156]:
main('./pointclouds/1504941055.292141000.pcd')
main('./pointclouds/1504941060.199916000.pcd')

In [157]:
def run_all_pointclouds(folder_path='./pointclouds'):
    """
    Find and process all .pcd files in the specified folder using the main() function.
    """
    pcd_files = sorted([f for f in os.listdir(folder_path) if f.endswith('.pcd')])

    for filename in pcd_files:
        file_path = os.path.join(folder_path, filename)
        print(f"\nProcessing: {filename}")
        try:
            main(file_path)
        except Exception as e:
            print(f"Error processing {filename}: {e}")

# Run the batch
# run_all_pointclouds('./pointclouds')