In [312]:
import numpy as np
import open3d as o3d
import copy
import math
import itertools
import sys
import matplotlib.pyplot as plt

from collections import Counter

## Loading in Point cloud data

UPDATE:
- P003, P013 work successfully
- P013, P014, P015, P017 also work successfully, but require altering z value bounding box plane; otherwise doesn't work.
- P010, P005, P006 works well, but requires sacrificing small details of feet

- P006

18/09:
- P003, P013, P014 works
- try colour threshold around feet


- work on feet refinement new AUT data
- focus on setting error checking pipeline

for report:
- talk about automatic cleaning w manual cleaning - error measure
- how much error is introduced with automatic cleaning - look at benefits / cons

In [313]:
import config as cfg

path = "./data/P001.ply"

X = cfg.PreLoad()
pcd = X.load(path)

print("PLY file loaded.")
print("Shape of points", np.asarray(pcd.points).shape)
print("Shape of colors", np.asarray(pcd.colors).shape)
o3d.visualization.draw_geometries([pcd])

PLY file loaded.
Shape of points (247847, 3)
Shape of colors (247847, 3)


In [314]:
pcd_vis = copy.deepcopy(pcd) # need to do this to create a deep copy of pcd

def viewClustersViaColours(pcd: o3d.cpu.pybind.geometry.PointCloud) -> None:
    '''
        Visualising pcd coloured by cluster applying DBSCAN (Density-Based Spatial Clustering of Applications with Noise) algorithm
        Alter cluster_dbscan() params for different results
            eps: max dist between two samples for them to be considered same neighbourhood
            min_points: min number of points required to form a cluster
    '''
    
    labels = np.array(pcd.cluster_dbscan(eps=0.03, min_points=3, print_progress=True))
    max_label = labels.max()
    print(f"point cloud has {max_label + 1} clusters")
    colors = plt.get_cmap("tab20")(labels / (max_label if max_label > 0 else 1)) # avoids div error
    colors[labels < 0] = 0 # label=-1 indicates noise
    pcd.colors = o3d.utility.Vector3dVector(colors[:, :3])
    o3d.visualization.draw_geometries([pcd])

viewClustersViaColours(pcd_vis)

point cloud has 21 clusters


In [315]:
def remove_floor_by_plane_fitting(pcd: o3d.cpu.pybind.geometry.PointCloud, distance_threshold=0.01) -> o3d.cpu.pybind.geometry.PointCloud:
    # Use RANSAC plane segmentation
    plane_model, inliers = pcd.segment_plane(distance_threshold=distance_threshold,
                                             ransac_n=3,
                                             num_iterations=1000)
    [a, b, c, d] = plane_model
    print(f"Plane equation: {a:.2f}x + {b:.2f}y + {c:.2f}z + {d:.2f} = 0")

    # Remove points that belong to the plane (i.e., the floor)
    pcd_without_floor = pcd.select_by_index(inliers, invert=True)
    
    return pcd_without_floor

# Segment and remove the floor using plane fitting
pcd_without_floor = remove_floor_by_plane_fitting(pcd)
o3d.visualization.draw_geometries([pcd_without_floor])

Plane equation: -0.00x + -0.00y + 1.00z + -0.04 = 0


In [316]:
from ipywidgets import (
    interact,
    interactive,
    Button,
    FloatSlider,
    FloatText,
)
from ipywidgets import jslink as link
from IPython.display import display
import ipywidgets as widgets

# holds final state of point cloud
final_pcd = None

pcd_tmp = np.asarray(pcd.points)

min_x, min_y, min_z = pcd_tmp.min(axis=0)
max_x, max_y, max_z = pcd_tmp.max(axis=0)


def create_grid_on_plane(thresh, axis, color):
    lines = []
    colors = []
    for i in np.linspace(-1, 1, 10):  # Adjust these values as per your requirements
        # vertical lines
        start = (
            [i, thresh, -1]
            if axis == "y"
            else ([thresh, i, -1] if axis == "x" else [-1, i, thresh])
        )
        end = (
            [i, thresh, 1]
            if axis == "y"
            else ([thresh, i, 1] if axis == "x" else [1, i, thresh])
        )
        lines.append([start, end])
        colors.append(color)
        # horizontal lines
        start = (
            [-1, thresh, i]
            if axis == "y"
            else ([thresh, -1, i] if axis == "x" else [i, -1, thresh])
        )
        end = (
            [1, thresh, i]
            if axis == "y"
            else ([thresh, 1, i] if axis == "x" else [i, 1, thresh])
        )
        lines.append([start, end])
        colors.append(color)

    line_set = o3d.geometry.LineSet(
        points=o3d.utility.Vector3dVector(np.array(lines).reshape(-1, 3)),
        lines=o3d.utility.Vector2iVector(
            np.array([[i, i + 1] for i in range(0, len(lines) * 2, 2)])
        ),
    )

    line_set.colors = o3d.utility.Vector3dVector(colors)
    return line_set


def view_and_adjust_threshold(
    thresh_x=min_x,
    thresh_x_dup=max_x,
    thresh_y=min_y,
    thresh_y_dup=max_y,
    thresh_z=min_z,
):
    global final_pcd

    pcd_tmp = np.asarray(pcd.points)
    pcd_tmpc = np.asarray(pcd.colors)

    indices = (
        ((pcd_tmp[:, 0] > thresh_x) & (pcd_tmp[:, 0] < thresh_x_dup))
        & ((pcd_tmp[:, 1] > thresh_y) & (pcd_tmp[:, 1] < thresh_y_dup))
        & (pcd_tmp[:, 2] > thresh_z)
    )
    pcd_tmp_tmp = pcd_tmp[indices]
    pcd_tmp_tmpc = pcd_tmpc[indices]

    filtered = o3d.geometry.PointCloud()
    filtered.points = o3d.utility.Vector3dVector(pcd_tmp_tmp)
    filtered.colors = o3d.utility.Vector3dVector(pcd_tmp_tmpc)

    final_pcd = filtered

    plane_x = create_grid_on_plane(thresh_x, "x", [1, 0, 0])  # red color for x plane
    plane_y = create_grid_on_plane(thresh_y, "y", [0, 1, 0])  # green color for y plane
    plane_z = create_grid_on_plane(thresh_z, "z", [0, 0, 1])  # blue color for z plane

    plane_x_dup = create_grid_on_plane(
        thresh_x_dup, "x", [1, 0, 0]
    )  
    plane_y_dup = create_grid_on_plane(
        thresh_y_dup, "y", [0, 1, 0]
    )  
    
    centroid = np.mean(np.asarray(pcd.points), axis=0)
    centroid_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=1, origin=centroid)

    ## visualise origin pt
    origin = o3d.geometry.TriangleMesh.create_coordinate_frame(size=1, origin=[0, 0, 0])
    o3d.visualization.draw_geometries(
        [final_pcd, origin, plane_x, plane_y, plane_z, plane_x_dup, plane_y_dup]
    )


def save_pcd(b):
    if final_pcd is not None:
        o3d.io.write_point_cloud("final.ply", final_pcd)
        print("Point cloud saved!")
    else:
        print("No point cloud to save.")


save_button = Button(description="Save point cloud")
save_button.on_click(save_pcd)

slider_x = FloatSlider(min=min_x, max=max_x, step=0.001, value=-0.8, description="x")
text_x = FloatText(value=min_x, description="x")
link((slider_x, "value"), (text_x, "value"))

slider_x_dup = FloatSlider(
    min=min_x, max=max_x, step=0.001, value=0.8, description="x_dup"
)
text_x_dup = FloatText(value=max_x, description="x_dup")
link((slider_x_dup, "value"), (text_x_dup, "value"))

slider_y = FloatSlider(min=min_y, max=max_y, step=0.001, value=-0.8, description="y")
text_y = FloatText(value=min_y, description="y")
link((slider_y, "value"), (text_y, "value"))

slider_y_dup = FloatSlider(
    min=min_y, max=max_y, step=0.001, value=0.8, description="y_dup"
)
text_y_dup = FloatText(value=max_y, description="y_dup")
link((slider_y_dup, "value"), (text_y_dup, "value"))

slider_z = FloatSlider(min=min_z, max=max_z, step=0.001, value=0.03, description="z")
text_z = FloatText(value=min_z, description="z")
link((slider_z, "value"), (text_z, "value"))

interactive(
    view_and_adjust_threshold,
    thresh_x=slider_x,
    thresh_x_dup=slider_x_dup,
    thresh_y=slider_y,
    thresh_y_dup=slider_y_dup,
    thresh_z=slider_z,
)

display(text_x, text_x_dup, text_y, text_y_dup, text_z, interact, save_button)

pcd = copy.deepcopy(final_pcd)
# o3d.visualization.draw_geometries([pcd])

FloatText(value=-1.2634514570236206, description='x')

FloatText(value=1.3225486278533936, description='x_dup')

FloatText(value=-1.425574541091919, description='y')

FloatText(value=1.4900506734848022, description='y_dup')

FloatText(value=-0.028662221506237984, description='z')

<ipywidgets.widgets.interaction._InteractFactory at 0x21b8b76ad90>

Button(description='Save point cloud', style=ButtonStyle())

In [317]:
pcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.5, max_nn=50))

# z values
z_values = np.asarray(pcd.points)[:, 2]

# 90th percentile of the Z values for plane segmentation
z_90th_percentile = np.percentile(z_values, 90)
z_min = z_values.min()
alpha = 0.01
distance_threshold = alpha * (z_90th_percentile - z_min)

# Apply plane segmentation to identify potential ground points
plane_model, inliers = pcd.segment_plane(distance_threshold=distance_threshold, ransac_n=3, num_iterations=1000)

# From the identified ground points, find the minimum Z value
ground_z_min = np.asarray(pcd.select_by_index(inliers).points)[:, 2].min()

# Set a height threshold above the identified floor
height_threshold = 0.02  # 2 centimeters

# Identify points that are inliers and below the height threshold
to_remove = [i for i in inliers if z_values[i] < ground_z_min + height_threshold]

# Remove the identified points to retain the details of the feet
pcd = pcd.select_by_index(to_remove, invert=True)

# Visualize the result
o3d.visualization.draw_geometries([pcd])


In [318]:
# def region_growing_segmentation(pcd, initial_inliers, height_threshold=0.02, radius=0.05):
#     """
#     Perform region growing segmentation based on initial ground points.
    
#     Parameters:
#     - pcd: The point cloud.
#     - initial_inliers: List of initial ground points (from plane segmentation).
#     - height_threshold: Maximum height difference to consider a point as ground.
#     - radius: Radius for neighbor search.
    
#     Returns:
#     - ground_points: List of all ground points after region growing.
#     """
    
#     # Convert points to numpy array
#     points = np.asarray(pcd.points)
    
#     # Initialize the KD-tree for efficient neighbor search
#     kdtree = o3d.geometry.KDTreeFlann(pcd)
    
#     # Set of all ground points
#     ground_points = set(initial_inliers)
    
#     # Points to be checked
#     to_check = set(initial_inliers)
    
#     while to_check:
#         # Take a point from the set
#         point_idx = to_check.pop()
        
#         # Find neighbors of the point within the given radius
#         _, idx_neighbors, _ = kdtree.search_radius_vector_3d(points[point_idx], radius)
        
#         for idx in idx_neighbors:
#             # If the point is close enough in height and hasn't been considered yet
#             if abs(points[idx, 2] - points[point_idx, 2]) < height_threshold and idx not in ground_points:
#                 ground_points.add(idx)
#                 to_check.add(idx)
    
#     return list(ground_points)

# # Use initial plane segmentation to get starting inliers
# initial_inliers = inliers

# # Apply region growing segmentation
# all_ground_points = region_growing_segmentation(pcd, initial_inliers)

# # Visualize the result after removing the identified ground
# pcd_without_ground = pcd.select_by_index(all_ground_points, invert=True)
# o3d.visualization.draw_geometries([pcd_without_ground])


In [319]:
# def remove_floor_using_color(pcd, floor_color_range):
#     colors = np.asarray(pcd.colors)

#     floor_indices = np.where(
#         (colors[:, 0] >= floor_color_range[0][0]) & (colors[:, 0] <= floor_color_range[1][0]) &
#         (colors[:, 1] >= floor_color_range[0][1]) & (colors[:, 1] <= floor_color_range[1][1]) &
#         (colors[:, 2] >= floor_color_range[0][2]) & (colors[:, 2] <= floor_color_range[1][2])
#     )[0]

#     pcd_without_floor = pcd.select_by_index(floor_indices, invert=True)

#     return pcd_without_floor

# floor_color_range = [(0.3, 0.3, 0.3), (0.7, 0.7, 0.7)]

# pcd_without_floor = remove_floor_using_color(pcd, floor_color_range)

# o3d.visualization.draw_geometries([pcd_without_floor])


In [320]:
# def segment_by_dominant_color(pcd, threshold=0.1):
#     colors = np.asarray(pcd.colors)
    
#     hist, bin_edges = np.histogramdd(colors, bins=(8, 8, 8))
    
#     dominant_color_index = np.unravel_index(hist.argmax(), hist.shape)
#     dominant_color = [(edge[i] + edge[i+1]) / 2 for i, edge in zip(dominant_color_index, bin_edges)]
    
#     mask = np.all(np.abs(colors - dominant_color) < threshold, axis=1)
    
#     segmented_pcd = pcd.select_by_index(np.where(mask)[0])
    
#     return segmented_pcd

# segmented_pcd = segment_by_dominant_color(pcd)

# # indices of segmented pcd
# segmented_indices = np.asarray(segmented_pcd.points).tolist()

# # find respective indices from original pcd
# original_indices = [i for i, v in enumerate(pcd.points) if tuple(v) in set(map(tuple, segmented_indices))]

# # inverse of segment 
# inverse_pcd = pcd.select_by_index(original_indices, invert=True)

# pcd = copy.deepcopy(inverse_pcd)

# o3d.visualization.draw_geometries([pcd])


KeyboardInterrupt: 

In [322]:
# def bounding_box_minor(pcd: o3d.cpu.pybind.geometry.PointCloud) -> None:
#     '''
#     Cleans the floor??
    
#     Brain no work 
    
#     '''
    
#     # Create bounding box:
#     bounds = [[-math.inf, math.inf], [-math.inf, math.inf], [0, 0.0458]]  # set the bounds
    
#     bounding_box_points = list(itertools.product(*bounds))  # create limit points
#     bounding_box = o3d.geometry.AxisAlignedBoundingBox.create_from_points(
#         o3d.utility.Vector3dVector(bounding_box_points))  # create bounding box object
    
#     _, inliers = pcd.segment_plane(distance_threshold=0.01,
#                                           ransac_n=3,
#                                           num_iterations=1000)
#     plane = pcd.select_by_index(inliers)
#     pcd = pcd.select_by_index(inliers, invert=True)
    
#     # Crop the point cloud using the bounding box:
#     pcd_croped = pcd.crop(bounding_box)
#     dists = np.asarray(pcd.compute_point_cloud_distance(pcd_croped))
#     indices = np.where(dists > 0.01)[0]
#     pcd_cropped_inv = pcd.select_by_index(indices)

#     # Display the cropped point cloud:
#     o3d.visualization.draw_geometries([pcd_cropped_inv])
        
#     return pcd_cropped_inv


def downsample_clean(pcd: o3d.cpu.pybind.geometry.PointCloud) -> None:
    '''
    Performs downsampling, outlier removal and bounding box removal
    
    Args:
        pcd (o3d.cpu.pybind.geometry.PointCloud): PointCloud object
    
    Returns:
        type: None 
    '''

    # voxel downsampling: reducing overall num of pts
    voxel_size = 0.01
    downsampled = pcd.voxel_down_sample(voxel_size) 
    
    # outlier cleaning
    _, ind = downsampled.remove_statistical_outlier(nb_neighbors=100, std_ratio=2.0)
    cleaned_pcd = downsampled.select_by_index(ind)
    
    # minimal bounding box
    # pcd = bounding_box_minor(cleaned_pcd)

    # # paint removal parts
    epsilon = 0.01
    points = np.asarray(pcd.points)
    indices = np.where(np.abs(points[:, 2] < epsilon))[0]
    
    pcd_in_color = pcd.select_by_index(indices)
    pcd_in_color.paint_uniform_color([1,1,0])
    pcd = pcd.select_by_index(indices, invert=True)
    
    o3d.visualization.draw_geometries([pcd])

    return pcd

def isolateLargestCluster(
    pcd: o3d.cpu.pybind.geometry.PointCloud, labels: np.ndarray
) -> None:
    """
    Uses DBSCAN to group and isolate the largest point cloud

    Args:
        pcd (o3d.cpu.pybind.geometry.PointCloud): PointCloud object
        labels (np.ndarray): Array of clusters identified by labels

    Returns:
        type: o3d.cpu.pybind.geometry.PointCloud
    """

    # finding the label of largest cluster + ignoring noisy points labeled -1
    counts = Counter(labels)
    largest_cluster_label = max(
        counts.items(), key=lambda x: x[1] if x[0] != -1 else -1
    )[0]

    # get all pts / colors of largest cluster
    largest_cluster_points = np.array(pcd.points)[labels == largest_cluster_label]
    largest_cluster_colors = np.array(pcd.colors)[labels == largest_cluster_label]

    # new point cloud from the largest cluster pts w/ colors
    largest_cluster_pcd = o3d.geometry.PointCloud()
    largest_cluster_pcd.points = o3d.utility.Vector3dVector(largest_cluster_points)
    largest_cluster_pcd.colors = o3d.utility.Vector3dVector(largest_cluster_colors)

    return largest_cluster_pcd

pcd = downsample_clean(pcd)
labels = np.array(pcd.cluster_dbscan(eps=0.05, min_points=3, print_progress=True))
pcd = isolateLargestCluster(pcd, labels)
o3d.visualization.draw_geometries([pcd])


### Saving Point Cloud

In [323]:
output_file_name = f"./data/lidar-cleaned/cleaned_{path.split('/')[-1][:-4]}.ply"

if pcd is not None:
    o3d.io.write_point_cloud(output_file_name, pcd)
    print("Point cloud saved!")
else:
    print("No point cloud to save.")

Point cloud saved!


In [324]:
path = f"./data/lidar-cleaned/cleaned_{path.split('/')[-1][:-4]}.ply"

pcd = X.load(path)
o3d.visualization.draw_geometries([pcd], window_name='read')