Import Statements for the Code

In [1]:
#Base Libraries

import numpy as np

#3D Libraries 
import open3d as o3d
# import laspy as lp
# from sklearn.linear_model import RANSACRegressor
from sklearn.cluster import DBSCAN
import time

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


Environment Setup and Downsampled the point clouds

In [45]:
pcd = o3d.io.read_point_cloud("Data\scans_1_77g.pcd")
# Convert to NumPy array for filtering
points = np.asarray(pcd.points)

# Create a mask for points where y, and z are all > 0
# mask = (points[:, 1] > 0) & (points[:, 2] > 0)
mask = (points[:, 0] > -30) & (points[:, 0] < 18) & \
       (points[:, 1] > 0) & (points[:, 1] < 3) & \
       (points[:, 2] > 0) & (points[:, 2] < 5)

# Filter the points
filtered_points = points[mask]

# Create a new point cloud with the filtered points
filtered_pcd = o3d.geometry.PointCloud()
filtered_pcd.points = o3d.utility.Vector3dVector(filtered_points)

downpcd = filtered_pcd.voxel_down_sample(voxel_size=0.4)

Visualize the Downsampled point cloud

In [46]:
o3d.visualization.draw_geometries([downpcd],
                                  zoom=0.3412,
                                  front=[0.4257, 0.2125, 0.8795],
                                  lookat=[-2.6172, 2.0475, 1.532],
                                  up=[0.0694, 0.9768, -0.2024])

Cluster the points to detect the Horizontal Truss

In [None]:
points_down = np.asarray(downpcd.points)

t0 = time.time()
# 3) Cluster in (x, y)
xy = points_down[:, :2]
dbscan = DBSCAN(eps=0.1, min_samples=10)
labels = dbscan.fit_predict(xy)

# 4) Prepare a color array (gray) for all points in downpcd
colors = np.full((len(points_down), 3), 0.7)  # shape = (N,3), light gray

unique_labels = np.unique(labels[labels >= 0])  # skip noise label -1
lines_1 = []
# 5) For each cluster, check if it qualifies as a line and then color it
for label in unique_labels:
    idx = np.where(labels == label)[0]
    cluster_points = points_down[idx]   
    
    # Check z-range
    z_min, z_max = cluster_points[:, 2].min(), cluster_points[:, 2].max()
    z_range = z_max - z_min
    # x_range = np.median(cluster_points[:, 0])
    # y_range = np.max(cluster_points[:, 1])
    # print(x_range, y_range)

    if z_range >= 0.5 and label != 3:

        # Assign a random color to this line cluster
        color = np.random.rand(3)
        colors[idx] = color  # apply to all points in this cluster
        lines_1.append(cluster_points)
        # print(label)
# 6) Assign the colors back to downpcd
downpcd.colors = o3d.utility.Vector3dVector(colors)

t1 = time.time()
print(f"DBScan_1 Computation in {t1-t0} seconds")
# 7) Visualize the entire downsampled cloud 
#    (with line clusters in random colors, and other points in gray)
o3d.visualization.draw_geometries([downpcd])
# print(unique_labels)
# print(lines)


DBScan_1 Computation in 0.0035545825958251953 seconds


Detect the inliners and filter out the outliners in the cluster and get the line Equations

In [5]:
def distance_point_to_line(point, line_point, line_dir):
    """
    Compute the perpendicular distance from `point` to a line defined by:
        r(t) = line_point + t * line_dir
    where line_dir is assumed to be a *normalized* direction vector.
    """
    diff = point - line_point
    cross_prod = np.cross(diff, line_dir)
    dist = np.linalg.norm(cross_prod)
    return dist

def ransac_line_fitting_3d(points, num_iterations=1000, distance_threshold=0.1):
    """
    Fit a line to 3D points using a simple RANSAC approach.

    Parameters
    ----------
    points : (N, 3) np.ndarray
        The 3D points to fit.
    num_iterations : int
        Number of RANSAC iterations.
    distance_threshold : float
        Distance (in 3D) to consider a point as an inlier to the candidate line.

    Returns
    -------
    centroid : (3,) np.ndarray
        A point on the final best-fit line (computed from inliers).
    direction_refined : (3,) np.ndarray
        The *normalized* direction vector of the best-fit line (via SVD).
    final_inliers : (M, 3) np.ndarray
        The subset of points that are considered inliers to the final line.
    """
    if len(points) < 2:
        raise ValueError("Need at least 2 points to define a line.")

    best_inlier_count = 0
    best_line_point = None
    best_line_dir = None

    # --- RANSAC main loop ---
    for _ in range(num_iterations):
        # 1) Randomly sample two distinct points to define a line
        idx_sample = np.random.choice(len(points), 2, replace=False)
        p1, p2 = points[idx_sample[0]], points[idx_sample[1]]
        
        # 2) Compute direction of this candidate line
        line_dir = p2 - p1
        norm_dir = np.linalg.norm(line_dir)
        if norm_dir < 1e-8:
            # If points are extremely close, skip
            continue
        line_dir /= norm_dir  # normalize

        # 3) Count how many points lie close (inliers)
        inliers = []
        for pt in points:
            dist = distance_point_to_line(pt, p1, line_dir)
            if dist < distance_threshold:
                inliers.append(pt)
        
        inlier_count = len(inliers)
        # 4) Update the best line if we found a better model
        if inlier_count > best_inlier_count:
            best_inlier_count = inlier_count
            best_line_point = p1
            best_line_dir = line_dir

    # If we never found a line
    if best_line_point is None or best_line_dir is None:
        raise RuntimeError("RANSAC failed to find a valid line. "
                           "Try adjusting thresholds or iterations.")
    
    # --- Refine using inliers of the best model ---
    final_inliers = []
    for pt in points:
        dist = distance_point_to_line(pt, best_line_point, best_line_dir)
        if dist < distance_threshold:
            final_inliers.append(pt)
    final_inliers = np.array(final_inliers)
    
    # Use PCA/SVD to refine the line from these inliers
    centroid = np.mean(final_inliers, axis=0)
    centered = final_inliers - centroid
    _, _, vh = np.linalg.svd(centered, full_matrices=False)
    direction_refined = vh[0]
    direction_refined /= np.linalg.norm(direction_refined)
    
    return centroid, direction_refined, final_inliers

Getting the Line Equations and Printing them

In [None]:
line_equations_1 = []

# Loop over each cluster and fit a line
for i, cluster_pts in enumerate(lines_1):
    # If there are fewer than 2 points, skip
    if len(cluster_pts) < 2:
        print(f"Cluster {i}: Not enough points to fit a line.")
        continue

    try:
        p0, direction, inliers = ransac_line_fitting_3d(
            cluster_pts,
            distance_threshold=0.10,  # adjust threshold as needed
            num_iterations=1000       # increase if needed
        )
    except RuntimeError as e:
        print(f"Cluster {i}: {str(e)}")
        continue

    # Store the final line parameters
    line_dict = {
        "p0": p0.tolist(),
        "direction": direction.tolist(),
        "num_inliers": len(inliers)
    }
    line_equations_1.append(line_dict)

    # Print the result in parametric form
    # Parametric: r(t) = p0 + t * direction
    print(f"Cluster {i}:")
    print(f"  Best-fit line => p0 = {p0},  dir = {direction}")
    print(f"  Number of inliers = {len(inliers)}")
    print("-"*60)

# ---------------------------------------------------------------------------
# All the line equations stored in `line_equations`.
# Print all the lines we got
print("\nFinal stored line equations:")
for i, eq in enumerate(line_equations_1):
    print(f"Line {i+1}: p0 = {eq['p0']}, direction = {eq['direction']}, inliers = {eq['num_inliers']}")

Cluster 0:
  Best-fit line => p0 = [-7.9313945   0.16277739  1.57016714],  dir = [ 0.00748647 -0.03745683  0.9992702 ]
  Number of inliers = 29
------------------------------------------------------------
Cluster 1:
  Best-fit line => p0 = [6.46278904 0.07307629 1.51946137],  dir = [ 0.00536218 -0.01958457  0.99979382]
  Number of inliers = 17
------------------------------------------------------------
Cluster 2:
  Best-fit line => p0 = [-21.5556972    0.60511547   2.34915296],  dir = [ 0.02662532 -0.04373143  0.99868847]
  Number of inliers = 21
------------------------------------------------------------
Cluster 3:
  Best-fit line => p0 = [-28.35765788   0.96637793   2.23603387],  dir = [-0.01258866  0.09115064 -0.99575754]
  Number of inliers = 15
------------------------------------------------------------
Cluster 4:
  Best-fit line => p0 = [13.28256256  0.19918157  1.23594838],  dir = [ 0.01521549 -0.02796935  0.99949297]
  Number of inliers = 26
---------------------------------

Display the detected lines in the original Point Cloud

In [None]:
def detect_and_display_lines_in_downsampled_cloud(
    downpcd,
    line_equations,
    distance_threshold=0.1,
    line_length=5.0,
    random_seed=None
):
    """
    Given a point cloud (downpcd) and multiple line_equations,
    color the inlier points for each line, draw a short line segment,
    and visualize in Open3D.
    """
    if random_seed is not None:
        np.random.seed(random_seed)

    points_np = np.asarray(downpcd.points).copy()
    n_points  = len(points_np)
    if n_points == 0:
        print("No points in downpcd; nothing to visualize.")
        return

    # Default color for every point: gray
    colors_np = np.full((n_points, 3), 0.5)

    line_sets = []

    for i, line_info in enumerate(line_equations):
        p0 = np.array(line_info["p0"], dtype=float)
        direction = np.array(line_info["direction"], dtype=float)
        norm_dir = np.linalg.norm(direction)
        if norm_dir < 1e-12:
            continue
        direction /= norm_dir  # ensure normalized

        # Random line color
        color_line = np.random.rand(3)

        # Create a short segment for visualization: p0 +/- (line_length/2)*direction
        t = line_length / 2.0
        p_start = p0 - t * direction
        p_end   = p0 + t * direction

        line_pts = np.vstack([p_start, p_end])
        line_indices = [[0, 1]]

        seg = o3d.geometry.LineSet()
        seg.points = o3d.utility.Vector3dVector(line_pts)
        seg.lines  = o3d.utility.Vector2iVector(line_indices)
        seg.colors = o3d.utility.Vector3dVector([color_line])
        line_sets.append(seg)

        # Color inlier points
        dists = np.empty(n_points, dtype=float)
        for idx_pt, pt in enumerate(points_np):
            diff = pt - p0
            cross_prod = np.cross(diff, direction)
            dists[idx_pt] = np.linalg.norm(cross_prod)

        inlier_mask = dists < distance_threshold
        colors_np[inlier_mask] = color_line

    # Build a new colored point cloud
    colored_pcd = o3d.geometry.PointCloud()
    colored_pcd.points = o3d.utility.Vector3dVector(points_np)
    colored_pcd.colors = o3d.utility.Vector3dVector(colors_np)

    # Show everything
    o3d.visualization.draw_geometries([colored_pcd, *line_sets])

# Calling the Functions
# detect_and_display_lines_in_downsampled_cloud(
#     downpcd=downpcd,
#     line_equations=line_equations_1,
#     distance_threshold=0.1,
#     line_length=5.0
# )

# print("\nDone! Check the visualization window for two fitted lines per cluster (where applicable).")

Now detect the Arms of the V structures : Filtering down the existing Pcd to Region of Interests and clustering the V Slanting trusses

In [37]:
points_down = np.asarray(downpcd.points)
z_mask = (points_down[:, 2] < 1.5) & \
         (points_down[:, 1] > 0.225) & \
         (points_down[:, 0] > -30) & (points_down[:, 0] < 15)

points_down = points_down[z_mask]

# Create a new point cloud of the filtered subset
downpcd1= o3d.geometry.PointCloud()
downpcd1.points = o3d.utility.Vector3dVector(points_down)

# Visualize
o3d.visualization.draw_geometries([downpcd1])

Now clustering the V structures based on the range, Height and lenght

In [43]:
# Project onto XY by taking the first two columns
points_xy = points_down[:, :2]  # shape (N, 2)
t3 = time.time()
# 3) Cluster in (x, y)
xy = points_down[:, :2]
dbscan = DBSCAN(eps = 0.75, min_samples = 10, algorithm = "ball_tree")
labels = dbscan.fit_predict(xy)

# 4) Prepare a color array (e.g., gray) for all points in downpcd
colors = np.full((len(points_down), 3), 0.7)  # shape = (N,3), light gray

unique_labels = np.unique(labels[labels >= 0])  # skip noise label -1
lines_2 = []
# 5) For each cluster, check if it qualifies as a line and then color it
for label in unique_labels:
    idx = np.where(labels == label)[0]
    cluster_points = points_down[idx]   
    
    # Check z-range
    z_min, z_max = cluster_points[:, 2].min(), cluster_points[:, 2].max()
    z_range = z_max - z_min
    x_min, x_max = cluster_points[:, 0].min(), cluster_points[:, 0].max()
    x_range = x_max - x_min
    y_min, y_max = cluster_points[:, 1].min(), cluster_points[:, 1].max()
    y_range = y_max - y_min

    if z_range >= 1 and x_range >= 1  and y_range >= 1:

    # Assign a random color to this line cluster
        color = np.random.rand(3)
        colors[idx] = color  # apply to all points in this cluster
        lines_2.append(cluster_points)
        # print(label)
# 6) Assign the colors back to downpcd
downpcd1.colors = o3d.utility.Vector3dVector(colors)

t4 = time.time()
print(f"DBScan_2 Computation in {t4-t3} seconds")
# 7) Visualize the entire downsampled cloud 
#    (with line clusters in random colors, and other points in gray)
o3d.visualization.draw_geometries([downpcd1])
# print(unique_labels)
# print(lines_2)

DBScan_2 Computation in 0.013300180435180664 seconds


Detecting the 2 Arms of the V structures Using the Clustered points 

In [None]:
line_equations_2 = []  # this will hold ALL lines from ALL clusters

for cluster_idx, cluster_pts in enumerate(lines_2):
    if len(cluster_pts) < 2:
        print(f"[Cluster {cluster_idx}] Not enough points to fit even one line.")
        continue

    # ----------------------------------------
    # (A) Fit the first line with RANSAC
    # ----------------------------------------
    try:
        p0_1, dir_1, inliers_1 = ransac_line_fitting_3d(
            cluster_pts,
            distance_threshold=0.10,  # adjust as needed
            num_iterations=1000
        )
    except RuntimeError as e:
        print(f"[Cluster {cluster_idx}] Could not fit the first line: {str(e)}")
        continue

    # Store the first line’s info
    line1_dict = {
        "p0": p0_1.tolist(),
        "direction": dir_1.tolist(),
        "num_inliers": len(inliers_1),
        "cluster_index": cluster_idx,
    }
    line_equations_2.append(line1_dict)

    # Removing the first line’s inliers from the cluster so we can detect the second line
    #   Converting cluster_pts to a list and filtering out points in inliers_1, then back to np.array.
    inliers_set = set(map(tuple, inliers_1))  # for fast membership checking
    remaining_pts = [tuple(pt) for pt in cluster_pts if tuple(pt) not in inliers_set]
    remaining_pts = np.array(remaining_pts)

    print(f"[Cluster {cluster_idx}] First line found, #inliers = {len(inliers_1)}")
    print(f"  -> Points left for second line: {len(remaining_pts)}")

    # If not enough points remain for a second line, skip
    if len(remaining_pts) < 2:
        print(f"[Cluster {cluster_idx}] Not enough points left to fit a second line.")
        continue

    # ----------------------------------------
    # (B) Fit the second line with RANSAC
    # ----------------------------------------
    try:
        p0_2, dir_2, inliers_2 = ransac_line_fitting_3d(
            remaining_pts,
            distance_threshold=0.10,
            num_iterations=1000
        )
    except RuntimeError as e:
        print(f"[Cluster {cluster_idx}] Could not fit the second line: {str(e)}")
        continue

    line2_dict = {
        "p0": p0_2.tolist(),
        "direction": dir_2.tolist(),
        "num_inliers": len(inliers_2),
        "cluster_index": cluster_idx,
    }
    line_equations_2.append(line2_dict)

    print(f"[Cluster {cluster_idx}] Second line found, #inliers = {len(inliers_2)}")
    print("-" * 60)


[Cluster 0] First line found, #inliers = 17
  -> Points left for second line: 25
[Cluster 0] Second line found, #inliers = 14
------------------------------------------------------------
[Cluster 1] First line found, #inliers = 12
  -> Points left for second line: 25
[Cluster 1] Second line found, #inliers = 10
------------------------------------------------------------
[Cluster 2] First line found, #inliers = 15
  -> Points left for second line: 10
[Cluster 2] Second line found, #inliers = 9
------------------------------------------------------------
[Cluster 3] First line found, #inliers = 15
  -> Points left for second line: 9
[Cluster 3] Second line found, #inliers = 6
------------------------------------------------------------
[Cluster 4] First line found, #inliers = 17
  -> Points left for second line: 12
[Cluster 4] Second line found, #inliers = 9
------------------------------------------------------------


Combine all the line_equation into 1 list and display the detected lines in the original Pointcloud

In [44]:
lines = []
for i in line_equations_1:
    lines.append(i)

for j in line_equations_2:
    lines.append(j)

detect_and_display_lines_in_downsampled_cloud(
    downpcd=downpcd,
    line_equations=lines,
    distance_threshold=0.1,
    line_length=5.0
)

print("\nDone! Check the visualization window for two fitted lines per cluster (where applicable).")


Done! Check the visualization window for two fitted lines per cluster (where applicable).
