# Environment Setup

In [None]:
import os
import sys

import matplotlib.pyplot as plt
import numpy as np
import open3d as o3d

# Python version
assert sys.version_info >= (3, 8)
# Open3D version
assert o3d.__version__ >= "0.17.0"

In [None]:
# Point cloud file I/O
filename = "data/input/iScan-Pcd-1-1.ply"
base_name, extension = os.path.splitext(os.path.basename(filename))

output_filepath = "data/output"
if not os.path.exists(output_filepath):
    os.makedirs(output_filepath)

# 1. Point Cloud Filtering

## Step 1. Down-sampling

In [None]:
def down_sample(input_filename, output_filename):
    # Load the input point cloud file
    pcd = o3d.t.io.read_point_cloud(input_filename)

    # Function uniformly down-samples the point cloud, evenly select 1 point for every k points
    down_sampled_pcd = pcd.uniform_down_sample(every_k_points=10)

    # Save the down-sampled point cloud to an output file
    o3d.t.io.write_point_cloud(output_filename, down_sampled_pcd, write_ascii=False)

In [None]:
# Perform down sampling
down_sampled_filename = os.path.join(output_filepath, base_name + " - downsampled" + extension)
down_sample(filename, down_sampled_filename)

## Step 2. Crop the point cloud
This step is done in `CloudCompare` software manually.

## Step 3. Outlier removal

In [None]:
# Input point cloud file
cropped_filename = os.path.join(output_filepath, base_name + " - cropped" + extension)

In [None]:
def outlier_removal(input_filename, output_filename, visualize=False):
    # Load the input point cloud file
    pcd = o3d.t.io.read_point_cloud(input_filename)

    # Statistical outlier removal
    filtered_pcd, mask = pcd.remove_statistical_outliers(nb_neighbors=20, std_ratio=16.0)

    # Color outliers in red
    outlier = pcd.select_by_mask(mask, invert=True)
    outlier.paint_uniform_color([1.0, 0.0, 0.0])
    print(f"Remove {outlier.point.positions.shape[0]} outliers.")

    # Visualize the outlier
    if visualize:
        inlier = pcd.select_by_mask(mask)
        inlier.paint_uniform_color([0.8, 0.8, 0.8])  # color in grey
        o3d.visualization.draw_geometries([inlier.to_legacy(), outlier.to_legacy()],
                                          window_name="Visualization — {}".format(input_filename),
                                          width=1000, height=800, left=400, top=150)

    # Save the outlier for comparison
    outlier_filename = os.path.join(output_filepath, base_name + " - outlier" + extension)
    o3d.t.io.write_point_cloud(outlier_filename, outlier, write_ascii=False)

    # Save the filtered point cloud to an output file
    o3d.t.io.write_point_cloud(output_filename, filtered_pcd, write_ascii=False)

In [None]:
# Perform outlier removal
filtered_filename = os.path.join(output_filepath, base_name + " - filtered" + extension)
outlier_removal(cropped_filename, filtered_filename, visualize=False)

# 2. Point Cloud Segmentation

## Step 1. Mask by intensity

In [None]:
# Input point cloud file
filtered_filename = os.path.join(output_filepath, base_name + " - filtered" + extension)

In [None]:
def mask_intensity(input_filename, output_filename):
    # Load the input point cloud file
    pcd = o3d.t.io.read_point_cloud(input_filename)
    print(pcd.point, '\n')

    # Add mask attribute to acquire low intensity points
    intensity_threshold = 50
    mask = np.where(pcd.point.scalar_intensity.numpy() <= intensity_threshold, 1, 0).astype(np.uint8)
    pcd.point.mask = np.reshape(mask, (len(mask), 1))

    print(pcd.point)

    # Save the masked point cloud to an output file
    o3d.t.io.write_point_cloud(output_filename, pcd, write_ascii=False)

In [None]:
# Perform mask by intensity
masked_filename = os.path.join(output_filepath, base_name + " - masked" + extension)
mask_intensity(filtered_filename, masked_filename)

## Step 2. DBSCAN clustering

In [None]:
# Input point cloud file
masked_filename = os.path.join(output_filepath, base_name + " - masked" + extension)

In [None]:
from collections import Counter


def dbscan_clustering(input_filename, output_filename):
    # Load the input point cloud file
    pcd = o3d.t.io.read_point_cloud(input_filename)

    # Convert form uint8 to bool
    mask = pcd.point.mask.numpy()
    mask = [bool(x) for x in mask]

    # Convert mask to index
    index_of_mask = [i for i, boolean_val in enumerate(mask) if boolean_val]

    # Extract low intensity points, then perform DBSCAN on them
    low_intensity_pcd = pcd.select_by_mask(mask)
    labels = low_intensity_pcd.cluster_dbscan(eps=0.10, min_points=5)
    labels = labels.numpy()

    n_clusters = labels.max() + 1
    print(f"DBSCAN clustering return {n_clusters} clusters.\n")

    # Clusters will be labeled in a way that cluster with the most points is labeled 1
    if n_clusters >= 2:
        counter = Counter(labels)
        print(counter, '\n')

        # Remove noise that be labeled -1
        del counter[-1]

        # Sort by count
        sorted_items = sorted(counter.items(), key=lambda x: x[1], reverse=True)
        rank = [item[0] for item in sorted_items]
        print(rank, '\n')

        # Add cluster attribute to the input point cloud
        cluster = np.zeros(len(mask)).astype(np.int32)
        for index, val in enumerate(labels):
            if val != -1:
                cluster[index_of_mask[index]] = rank.index(val) + 1
        pcd.point.cluster = np.reshape(cluster, (len(cluster), 1))

        print(pcd.point)

        # Save the clustered point cloud to an output file
        o3d.t.io.write_point_cloud(output_filename, pcd, write_ascii=False)

    else:
        print("Warning: DBSCAN clustering return less than 2 clusters.")

In [None]:
clustered_filename = os.path.join(output_filepath, base_name + " - clustered" + extension)
dbscan_clustering(masked_filename, clustered_filename)

## Step 3. Curve fitting

In [None]:
# Input point cloud file
clustered_filename = os.path.join(output_filepath, base_name + " - clustered" + extension)

In [None]:
from scipy import interpolate
from scipy.spatial import cKDTree


def curve_fitting(input_filename, debug=False):
    # Load the input point cloud file
    pcd = o3d.t.io.read_point_cloud(input_filename)

    # Step 1. fit two curves on both left and right

    # Use cluster attribute to extrack rail points
    cluster = pcd.point.cluster.numpy()
    left_rail_mask = (cluster == 1).flatten()
    right_rail_mask = (cluster == 2).flatten()

    left_rail = pcd.select_by_mask(left_rail_mask)
    point_left = left_rail.point.positions.numpy()
    print("Left rail points:", point_left.shape[0])

    right_rail = pcd.select_by_mask(right_rail_mask)
    point_right = right_rail.point.positions.numpy()
    print("Right rail points:", point_right.shape[0], '\n')

    # Debug code begin
    num_of_sampled = 100
    if debug:
        scalar_factor = point_left.shape[0] / point_right.shape[0]
        print("scalar_factor:", scalar_factor, '\n')

        point_left = point_left[:int(num_of_sampled * scalar_factor)]
        point_right = point_right[:num_of_sampled]

        print("Left rail points - sampled:", len(point_left))
        print("Right rail points - sampled:", len(point_right), '\n')
    # Debug code end

    point_left = np.transpose(point_left)  # (n, 3) -> (3, n)
    point_right = np.transpose(point_right)

    # Find the B-spline representation of an 3-D curve
    tck_left, u_left = interpolate.splprep(point_left)
    tck_right, u_right = interpolate.splprep(point_right)

    knots_left = interpolate.splev(u_left, tck_left)
    knots_right = interpolate.splev(u_right, tck_right)

    # Step 2. Calculate the centre line, then fit a curve on it

    # Convert the curve points to a NumPy array
    curve_point_left = np.column_stack((knots_left[0], knots_left[1], knots_left[2]))
    curve_point_right = np.column_stack((knots_right[0], knots_right[1], knots_right[2]))

    # Build a kd-tree from the left curve points
    kdtree = cKDTree(curve_point_left)

    # Query the kd-tree to find the nearest neighbor and its distance
    distance, nearest_index = kdtree.query(curve_point_right)

    # Get the nearest point from the left curve points
    nearest_point = curve_point_left[nearest_index]

    # Points on the centre line
    point_centre = (nearest_point + curve_point_right) / 2
    point_centre = np.transpose(point_centre)  # (n, 3) -> (3, n)

    # Fit a curve on centre line
    tck_centre, u_centre = interpolate.splprep(point_centre)
    knots_centre = interpolate.splev(u_centre, tck_centre)

    # Debug code begin
    if debug:
        num_of_even = 5
        u_even = np.linspace(0, 1, num_of_even)

        even_left = interpolate.splev(u_even, tck_left)
        even_right = interpolate.splev(u_even, tck_right)
        even_centre = interpolate.splev(u_even, tck_centre)

        fig = plt.figure()
        ax = fig.add_subplot(projection="3d")
        ax.set(xlabel="x", ylabel="y", zlabel="z", title="Curve fitting")

        # True points
        ax.plot(point_left[0], point_left[1], point_left[2], "b")
        ax.plot(point_right[0], point_right[1], point_right[2], "r")
        ax.plot(point_centre[0], point_centre[1], point_centre[2], "g")

        # Fitted points
        ax.plot(knots_left[0], knots_left[1], knots_left[2], "b")
        ax.plot(knots_right[0], knots_right[1], knots_right[2], "r")
        ax.plot(knots_centre[0], knots_centre[1], knots_centre[2], "g")

        ax.plot(even_left[0], even_left[1], even_left[2], "c*")
        ax.plot(even_right[0], even_right[1], even_right[2], "m*")
        ax.plot(even_centre[0], even_centre[1], even_centre[2], "y*")

        fig.show()
    # Debug code end

    # Step 3. Store three fitted curves to a separate file

    pcd_left = o3d.t.geometry.PointCloud()
    pcd_left.point.positions = o3d.core.Tensor(curve_point_left)
    pcd_left.paint_uniform_color([0.0, 0.0, 1.0])  # color in blue
    pcd_left.point.curve = np.full((curve_point_left.shape[0], 1), 1).astype(np.int32)

    pcd_right = o3d.t.geometry.PointCloud()
    pcd_right.point.positions = o3d.core.Tensor(curve_point_right)
    pcd_right.paint_uniform_color([0.0, 1.0, 0.0])  # color in green
    pcd_right.point.curve = np.full((curve_point_right.shape[0], 1), 2).astype(np.int32)

    curve_point_centre = np.column_stack((knots_centre[0], knots_centre[1], knots_centre[2]))

    pcd_centre = o3d.t.geometry.PointCloud()
    pcd_centre.point.positions = o3d.core.Tensor(curve_point_centre)
    pcd_centre.paint_uniform_color([1.0, 0.0, 0.0])  # color in red
    pcd_centre.point.curve = np.full((curve_point_centre.shape[0], 1), 3).astype(np.int32)

    # Save the fitted curves for comparison
    curve_filename = os.path.join(output_filepath, base_name + " - curve" + extension)
    o3d.t.io.write_point_cloud(curve_filename, pcd_left + pcd_right + pcd_centre, write_ascii=False)

    # Return the B-spline representation of the centre line
    return [tck_centre, u_centre], curve_point_left[0] - curve_point_centre[0]

In [None]:
[tck_curve, u_curve], vector_left = curve_fitting(clustered_filename, debug=False)  # tck, u

## 3. Cross-Section Extraction

In [None]:
def segment_ground(input_filename):
    pcd = o3d.t.io.read_point_cloud(input_filename)
    plane_model, _ = pcd.segment_plane(distance_threshold=0.05, ransac_n=3, num_iterations=100)
    return plane_model

In [None]:
plane = segment_ground(clustered_filename).numpy()  # ax + by + cz + d = 0

## Step 1. Coordinate Transformation

In [None]:
# Print centre curve parameters
print("Centre curve parameters:\n")
print(f"tck_curve[0] - t: {tck_curve[0]}\n")
print(f"tck_curve[1] - c: {np.array(tck_curve[1])}\n")
print(f"tck_curve[2] - k: {tck_curve[2]}\n")
print(f"u_curve: {len(u_curve)} {u_curve}\n")

# Print plane parameters
print("plane parameters:", plane)

In [None]:
# Evaluate the curve
u_curve_even = np.linspace(0, 1, len(u_curve))
centre_curve = interpolate.splev(u_curve_even, tck_curve)
centre_curve = np.column_stack((centre_curve[0], centre_curve[1], centre_curve[2]))

# Evaluate the plane, ax + by + cz + d = 0
a, b, c, d = plane

In [None]:
# Calculate coordinates in the new coordinate system
# def coordinate_transformation(input_filename):

# Load the input point cloud file
pcd = o3d.t.io.read_point_cloud(clustered_filename)

# Coordinates in original coordinate system
coordinates = pcd.point.positions.numpy()

# Prepare for coordinate calculation
normal_z = np.array([a, b, c])
if np.dot(normal_z, centre_curve[0]) + d < 0:  # centre curve points should have z > 0
    normal_z = -normal_z
magnitude_z = np.linalg.norm(normal_z)

diff = np.diff(centre_curve, axis=0)  # Compute the differences between consecutive points
distances = np.linalg.norm(diff, axis=1)
cumulate_y = np.cumsum(np.insert(distances, 0, 0))

print(f"centre_curve: {centre_curve.shape}\n{centre_curve[:3]}\n")
print(f"diff: {diff.shape}\n{diff[:3]}\n")
print(f"distances: {distances.shape}\n{distances[:3]}\n")
print(f"cumulate_y: {cumulate_y.shape}\n{cumulate_y[:3]}\n")

kdtree = cKDTree(centre_curve)
distance, nearest_index = kdtree.query(coordinates)

PQ = coordinates - centre_curve[nearest_index]
magnitude_PQ = distance
cross_product = np.cross(normal_z, PQ)
sin_theta = np.linalg.norm(cross_product, axis=1) / (magnitude_z * magnitude_PQ)

sign_x = np.where(np.dot(PQ, vector_left) < 0, -1, 1)

# Coordinate calculation
x = magnitude_PQ * sin_theta * sign_x
y = cumulate_y[nearest_index]
z = (np.dot(coordinates, normal_z) + d) / magnitude_z

coordinates_new = np.column_stack((x, y, z))

pcd_new = o3d.t.geometry.PointCloud(coordinates_new)
output_filename = os.path.join(output_filepath, base_name + " - test" + extension)
o3d.t.io.write_point_cloud(output_filename, pcd_new, write_ascii=False)