In [12]:
from pyforestscan.calculate import calculate_chm, calculate_fhd, assign_voxels
from pyforestscan.handlers import read_lidar, create_geotiff
import numpy as np
import glob, os

---
### 1. Pipeline PDAL

In [None]:
import pdal, json

pipe = pdal.Pipeline(
    json.dumps(
        {
            "pipeline": [
                {
                    "type": "readers.las",
                    "filename": "/home/mgallet/Téléchargements/uncompress_LHD_FXX_0945_6473_PTS_O_LAMB93_IGN69.las",
                    "spatialreference": "EPSG:2154",
                    "start": 0,
                    "count": 50000,
                },
                {
                    "type": "filters.voxeldownsize",
                    "cell": 0.1,
                },
                {
                    "type": "filters.outlier",
                    "method": "statistical",
                    "mean_k": 20,
                    "multiplier": 2,
                },
                {"type": "filters.elm", "cell": 2, "class": 2, "threshold": 0.5},
                # {
                #   "type": "filters.assign",
                #   "assignment": "Classification[2:2]=0"
                # },
                {
                    "type": "filters.csf",
                    "resolution": 0.375,
                    "rigidness": 3,
                    "iterations": 500,
                    "threshold": 0.15,
                    "step": 0.75,
                    "ignore": "Classification[7:7]",
                },
                # {
                #     "type": "filters.smrf",
                #     "slope": 0.25,
                #     "window": 20.0,
                #     "threshold": 0.2,
                #     "scalar": 1.25,
                #     "cell": 5.0,
                # },
                # {
                #     "type": "filters.csf",
                #     "resolution": 0.5,
                #     "rigidness": 3,
                #     "iterations": 250,
                #     "threshold": 0.2,
                # },
                # {"type": "filters.range", "limits": "Classification![2:2]"},
                # {
                #     "type": "filters.csf",
                #     "resolution": 0.25,
                #     "rigidness": 3,
                #     "iterations": 500,
                #     "threshold": 0.15,
                # },
                {"type": "filters.hag_nn", "count": 100},
                {
                    "type": "filters.range",
                    "limits": "Classification![7:7],HeightAboveGround[0:]",
                },
            ]
        }
    )
)
pipe.execute()
arrays = pipe.arrays[0]
print(f"Shape of arrays: {arrays.shape}")
print(
    f"Unique classification values: {np.unique(arrays['Classification'], return_counts=True)}"
)
print(f"Max height above ground: {arrays['HeightAboveGround'].max()}")
print(f"Min height above ground: {arrays['HeightAboveGround'].min()}")
print(f"Height above ground values: {arrays['HeightAboveGround']}")

In [None]:
voxel_resolution = (2, 2, 0.1)

voxels, extent = assign_voxels(arrays, voxel_resolution)
chm = calculate_chm(arrays, voxel_resolution, interpolation=None)
create_geotiff(chm[0], "/home/mgallet/Téléchargements/chm.tif", "EPSG:2154", extent)

---
### 2. Analyse CHM /FHD

In [None]:
import numpy as np
import glob, copy

path = (
    "/home/mgallet/Documents/DATA/LIDAR_TEST/processed/lidar_data/Belledonne/**/*.npy"
)
path2 = "/home/mgallet/Téléchargements/**/*.npy"
data = np.load(glob.glob(path2, recursive=True)[0])

In [None]:
plot_voxels = False

def test(newdata):
    voxel_resolution = (20, 20, 0.1)
    voxels, extent = assign_voxels(newdata, voxel_resolution)
    chm = calculate_chm(newdata, voxel_resolution, interpolation=None)
    create_geotiff(chm[0], "chm.tif", "EPSG:2154", extent)
if plot_voxels:
    test(data)

In [94]:
import numpy as np
import rasterio
from rasterio.transform import from_origin

from scipy.stats import entropy

def compute_fhd(points, resolution=1.0, quantile=100, output_tif=None):
    # Extraire les colonnes
    x = points["X"]
    y = points["Y"]
    z = points["HeightAboveGround"]

    # Définir l’étendue
    xmin, xmax = np.min(x), np.max(x)
    ymin, ymax = np.min(y), np.max(y)

    width = int(np.ceil((xmax - xmin) / resolution))
    height = int(np.ceil((ymax - ymin) / resolution))

    # Créer la grille CHM avec les dimensions correctes
    chm = np.full((height, width), np.nan, dtype=np.float32)

    # Convertir coordonnées XY en indices de grille
    col = np.clip(((x - xmin) / resolution).astype(int), 0, width - 1)
    row = np.clip(((ymax - y) / resolution).astype(int), 0, height - 1)

    from collections import defaultdict

    grid_dict = defaultdict(list)
    for r, c, val in zip(row, col, z):
        grid_dict[(r, c)].append(val)

    # Fonction pour calculer le quantile sur une cellule
    def process_cell(cell):
        r, c = cell
        values = grid_dict[(r, c)]
        bins = np.arange(0, 5, 0.1)
        hist, _ = np.histogram(values, bins=bins)
        hist = hist / hist.sum()
        percent_max = entropy(hist)
        # unique_values = np.unique(values, return_counts=True)
        # try:
        #     percent_max = unique_values[1][unique_values[0] == 2][0]/unique_values[1].sum()
        # except IndexError:
        #     percent_max = np.nan
        return (r, c, percent_max)
    
    cells = list(grid_dict.keys())
    results = [process_cell(cell) for cell in cells]
    for r, c, val in results:
        chm[r, c] = val

    if output_tif:
        transform = from_origin(xmin, ymax, resolution, resolution)
        with rasterio.open(
            output_tif,
            "w",
            driver="GTiff",
            height=chm.shape[0]-1,
            width=chm.shape[1]-1,
            count=1,
            dtype=chm.dtype,
            crs="EPSG:2154",
            transform=transform,
            nodata=np.nan,
        ) as dst:
            dst.write(chm[:-1, :-1], 1)

    return chm


# Exemple d'utilisation
# chm = compute_chm(mon_array, resolution=1.0, quantile=95, output_tif="chm_95.tif")

In [95]:
_ = compute_fhd(data, resolution=10, quantile=25, output_tif='fhd_10.tif')

In [62]:
a = np.unique(data['Classification'], return_counts=True)

In [92]:
a,_ = np.histogram(np.random.normal(0, 1, 1000))
a, a.sum()

(array([ 13,  37, 103, 169, 230, 222, 133,  68,  18,   7]), np.int64(1000))

In [89]:
a,_ = np.histogram(np.random.normal(0, 1, 1000), bins=10)
a= a/a.sum()
a

array([0.004, 0.028, 0.071, 0.144, 0.274, 0.242, 0.157, 0.055, 0.022,
       0.003])

In [66]:
a,a[1][a[0] == 2][0]

((array([ 1,  2,  3,  4,  5,  6,  7, 65], dtype=uint8),
  array([ 69291369, 110669800,    156552,   1375575,  11737070,       920,
           1853898,       158])),
 np.int64(110669800))

In [None]:
import numpy as np
import rasterio
from rasterio.transform import from_origin
from joblib import Parallel, delayed
from tqdm import tqdm


def compute_fhd(points, resolution=1.0, alti_resolution=1.0, output_tif=None):
    x = points["X"]
    y = points["Y"]
    z = points["HeightAboveGround"]

    # Étendue spatiale
    xmin, xmax = np.min(x), np.max(x)
    ymin, ymax = np.min(y), np.max(y)
    width = int(np.ceil((xmax - xmin) / resolution))
    height = int(np.ceil((ymax - ymin) / resolution))

    # Indices grille
    col = ((x - xmin) / resolution).astype(int)
    row = ((ymax - y) / resolution).astype(int)

    # Groupement par cellule
    from collections import defaultdict

    grid_dict = defaultdict(list)
    for r, c, h in zip(row, col, z):
        grid_dict[(r, c)].append(h)

    # Fonction pour calculer la FHD sur une cellule
    def compute_cell_fhd(cell):
        r, c = cell
        heights = grid_dict[(r, c)]
        if len(heights) == 0:
            return (r, c, np.nan)
        # Binning
        bins = np.arange(0, max(heights) + alti_resolution, alti_resolution)
        hist, _ = np.histogram(heights, bins=bins)
        probs = hist / np.sum(hist)
        probs = probs[probs > 0]
        fhd = -np.sum(probs * np.log(probs))
        return (r, c, fhd)

    # Parallélisation
    cells = list(grid_dict.keys())
    results = Parallel(n_jobs=-1)(
        delayed(compute_cell_fhd)(cell) for cell in tqdm(cells)
    )

    # Création raster FHD
    fhd_raster = np.full((height, width), np.nan, dtype=np.float32)
    for r, c, val in results:
        fhd_raster[r, c] = val

    # Export GeoTIFF
    if output_tif:
        transform = from_origin(xmin, ymax, resolution, resolution)
        with rasterio.open(
            output_tif,
            "w",
            driver="GTiff",
            height=fhd_raster.shape[0],
            width=fhd_raster.shape[1],
            count=1,
            dtype=fhd_raster.dtype,
            crs="EPSG:2154",  # ajustez selon votre système de coordonnées
            transform=transform,
            nodata=np.nan,
        ) as dst:
            dst.write(fhd_raster, 1)

    return fhd_raster


# Exemple d'utilisation
# fhd = compute_fhd(mon_array, resolution=1.0, classification_filter=[5], bin_size=1.0, output_tif="fhd.tif")

In [78]:
from scipy.stats import entropy

entropy([0.5, 0.2,0.1,0,0.05,0.05,0.1,0])

np.float64(1.428551418721001)

In [None]:
import numpy as np
from joblib import Parallel, delayed
from collections import defaultdict
from tqdm import tqdm


def compute_chm_fhd_canal(
    points,
    resolution=1.0,
    classification_filter=[5],
    seuils_chm=[0.2, 2.0],
    bin_size=1.0,
):
    x = points["X"]
    y = points["Y"]
    z = points["HeightAboveGround"]
    classification = points["Classification"]

    # Filtrage
    mask = np.isin(classification, classification_filter)
    x, y, z = x[mask], y[mask], z[mask]

    # Dimensions grille
    xmin, xmax = np.min(x), np.max(x)
    ymin, ymax = np.min(y), np.max(y)
    width = int(np.ceil((xmax - xmin) / resolution))
    height = int(np.ceil((ymax - ymin) / resolution))

    # Index grille
    col = ((x - xmin) / resolution).astype(int)
    row = ((ymax - y) / resolution).astype(int)

    # Groupement par cellule
    grid_dict = defaultdict(list)
    for r, c, h in zip(row, col, z):
        grid_dict[(r, c)].append(h)

    # Calcul CHM et FHD pour chaque cellule
    def compute_cell_values(cell):
        r, c = cell
        heights = grid_dict[(r, c)]
        if not heights:
            return (r, c, np.nan, np.nan)

        heights = np.array(heights)
        chm = np.nanquantile(heights, 1.0)  # quantile 100%
        bins = np.arange(0, heights.max() + bin_size, bin_size)
        hist, _ = np.histogram(heights, bins=bins)
        probs = hist / np.sum(hist)
        probs = probs[probs > 0]
        fhd = -np.sum(probs * np.log(probs)) if len(probs) > 0 else 0.0

        return (r, c, chm, fhd)

    cells = list(grid_dict.keys())
    results = Parallel(n_jobs=-1)(
        delayed(compute_cell_values)(cell) for cell in tqdm(cells)
    )

    # Matrices
    chm = np.full((height, width), np.nan, dtype=np.float32)
    fhd = np.full_like(chm, np.nan)
    canal = np.full_like(chm, 127, dtype=np.uint8)  # par défaut: nan

    # Remplissage
    seuil_bas, seuil_haut = seuils_chm
    for r, c, chm_val, fhd_val in results:
        chm[r, c] = chm_val
        fhd[r, c] = fhd_val

        if np.isnan(chm_val) or np.isnan(fhd_val):
            canal[r, c] = 127  # NaN
        elif chm_val == 0 and fhd_val == 0:
            canal[r, c] = 0
        elif 0 < chm_val <= seuil_bas:
            canal[r, c] = 1
        elif chm_val > seuil_haut:
            canal[r, c] = 2
        else:
            canal[r, c] = 0  # autres cas

    return canal

In [None]:
voxels, extent = assign_voxels(data, voxel_resolution)
fhd = calculate_fhd(data, voxel_resolution)
create_geotiff(fhd, "fhd.tif", "EPSG:2154", extent)
del voxels

chm = calculate_chm(data, voxel_resolution, interpolation=None)
create_geotiff(chm, "chm.tif", "EPSG:2154", extent)

In [None]:
import numpy as np
import rasterio
from rasterio.transform import from_origin
from joblib import Parallel, delayed
from collections import defaultdict
from tqdm import tqdm


def compute_chm_fhd_and_canal(
    points,
    resolution=1.0,
    classification_filter=[5],
    quantile=100,
    bin_size=1.0,
    seuils_chm=[0.2, 2.0],
):
    x = points["X"]
    y = points["Y"]
    z = points["HeightAboveGround"]
    classification = points["Classification"]

    # Filtrage
    mask = np.isin(classification, classification_filter)
    x, y, z = x[mask], y[mask], z[mask]

    xmin, xmax = np.min(x), np.max(x)
    ymin, ymax = np.min(y), np.max(y)
    width = int(np.ceil((xmax - xmin) / resolution))
    height = int(np.ceil((ymax - ymin) / resolution))

    col = ((x - xmin) / resolution).astype(int)
    row = ((ymax - y) / resolution).astype(int)

    grid_dict = defaultdict(list)
    for r, c, h in zip(row, col, z):
        grid_dict[(r, c)].append(h)

    def process_cell(cell):
        r, c = cell
        heights = np.array(grid_dict[(r, c)])
        if len(heights) == 0:
            return (r, c, np.nan, np.nan)

        chm = np.nanquantile(heights, quantile / 100.0)
        bins = np.arange(0, heights.max() + bin_size, bin_size)
        hist, _ = np.histogram(heights, bins=bins)
        probs = hist / np.sum(hist)
        probs = probs[probs > 0]
        fhd = -np.sum(probs * np.log(probs)) if len(probs) > 0 else 0.0

        # Canal logique
        if np.isnan(chm) or np.isnan(fhd):
            canal = 127
        elif chm == 0 and fhd == 0:
            canal = 0
        elif 0 < chm <= seuils_chm[0]:
            canal = 1
        elif chm > seuils_chm[1]:
            canal = 2
        else:
            canal = 0

        return r, c, chm, fhd, canal

    cells = list(grid_dict.keys())
    results = Parallel(n_jobs=-1)(delayed(process_cell)(cell) for cell in tqdm(cells))

    # Matrices de sortie
    chm_img = np.full((height, width), np.nan, dtype=np.float32)
    fhd_img = np.full((height, width), np.nan, dtype=np.float32)
    canal_img = np.full((height, width), 127, dtype=np.uint8)

    for r, c, chm_val, fhd_val, canal_val in results:
        chm_img[r, c] = chm_val
        fhd_img[r, c] = fhd_val
        canal_img[r, c] = canal_val

    transform = from_origin(xmin, ymax, resolution, resolution)
    return chm_img, fhd_img, canal_img, transform, height, width


def export_3band_geotiff(filename, chm, fhd, canal, transform, crs="EPSG:2154"):
    with rasterio.open(
        filename,
        "w",
        driver="GTiff",
        height=chm.shape[0],
        width=chm.shape[1],
        count=3,
        dtype=rasterio.float32,
        crs=crs,
        transform=transform,
        nodata=np.nan,
    ) as dst:
        dst.write(chm.astype(np.float32), 1)
        dst.write(fhd.astype(np.float32), 2)
        dst.write(
            canal.astype(np.float32), 3
        )  # exporté en float32 pour cohérence multi-bandes


def compute_and_export_3band_image(
    points,
    output_tif,
    resolution=1.0,
    classification_filter=[5],
    quantile=100,
    bin_size=1.0,
    seuils_chm=[0.2, 2.0],
):
    chm, fhd, canal, transform, height, width = compute_chm_fhd_and_canal(
        points=points,
        resolution=resolution,
        classification_filter=classification_filter,
        quantile=quantile,
        bin_size=bin_size,
        seuils_chm=seuils_chm,
    )
    export_3band_geotiff(output_tif, chm, fhd, canal, transform)
    return chm, fhd, canal


chm, fhd, canal = compute_and_export_3band_image(
    points=mon_array,
    output_tif="output_3band_image.tif",
    resolution=1.0,
    classification_filter=[5],
    quantile=100,
    bin_size=1.0,
    seuils_chm=[0.2, 2.0],
)