In [1]:
import numpy as np
import random
import open3d as o3d
from scipy.spatial import cKDTree
from scipy.optimize import minimize
from sklearn.metrics import pairwise_distances

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [37]:
pcd = o3d.io.read_point_cloud("filtered.pcd")
downpcd = pcd.voxel_down_sample(voxel_size=0.005)
o3d.visualization.draw_geometries([downpcd])
points = np.asarray(pcd.points)
pcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.01, max_nn=30))
normals = np.asarray(pcd.normals)

In [44]:
class Cylinder:
    def __init__(self, axis_point, axis_direction, radius):
        self.axis_point = axis_point
        self.axis_direction = axis_direction / np.linalg.norm(axis_direction)
        self.radius = radius

    def score(self, point_cloud, epsilon, alpha):
        axis_direction = self.axis_direction
        axis_point = self.axis_point

        # Project points onto the cylinder axis
        v = point_cloud - axis_point
        projections = axis_point + np.dot(v, axis_direction)[:, np.newaxis] * axis_direction

        # Calculate distances from the projections to the points
        dist_to_axis = np.linalg.norm(point_cloud - projections, axis=1)

        # Calculate inliers
        inlier_mask = (np.abs(dist_to_axis - self.radius) < epsilon)

        # Calculate the connected component of inliers
        inliers = point_cloud[inlier_mask]
        if len(inliers) > 0:
            tree = cKDTree(inliers)
            connected_component = self.largest_connected_component(inliers, tree, epsilon)
            return len(connected_component)
        else:
            return 0

    @staticmethod
    def largest_connected_component(points, tree, epsilon):
        clusters = []
        visited = set()
        for i, point in enumerate(points):
            if i not in visited:
                indices = tree.query_ball_point(point, epsilon)
                cluster = set(indices)
                visited.update(cluster)
                clusters.append(cluster)
        largest_cluster = max(clusters, key=len)
        return points[list(largest_cluster)]

    @staticmethod
    def estimate_cylinder(points, normals):
        if len(points) < 2:
            raise ValueError("At least two points are required to estimate a cylinder")

        p1, p2 = points[:2]
        n1, n2 = normals[:2]

        # Axis direction of the cylinder
        axis_direction = np.cross(n1, n2)
        axis_direction /= np.linalg.norm(axis_direction)

        # Calculate cylinder center and radius
        mid_point = (p1 + p2) / 2
        radius = np.linalg.norm(np.cross(mid_point - p1, axis_direction))

        return Cylinder(mid_point, axis_direction, radius)


class RANSAC:
    def __init__(self, point_cloud, normals, epsilon=0.01, alpha=np.deg2rad(5), max_iterations=1000, pt=0.9, min_size_ratio=0.001):
        self.point_cloud = point_cloud
        self.normals = normals
        self.epsilon = epsilon
        self.alpha = alpha
        self.max_iterations = max_iterations
        self.pt = pt
        self.min_size_ratio = min_size_ratio
        self.octree = cKDTree(point_cloud)

   
    def run(self):
        best_cylinder = None
        best_score = 0
        best_inliers = np.array([])  # Initialize to an empty array
        n_points = self.point_cloud.shape[0]
    
        for _ in range(self.max_iterations):
            # Localized sampling using octree
            sample_points, sample_normals = self.localized_sampling()
    
            try:
                # Estimate cylinder from sampled points
                candidate_cylinder = Cylinder.estimate_cylinder(sample_points, sample_normals)
    
                # Evaluate the score of the estimated cylinder and get the inliers
                score, inliers = candidate_cylinder.score(self.point_cloud, self.epsilon, self.alpha)
    
                if score > best_score:
                    best_score = score
                    best_cylinder = candidate_cylinder
                    best_inliers = inliers  # Store the inliers of the best cylinder
    
            except Exception as e:
                continue
    
            # Probability-based stopping criterion
            min_points_required = self.min_size_ratio * n_points
            P_detection = self.probability_of_detection(best_score, min_points_required, n_points)
            if P_detection >= self.pt:
                break
    
        # Refitting step
        if best_cylinder:
            best_cylinder = self.refit(best_cylinder, best_inliers)
    
        return best_cylinder, best_score, best_inliers  # Return the inliers as well

    def localized_sampling(self):
        random_point_idx = random.randint(0, len(self.point_cloud) - 1)
        random_point = self.point_cloud[random_point_idx]

        # Localized sampling around the random point
        radius = self.epsilon * 10  # Arbitrary radius choice for sampling
        indices = self.octree.query_ball_point(random_point, radius)

        if len(indices) < 2:
            indices = random.sample(range(len(self.point_cloud)), 2)

        sampled_points = self.point_cloud[indices[:2]]
        sampled_normals = self.normals[indices[:2]]

        return sampled_points, sampled_normals

    def probability_of_detection(self, score, min_points_required, n_points):
        k = 2  # Minimal set size for cylinders
        P_n = (score / n_points) ** k
        T = (1 - self.pt) / P_n
        P_detection = 1 - (1 - P_n) ** T
        return P_detection

    def refit(self, cylinder):
        def error_function(params, point_cloud):
            axis_point = params[:3]
            axis_direction = params[3:6] / np.linalg.norm(params[3:6])
            radius = params[6]

            v = point_cloud - axis_point
            projections = axis_point + np.dot(v, axis_direction)[:, np.newaxis] * axis_direction
            dist_to_axis = np.linalg.norm(point_cloud - projections, axis=1)

            return np.sum((dist_to_axis - radius) ** 2)

        initial_guess = np.hstack((cylinder.axis_point, cylinder.axis_direction, cylinder.radius))
        result = minimize(error_function, initial_guess, args=(self.point_cloud,), method='BFGS')

        axis_point = result.x[:3]
        axis_direction = result.x[3:6]
        radius = result.x[6]

        return Cylinder(axis_point, axis_direction, radius)


# Example usage
if __name__ == "__main__":

    ransac = RANSAC(points, normals, max_iterations=1000, epsilon=0.01, alpha=np.deg2rad(5))
    best_cylinder, score, inliers = ransac.run()

    if best_cylinder:
        print(f"Best cylinder found with score: {score}")
        print(f"Axis Point: {best_cylinder.axis_point}, Axis Direction: {best_cylinder.axis_direction}, Radius: {best_cylinder.radius}")
    else:
        print("No cylinder was detected.")

No cylinder was detected.


In [21]:
# Visualization function
def visualize(point_cloud, cylinder_inliers):
    downpcd = o3d.geometry.PointCloud()
    downpcd.points = o3d.utility.Vector3dVector(point_cloud)
    downpcd.paint_uniform_color([0, 0, 0])

    cylinder_cloud = o3d.geometry.PointCloud()
    cylinder_cloud.points = o3d.utility.Vector3dVector(cylinder_inliers)
    cylinder_cloud.paint_uniform_color([0, 1, 0])

    o3d.visualization.draw_geometries([downpcd, cylinder_cloud])

In [15]:
downpcd.paint_uniform_color([0, 0, 0])

cylinder_cloud = o3d.geometry.PointCloud()
cylinder_cloud.points = o3d.utility.Vector3dVector(np.array(best_cylinder))
cylinder_cloud.paint_uniform_color([0, 1, 0])


o3d.visualization.draw_geometries([downpcd, cylinder_cloud])

TypeError: __init__(): incompatible constructor arguments. The following argument types are supported:
    1. open3d.cpu.pybind.utility.Vector3dVector()
    2. open3d.cpu.pybind.utility.Vector3dVector(arg0: numpy.ndarray[numpy.float64])
    3. open3d.cpu.pybind.utility.Vector3dVector(arg0: open3d.cpu.pybind.utility.Vector3dVector)
    4. open3d.cpu.pybind.utility.Vector3dVector(arg0: Iterable)

Invoked with: array(<__main__.Cylinder object at 0x7ad32c106e90>, dtype=object)