In [18]:
from google.colab import output
output.disable_custom_widget_manager()

In [19]:
!pip install numpy
!pip install laspy
!pip install scikit-learn
!pip install scipy
!pip install open3d
!pip install ipywidgets
!pip install matplotlib




Support for third party widgets will remain active for the duration of the session. To disable support:

In [20]:
from google.colab import output
output.enable_custom_widget_manager()

In [21]:
import numpy as np
import laspy
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from scipy.spatial import cKDTree
import open3d as o3d
from sklearn.ensemble import RandomForestClassifier
from mpl_toolkits.mplot3d import Axes3D
import ipywidgets as widgets
from IPython.display import display

In [22]:
class LidarProcessor:
    UNCLASSIFIED = 0
    GROUND = 1
    BUILDING = 2
    VEGETATION = 3

    def __init__(self):
        """Initialize the processor"""
        self.data = None
        self.features = None
        self.output_widget = widgets.Output()

    def load_file(self, file_path):
        """Load LAS file, return points and associated data"""
        print(f"Loading LAS file from: {file_path}")
        try:
            las = laspy.read(file_path)
            #Extract basic point data
            points = np.vstack((las.x, las.y, las.z)).transpose()

            #Extract additional attributes
            intensity = getattr(las, 'intensity', np.zeros(len(points)))
            return_number = getattr(las, 'return_number', np.ones(len(points)))
            number_of_returns = getattr(las, 'number_of_returns', np.ones(len(points)))

            #Storing everything
            self.data = {
                'points': points,
                'intensity': intensity,
                'return_number': return_number,
                'number_of_returns': number_of_returns,
                'las_header': las.header,
                'classification': np.zeros(len(points), dtype=np.uint8)  #Initialize as unclassified
            }

            print(f"Successfully loaded {len(points)} points")
            return True
        except Exception as e:
            print(f"Error loading LAS file: {e}")
            return False

    def downsample(self, voxel_size=0.2):
        """Downsample the point cloud for easier computation"""
        if self.data is None:
            print("No data loaded")
            return False

        print(f"Downsampling point cloud with voxel size: {voxel_size}...")

        #Create open3d point cloud and downsample
        pcd = o3d.geometry.PointCloud()
        pcd.points = o3d.utility.Vector3dVector(self.data['points'])
        downsampled_pcd = pcd.voxel_down_sample(voxel_size=voxel_size)
        downsampled_points = np.asarray(downsampled_pcd.points)

        #Create KD-tree to find corresponding original points
        kdtree = cKDTree(self.data['points'])
        _, indices = kdtree.query(downsampled_points)

        #Create downsampled data dictionary
        original_data = self.data
        self.data = {
            'points': downsampled_points,
            'intensity': original_data['intensity'][indices],
            'return_number': original_data['return_number'][indices],
            'number_of_returns': original_data['number_of_returns'][indices],
            'las_header': original_data['las_header'],
            'classification': np.zeros(len(downsampled_points), dtype=np.uint8),
            'original_indices': indices  #Store indices of original points
        }

        print(f"Downsampled from {len(original_data['points'])} to {len(downsampled_points)} points")
        return True

    def extract_ground(self, grid_size=1.0, height_threshold=0.2, slope_threshold=20.0):
        """Extract ground points using a grid based thingy"""
        if self.data is None:
            print("No data loaded")
            return False

        print("Extracting ground points...")
        points = self.data['points']

        #Create a 2D grid
        x_min, y_min = np.min(points[:, 0]), np.min(points[:, 1])
        x_max, y_max = np.max(points[:, 0]), np.max(points[:, 1])

        #Calculate grid dimensions
        grid_x = int((x_max - x_min) / grid_size) + 1
        grid_y = int((y_max - y_min) / grid_size) + 1

        #Initialize grid with high values
        grid = np.ones((grid_x, grid_y)) * float('inf')
        grid_points = {}

        #Find lowest point in each grid cell
        for i, (x, y, z) in enumerate(points):
            gx = min(grid_x - 1, int((x - x_min) / grid_size))
            gy = min(grid_y - 1, int((y - y_min) / grid_size))

            if z < grid[gx, gy]:
                grid[gx, gy] = z
                grid_points[(gx, gy)] = i

        #Identify ground cells based on slope
        ground_indices = []
        for (gx, gy), idx in grid_points.items():
            is_ground = True

            #Check neighboring cells
            for dx in [-1, 0, 1]:
                for dy in [-1, 0, 1]:
                    nx, ny = gx + dx, gy + dy
                    if (nx >= 0 and nx < grid_x and ny >= 0 and ny < grid_y and
                        grid[nx, ny] != float('inf')):
                        #Calculate slope between cells
                        distance = np.sqrt(dx**2 + dy**2) * grid_size
                        if distance > 0:  #Avoid checking the cell itself
                            height_diff = abs(grid[gx, gy] - grid[nx, ny])
                            slope = np.degrees(np.arctan(height_diff / distance))
                            if slope > slope_threshold:
                                is_ground = False
                                break
                if not is_ground:
                    break

            if is_ground:
                ground_indices.append(idx)

        #Mark initial ground points
        classifications = self.data['classification']
        for idx in ground_indices:
            classifications[idx] = self.GROUND

        #Refine ground classification using height threshold
        if ground_indices:
            kdtree = cKDTree(points[ground_indices][:, 0:2])  #2D KD-tree of ground points

            for i, (x, y, z) in enumerate(points):
                if classifications[i] != self.GROUND:  #f not already classified as ground
                    #Find closest ground point
                    distance, idx = kdtree.query([x, y])
                    if distance < grid_size * 2:  #Only compare with nearby ground points
                        ground_z = points[ground_indices[idx]][2]
                        if z - ground_z < height_threshold:
                            classifications[i] = self.GROUND  #Classify as ground

        print(f"Ground extraction complete. {np.sum(classifications == self.GROUND)} points classified as ground.")
        return True

    def segmentation(self):
        """Compute features for classification"""
        if self.data is None:
            print("No data loaded")
            return False

        print("Computing features for classification...")

        points = self.data['points']
        intensity = self.data['intensity']
        return_number = self.data['return_number']
        number_of_returns = self.data['number_of_returns']
        classifications = self.data['classification']

        #Create KD-tree for nearest neighbor search
        kdtree = cKDTree(points)

        #Features: height, height range, density, linearity, planarity, sphericity, return number, number of returns, return ratio, intensity
        self.features = np.zeros((len(points), 10))

        #Calculate height above ground for all points
        ground_indices = np.where(classifications == self.GROUND)[0]
        if len(ground_indices) > 0:
            ground_points = points[ground_indices]
            ground_kdtree = cKDTree(ground_points[:, 0:2])

            #For each point, find closest ground point
            distances, indices = ground_kdtree.query(points[:, 0:2])
            ground_z = ground_points[indices, 2]

            #Height above ground
            self.features[:, 0] = points[:, 2] - ground_z

        #Process points in batches as in segmentation
        batch_size = 1000
        for start_idx in range(0, len(points), batch_size):
            end_idx = min(start_idx + batch_size, len(points))
            batch_points = points[start_idx:end_idx]

            #Find points within search radius
            distances, neighbors_indices = kdtree.query(batch_points, k=30)

            for i in range(end_idx - start_idx):
                #Get indices of neighboring points within 2m
                valid_neighbors = neighbors_indices[i][distances[i] < 2.0]

                if len(valid_neighbors) < 3:
                    continue

                #Extract neighborhood points
                neighbors = points[valid_neighbors]

                #Local height range
                self.features[start_idx + i, 1] = np.max(neighbors[:, 2]) - np.min(neighbors[:, 2])

                #Local point density
                self.features[start_idx + i, 2] = len(valid_neighbors) / 33.5  #Approx volume of 2m radius sphere

                #Eigenvalue-based features
                if len(valid_neighbors) >= 3:
                    #Compute covariance matrix and eigenvalues
                    centered_points = neighbors - np.mean(neighbors, axis=0)
                    cov = np.cov(centered_points.T)
                    eigenvalues = np.sort(np.linalg.eigvals(cov))[::-1]  #Sort in descending order

                    if np.sum(eigenvalues) > 0:
                        #Normalize eigenvalues
                        eigenvalues = eigenvalues / np.sum(eigenvalues)

                        #Geometry features
                        self.features[start_idx + i, 3] = (eigenvalues[0] - eigenvalues[1]) / eigenvalues[0]  #Linearity
                        self.features[start_idx + i, 4] = (eigenvalues[1] - eigenvalues[2]) / eigenvalues[0]  #Planarity
                        self.features[start_idx + i, 5] = eigenvalues[2] / eigenvalues[0]  #Sphericity

        #Return information and intensity (already available per point)
        self.features[:, 6] = return_number
        self.features[:, 7] = number_of_returns
        self.features[:, 8] = return_number / np.maximum(number_of_returns, 1)  #Avoid division by zero
        self.features[:, 9] = intensity

        print("\n")
        print("For point number 1: \n Height abouve ground:", self.features[1, 0], "\n Local height range:", self.features[1, 1], "\n Local point density:", self.features[1, 2],
             "\n Liniarity:", self.features[1, 3], "\n Planarity:", self.features[1, 4], "\n Sphericity:", self.features[1, 5],
             "\n Return number:", self.features[1, 6], "\n Number of returns:", self.features[1, 7], "\n Return ratio:", self.features[1, 8],
             "\n Intensity:", self.features[1, 9])
        print("\n")
        print("For point number 101: \n Height abouve ground:", self.features[100, 0], "\n Local height range:", self.features[100, 1], "\n Local point density:", self.features[100, 2],
             "\n Liniarity:", self.features[100, 3], "\n Planarity:", self.features[100, 4], "\n Sphericity:", self.features[100, 5],
             "\n Return number:", self.features[100, 6], "\n Number of returns:", self.features[100, 7], "\n Return ratio:", self.features[100, 8],
             "\n Intensity:", self.features[100, 9])
        print("\n")




        print("Segmentation complete.")
        return True

    def classify_points(self):
        """Classify points based on computed segmentation"""
        if self.data is None or self.features is None:
            print("Data or features missing")
            return False

        print("Classifying points...")

        classifications = self.data['classification']

        #Define thresholds
        height_threshold_building = 1.0  #Minimum height for buildings
        height_threshold_vegetation = 1.0  #Minimum height for vegetation
        planarity_threshold = 0.6  #Higher values indicate planar structures (buildings)
        sphericity_threshold = 0.2  #Higher values indicate more spherical structures (vegetation)

        #Get non-ground points
        non_ground_indices = np.where(classifications != self.GROUND)[0]

        #Classify non-ground points based on features
        for i in non_ground_indices:
            height = self.features[i, 0]  #Height above ground
            planarity = self.features[i, 4]  #Planarity
            sphericity = self.features[i, 5]  #Sphericity

            #Classification rules
            if height < height_threshold_vegetation:
                continue

            if planarity > planarity_threshold and sphericity < sphericity_threshold and height > height_threshold_building:
                classifications[i] = self.BUILDING  #Building
            elif sphericity > sphericity_threshold or (height > height_threshold_vegetation and planarity < 0.4):
                classifications[i] = self.VEGETATION  #Vegetation and trees

        #Count classified points
        class_counts = {
            "Ground": np.sum(classifications == self.GROUND),
            "Building": np.sum(classifications == self.BUILDING),
            "Vegetation": np.sum(classifications == self.VEGETATION),
            "Unclassified": np.sum(classifications == self.UNCLASSIFIED)
        }

        print(f"Classification complete:")
        for cls, count in class_counts.items():
            print(f"  {cls} points: {count}")

        return True

    def refine_with_ml(self, train_percentage=0.3):
        """Refine classification using Random Forest"""
        if self.data is None or self.features is None:
            print("Data or features missing")
            return False

        print("Refining classification with Random Forest...")

        classifications = self.data['classification']

        #Get indices of classified points for training
        ground_indices = np.where(classifications == self.GROUND)[0]
        building_indices = np.where(classifications == self.BUILDING)[0]
        tree_indices = np.where(classifications == self.VEGETATION)[0]

        #Sample points for training
        def sample(indices, percentage=train_percentage):
            if len(indices) == 0:
                return np.array([])
            sample_size = max(int(len(indices) * percentage), 1)
            return np.random.choice(indices, sample_size, replace=False)

        #Create training dataset
        train_indices = np.concatenate([
            sample(ground_indices),
            sample(building_indices),
            sample(tree_indices)
        ])

        if len(train_indices) < 10:
            print("Not enough classified points for ML training. Skipping refinement.")
            return False

        #Train Random Forest classifier
        X_train = self.features[train_indices]
        y_train = classifications[train_indices]

        clf = RandomForestClassifier(n_estimators=50, max_depth=10, random_state=42, n_jobs=-1)
        clf.fit(X_train, y_train)

        #Only predict for unclassified points
        unclassified_indices = np.where(classifications == self.UNCLASSIFIED)[0]

        if len(unclassified_indices) > 0:
            X_unclassified = self.features[unclassified_indices]
            y_pred = clf.predict(X_unclassified)
            classifications[unclassified_indices] = y_pred

        #Update counts
        class_counts = {
            "Ground": np.sum(classifications == self.GROUND),
            "Building": np.sum(classifications == self.BUILDING),
            "Vegetation": np.sum(classifications == self.VEGETATION),
            "Unclassified": np.sum(classifications == self.UNCLASSIFIED)
        }

        print(f"Classification refinement complete:")
        for cls, count in class_counts.items():
            print(f"  {cls} points: {count}")

        return True

    def visualize(self, max_points=50000):
        """Visualize the classified point cloud using matplotlib"""
        if self.data is None:
            print("No data loaded")
            return None

        print("Visualizing classified point cloud...")

        points = self.data['points']
        classifications = self.data['classification']

        #Define colors for different classes
        colors = np.zeros((len(points), 3))
        colors[classifications == self.UNCLASSIFIED] = [0.7, 0.7, 0.7]  #Unclassified - gray
        colors[classifications == self.GROUND] = [0.8, 0.5, 0.2]  #Ground - brown
        colors[classifications == self.BUILDING] = [1.0, 0.0, 0.0]  #Buildings - red
        colors[classifications == self.VEGETATION] = [0.0, 0.8, 0.0]  #Trees and vegetation - green

        #Downsample points for visualization
        if len(points) > max_points:
            indices = np.random.choice(len(points), max_points, replace=False)
            points_to_plot = points[indices]
            colors_to_plot = colors[indices]
        else:
            points_to_plot = points
            colors_to_plot = colors

        #Create 3d figure
        fig = plt.figure(figsize=(10, 8))
        ax = fig.add_subplot(111, projection='3d')

        #Plot points
        ax.scatter(
            points_to_plot[:, 0],
            points_to_plot[:, 1],
            points_to_plot[:, 2],
            c=colors_to_plot,
            s=1,
            marker='.'
        )

        #Set labels and title
        ax.set_xlabel('X')
        ax.set_ylabel('Y')
        ax.set_zlabel('Z')
        ax.set_title('Classified Point Cloud')

        #Add legend
        legend_elements = [
            plt.Line2D([0], [0], marker='o', color='w', markerfacecolor=[0.7, 0.7, 0.7], markersize=10, label='Unclassified'),
            plt.Line2D([0], [0], marker='o', color='w', markerfacecolor=[0.8, 0.5, 0.2], markersize=10, label='Ground'),
            plt.Line2D([0], [0], marker='o', color='w', markerfacecolor=[1.0, 0.0, 0.0], markersize=10, label='Buildings'),
            plt.Line2D([0], [0], marker='o', color='w', markerfacecolor=[0.0, 0.8, 0.0], markersize=10, label='Trees/Vegetation')
        ]
        ax.legend(handles=legend_elements, loc='upper right')

        #Set equal aspect ratio
        max_range = np.array([
            points_to_plot[:, 0].max() - points_to_plot[:, 0].min(),
            points_to_plot[:, 1].max() - points_to_plot[:, 1].min(),
            points_to_plot[:, 2].max() - points_to_plot[:, 2].min()
        ]).max() / 2.0

        mid_x = (points_to_plot[:, 0].max() + points_to_plot[:, 0].min()) * 0.5
        mid_y = (points_to_plot[:, 1].max() + points_to_plot[:, 1].min()) * 0.5
        mid_z = (points_to_plot[:, 2].max() + points_to_plot[:, 2].min()) * 0.5

        ax.set_xlim(mid_x - max_range, mid_x + max_range)
        ax.set_ylim(mid_y - max_range, mid_y + max_range)
        ax.set_zlim(mid_z - max_range, mid_z + max_range)

        plt.tight_layout()
        plt.show()

        return fig

    def visualize_with_open3d(self, point_size=1.0, max_points=500000):
        """Visualize the classified point cloud using Open3D"""
        if self.data is None:
            print("No data loaded")
            return False

        print("Creating Open3D visualization...")

        points = self.data['points']
        classifications = self.data['classification']

        #Downsample for better performance if needed
        if len(points) > max_points:
            print(f"Point cloud is large ({len(points)} points). Downsampling for visualization...")
            indices = np.random.choice(len(points), max_points, replace=False)
            vis_points = points[indices]
            vis_classifications = classifications[indices]
        else:
            vis_points = points
            vis_classifications = classifications

        #Create Open3D point cloud
        pcd = o3d.geometry.PointCloud()
        pcd.points = o3d.utility.Vector3dVector(vis_points)

        #Define colors for different classes
        colors = np.zeros((len(vis_points), 3))
        colors[vis_classifications == self.UNCLASSIFIED] = [0.7, 0.7, 0.7]  #Unclassified - gray
        colors[vis_classifications == self.GROUND] = [0.8, 0.5, 0.2]  #Ground - brown
        colors[vis_classifications == self.BUILDING] = [1.0, 0.0, 0.0]  #Buildings - red
        colors[vis_classifications == self.VEGETATION] = [0.0, 0.8, 0.0]  #Trees/Vegetation - green

        pcd.colors = o3d.utility.Vector3dVector(colors)

        #Initialize visualizer
        vis = o3d.visualization.Visualizer()
        vis.create_window(window_name="Classified Point Cloud", width=1024, height=768)
        vis.add_geometry(pcd)

        #Set rendering options
        opt = vis.get_render_option()
        opt.background_color = np.array([0.1, 0.1, 0.1])
        opt.point_size = point_size

        #Start visualization
        print("Open3D visualization ready. Close the window to continue.")
        vis.run()
        vis.destroy_window()

        return True

    def export_result(self, output_path):
        """Export the classified point cloud as LAS file"""
        if self.data is None:
            print("No data loaded")
            return False

        print(f"Exporting classified point cloud to: {output_path}")

        try:
            #Create new LAS file with the same header as input
            header = self.data['las_header']
            new_las = laspy.create(point_format=header.point_format, file_version=header.version)

            #Copy header info
            new_las.header.offsets = header.offsets
            new_las.header.scales = header.scales

            #Set coordinates
            points = self.data['points']
            new_las.x = points[:, 0]
            new_las.y = points[:, 1]
            new_las.z = points[:, 2]

            #Set other attributes
            new_las.intensity = self.data['intensity']
            new_las.return_number = self.data['return_number']
            new_las.number_of_returns = self.data['number_of_returns']

            #Set classification
            new_las.classification = self.data['classification']

            #Write to file
            new_las.write(output_path)
            print(f"Export successful: {output_path}")
            return True
        except Exception as e:
            print(f"Error exporting classified point cloud: {e}")
            return False


In [23]:
def run_lidar_processor():
    """interactive UI"""
    processor = LidarProcessor()

    #Create widgets
    file_path_input = widgets.Text(
        value='',
        placeholder='Enter the path to your LAS file',
        description='File path:',
        disabled=False,
        style={'description_width': 'initial'},
        layout=widgets.Layout(width='50%')
    )

    voxel_size_input = widgets.FloatSlider(
        value=0.2,
        min=0.05,
        max=1.0,
        step=0.05,
        description='Voxel Size:',
        disabled=False,
        continuous_update=False,
        orientation='horizontal',
        readout=True,
        readout_format='.2f',
        tooltip='Higher values mean more aggressive downsampling (faster but less accurate)'
    )

    max_points_input = widgets.IntSlider(
        value=50000,
        min=10000,
        max=500000,
        step=10000,
        description='Viz Points:',
        disabled=False,
        continuous_update=False,
        orientation='horizontal',
        readout=True,
        readout_format='d',
        tooltip='Maximum number of points to show in visualization'
    )

    submit_button = widgets.Button(
        description='Process Point Cloud',
        disabled=False,
        button_style='info',
        tooltip='Click to process the LAS file'
    )

    output_area = widgets.Output()
    processor.output_widget = output_area

    #Display input widgets
    display(widgets.VBox([
        widgets.HBox([file_path_input, submit_button]),
        widgets.HBox([voxel_size_input, max_points_input]),
        output_area
    ]))

    #Processing function
    def process_file(b):
        with output_area:
            output_area.clear_output()

            file_path = file_path_input.value
            voxel_size = voxel_size_input.value
            max_points = max_points_input.value

            if not file_path:
                print("Please enter a valid file path")
                return

            print(f"Processing file: {file_path}")

            #Create progress indicator
            progress = widgets.IntProgress(
                value=0,
                min=0,
                max=6,
                description='Processing:',
                bar_style='info',
                style={'bar_color': '#389BDC'},
                orientation='horizontal'
            )
            display(progress)

            #Process steps
            if not processor.load_file(file_path):
                print("Failed to load file. Please check the path and try again.")
                return
            progress.value += 1

            processor.downsample(voxel_size=voxel_size)
            progress.value += 1

            processor.visualize(max_points=max_points)
            processor.visualize_with_open3d()
            progress.value += 1

            processor.extract_ground()
            progress.value += 1

            processor.visualize(max_points=max_points)
            processor.visualize_with_open3d()
            progress.value += 1

            processor.segmentation()
            progress.value += 1

            processor.classify_points()
            progress.value += 1

            processor.visualize(max_points=max_points)
            processor.visualize_with_open3d()
            progress.value += 1

            processor.refine_with_ml()
            progress.value += 1

            processor.visualize(max_points=max_points)

            #Add visualization and export options
            add_additional_options(processor, file_path)

    def add_additional_options(processor, file_path):
        """Add additional visualization and export options"""

        #open3d visualization button
        open3d_button = widgets.Button(
            description='Interactive 3D View',
            disabled=False,
            button_style='success',
            tooltip='Open interactive 3D visualization'
        )

        #Point size slider
        point_size_slider = widgets.FloatSlider(
            value=1.0,
            min=0.5,
            max=5.0,
            step=0.5,
            description='Point Size:',
            disabled=False,
            continuous_update=False,
            orientation='horizontal',
            readout=True,
            readout_format='.1f',
        )

        #Export options
        export_path = widgets.Text(
            value=file_path.replace('.las', '_classified.las'),
            placeholder='Enter output file path',
            description='Output path:',
            disabled=False,
            style={'description_width': 'initial'},
            layout=widgets.Layout(width='70%')
        )

        export_button = widgets.Button(
            description='Export Result',
            disabled=False,
            button_style='warning',
            tooltip='Export classified point cloud'
        )

        display(widgets.VBox([
            widgets.Label("Additional Options:"),
            widgets.HBox([open3d_button, point_size_slider]),
            widgets.HBox([export_path, export_button])
        ]))

        #Button handlers
        def on_visualize_click(b):
            with output_area:
                processor.visualize_with_open3d(
                    point_size=point_size_slider.value,
                    max_points=max_points_input.value
                )

        def on_export_click(b):
            with output_area:
                output_path = export_path.value
                if output_path:
                    processor.export_result(output_path)
                else:
                    print("Please enter a valid output file path")

        open3d_button.on_click(on_visualize_click)
        export_button.on_click(on_export_click)

    submit_button.on_click(process_file)

In [24]:
run_lidar_processor()

VBox(children=(HBox(children=(Text(value='', description='File path:', layout=Layout(width='50%'), placeholder…