In [None]:
!pip install laspy scipy matplotlib pykrige rasterio numba

In [None]:
# ---- IDW Interpolation ----
from scipy.spatial import cKDTree
from numba import jit

# --- IDW Interpolation ---
@jit(nopython=True, parallel=True)
def idw_interpolation(x, y, z, X, Y, power: int = 2, k: int = 10):
    """
    Performs Inverse Distance Weighting (IDW) interpolation using a KDTree for efficiency.
    
    Pros: Simple, fast, effective for smooth surfaces.
    Cons: May not handle sharp terrain features well.
    
    Parameters
    ----------
    x, y, z : np.ndarray
        Known data points (coordinates and elevation).
    X, Y : np.ndarray
        Grid points where interpolation is applied.
    power : int, optional
        Power parameter for IDW weighting (default is 2).
    k : int, optional
        Number of nearest neighbors to consider (default is 10).

    Returns
    -------
    np.ndarray
        Interpolated elevation values on the grid.
    """
    # Flatten grid arrays
    grid_points = np.column_stack((X.ravel(), Y.ravel()))
    
    # Build KDTree for fast nearest-neighbor lookup
    tree = cKDTree(np.column_stack((x, y)))
    
    # Query k nearest neighbors for all grid points
    distances, indices = tree.query(grid_points, k=k)
    
    # Avoid division by zero (replace zeros with a small value)
    distances = np.maximum(distances, 1e-10)
    
    # Compute IDW weights
    weights = 1.0 / (distances ** power)
    
    # Normalize weights
    weights /= np.sum(weights, axis=1, keepdims=True)
    
    # Compute weighted sum of elevations
    Z_interpolated = np.sum(weights * z[indices], axis=1)
    
    return Z_interpolated.reshape(X.shape)


In [None]:
from pykrige.ok import OrdinaryKriging
import numpy as np
from numba import jit

# ---- Kriging Interpolation ----
@jit(nopython=True, parallel=True)
def kriging_interpolation(x, y, z, X, Y, sample_size=5000):
    """
    Performs Kriging interpolation with downsampling to handle large datasets.

    Pros: Statistically optimal, handles spatial correlation	
    Cons: Computationally expensive, requires variogram tuning

    Parameters
    ----------
    x, y, z : np.ndarray
        Known data points (coordinates and elevation).
    X, Y : np.ndarray
        Grid points where interpolation is applied.
    sample_size : int, optional
        Number of points to sample for Kriging (default is 5000).

    Returns
    -------
    np.ndarray
        Interpolated elevation values on the grid.
    """
    try:
        # Downsample points if there are too many
        if len(x) > sample_size:
            indices = np.random.choice(len(x), sample_size, replace=False)
            x_sample, y_sample, z_sample = x[indices], y[indices], z[indices]
        else:
            x_sample, y_sample, z_sample = x, y, z

        # Apply Kriging only on sampled points
        kriging = OrdinaryKriging(
            x_sample, y_sample, z_sample, 
            variogram_model="spherical", verbose=False, enable_plotting=False
        )
        
        # Interpolate
        Z_pred, _ = kriging.execute("grid", X[0], Y[:, 0])

        return Z_pred

    except MemoryError:
        raise Exception("Kriging interpolation failed: Not enough memory, try a smaller sample size or a different method.")
    except Exception as e:
        raise Exception(f"Kriging interpolation failed: {e}")




In [None]:
from scipy.interpolate import griddata
from numba import jit

# ---- Natural Neighbor Interpolation ----
@jit(nopython=True, parallel=True)
def natural_neighbor_interpolation(x, y, z, X, Y):
    """
    Performs Natural Neighbor interpolation.

    Pros: Produces smooth surfaces, adapts well to irregularly spaced points.
          Works well for high-resolution LiDAR when preserving natural terrain variations.
      
    Cons: Computationally expensive for large datasets.

    Parameters
    ----------
    x, y, z : np.ndarray
        Known data points (coordinates and elevation).
    X, Y : np.ndarray
        Grid points where interpolation is applied.

    Returns
    -------
    np.ndarray
        Interpolated elevation values on the grid.
    """
    return griddata((x, y), z, (X, Y), method="nearest")


In [None]:
from scipy.interpolate import Rbf
import numpy as np
from numba import jit

# ---- Spline Interpolation ----
@jit(nopython=True, parallel=True)
def spline_interpolation(x, y, z, X, Y, sample_size=5000, function='linear'):
    """
    Performs Spline (Radial Basis Function) interpolation with downsampling.

    Pros: Produces very smooth surfaces, ideal for continuous terrain.
          Suitable for urban areas or soft landscapes.
      
    Cons: Can introduce artifacts, not ideal for rugged terrain.

    Parameters
    ----------
    x, y, z : np.ndarray
        Known data points (coordinates and elevation).
    X, Y : np.ndarray
        Grid points where interpolation is applied.
    sample_size : int, optional
        Number of points to sample for interpolation (default is 5000).
    function : str, optional
        RBF function to use ('linear', 'cubic', 'thin_plate'). Default is 'linear'.

    Returns
    -------
    np.ndarray
        Interpolated elevation values on the grid.
    """
    try:
        # Downsample points if there are too many
        if len(x) > sample_size:
            indices = np.random.choice(len(x), sample_size, replace=False)
            x_sample, y_sample, z_sample = x[indices], y[indices], z[indices]
        else:
            x_sample, y_sample, z_sample = x, y, z

        # Perform RBF interpolation with a faster function
        spline = Rbf(x_sample, y_sample, z_sample, function=function)

        return spline(X, Y)

    except MemoryError:
        raise Exception("Spline interpolation failed: Not enough memory, try a smaller sample size or a different method.")
    except Exception as e:
        raise Exception(f"Spline interpolation failed: {e}")


In [None]:
from scipy.spatial import Delaunay
from numba import jit

# ---- TIN Interpolation ----
@jit(nopython=True, parallel=True)
def tin_interpolation(x, y, z, X, Y):
    """
    Performs TIN (Triangulated Irregular Network) interpolation.

    Pros: Uses Delaunay Triangulation, preserves terrain break lines.
          Ideal for mountainous or hilly terrain where breaklines matter.
      
    Cons: Less smooth in flat areas, slower than IDW.

    Parameters
    ----------
    x, y, z : np.ndarray
        Known data points (coordinates and elevation).
    X, Y : np.ndarray
        Grid points where interpolation is applied.

    Returns
    -------
    np.ndarray
        Interpolated elevation values on the grid.
    """
    tri = Delaunay(np.column_stack((x, y)))
    return griddata((x, y), z, (X, Y), method="linear", fill_value=np.nan)


In [None]:
from sklearn.neighbors import KNeighborsRegressor
from numba import jit 


# ---- KNN Interpolation ----
@jit(nopython=True, parallel=True)
def knn_interpolation(x, y, z, X, Y, k: int = 5):
    """
    Performs K-Nearest Neighbors (KNN) interpolation for DTM generation.

    Pros: Very fast, handles large datasets efficiently.
          Best for categorical terrain classifications or fast previewing.

    Cons: Creates step-like artifacts, not smooth.

    Parameters
    ----------
    x, y, z : np.ndarray
        Known data points (coordinates and elevation).
    X, Y : np.ndarray
        Grid points where interpolation is applied.
    k : int, optional
        Number of nearest neighbors to consider (default is 5).

    Returns
    -------
    np.ndarray
        Interpolated elevation values on the grid.
    """
    # Reshape grid points
    grid_points = np.column_stack((X.ravel(), Y.ravel()))
    
    # Train KNN model
    knn = KNeighborsRegressor(n_neighbors=k, weights="distance")
    knn.fit(np.column_stack((x, y)), z)
    
    # Predict elevation values
    Z_pred = knn.predict(grid_points)
    return Z_pred.reshape(X.shape)


In [None]:
import matplotlib.pyplot as plt

# Plot a DTM

def plot_dtm(Z: np.ndarray, x_range: tuple, y_range: tuple, title: str = "Digital Terrain Model (DTM)") -> None:
    """
    Plots the Digital Terrain Model (DTM) as a heatmap.

    Parameters
    ----------
    Z : np.ndarray
        The DTM grid (2D array) containing elevation values.
    x_range : tuple
        The (min_x, max_x) range for the x-axis.
    y_range : tuple
        The (min_y, max_y) range for the y-axis.
    title : str, optional
        The title of the plot (default is "Digital Terrain Model (DTM)").

    Returns
    -------
    None
        Displays the DTM plot.
    """
    try:
        if Z is None or Z.size == 0:
            raise ValueError("DTM data is empty or not generated.")

        plt.figure(figsize=(10, 7))
        plt.imshow(Z, extent=[x_range[0], x_range[1], y_range[0], y_range[1]], 
                   origin="lower", cmap="terrain")
        plt.colorbar(label="Elevation (m)")
        plt.title(title)
        plt.xlabel("X Coordinate")
        plt.ylabel("Y Coordinate")
        plt.show()

    except Exception as e:
        raise Exception(f"Error in plotting DTM: {e}")

In [None]:
import laspy
import numpy as np


# Generate DTM
def generate_dtm(input_laz: str, output_resolution: float, method: str = "IDW") -> np.ndarray:
    """
    Generates a Digital Terrain Model (DTM) using ground points (class label = 2) from a .laz file.

    Parameters
    ----------
    input_laz : str
        Path to the input .laz file.
    output_resolution : float
        Grid resolution for the DTM (e.g., 1.0 for 1m resolution).
    method : str, optional
        Interpolation method, either "IDW" or "Kriging". Default is "IDW".

    Returns
    -------
    np.ndarray
        The interpolated DTM grid.

    Raises
    ------
    ValueError
        If an invalid interpolation method is given.
    FileNotFoundError
        If the input file does not exist.
    """
    try:
        if not input_laz.endswith(".laz") and not input_laz.endswith(".las"):
            raise ValueError("Invalid file format. Must be .laz or .las")

        # Read .laz file
        with laspy.open(input_laz) as las_file:
            las = las_file.read()
        print(f"The LAZ file has been loaded.")

        #Extract ground points (classification label 2)
        mask = las.classification == 2
        x, y, z = las.x[mask], las.y[mask], las.z[mask]

        if len(x) == 0:
            raise ValueError("No ground points (class 2) found in the dataset.")
        print(f"Ground points obtained: {len(x)} points")

        # Define grid extent
        min_x, max_x = np.min(x), np.max(x)
        min_y, max_y = np.min(y), np.max(y)

        # Create grid (ensure correct Y-ordering)
        x_grid = np.arange(min_x, max_x + output_resolution, output_resolution)
        y_grid = np.arange(min_y, max_y + output_resolution, output_resolution)[::-1]  # Flip Y-order

        X, Y = np.meshgrid(x_grid, y_grid)
        Z = np.full(X.shape, np.nan)  # Initialize with NaNs

        # Choose interpolation method
        if method.upper() == "IDW":
            Z = idw_interpolation(x, y, z, X, Y)
        elif method.upper() == "KRIGING":
            Z = kriging_interpolation(x, y, z, X, Y)
        elif method.upper() == "NNI":
            Z = natural_neighbor_interpolation(x, y, z, X, Y)
        elif method.upper() == "SPLINE":
            Z = spline_interpolation(x, y, z, X, Y)
        elif method.upper() == "TIN":
            Z = tin_interpolation(x, y, z, X, Y)
        elif method.upper() == "KNN":
            Z = knn_interpolation(x, y, z, X, Y, k=5)  # Using 5 nearest neighbors
        else:
            raise ValueError("Invalid method. Choose from 'IDW', 'Kriging', 'NNI', 'Spline', 'TIN', 'KNN'.")

        print("DTM grid has been generated.")
        return Z

    except FileNotFoundError as e:
        raise FileNotFoundError(str(e))
    except ValueError as e:
        raise ValueError(str(e))
    except Exception as e:
        raise Exception(f"Error generating DTM: {e}")


In [None]:
import rasterio
from rasterio.transform import from_origin

# Save DTM as GeoTIFF
def save_dtm_as_geotiff(Z: np.ndarray, x_range: tuple, y_range: tuple, resolution: float, output_file: str, crs: str = "EPSG:28992") -> None:
    """
    Saves a Digital Terrain Model (DTM) grid as a GeoTIFF file with a specified CRS.

    Parameters
    ----------
    Z : np.ndarray
        The DTM grid (2D array) containing elevation values.
    x_range : tuple
        The (min_x, max_x) range for the x-axis.
    y_range : tuple
        The (min_y, max_y) range for the y-axis.
    resolution : float
        Grid resolution (e.g., 1.0 for 1m resolution).
    output_file : str
        Path to save the GeoTIFF file.
    crs : str, optional
        Coordinate Reference System (CRS) in EPSG format (default is "EPSG:28992").

    Returns
    -------
    None
        Saves the DTM as a GeoTIFF file.
    """
    try:
        # Define transform (top-left corner as origin)
        transform = from_origin(x_range[0], y_range[1], resolution, resolution)

        # Save as GeoTIFF using rasterio
        with rasterio.open(
            output_file,
            "w",
            driver="GTiff",
            height=Z.shape[0],
            width=Z.shape[1],
            count=1,
            dtype=Z.dtype,
            crs=crs,  # Use provided CRS
            transform=transform
        ) as dst:
            dst.write(Z, 1)  # Write the DTM to band 1

        print(f"DTM saved as GeoTIFF: {output_file} with CRS {crs}")

    except Exception as e:
        raise Exception(f"Error saving GeoTIFF: {e}")



In [None]:
import laspy
import numpy as np

# Save DTM as LAZ
def save_dtm_as_laz(dtm_grid: np.ndarray, transform, output_laz: str):
    """
    Saves a Digital Terrain Model (DTM) as a .laz file.

    Parameters
    ----------
    dtm_grid : np.ndarray
        The DTM elevation grid.
    transform : rasterio.transform.Affine
        Transform containing the geospatial reference information.
    output_laz : str
        Output path for the .laz file.

    Returns
    -------
    None
        Saves a LiDAR file with ground points representing the DTM.
    """
    try:
        # Get grid dimensions
        rows, cols = dtm_grid.shape
        x_min, y_max = transform.c, transform.f  # Top-left corner
        resolution = transform.a  # Grid resolution

        # Generate X, Y coordinates for each grid cell
        x_coords = np.tile(np.arange(cols) * resolution + x_min, rows)
        y_coords = np.repeat(y_max - np.arange(rows) * resolution, cols)

        # Flatten DTM values
        z_coords = dtm_grid.flatten()

        # Remove NaN values (if any)
        valid_mask = ~np.isnan(z_coords)
        x_coords, y_coords, z_coords = x_coords[valid_mask], y_coords[valid_mask], z_coords[valid_mask]

        # Create LAS header
        header = laspy.LasHeader(point_format=1, version="1.2")
        header.scales = (0.01, 0.01, 0.01)  # Set scale factors
        header.offsets = (np.min(x_coords), np.min(y_coords), np.min(z_coords))

        # Create LAS data
        las = laspy.LasData(header)
        las.x = x_coords
        las.y = y_coords
        las.z = z_coords
        las.classification = np.full_like(z_coords, 2, dtype=np.uint8)  # Classify as ground (2)

        # Save to LAZ file
        with laspy.open(output_laz, mode="w", header=header) as las_writer:
            las_writer.write_points(las.points)

        print(f"DTM saved as LAZ: {output_laz}")

    except Exception as e:
        raise Exception(f"Error saving DTM as LAZ: {e}")

In [None]:
import matplotlib.pyplot as plt
import numpy as np


# Plot a DTM

def plot_dtm(Z: np.ndarray, x_range: tuple, y_range: tuple, title: str = "Digital Terrain Model (DTM)") -> None:
    """
    Plots the Digital Terrain Model (DTM) as a heatmap.

    Parameters
    ----------
    Z : np.ndarray
        The DTM grid (2D array) containing elevation values.
    x_range : tuple
        The (min_x, max_x) range for the x-axis.
    y_range : tuple
        The (min_y, max_y) range for the y-axis.
    title : str, optional
        The title of the plot (default is "Digital Terrain Model (DTM)").

    Returns
    -------
    None
        Displays the DTM plot.
    """
    try:
        if Z is None or Z.size == 0:
            raise ValueError("DTM data is empty or not generated.")

        # Flip the DTM grid to match GeoTIFF orientation
        #Z_flipped = np.flipud(Z)

        plt.figure(figsize=(10, 7))
        plt.imshow(Z, extent=[x_range[0], x_range[1], y_range[0], y_range[1]], 
                   origin="upper", cmap="terrain")  # Use origin="upper" for correct alignment
        plt.colorbar(label="Elevation (m)")
        plt.title(title)
        plt.xlabel("X Coordinate")
        plt.ylabel("Y Coordinate")
        plt.show()

    except Exception as e:
        raise Exception(f"Error in plotting DTM: {e}")


Next cells are the example of usage.

In [None]:
import os 

# Define paths for input and output
input_folder = "/path/to/your/laz/folder"  # Change to your actual input folder
output_folder = "/path/to/output/folder"  # Change to your actual output folder
os.makedirs(output_folder, exist_ok=True)


In [None]:
# The workflow for one single .laz file
import laspy
import numpy as np

def process_laz_file(file_name):
    """
        Reads a .laz file, generates a DTM using `generate_dtm()`, and saves it in XYZ and GeoTIFF formats.
        
        Caution: 
            1. The resolution of the DTM has to be confirmed.
            2. The interpolation method can be changed as needed.
            3. The las/laz files should have classifications for ground points.
            4. The output folder should exist.
            5. The input folder should contain the .laz file.
            6. The output folder should be different from the input folder.
            7. The output folder should have write permissions.
            8. The CRS for the GeoTIFF file can be changed as needed.
        
        Parameters
        ----------
        file_name : str
            Name of the input .laz file to process.
            
        Returns
        -------
        None
            Saves the DTM as an XYZ and GeoTIFF file.
            
        Raises
        ------
        Exception
            If an error occurs during processing
    """
    input_file_path = os.path.join(input_folder, file_name)
    
    if os.path.isfile(input_file_path) and file_name.endswith((".laz", ".las")):
        try:
            # Set DTM parameters
            grid_resolution = 1.0  # Adjust as needed
            interpolation_method = "TIN"  # Change to "IDW", "Kriging", etc.

            # Generate DTM using the provided function
            Z = generate_dtm(input_file_path, grid_resolution, method=interpolation_method)

            # Read ground points to construct X, Y grid
            with laspy.open(input_file_path) as las_file:
                las = las_file.read()
            mask = las.classification == 2
            x, y = las.x[mask], las.y[mask]

            # Create X, Y grid
            min_x, max_x = np.min(x), np.max(x)
            min_y, max_y = np.min(y), np.max(y)
            X, Y = np.meshgrid(
                np.arange(min_x, max_x + grid_resolution, grid_resolution),
                np.arange(min_y, max_y + grid_resolution, grid_resolution)[::-1]
            )

            # Save as .xyz file
            #dtm_points = np.column_stack((X.ravel(), Y.ravel(), Z.ravel()))
            #output_xyz_file = os.path.splitext(file_name)[0] + "_dtm.xyz"
            #output_xyz_path = os.path.join(output_folder, output_xyz_file)
            #np.savetxt(output_xyz_path, dtm_points, fmt="%.6f", delimiter=" ")

            # Save as GeoTIFF
            output_tif_file = os.path.splitext(file_name)[0] + "_dtm.tif"
            output_tif_path = os.path.join(output_folder, output_tif_file)
            transform = from_origin(min_x, max_y, grid_resolution, grid_resolution)
            save_dtm_as_geotiff(Z, (min_x, max_x), (min_y, max_y), grid_resolution, output_tif_path, crs="EPSG:28992")

            print(f"Processed {file_name} -> {output_xyz_file}, {output_tif_file}")

        except Exception as e:
            print(f"Error processing {file_name}: {e}")


In [None]:
from multiprocessing import Pool, cpu_count


if __name__ == "__main__":
    file_list = [f for f in os.listdir(input_folder) if f.endswith((".laz", ".las"))]
    num_workers = min( 0.5*cpu_count(), len(file_list))  # Use a proportion of the available CPU cores

    with Pool(processes=num_workers) as pool:
        pool.map(process_laz_file, file_list)

    print("All .laz files processed in parallel.")

