In [None]:
from pathlib import Path
import os

from plyfile import PlyData

import numpy as np

import plotly.graph_objs as go
from plotly.subplots import make_subplots

from sklearn.cluster import DBSCAN
from sklearn.neighbors import KernelDensity

from scipy.ndimage import gaussian_filter1d
from scipy.signal import find_peaks
from scipy.spatial.transform import Rotation
from scipy.spatial import ConvexHull

import open3d as o3d

import cv2

## Read Point Cloud

In [7]:
def read_ply(file_path):
    plydata = PlyData.read(file_path)
    x = plydata['vertex']['x']
    y = plydata['vertex']['y']
    z = plydata['vertex']['z']
    if 'red' in plydata['vertex']:
        r = plydata['vertex']['red']
        g = plydata['vertex']['green']
        b = plydata['vertex']['blue']
        colors = np.array([r, g, b]).T
    else:
        colors = None
    if 'nx' in plydata['vertex']:
        nx = plydata['vertex']['nx']
        ny = plydata['vertex']['ny']
        nz = plydata['vertex']['nz']
        normals = np.array([nx, ny, nz]).T
    else:
        normals = None
    points = np.array([x, y, z]).T
    print(f"Number of points: {points.shape[0]}")
    return points, colors, normals

## Plot Original Point Cloud

In [8]:
def plot_point_cloud(points, colors=None):
    fig = make_subplots(rows=1, cols=1, specs=[[{'type': 'scatter3d'}]])
    fig.add_trace(go.Scatter3d(x=points[:, 0], y=points[:, 1], z=points[:, 2], mode='markers', marker=dict(size=1, color=colors)), row=1, col=1)
    fig.update_layout(width=1000, height=1000)
    fig.show()

## Density

In [30]:
def dbscan(points, eps=0.5, min_samples=200):
    dbscan = DBSCAN(eps=eps, min_samples=min_samples)
    labels = dbscan.fit_predict(points)
    return labels

def get_peaks(density, min_peak_points, sigma, plot=False):
    density = np.sort(density)
    density = density[int(0.1 * len(density)):]
    density_values, bin_edges = np.histogram(density, bins=100) 
    bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2

    smoothed_density = gaussian_filter1d(density_values, sigma=sigma)

    peaks, _ = find_peaks(smoothed_density, height=min_peak_points)  # Adjust height to filter out small peaks

    peak_boundaries = []
    for peak in peaks:
        start, end = peak, peak

        while start > 0 and smoothed_density[start - 1] < smoothed_density[start]:
            start -= 1
        
        while end < len(smoothed_density) - 1 and smoothed_density[end + 1] < smoothed_density[end]:
            end += 1

        peak_boundaries.append((start, end))

    if plot:
        fig = go.Figure()

        fig.add_trace(go.Scatter(
            x=bin_centers,
            y=density_values,
            mode='lines',
            name="Original Histogram"
        ))

        fig.add_trace(go.Scatter(
            x=bin_centers,
            y=smoothed_density,
            mode='lines',
            line=dict(color='orange'),
            name="Smoothed Histogram"
        ))

        fig.add_trace(go.Scatter(
            x=bin_centers[peaks],
            y=smoothed_density[peaks],
            mode='markers',
            marker=dict(color='red', size=10, symbol='x'),
            name="Detected Peaks"
        ))

        for idx, (start, end) in enumerate(peak_boundaries):
            fig.add_trace(go.Scatter(
                x=[bin_centers[start], bin_centers[start]],
                y=[0, max(density_values)],
                mode='lines',
                line=dict(color='green', dash='dash'),
                showlegend=idx == 0,
                name="Peak Start" if idx == 0 else None
            ))
            fig.add_trace(go.Scatter(
                x=[bin_centers[end], bin_centers[end]],
                y=[0, max(density_values)],
                mode='lines',
                line=dict(color='purple', dash='dash'),
                showlegend=idx == 0,
                name="Peak End" if idx == 0 else None
            ))

        fig.update_layout(
            title="Density Histogram with Peak Detection",
            xaxis_title="Density",
            yaxis_title="Number of Points",
            legend=dict(
                x=1.0,
                y=1.0,
                xanchor='right',
                yanchor='top',
                bgcolor='rgba(255, 255, 255, 0.5)',
                bordercolor='Black',
                borderwidth=1
            )
        )

        fig.show()
    return peak_boundaries, bin_centers


def monte_carlo_kde(points, bandwidth: float, sample_size: int = 500, num_samples: int = 10):
    densities = []
    for _ in range(num_samples):
        sample_indices = np.random.choice(len(points), sample_size, replace=False)
        sample_points = points[sample_indices]

        kde = KernelDensity(bandwidth=bandwidth, kernel='gaussian')
        kde.fit(sample_points)
        
        log_density = kde.score_samples(points)
        densities.append(np.exp(log_density))

    # Aggregate results (e.g., average density estimates)
    density = np.mean(densities, axis=0)
    return density

def get_densest_cluster(points, min_peak_points, kde_samples=1000, sigma=2, colors=None, plot=False):
    # density = monte_carlo_kde(points, bandwidth=1, sample_size=max(len(points) // 100, 3000))  
    density = monte_carlo_kde(points, bandwidth=1, sample_size=kde_samples)  
    peak_boundaries, bin_centers = get_peaks(density, min_peak_points, sigma=sigma, plot=True)
    first_peak_end_index = peak_boundaries[0][0]
    first_peak_end = bin_centers[first_peak_end_index]
    print(f"First peak ends at density {first_peak_end}")
    points = points[density > first_peak_end]
    density = density[density > first_peak_end]
    fig = go.Figure()

    if plot:
        fig.add_trace(go.Scatter3d(
            x=points[:, 0],
            y=points[:, 1],
            z=points[:, 2],
            mode='markers',
            marker=dict(
                size=2,
                # color uses r, g, and b
                color=density,
                colorscale='Viridis',
                colorbar=dict(title='Density'),
            )
        ))

        fig.update_layout(
            scene=dict(
                xaxis_title='X',
                yaxis_title='Y',
                zaxis_title='Z'
            ),
            title="3D Point Cloud with Density Color-Coding"
        )

        fig.show()
    return points, density


## Floor Separation

In [10]:
def find_floor_plane(points, distance_threshold=0.02, min_floor_points=100):
    """Find the floor plane in a point cloud."""
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(points)
    
    plane_model, inliers = pcd.segment_plane(
        distance_threshold=distance_threshold,
        ransac_n=3,
        num_iterations=1000
    )
    

    inlier_mask = np.zeros(len(points), dtype=bool)
    inlier_mask[inliers] = True
    
    print(f"Number of inlier indices: {len(inliers)}")
    print(f"Number of True values in inliers: {np.sum(inliers)}")
    
    floor_points = points[inlier_mask]
    non_floor_points = points[~inlier_mask]
    
    print(f"Number of floor points: {len(floor_points)}")
    print(f"Number of non-floor points: {len(non_floor_points)}")
    
    if len(floor_points) < min_floor_points:
        print(f"Warning: Found only {len(floor_points)} floor points. Might be unreliable.")
    
    return floor_points, non_floor_points, plane_model


def find_optimal_threshold(points, 
                           initial_threshold=0.01, 
                           max_threshold=0.2, 
                           iterations=50):
    """
    Automatically find optimal distance threshold for floor detection without predefined floor ratio bounds.
    The function iteratively adjusts the threshold and monitors the change in floor_ratio to determine when to stop.
    
    Parameters:
    -----------
    points : np.ndarray
        Input point cloud as a NumPy array of shape (N, 3).
    initial_threshold : float, default=0.02
        Starting distance threshold value.
    max_threshold : float, default=0.1
        Maximum allowed threshold.
    iterations : int, default=10
        Maximum number of iterations for searching the optimal threshold.
    
    Returns:
    --------
    best_threshold : float
        The optimal distance threshold found.
    best_ratio : float
        The ratio of floor points corresponding to the optimal threshold.
    stats : dict
        Dictionary containing statistics about the threshold search process.
    """
    total_points = len(points)
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(points)
    
    # Calculate point cloud statistics
    distances = np.asarray(pcd.compute_nearest_neighbor_distance())
    median_distance = np.median(distances)
    
    # Initialize threshold search
    threshold = initial_threshold
    best_threshold = threshold
    best_ratio = 0
    
    stats = {
        'iterations': [],
        'thresholds': [],
        'floor_ratios': [],
        'improvements': [],
        'median_distance': median_distance
    }
    
    for iteration in range(iterations):
        # Segment plane with current threshold
        plane_model, inliers = pcd.segment_plane(
            distance_threshold=threshold,
            ransac_n=3,
            num_iterations=1000
        )
        
        floor_ratio = len(inliers) / total_points
        
        # Store statistics
        stats['iterations'].append(iteration)
        stats['thresholds'].append(threshold)
        stats['floor_ratios'].append(floor_ratio)        

        if threshold >= max_threshold:
            print(f"Stopping search: Reached maximum threshold {max_threshold}")
            break

        threshold += (max_threshold - initial_threshold) / iterations
        threshold = min(threshold, max_threshold)
    
    smoothed_ratios = gaussian_filter1d(stats['floor_ratios'], sigma=3)
    second_derivative = np.gradient(np.gradient(smoothed_ratios))
    # best threshold is the one with the lowest second derivative
    best_threshold = stats['thresholds'][np.argmin(second_derivative)]
    best_ratio = stats['floor_ratios'][np.argmin(second_derivative)]
    

    # Collect final statistics
    stats['optimal_threshold'] = best_threshold
    stats['final_floor_ratio'] = best_ratio
    stats['median_point_distance'] = median_distance
    
    return best_threshold, best_ratio, stats

def find_floor_plane_auto(points, min_floor_points=100, visualize_threshold_search=False):
    """
    Enhanced floor detection with automatic threshold selection.
    """
    optimal_threshold, floor_ratio, stats = find_optimal_threshold(points)
    
    if visualize_threshold_search:
        # Create visualization of threshold search
        fig = go.Figure()
        
        # Plot threshold evolution
        fig.add_trace(go.Scatter(
            x=stats['iterations'],
            y=stats['thresholds'],
            name='Threshold',
            mode='lines+markers',
            line=dict(color='red'),
        ))
        
        # Plot floor ratio evolution
        fig.add_trace(go.Scatter(
            x=stats['iterations'],
            y=stats['floor_ratios'],
            name='Floor Ratio',
            mode='lines+markers',
            line=dict(color='cyan'),
            yaxis='y2'
        ))

        smoothed_floor_ratio = gaussian_filter1d(stats['floor_ratios'], sigma=3)
        fig.add_trace(go.Scatter(
            x=stats['iterations'],
            y=smoothed_floor_ratio,
            name='Smoothed Floor Ratio',
            mode='lines',
            line=dict(color='blue'),
            yaxis='y2'
        ))


        # plot 2nd derivative of floor ratio
        fig.add_trace(go.Scatter(
            x=stats['iterations'],
            y=np.gradient(np.gradient(smoothed_floor_ratio)),
            name='2nd Derivative of Floor Ratio',
            mode='lines+markers',
            line=dict(color='blue'),
        ))
        
        # Plot floor ratio evolution
        fig.add_trace(go.Scatter(
            x=stats['iterations'],
            y=stats['improvements'],
            name='Improvements',
            mode='lines+markers',
            line=dict(color='blue'),
            yaxis='y2'
        ))
        
        fig.update_layout(
            title='Threshold Search Evolution',
            xaxis_title='Iteration',
            yaxis_title='Threshold',
            yaxis2=dict(
                title='Floor Ratio',
                overlaying='y',
                side='right'
            )
        )
        fig.show()
    
    print(f"Found optimal threshold: {optimal_threshold:.4f}")
    print(f"Floor ratio: {floor_ratio:.2%}")
    print(f"Median point distance: {stats['median_point_distance']:.4f}")
    
    return find_floor_plane(points, distance_threshold=optimal_threshold, 
                          min_floor_points=min_floor_points)


def determine_model_orientation(points, plane_model):
    """Determine if the model is upside down relative to the floor plane."""
    a, b, c, d = plane_model
    normal_vector = np.array([a, b, c])
    
    signed_distances = (points @ normal_vector + d)
    
    points_above = points[signed_distances > 0]
    points_below = points[signed_distances < 0]
    total_points = len(points)
    
    is_upside_down = (len(points_below) / total_points) > 0.2
    
    orientation_info = {
        "points_above_floor": points_above,
        "points_below_floor": points_below,
        "ratio_above": points_above / total_points,
        "ratio_below": points_below / total_points,
        "is_upside_down": is_upside_down,
        "floor_normal": normal_vector
    }
    
    return orientation_info

def align_to_xy_plane(points, plane_model, orientation_info):
    """Align the point cloud so the floor is parallel to the XY plane and positioned at z=0."""
    # Extract plane parameters
    a, b, c, d = plane_model
    floor_normal = np.array([a, b, c])
    
    # If the model is upside down, flip the normal
    if orientation_info["is_upside_down"]:
        floor_normal = -floor_normal
    
    # Define the target normal (Z-axis)
    z_axis = np.array([0, 0, 1])
    
    # Calculate rotation required to align floor_normal with Z-axis
    rotation_axis = np.cross(floor_normal, z_axis)
    norm_rotation_axis = np.linalg.norm(rotation_axis)
    
    if norm_rotation_axis < 1e-6:
        # The normals are already aligned or opposite
        if np.dot(floor_normal, z_axis) < 0:
            rotation_matrix = -np.eye(3)
        else:
            rotation_matrix = np.eye(3)
    else:
        rotation_axis /= norm_rotation_axis
        rotation_angle = np.arccos(np.clip(np.dot(floor_normal, z_axis), -1.0, 1.0))
        rotation = Rotation.from_rotvec(rotation_angle * rotation_axis)
        rotation_matrix = rotation.as_matrix()
    
    # Rotate all points
    rotated_points = (rotation_matrix @ points.T).T
    
    # Find a point on the original plane
    plane_norm_sq = a**2 + b**2 + c**2
    if plane_norm_sq == 0:
        raise ValueError("Invalid plane model with zero normal vector.")
    p0 = np.array([-a * d / plane_norm_sq,
                   -b * d / plane_norm_sq,
                   -c * d / plane_norm_sq])
    
    # Rotate the point on the plane
    p0_rotated = rotation_matrix @ p0
    
    # Calculate translation to bring the rotated plane to z=0
    translation_z = -p0_rotated[2]
    translation = np.array([0, 0, translation_z])
    
    # Apply translation
    aligned_points = rotated_points + translation
    
    return aligned_points, rotation_matrix, translation

def remove_points_below_floor(points, plane_model):
    """Remove points below the floor."""
    above_floor_mask = points[:, 2] >= 0
    return points[above_floor_mask]

def denoise_point_cloud(points, neighbors=20, std_ratio=0.1):
    """Denoise the point cloud using statistical outlier removal."""
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(points)
    
    cl, ind = pcd.remove_statistical_outlier(nb_neighbors=neighbors, std_ratio=std_ratio)
    return np.asarray(pcd.points)[ind]

def process_point_cloud(points, min_floor_points=500, distance_threshold=None):
    """Complete pipeline to process the point cloud."""
    result = {}
    # 1. Find floor
    
    if distance_threshold is None:
        floor_points, non_floor_points, plane_model = find_floor_plane_auto(
            points, 
            min_floor_points=min_floor_points,
            visualize_threshold_search=True
        )
    else:
        # floor_points, non_floor_points, plane_model = find_largest_surface_floor(points, distance_threshold=distance_threshold, 
        #                   min_floor_points=min_floor_points)
        floor_points, non_floor_points, plane_model = find_floor_plane(points, distance_threshold=distance_threshold, 
                          min_floor_points=min_floor_points)
    result['floor_points'] = floor_points
    result['non_floor_points'] = non_floor_points
    result['plane_model'] = plane_model
    
    # 2. Determine model orientation
    orientation_info = determine_model_orientation(
        non_floor_points, 
        plane_model
    )
    
    # 3. Align to XY plane
    aligned_points, rotation_matrix, translation = align_to_xy_plane(
        non_floor_points, 
        plane_model, 
        orientation_info
    )
    result['aligned_points'] = aligned_points
    result['orientation_info'] = orientation_info
    result['transformation'] = {
        'rotation': rotation_matrix,
        'translation': translation
    }

    final_points = orientation_info["points_below_floor"] if orientation_info["is_upside_down"] else orientation_info["points_above_floor"]

    
    # 4. Remove points below floor
    # final_points = remove_points_below_floor(final_points, plane_model)
    result['final_points'] = final_points

    # 5. Denoise point cloud
    denoised_points = denoise_point_cloud(final_points)
    result['denoised_points'] = denoised_points

    # 6. Denoise point cloud DBSCAN
    dbscan_labels = dbscan(final_points, eps=0.9)
    result['dbscan_labels'] = dbscan_labels
    
    return result


## Visualize Floor Separation

In [11]:
def plot_points_3d(points, color='blue', size=2, opacity=0.6):
    """Create a basic 3D scatter plot for points."""
    return go.Scatter3d(
        x=points[:, 0], y=points[:, 1], z=points[:, 2],
        mode='markers',
        marker=dict(
            size=size,
            color=color,
            opacity=opacity
        ),
        name=f'Points ({len(points)} pts)'
    )

def plot_plane(plane_model, points, grid_size=20):
    """Create a surface plot for a plane within the points bounds."""
    a, b, c, d = plane_model
    
    # Get bounds from points
    x_min, x_max = points[:, 0].min(), points[:, 0].max()
    y_min, y_max = points[:, 1].min(), points[:, 1].max()
    
    # Create grid
    x = np.linspace(x_min, x_max, grid_size)
    y = np.linspace(y_min, y_max, grid_size)
    X, Y = np.meshgrid(x, y)
    
    # Calculate Z values for the plane
    if abs(c) > 1e-6:  # Ensure c is not close to zero
        Z = (-a * X - b * Y - d) / c
        Z = np.clip(Z, -3, 3)  # Limit Z values for plotting
    else:
        Z = np.zeros_like(X)  # Default to zero plane if c is very small

    
    return go.Surface(
        x=X, y=Y, z=Z,
        opacity=0.3,
        showscale=False,
        name='Floor plane'
    )

def visualize_floor_detection(points, floor_points, non_floor_points, plane_model):
    """Visualize the floor detection step."""
    fig = go.Figure()
    
    # Add original points with low opacity
    # fig.add_trace(plot_points_3d(points, color='gray', opacity=0.2))
    
    # Add floor points
    fig.add_trace(plot_points_3d(floor_points, color='green', opacity=0.8))
    
    # Add non-floor points
    fig.add_trace(plot_points_3d(non_floor_points, color='red', opacity=0.8))
    
    # Add floor plane
    fig.add_trace(plot_plane(plane_model, points))
    
    fig.update_layout(
        title='Floor Detection Results',
        scene=dict(
            aspectmode='data'
        ),
        showlegend=True
    )
    
    return fig

def visualize_orientation(points, plane_model, orientation_info):
    """Visualize the model orientation relative to the floor."""
    # Split points based on their position relative to the floor
    a, b, c, d = plane_model
    signed_distances = (points @ np.array([a, b, c]) + d)
    
    points_above = points[signed_distances > 0]
    points_below = points[signed_distances < 0]
    
    fig = go.Figure()
    
    # Add points above floor
    if len(points_above) > 0:
        fig.add_trace(plot_points_3d(points_above, color='blue', opacity=0.8))
    
    # Add points below floor
    if len(points_below) > 0:
        fig.add_trace(plot_points_3d(points_below, color='red', opacity=0.8))
    
    # Add floor plane
    fig.add_trace(plot_plane(plane_model, points))
    
    # Add floor normal vector at center of points
    center = points.mean(axis=0)
    normal = orientation_info['floor_normal'] * (points.max() - points.min()).mean() * 0.2
    
    fig.add_trace(go.Scatter3d(
        x=[center[0], center[0] + normal[0]],
        y=[center[1], center[1] + normal[1]],
        z=[center[2], center[2] + normal[2]],
        mode='lines+markers',
        line=dict(color='black', width=5),
        name='Floor normal'
    ))
    
    fig.update_layout(
        title=f"Model Orientation (Upside down: {orientation_info['is_upside_down']})",
        scene=dict(aspectmode='data'),
        showlegend=True
    )
    
    return fig

def visualize_alignment(original_points, aligned_points):
    """Visualize the alignment transformation."""
    fig = make_subplots(
        rows=1, cols=2,
        specs=[[{'type': 'scene'}, {'type': 'scene'}]],
        subplot_titles=('Original Points', 'Aligned Points')
    )
    
    # Original points
    fig.add_trace(
        plot_points_3d(original_points, color='blue'),
        row=1, col=1
    )
    
    # Aligned points
    fig.add_trace(
        plot_points_3d(aligned_points, color='green'),
        row=1, col=2
    )
    
    # Update layout
    fig.update_layout(
        title='Point Cloud Alignment Results',
        scene=dict(aspectmode='data'),
        scene2=dict(aspectmode='data'),
        showlegend=True
    )
    
    return fig

def visualize_final_result(original_points, final_points):
    """Visualize the original vs final processed point cloud."""
    fig = make_subplots(
        rows=1, cols=2,
        specs=[[{'type': 'scene'}, {'type': 'scene'}]],
        subplot_titles=('Original Point Cloud', 'Processed Point Cloud')
    )
    
    # Original points
    fig.add_trace(
        plot_points_3d(original_points, color='blue'),
        row=1, col=1
    )
    
    # Final points
    fig.add_trace(
        plot_points_3d(final_points, color='green'),
        row=1, col=2
    )
    
    fig.update_layout(
        title='Final Processing Results',
        scene=dict(aspectmode='data'),
        scene2=dict(aspectmode='data'),
        showlegend=True
    )
    
    return fig

def visualize_complete_pipeline(result_dict):
    """Visualize all steps of the pipeline in a single figure."""
    fig = make_subplots(
        rows=3, cols=2,
        specs=[[{'type': 'scene'}, {'type': 'scene'}],
               [{'type': 'scene'}, {'type': 'scene'}],
               [{'type': 'scene'}, {'type': 'scene'}]],
        subplot_titles=(
            'Floor Detection',
            'Orientation Analysis',
            'Alignment Result',
            'Final Result',
            'Denoised Result Statistical Outlier',
            'Denoised Result DBSCAN'
        )
    )
    
    # 1. Floor Detection
    floor_trace = plot_points_3d(result_dict['floor_points'], color='green')
    fig.add_trace(floor_trace, row=1, col=1)
    
    # 2. Orientation
    # above_below_trace = plot_points_3d(result_dict['aligned_points'], color='blue')
    # fig.add_trace(above_below_trace, row=1, col=2)
    
    # 3. Alignment
    aligned_trace = plot_points_3d(result_dict['aligned_points'], color='orange')
    fig.add_trace(aligned_trace, row=2, col=1)
    
    # 4. Final Result
    final_trace = plot_points_3d(result_dict['final_points'], color='red')
    fig.add_trace(final_trace, row=2, col=2)

    # 5. Denoised Result Statistical Outlier
    denoised_trace = plot_points_3d(result_dict['denoised_points'], color='purple')
    fig.add_trace(denoised_trace, row=3, col=1)

    # 6. Denoised Result DBSCAN
    # INSERT CODE HERE
    dbscan_labels = result_dict["dbscan_labels"]
    unique_labels = np.unique(dbscan_labels)
    
    for label in unique_labels:
        color = 'gray' if label == -1 else f"rgba({np.random.randint(0,255)},{np.random.randint(0,255)},{np.random.randint(0,255)},0.6)"
        label_points = result_dict['final_points'][dbscan_labels == label]
        fig.add_trace(
            plot_points_3d(label_points, color=color),
            row=3, col=2
        )
        
    fig.update_layout(
        title='Complete Pipeline Visualization',
        height=1000,
        showlegend=True
    )
    
    return fig

## Camera

In [12]:
#
# Copyright (C) 2023, Inria
# GRAPHDECO research group, https://team.inria.fr/graphdeco
# All rights reserved.
#
# This software is free for non-commercial, research and evaluation use 
# under the terms of the LICENSE.md file.
#
# For inquiries contact  george.drettakis@inria.fr
#

import numpy as np
import collections
import struct

CameraModel = collections.namedtuple(
    "CameraModel", ["model_id", "model_name", "num_params"])
Camera = collections.namedtuple(
    "Camera", ["id", "model", "width", "height", "params"])
BaseImage = collections.namedtuple(
    "Image", ["id", "qvec", "tvec", "camera_id", "name", "xys", "point3D_ids"])
Point3D = collections.namedtuple(
    "Point3D", ["id", "xyz", "rgb", "error", "image_ids", "point2D_idxs"])
CAMERA_MODELS = {
    CameraModel(model_id=0, model_name="SIMPLE_PINHOLE", num_params=3),
    CameraModel(model_id=1, model_name="PINHOLE", num_params=4),
    CameraModel(model_id=2, model_name="SIMPLE_RADIAL", num_params=4),
    CameraModel(model_id=3, model_name="RADIAL", num_params=5),
    CameraModel(model_id=4, model_name="OPENCV", num_params=8),
    CameraModel(model_id=5, model_name="OPENCV_FISHEYE", num_params=8),
    CameraModel(model_id=6, model_name="FULL_OPENCV", num_params=12),
    CameraModel(model_id=7, model_name="FOV", num_params=5),
    CameraModel(model_id=8, model_name="SIMPLE_RADIAL_FISHEYE", num_params=4),
    CameraModel(model_id=9, model_name="RADIAL_FISHEYE", num_params=5),
    CameraModel(model_id=10, model_name="THIN_PRISM_FISHEYE", num_params=12)
}
CAMERA_MODEL_IDS = dict([(camera_model.model_id, camera_model)
                         for camera_model in CAMERA_MODELS])
CAMERA_MODEL_NAMES = dict([(camera_model.model_name, camera_model)
                           for camera_model in CAMERA_MODELS])


def qvec2rotmat(qvec):
    return np.array([
        [1 - 2 * qvec[2]**2 - 2 * qvec[3]**2,
         2 * qvec[1] * qvec[2] - 2 * qvec[0] * qvec[3],
         2 * qvec[3] * qvec[1] + 2 * qvec[0] * qvec[2]],
        [2 * qvec[1] * qvec[2] + 2 * qvec[0] * qvec[3],
         1 - 2 * qvec[1]**2 - 2 * qvec[3]**2,
         2 * qvec[2] * qvec[3] - 2 * qvec[0] * qvec[1]],
        [2 * qvec[3] * qvec[1] - 2 * qvec[0] * qvec[2],
         2 * qvec[2] * qvec[3] + 2 * qvec[0] * qvec[1],
         1 - 2 * qvec[1]**2 - 2 * qvec[2]**2]])

def rotmat2qvec(R):
    Rxx, Ryx, Rzx, Rxy, Ryy, Rzy, Rxz, Ryz, Rzz = R.flat
    K = np.array([
        [Rxx - Ryy - Rzz, 0, 0, 0],
        [Ryx + Rxy, Ryy - Rxx - Rzz, 0, 0],
        [Rzx + Rxz, Rzy + Ryz, Rzz - Rxx - Ryy, 0],
        [Ryz - Rzy, Rzx - Rxz, Rxy - Ryx, Rxx + Ryy + Rzz]]) / 3.0
    eigvals, eigvecs = np.linalg.eigh(K)
    qvec = eigvecs[[3, 0, 1, 2], np.argmax(eigvals)]
    if qvec[0] < 0:
        qvec *= -1
    return qvec

class Image(BaseImage):
    def qvec2rotmat(self):
        return qvec2rotmat(self.qvec)

def read_next_bytes(fid, num_bytes, format_char_sequence, endian_character="<"):
    """Read and unpack the next bytes from a binary file.
    :param fid:
    :param num_bytes: Sum of combination of {2, 4, 8}, e.g. 2, 6, 16, 30, etc.
    :param format_char_sequence: List of {c, e, f, d, h, H, i, I, l, L, q, Q}.
    :param endian_character: Any of {@, =, <, >, !}
    :return: Tuple of read and unpacked values.
    """
    data = fid.read(num_bytes)
    return struct.unpack(endian_character + format_char_sequence, data)

def read_points3D_text(path):
    """
    see: src/base/reconstruction.cc
        void Reconstruction::ReadPoints3DText(const std::string& path)
        void Reconstruction::WritePoints3DText(const std::string& path)
    """
    xyzs = None
    rgbs = None
    errors = None
    num_points = 0
    with open(path, "r") as fid:
        while True:
            line = fid.readline()
            if not line:
                break
            line = line.strip()
            if len(line) > 0 and line[0] != "#":
                num_points += 1


    xyzs = np.empty((num_points, 3))
    rgbs = np.empty((num_points, 3))
    errors = np.empty((num_points, 1))
    count = 0
    with open(path, "r") as fid:
        while True:
            line = fid.readline()
            if not line:
                break
            line = line.strip()
            if len(line) > 0 and line[0] != "#":
                elems = line.split()
                xyz = np.array(tuple(map(float, elems[1:4])))
                rgb = np.array(tuple(map(int, elems[4:7])))
                error = np.array(float(elems[7]))
                xyzs[count] = xyz
                rgbs[count] = rgb
                errors[count] = error
                count += 1

    return xyzs, rgbs, errors

def read_points3D_binary(path_to_model_file):
    """
    see: src/base/reconstruction.cc
        void Reconstruction::ReadPoints3DBinary(const std::string& path)
        void Reconstruction::WritePoints3DBinary(const std::string& path)
    """


    with open(path_to_model_file, "rb") as fid:
        num_points = read_next_bytes(fid, 8, "Q")[0]

        xyzs = np.empty((num_points, 3))
        rgbs = np.empty((num_points, 3))
        errors = np.empty((num_points, 1))

        for p_id in range(num_points):
            binary_point_line_properties = read_next_bytes(
                fid, num_bytes=43, format_char_sequence="QdddBBBd")
            xyz = np.array(binary_point_line_properties[1:4])
            rgb = np.array(binary_point_line_properties[4:7])
            error = np.array(binary_point_line_properties[7])
            track_length = read_next_bytes(
                fid, num_bytes=8, format_char_sequence="Q")[0]
            track_elems = read_next_bytes(
                fid, num_bytes=8*track_length,
                format_char_sequence="ii"*track_length)
            xyzs[p_id] = xyz
            rgbs[p_id] = rgb
            errors[p_id] = error
    return xyzs, rgbs, errors

def read_intrinsics_text(path):
    """
    Taken from https://github.com/colmap/colmap/blob/dev/scripts/python/read_write_model.py
    """
    cameras = {}
    with open(path, "r") as fid:
        while True:
            line = fid.readline()
            if not line:
                break
            line = line.strip()
            if len(line) > 0 and line[0] != "#":
                elems = line.split()
                camera_id = int(elems[0])
                model = elems[1]
                assert model == "PINHOLE", "While the loader support other types, the rest of the code assumes PINHOLE"
                width = int(elems[2])
                height = int(elems[3])
                params = np.array(tuple(map(float, elems[4:])))
                cameras[camera_id] = Camera(id=camera_id, model=model,
                                            width=width, height=height,
                                            params=params)
    return cameras

def read_extrinsics_binary(path_to_model_file):
    """
    see: src/base/reconstruction.cc
        void Reconstruction::ReadImagesBinary(const std::string& path)
        void Reconstruction::WriteImagesBinary(const std::string& path)
    """
    images = {}
    with open(path_to_model_file, "rb") as fid:
        num_reg_images = read_next_bytes(fid, 8, "Q")[0]
        for _ in range(num_reg_images):
            binary_image_properties = read_next_bytes(
                fid, num_bytes=64, format_char_sequence="idddddddi")
            image_id = binary_image_properties[0]
            qvec = np.array(binary_image_properties[1:5])
            tvec = np.array(binary_image_properties[5:8])
            camera_id = binary_image_properties[8]
            image_name = ""
            current_char = read_next_bytes(fid, 1, "c")[0]
            while current_char != b"\x00":   # look for the ASCII 0 entry
                image_name += current_char.decode("utf-8")
                current_char = read_next_bytes(fid, 1, "c")[0]
            num_points2D = read_next_bytes(fid, num_bytes=8,
                                           format_char_sequence="Q")[0]
            x_y_id_s = read_next_bytes(fid, num_bytes=24*num_points2D,
                                       format_char_sequence="ddq"*num_points2D)
            xys = np.column_stack([tuple(map(float, x_y_id_s[0::3])),
                                   tuple(map(float, x_y_id_s[1::3]))])
            point3D_ids = np.array(tuple(map(int, x_y_id_s[2::3])))
            images[image_id] = Image(
                id=image_id, qvec=qvec, tvec=tvec,
                camera_id=camera_id, name=image_name,
                xys=xys, point3D_ids=point3D_ids)
    return images


def read_intrinsics_binary(path_to_model_file):
    """
    see: src/base/reconstruction.cc
        void Reconstruction::WriteCamerasBinary(const std::string& path)
        void Reconstruction::ReadCamerasBinary(const std::string& path)
    """
    cameras = {}
    with open(path_to_model_file, "rb") as fid:
        num_cameras = read_next_bytes(fid, 8, "Q")[0]
        for _ in range(num_cameras):
            camera_properties = read_next_bytes(
                fid, num_bytes=24, format_char_sequence="iiQQ")
            camera_id = camera_properties[0]
            model_id = camera_properties[1]
            model_name = CAMERA_MODEL_IDS[camera_properties[1]].model_name
            width = camera_properties[2]
            height = camera_properties[3]
            num_params = CAMERA_MODEL_IDS[model_id].num_params
            params = read_next_bytes(fid, num_bytes=8*num_params,
                                     format_char_sequence="d"*num_params)
            cameras[camera_id] = Camera(id=camera_id,
                                        model=model_name,
                                        width=width,
                                        height=height,
                                        params=np.array(params))
        assert len(cameras) == num_cameras
    return cameras


def read_extrinsics_text(path):
    """
    Taken from https://github.com/colmap/colmap/blob/dev/scripts/python/read_write_model.py
    """
    images = {}
    with open(path, "r") as fid:
        while True:
            line = fid.readline()
            if not line:
                break
            line = line.strip()
            if len(line) > 0 and line[0] != "#":
                elems = line.split()
                image_id = int(elems[0])
                qvec = np.array(tuple(map(float, elems[1:5])))
                tvec = np.array(tuple(map(float, elems[5:8])))
                camera_id = int(elems[8])
                image_name = elems[9]
                elems = fid.readline().split()
                xys = np.column_stack([tuple(map(float, elems[0::3])),
                                       tuple(map(float, elems[1::3]))])
                point3D_ids = np.array(tuple(map(int, elems[2::3])))
                images[image_id] = Image(
                    id=image_id, qvec=qvec, tvec=tvec,
                    camera_id=camera_id, name=image_name,
                    xys=xys, point3D_ids=point3D_ids)
    return images


def read_colmap_bin_array(path):
    """
    Taken from https://github.com/colmap/colmap/blob/dev/scripts/python/read_dense.py

    :param path: path to the colmap binary file.
    :return: nd array with the floating point values in the value
    """
    with open(path, "rb") as fid:
        width, height, channels = np.genfromtxt(fid, delimiter="&", max_rows=1,
                                                usecols=(0, 1, 2), dtype=int)
        fid.seek(0)
        num_delimiter = 0
        byte = fid.read(1)
        while True:
            if byte == b"&":
                num_delimiter += 1
                if num_delimiter >= 3:
                    break
            byte = fid.read(1)
        array = np.fromfile(fid, np.float32)
    array = array.reshape((width, height, channels), order="F")
    return np.transpose(array, (1, 0, 2)).squeeze()

## Run Floor Separation

In [13]:
def run_floor_separation(points, min_floor_points=500, distance_threshold=None):
    result = process_point_cloud(points, min_floor_points, distance_threshold)
    print(f"Floor points: {len(result['floor_points'])}, Non-floor points: {len(result['non_floor_points'])}")
    # Visualize individual steps
    fig_floor = visualize_floor_detection(
        points, 
        result['floor_points'], 
        result['non_floor_points'], 
        result['plane_model']
    )
    fig_floor.show()

    fig_orientation = visualize_orientation(
        points, 
        result['plane_model'], 
        result['orientation_info']
    )
    fig_orientation.show()

    fig_alignment = visualize_alignment(
        points, 
        result['aligned_points']
    )
    fig_alignment.show()

    fig_final = visualize_final_result(
        points, 
        result['final_points']
    )
    fig_final.show()

    # # Or visualize everything at once
    fig_complete = visualize_complete_pipeline(result)
    fig_complete.show()
    return result

## Project Points

In [14]:
def project_points(points_3d, camera_intrinsics, extrinsics):
    """
    Project 3D points onto a 2D image plane using camera intrinsics and extrinsics.
    
    Parameters:
    - points_3d: np.ndarray - Array of 3D points (N, 3).
    - camera_intrinsics: Camera - The camera intrinsics (e.g., focal length, principal point).
    - extrinsics: np.ndarray - 4x4 extrinsic matrix (rotation and translation).

    Returns:
    - projected_points: np.ndarray - 2D projected points in image space.
    - mask: np.ndarray - Boolean array indicating which points are in front of the camera.
    """
    # Convert points to homogeneous coordinates
    points_homogeneous = np.hstack((points_3d, np.ones((points_3d.shape[0], 1))))
    
    # Transform 3D points to the camera coordinate system
    points_camera = (extrinsics @ points_homogeneous.T).T
    points_camera = points_camera[:, :3]

    # Filter points in front of the camera
    mask = points_camera[:, 2] > 0  # Keep points with positive Z
    points_camera = points_camera[mask]

    # Create intrinsic matrix
    fx, fy, cx, cy = camera_intrinsics.params  # Assuming Pinhole model (fx, fy, cx, cy)
    K = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]])

    # Project points onto the image plane
    projected_points = (K @ points_camera.T).T
    projected_points = projected_points[:, :2] / projected_points[:, 2].reshape(-1, 1)


    # Return only the points within the image boundaries
    return projected_points.astype(int), mask

def get_extrinsic_matrix(qvec, tvec):
    """
    Create a 4x4 extrinsic matrix from quaternion and translation vector.
    
    Parameters:
    - qvec: np.ndarray - Quaternion (w, x, y, z) representing rotation.
    - tvec: np.ndarray - Translation vector (x, y, z).
    
    Returns:
    - extrinsic_matrix: np.ndarray - 4x4 extrinsic matrix.
    """
    # Convert quaternion to rotation matrix
    rotation_matrix = qvec2rotmat(qvec)
    
    # Create 4x4 extrinsic matrix
    extrinsic_matrix = np.eye(4)
    extrinsic_matrix[:3, :3] = rotation_matrix
    extrinsic_matrix[:3, 3] = tvec
    
    return extrinsic_matrix



def render_composite_image(points_3d, camera, extrinsics, image_path, output_path, mask_path, image_size=None, point_size=25, alpha=0.5):
    """
    Render a composite image with the projected point cloud, superimposed image, and original image,
    and create a binary mask with dilation and closing operations applied.
    
    Parameters:
    - points_3d: np.ndarray - Array of 3D points (N, 3).
    - camera: Camera - Camera object containing intrinsics (fx, fy, cx, cy).
    - extrinsics: np.ndarray - 4x4 matrix of extrinsics for the camera.
    - image_path: str - Path to the original image in images_8.
    - output_path: str - Path where the output composite image will be saved.
    - mask_path: str - Path where the masked image will be saved.
    - image_size: tuple - Size of the image (height, width).
    - point_size: int - Size of points to draw on the image.
    - alpha: float - Transparency factor for the overlay (0 = fully transparent, 1 = fully opaque).
    
    Returns:
    - composite_image: np.ndarray - Composite image with the three views side-by-side.
    """
    if image_size is None:
        image_size = (camera.height, camera.width)
    # 1. Project the 3D points onto a blank binary mask
    projected_points, mask = project_points(points_3d, camera, extrinsics)
    binary_mask = np.zeros((image_size[0], image_size[1]), dtype=np.uint8)
    
    # Draw the projected points in white on the binary mask
    for x, y in projected_points:
        if 0 <= x < image_size[1] and 0 <= y < image_size[0]:
            cv2.circle(binary_mask, (x, y), point_size, 255, -1)  # White circles

    # 2. Apply dilation and closing
    dilation_kernel = np.ones((10, 10), np.uint8)
    closing_kernel = np.ones((100, 100), np.uint8)
    
    dilated_mask = cv2.dilate(binary_mask, dilation_kernel, iterations=1)
    closed_mask = cv2.morphologyEx(dilated_mask, cv2.MORPH_CLOSE, closing_kernel)

    # 3. Load the actual image and apply the mask
    actual_image = cv2.imread(image_path)
    if actual_image is None:
        raise FileNotFoundError(f"Image not found: {image_path}")

    # Resize actual image if needed to match image size
    if actual_image.shape[:2] != image_size:
        actual_image = cv2.resize(actual_image, (image_size[1], image_size[0]))

    # Apply the closed mask to the actual image
    masked_image = cv2.bitwise_and(actual_image, actual_image, mask=closed_mask)
    cv2.imwrite(mask_path, masked_image)  # Save the masked image

    # 4. Create an overlay image for the composite
    overlay_layer = actual_image.copy()
    for x, y in projected_points:
        if 0 <= x < image_size[1] and 0 <= y < image_size[0]:
            cv2.circle(overlay_layer, (x, y), point_size, (0, 255, 255), -1)  # Yellow circles

    overlay_image = cv2.addWeighted(overlay_layer, alpha, actual_image, 1 - alpha, 0)

    # 5. Concatenate the original and masked images for comparison
    composite_image = np.hstack((cv2.cvtColor(binary_mask, cv2.COLOR_GRAY2BGR), overlay_image, actual_image, masked_image))

    # Resize the composite image if it's too large
    composite_image = cv2.resize(composite_image, (composite_image.shape[1] // 4, composite_image.shape[0] // 4))
    cv2.imwrite(output_path, composite_image)
    
    return composite_image

def project_convex_hull(hull, camera_intrinsics, extrinsics):
    """Project 3D convex hull vertices onto a 2D image plane."""
    vertices_homogeneous = np.hstack((hull.points, np.ones((hull.points.shape[0], 1))))
    
    if extrinsics.shape != (4, 4):
        raise ValueError("Extrinsics must be a 4x4 matrix.")
    
    vertices_camera = (extrinsics @ vertices_homogeneous.T).T
    vertices_camera = vertices_camera[:, :3]

    mask = vertices_camera[:, 2] > 0  # Keep vertices with positive Z
    vertices_camera = vertices_camera[mask]

    fx, fy, cx, cy = camera_intrinsics.params
    K = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]])

    projected_vertices = (K @ vertices_camera.T).T
    projected_vertices = projected_vertices[:, :2] / projected_vertices[:, 2].reshape(-1, 1)

    return projected_vertices.astype(int), mask

def render_convex_hull_image(hull, camera, extrinsics, image_path, output_path, mask_path, image_size=None, line_color=(0, 255, 0), alpha=0.5):
    """Render a composite image with the projected convex hull and a mask."""
    if image_size is None:
        image_size = (camera.height, camera.width)
    projected_vertices, mask = project_convex_hull(hull, camera, extrinsics)
    hull_image = np.zeros((image_size[0], image_size[1], 3), dtype=np.uint8)
    mask_image = np.zeros((image_size[0], image_size[1]), dtype=np.uint8)  # Mask for the convex hull

    if projected_vertices.shape[0] > 3:
        for simplex in hull.simplices:
            if (simplex >= projected_vertices.shape[0]).any():
                continue
            face_vertices = projected_vertices[simplex]
            cv2.fillConvexPoly(mask_image, face_vertices, 255)  # Fill the face on the mask
            cv2.fillConvexPoly(hull_image, face_vertices, line_color)  # Optionally fill for visualization

    cv2.imwrite(mask_path, mask_image)	
    # 2. Load the actual image and create an overlay (middle side)
    actual_image = cv2.imread(image_path)
    if actual_image is None:
        raise FileNotFoundError(f"Image not found: {image_path}")

    if actual_image.shape[:2] != image_size:
        actual_image = cv2.resize(actual_image, (image_size[1], image_size[0]))

    overlay_image = cv2.addWeighted(hull_image, alpha, actual_image, 1 - alpha, 0)


    # 3. Create the masked original image using the convex hull mask
    masked_image = cv2.bitwise_and(actual_image, actual_image, mask=mask_image)

    # 4. Concatenate the four images side-by-side, including the masked original as the fourth
    composite_image = np.hstack((hull_image, overlay_image, actual_image, masked_image))


    # Resize the composite image if it's too large
    composite_image = cv2.resize(composite_image, (composite_image.shape[1]//4, composite_image.shape[0]//4))
    cv2.imwrite(output_path, composite_image)
    
    return composite_image




    


## Project Mesh

In [15]:
def project_mesh(mesh, camera_intrinsics, extrinsics):
    """Project 3D mesh vertices onto a 2D image plane."""
    vertices = np.asarray(mesh.vertices)
    vertices_homogeneous = np.hstack((vertices, np.ones((vertices.shape[0], 1))))
    
    if extrinsics.shape != (4, 4):
        raise ValueError("Extrinsics must be a 4x4 matrix.")
    
    # Transform vertices to camera space
    vertices_camera = (extrinsics @ vertices_homogeneous.T).T
    vertices_camera = vertices_camera[:, :3]

    # Apply mask to keep only vertices with positive Z
    mask = vertices_camera[:, 2] > 0
    vertices_camera = vertices_camera[mask]

    # Update faces to match the filtered vertices
    valid_indices = np.where(mask)[0]
    index_mapping = {old_idx: new_idx for new_idx, old_idx in enumerate(valid_indices)}
    faces = np.asarray(mesh.triangles)

    # Remap and filter faces
    remapped_faces = []
    for face in faces:
        if all(v in index_mapping for v in face):  # Keep faces with all vertices valid
            remapped_faces.append([index_mapping[v] for v in face])
    faces = np.array(remapped_faces)  # Convert the valid faces to a NumPy array

    # Project vertices onto the 2D plane
    fx, fy, cx, cy = camera_intrinsics.params
    K = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]])

    projected_vertices = (K @ vertices_camera.T).T
    projected_vertices = projected_vertices[:, :2] / projected_vertices[:, 2].reshape(-1, 1)

    return projected_vertices.astype(int), mask, faces

def render_mesh_image(mesh, camera, extrinsics, image_path, output_path, mask_path, image_size=None, fill_color=(0, 255, 0), alpha=0.5):
    """Render a composite image with the projected mesh and a masked original image."""
    if image_size is None:
        image_size = (camera.height, camera.width)
    projected_vertices, mask, faces = project_mesh(mesh, camera, extrinsics)
    mesh_image = np.zeros((image_size[0], image_size[1], 3), dtype=np.uint8)
    mask_image = np.zeros((image_size[0], image_size[1]), dtype=np.uint8)  # Mask for the mesh

    for face in faces:
        face_vertices = projected_vertices[face]
        cv2.fillConvexPoly(mask_image, face_vertices, 255)  # Fill the face on the mask
        cv2.fillConvexPoly(mesh_image, face_vertices, fill_color)  # Optionally fill for visualization

    # closing the mask
    # kernel = np.ones((50,50),np.uint8)
    # mask_image = cv2.morphologyEx(mask_image, cv2.MORPH_CLOSE, kernel)
    cv2.imwrite(mask_path, mask_image)

    # 2. Load the actual image and create an overlay (middle side)
    actual_image = cv2.imread(image_path)
    if actual_image is None:
        raise FileNotFoundError(f"Image not found: {image_path}")

    if actual_image.shape[:2] != image_size:
        actual_image = cv2.resize(actual_image, (image_size[1], image_size[0]))

    overlay_image = cv2.addWeighted(mesh_image, alpha, actual_image, 1 - alpha, 0)

    # 3. Create the masked original image using the mesh mask
    masked_image = cv2.bitwise_and(actual_image, actual_image, mask=mask_image)

    # 4. Concatenate the four images side-by-side, including the masked original as the fourth
    composite_image = np.hstack((mesh_image, overlay_image, actual_image, masked_image))

    # Resize the composite image if it's too large
    composite_image = cv2.resize(composite_image, (composite_image.shape[1]//4, composite_image.shape[0]//4))
    cv2.imwrite(output_path, composite_image)
    
    return composite_image

## Process Objects

In [16]:
def remove_outliers(points, eps=None, min_samples=50):
    if eps is None:
        density = monte_carlo_kde(points, bandwidth=1.0)
        mean_density = np.mean(density)
        median_density = np.median(density)
        eps = (1 / median_density) ** (1 / 3)
        print(f"Mean density: {mean_density:.2f}")
        print(f"Median density: {median_density:.2f}")
        print(f"Estimated epsilon: {eps:.2f}")
        print(f"Estimated epsilon using mean density: {(1 / mean_density) ** (1 / 3):.2f}")

    dbscan_labels = dbscan(points, eps=eps, min_samples=min_samples)
    unique_labels = np.unique(dbscan_labels)
    print(f"Number of clusters: {len(unique_labels)}")
    # plot clusters
    fig = go.Figure()
    colors = ['red', 'green', 'blue', 'purple', 'orange', 'cyan', 'magenta', 'yellow', 'black']
    # for label in unique_labels:
    label = 0
    color = 'gray' if label == -1 else colors[label % len(colors)]
    # label_points = points[dbscan_labels == label]
    # fig.add_trace(
    #     plot_points_3d(label_points, color=color),
    # )
    for labels in unique_labels:
        color = 'gray' if labels == -1 else colors[labels % len(colors)]
        label_points = points[dbscan_labels == labels]
        fig.add_trace(
            plot_points_3d(label_points, color=color),
        )

    fig.update_layout(
        title='DBSCAN Clustering Results',
        scene=dict(aspectmode='data'),
        showlegend=True
    )

    fig.show()
    # return cluster with the most points. dbscan_labels has -1 for outliers
    cluster_sizes = np.bincount(dbscan_labels[dbscan_labels != -1])
    largest_cluster_label = np.argmax(cluster_sizes)
    largest_cluster_points = points[dbscan_labels == largest_cluster_label]
    return largest_cluster_points


def project_points_to_image(mesh, hull, camera_id, cam_extrinsics, cam_intrinsics, object_name, points):
    camera = cam_intrinsics[camera_id] if camera_id in cam_intrinsics else cam_intrinsics[list(cam_intrinsics.keys())[0]]
    qvec = cam_extrinsics[camera_id].qvec
    tvec = cam_extrinsics[camera_id].tvec
    extrinsics = get_extrinsic_matrix(qvec, tvec)
    name = cam_extrinsics[camera_id].name
    image_path = f"data/{object_name}/images/{name}"  # Path to the image in images_8
    output_name = Path(image_path).stem
    mask_type = "hull"
    output_dir = f"output/{object_name}/cluster/{mask_type}/mask"
    os.makedirs(output_dir, exist_ok=True)
    output_path = f"output/{object_name}/cluster/{mask_type}/{output_name}.png"  # Output path for the rendered image
    mask_path = f"output/{object_name}/cluster/{mask_type}/mask/{output_name}.png"  # Output path for the mask image


    # Render the point cloud from the camera's viewpoint
    # render_convex_hull_image(hull, camera, extrinsics, image_path, output_path, mask_path)
    mask_type = "mesh"
    output_dir = f"output/{object_name}/cluster/{mask_type}/mask"
    os.makedirs(output_dir, exist_ok=True)
    output_path = f"output/{object_name}/cluster/{mask_type}/{output_name}.png"  # Output path for the rendered image
    mask_path = f"output/{object_name}/cluster/{mask_type}/mask/{output_name}.png"  # Output path for the mask image
    render_mesh_image(mesh, camera, extrinsics, image_path, output_path, mask_path)
    # mask_type = "points"
    # output_dir = f"output/{object_name}/cluster/{mask_type}/mask"
    # os.makedirs(output_dir, exist_ok=True)
    # output_path = f"output/{object_name}/cluster/{mask_type}/{output_name}.png"  # Output path for the rendered image
    # mask_path = f"output/{object_name}/cluster/{mask_type}/mask/{output_name}.png"  # Output path for the mask image
    # render_composite_image(points, camera, extrinsics, image_path, output_path, mask_path)


In [29]:
def process_object(object_name, plot=False, kde_samples=1000, min_peak_points=300, project=False, outlier_removal_eps=0.4, separate_floor=True, poisson_depth=9):
    pcd = o3d.geometry.PointCloud()
    points_original, colors, normals = read_ply(f'data/points_{object_name}.ply')
    # plot_point_cloud(bonsai_points, bonsai_colors)
    object_points, density = get_densest_cluster(points_original, min_peak_points, kde_samples=kde_samples, sigma=1, plot=plot)
    pcd.points = o3d.utility.Vector3dVector(object_points)
    o3d.io.write_point_cloud("output/{object_name}/cluster/new_{object_name}_density.ply", pcd)
    if separate_floor:
        result = run_floor_separation(object_points, distance_threshold=None)
        pcd.points = o3d.utility.Vector3dVector(result["final_points"])
        o3d.io.write_point_cloud("output/{object_name}/cluster/new_{object_name}_floorseg.ply", pcd)
    else:
        result = {"final_points": object_points}
    result["densest_cluster"] = object_points
    result["density"] = density
    outliers_removed = remove_outliers(result["final_points"], eps=outlier_removal_eps, min_samples=50)
    outliers_removed = remove_outliers(outliers_removed, eps=outlier_removal_eps, min_samples=200)
    result["outliers_removed"] = outliers_removed
    if object_name[-1].isdigit():
        object_name = object_name[:object_name.rfind("_")]
    cam_extrinsics = read_extrinsics_binary(f"data/{object_name}/sparse/0/images.bin")
    cam_intrinsics = read_intrinsics_binary(f"data/{object_name}/sparse/0/cameras.bin")
    
    mesh = poisson_surface_reconstruction(outliers_removed, depth=poisson_depth)
    plot_mesh_with_plotly(mesh, outliers_removed)
    
    # Write point cloud to file
    points_path = f"output/{object_name}/cluster/new_{object_name}_final.ply"
    
    dtype = [('x', float), ('y', float), ('z', float)]
    original_structured = np.array([tuple(p) for p in points_original], dtype=dtype)
    outliers_structured = np.array([tuple(p) for p in outliers_removed], dtype=dtype)

    indices = np.nonzero(np.in1d(original_structured, outliers_structured))[0]

    pcd.points = o3d.utility.Vector3dVector(outliers_removed)
    if colors is not None:
        end_colors = colors[indices]
        pcd.colors = o3d.utility.Vector3dVector(end_colors / 255.0)
    if normals is not None:
        end_normals = normals[indices]
        pcd.normals = o3d.utility.Vector3dVector(end_normals)

    o3d.io.write_point_cloud(points_path, pcd)

    result["mesh"] = mesh
    if project:
        hull = ConvexHull(outliers_removed)
        plot_convex_hull(outliers_removed, hull)
        result["hull"] = hull
        for camera_id in cam_extrinsics:
            project_points_to_image(mesh, hull, camera_id, cam_extrinsics, cam_intrinsics, object_name, outliers_removed)
    return result
    
def write_point_cloud(points, colors, normals, indices, path):
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(points)
    if colors is not None:
        end_colors = colors[indices]
        pcd.colors = o3d.utility.Vector3dVector(end_colors / 255.0)
    if normals is not None:
        end_normals = normals[indices]
        pcd.normals = o3d.utility.Vector3dVector(end_normals)

    o3d.io.write_point_cloud(path, pcd)


def poisson_surface_reconstruction(points, depth=9):
    # Preprocess to reduce outliers
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(points)
    pcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=1.0, max_nn=30))
    pcd.orient_normals_consistent_tangent_plane(k=30)
    
    # Poisson surface reconstruction
    mesh, densities = o3d.geometry.TriangleMesh.create_from_point_cloud_poisson(pcd, depth=depth)
    vertices_to_remove = densities < np.quantile(densities, 0.05)
    mesh.remove_vertices_by_mask(vertices_to_remove)
    
    return mesh

def plot_mesh_with_plotly(mesh, points):
    # Extract vertices and faces from the Open3D mesh
    vertices = np.asarray(mesh.vertices)
    faces = np.asarray(mesh.triangles)
    
    # Create Plotly Mesh3d plot
    fig = go.Figure(data=[
        go.Mesh3d(
            x=vertices[:, 0],
            y=vertices[:, 1],
            z=vertices[:, 2],
            i=faces[:, 0],
            j=faces[:, 1],
            k=faces[:, 2],
            opacity=0.5,
            color='red'
        )
    ])

    # Add original points
    fig.add_trace(
        go.Scatter3d(
            x=points[:, 0],
            y=points[:, 1],
            z=points[:, 2],
            mode='markers',
            marker=dict(color='blue', size=2, opacity=0.6)
        )
    )

    # Set the aspect ratio and scene size
    fig.update_layout(
        scene=dict(
            aspectmode='data',  # Ensures the aspect ratio is proportional to the data
            xaxis=dict(nticks=10, range=[np.min(vertices[:, 0]), np.max(vertices[:, 0])]),
            yaxis=dict(nticks=10, range=[np.min(vertices[:, 1]), np.max(vertices[:, 1])]),
            zaxis=dict(nticks=10, range=[np.min(vertices[:, 2]), np.max(vertices[:, 2])]),
        ),
        scene_camera=dict(eye=dict(x=1.5, y=1.5, z=1.5)),  # Sets an initial viewing angle
        title=dict(text='Poisson Surface Reconstruction', x=0.5),
        margin=dict(l=0, r=0, t=30, b=0)  # Minimizes white space around the plot
    )
    
    fig.show()


def compute_convex_hull(points):
    """Compute the convex hull of a 3D point cloud."""
    hull = ConvexHull(points)
    return hull

def plot_convex_hull(points, hull):
    """Plot the convex hull with Plotly."""
    # Extract vertices and faces from the hull
    vertices = points[hull.vertices]
    faces = hull.simplices  # Indices of vertices forming each face
    
    # Create the plot
    fig = go.Figure(data=[
        go.Scatter3d(
            x=points[:, 0],
            y=points[:, 1],
            z=points[:, 2],
            mode='markers',
            marker=dict(size=2, color='blue'),
            name='Point Cloud'
        ),
        go.Mesh3d(
            x=points[:, 0],
            y=points[:, 1],
            z=points[:, 2],
            i=faces[:, 0],
            j=faces[:, 1],
            k=faces[:, 2],
            opacity=0.5,
            color='lightgreen',
            name='Convex Hull'
        )
    ])
    
    fig.update_layout(scene=dict(aspectmode='data'), title='Convex Hull of Point Cloud')
    fig.show()

# Example usage:
# points = np.random.rand(100, 3)  # Replace with your point cloud data

### Rocket

In [None]:
rocket = process_object("rocket", plot=True, project=True, separate_floor=True, poisson_depth=12, kde_samples=5000)

### Bicycle

In [None]:
bicycle = process_object("bicycle", plot=True, project=False)

In [None]:
mesh = poisson_surface_reconstruction(bicycle["outliers_removed"], depth=9)
plot_mesh_with_plotly(mesh, bicycle["outliers_removed"])

hull = compute_convex_hull(bicycle["outliers_removed"])
plot_convex_hull(bicycle["outliers_removed"], hull)


### Bonsai

In [None]:
bonsai = process_object("bonsai", plot=True, project=False, outlier_removal_eps=0.2)

### Counter

In [None]:
counter = process_object("counter", plot=True, project=True)

### Garden

In [None]:
garden = process_object("garden", plot=True, project=False)

### Kitchen

In [None]:
kitchen = process_object("kitchen", plot=True, project=False, separate_floor=True)

### Room

In [None]:
room = process_object("room", plot=False, project=True)

### Stump

In [None]:
stump = process_object("stump", plot=True, project=True, min_peak_points=100)

### Plane

In [None]:
plane = process_object("plane", plot=True, min_peak_points=100, project=True)

## Show cameras

In [None]:
import open3d as o3d
import numpy as np
from scipy.spatial.transform import Rotation as R

def camera_position_from_extrinsics(extrinsic):
    rotation = extrinsic[:3, :3]
    translation = extrinsic[:3, 3]
    position = -rotation.T @ translation
    return position

def visualize_point_cloud_with_cameras(points, camera_extrinsics):
    point_cloud = o3d.geometry.PointCloud()
    point_cloud.points = o3d.utility.Vector3dVector(points)
    
    geometries = [point_cloud]

    for extrinsic in camera_extrinsics:
        camera_position = camera_position_from_extrinsics(extrinsic)
        
        camera_sphere = o3d.geometry.TriangleMesh.create_sphere(radius=0.05)
        camera_sphere.translate(camera_position)
        camera_sphere.paint_uniform_color([1, 0, 0]) 
        geometries.append(camera_sphere)
        
        camera_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.2)
        camera_frame.translate(camera_position)
        geometries.append(camera_frame)

    o3d.visualization.draw_geometries(geometries)


In [None]:

camera_extrinsics = [np.eye(4), np.eye(4)]  # Replace with actual extrinsics from COLMAP
visualize_point_cloud_with_cameras(plane["final_points"], camera_extrinsics)