# Laserfarm: LiDAR point cloud analysis for macro-ecology

## Configuration

### User parameters

Defines the parameters that can be set by users when executing the workflow.

In [None]:
# (DO NOT containerize this cell)

# Data handling parameters
param_minio_endpoint = 'scruffy.lab.uvalight.net:9000'
param_minio_public_bucket = 'naa-vre-public'
param_minio_virtual_lab_bucket = 'naa-vre-laserfarm'

# Laserfarm parameters
param_feature_name = 'perc_95_normalized_height'
param_validate_precision = '0.001'
param_tile_mesh_size = '10.'
param_filter_type = 'select_equal'
param_attribute = 'raw_classification'
param_min_x = '-113107.81'  # EPSG:28992
param_max_x = '398892.19'  # EPSG:28992
param_min_y = '214783.87'  # EPSG:28992
param_max_y = '726783.87'  # EPSG:28992
param_n_tiles_side = '512'
param_apply_filter_value = '1'
param_processed_files_record_file = 'processed_files_log.txt'
param_files_to_process_file = 'AHN6_url_test.txt'
param_max_tiles = 2
param_max_nodes = 2

In [None]:
# Secrets (DO NOT containerize this cell)
from SecretsProvider import SecretsProvider
from getpass import getpass

secrets_provider = SecretsProvider(input_func=getpass)
secret_minio_access_key = secrets_provider.get_secret('secret_minio_access_key')
secret_minio_secret_key = secrets_provider.get_secret('secret_minio_secret_key')

### Dependencies

The following cells install extra dependencies that are not included in the Laserfarm flavor by default, and import the libraries used in the notebook.

In [None]:
# (DO NOT containerize this cell)

import json
import os
import laspy
import io
import random
import math

from laserfarm import DataProcessing, GeotiffWriter, Retiler
from laserfarm.remote_utils import get_wdclient, list_remote
from minio import Minio
from minio.error import S3Error
from typing import List
from urllib import request

### Global configuration

The following variable are used throughout the code. They are intended to be edited by developers who the notebook.

In [None]:
# (DO NOT containerize this cell)

conf_local_tmp = '/tmp/data'
conf_local_path_raw = os.path.join(conf_local_tmp, 'raw')
conf_local_path_retiled = os.path.join(conf_local_tmp, 'retiled')
conf_local_path_targets = os.path.join(conf_local_tmp, 'targets')

## Workflow steps

In [None]:
# datafiles processing manager
def get_filename_batches_to_process() -> List[List[str]]:
    filenames_to_process = get_minio_file_as_set(f"{param_feature_name}/{param_files_to_process_file}")
    unprocessed_filenames = keep_unprocessed_filenames(filenames_to_process)
    if not unprocessed_filenames:
        raise ValueError(f"The list of unprocessed_filenames is empty. Cannot proceed with processing.")
    random.shuffle(unprocessed_filenames) 
    selected_filenames = unprocessed_filenames[:param_max_tiles]
    filename_batches = split_into_batches(selected_filenames)
    print(f"Files to process: {len(filenames_to_process)}, of which {len(unprocessed_filenames)} are unprocessed. Preparing {len(selected_filenames)} files in {len(filename_batches)} batches.")
    return filename_batches

def get_minio_file_as_set(filename: str) -> set[str]:
    response = None
    try:
        response = minio_client.get_object(param_minio_virtual_lab_bucket, filename)
        content = response.data.decode("utf-8")
        return {line.strip() for line in content.splitlines() if line.strip()}
    
    except S3Error as e:
        if e.code == "NoSuchKey":
            return set()
        raise e
        
    finally:
        if response:
            response.close()
            response.release_conn()

minio_client = Minio(
    param_minio_endpoint, 
    access_key=secret_minio_access_key,
    secret_key=secret_minio_secret_key,
    secure=True
)
    
def keep_unprocessed_filenames(filenames : List[str]) -> List[str]:
    processed_files : set[str] = get_minio_file_as_set(f"{param_feature_name}/{param_processed_files_record_file}")
    files_to_process = list(set(filenames) - processed_files)
    return files_to_process

def split_into_batches(filenames : List[str]):
    batch_size = math.ceil(len(filenames)/param_max_nodes)
    return[filenames[i:i + batch_size] for i in range(0, len(filenames), batch_size)]

def prepare_directories():
    os.makedirs(conf_local_path_raw, exist_ok=True)
    os.makedirs(conf_local_path_retiled, exist_ok=True)

las_data_filename_batches: List[List[str]] = get_filename_batches_to_process()
prepare_directories()

In [None]:
# Laserfarm runner
def process(batch : List[str]):
    for url in batch:
        process_data(url)

def process_data(url : str):
    downloaded_filepath = fetch_file(url, output_folder=conf_local_path_raw)
    retiled_filepaths = retile_laz_file(raw_laz_filepath=downloaded_filepath, input_folder=conf_local_path_raw, output_folder=conf_local_path_retiled)
    feature_filepaths = extract_features(retiled_filepaths, input_folder=conf_local_path_retiled, output_folder=conf_local_path_targets)
    for filepath in feature_filepaths:
        copy_to_minio(filepath, input_folder=conf_local_path_targets)
    add_to_processed_log(url)

def fetch_file(url : str, output_folder : str) -> str:
    filename = url.rpartition('/')[-1] 
    filepath = f"{output_folder}/{filename}"
    request.urlretrieve(url, filepath)
    return filepath

def retile_laz_file(raw_laz_filepath : str, input_folder : str, output_folder : str) -> List[str]:
    grid_retile = {
        'min_x': float(param_min_x),
        'max_x': float(param_max_x),
        'min_y': float(param_min_y),
        'max_y': float(param_max_y),
        'n_tiles_side': int(param_n_tiles_side),
        }

    retiling_input = {
        'setup_local_fs': {
            'input_folder': input_folder,
            'output_folder': output_folder,
            },
        'set_grid': grid_retile,
        'split_and_redistribute': {},
        'validate': {},
        }
    
    tiles = []
    base_name = os.path.splitext(os.path.basename(raw_laz_filepath))[0]
    retile_record_filename = os.path.join(
        output_folder,
        f'{base_name}_retile_record.js',
        )
    if not os.path.isfile(retile_record_filename):
        print(f'Retiling {raw_laz_filepath}')
        retiler = Retiler(raw_laz_filepath, label=raw_laz_filepath).config(retiling_input)
        retiler.run()
    else:
        print(
            f'Skipping retiling of {raw_laz_filepath} because {retile_record_filename} already exists'
            )
    # load filenames from retile record
    with open(retile_record_filename, 'r') as f:
        retile_record = json.load(f)
    
    tiles += retile_record['redistributed_to']
    return tiles

def extract_features(retiled_laz_filepaths : List[str], input_folder : str, output_folder : str):
    feature_files = []
    for i, tile in enumerate(retiled_laz_filepaths):
        grid_feature = {
            'min_x': float(param_min_x),
            'max_x': float(param_max_x),
            'min_y': float(param_min_y),
            'max_y': float(param_max_y),
            'n_tiles_side': int(param_n_tiles_side),
            }
    
        feature_extraction_input = {
            'setup_local_fs': {
                'input_folder': input_folder,
                'output_folder': output_folder,
                },
            'load': {'attributes': [param_attribute]},
            'normalize': 1,
            'apply_filter': {
                'filter_type': param_filter_type,
                'attribute': param_attribute,
                'value': [int(param_apply_filter_value)],
                #ground surface (2), water (9), buildings (6), artificial objects (26), vegetation (?), and unclassified (1)
                },
            'generate_targets': {
                'tile_mesh_size': float(param_tile_mesh_size),
                'validate': True,
                'validate_precision': float(param_validate_precision),
                **grid_feature
                },
            'extract_features': {
                'feature_names': [param_feature_name],
                'volume_type': 'cell',
                'volume_size': float(param_tile_mesh_size),
                },
            'export_targets': {
                'attributes': [param_feature_name],
                'multi_band_files': False,
                },
            }
        idx = (tile.split('_')[1:])
    
        target_file = os.path.join(
            output_folder, param_feature_name, tile + '.ply'
            )
        print(target_file)
    
        if not os.path.isfile(target_file):
            processing = DataProcessing(tile, tile_index=idx, label=tile).config(
                feature_extraction_input
                )
            processing.run()
        else:
            print(
                f'Skipping features extraction for {tile} because {target_file} already exists'
                )
    
        feature_files.append(target_file)
    
    return feature_files
 
def get_processed_file_log() -> set[str]:
    response = None
    try:
        response = minio_client.get_object(bucket_name=param_minio_virtual_lab_bucket, object_name=f"{param_feature_name}/{param_processed_files_record_file}")
        content = response.data.decode("utf-8")
        return {line.strip() for line in content.splitlines() if line.strip()}
    
    except S3Error as e:
        if e.code == "NoSuchKey":
            return set()
        raise e
        
    finally:
        if response:
            response.close()
            response.release_conn()

def add_to_processed_log(url : str):
    processed_files : set = get_processed_file_log()
    processed_files.add(url)
    set_as_string = "\n".join(processed_files)
    set_as_bytes = set_as_string.encode("utf-8") 
    minio_client.put_object(
        bucket_name=param_minio_virtual_lab_bucket, 
        object_name=f"{param_feature_name}/{param_processed_files_record_file}", 
        data=io.BytesIO(set_as_bytes), length=len(set_as_bytes))

def copy_to_minio(filepath : str, input_folder : str):
    filename = filepath.replace(input_folder,'')
    minio_client.fput_object(bucket_name=param_minio_virtual_lab_bucket, file_path=filepath, object_name=filename)

minio_client = Minio(
    param_minio_endpoint, 
    access_key=secret_minio_access_key,
    secret_key=secret_minio_secret_key,
    secure=True
)
    
for batch in las_data_filename_batches:
    process(batch)