In [1]:
import copy
import fnmatch
import json
import getpass
import os
import pathlib
import datetime
                    
from dask.distributed import Client, SSHCluster
from laserchicken.io.load import load
from laserfarm import Retiler, DataProcessing, GeotiffWriter, MacroPipeline
from laserfarm.remote_utils import get_wdclient, get_info_remote, list_remote

# Macro-Pipeline AHN2 Workflow - Feature Extraction (All Points)

## Set Run-Specific Input

Choose whether you want to run all input files or run the only input files listed in `filename`.

In [77]:
path_root = pathlib.Path('/project/lidarac/Data')

# path to normalized files 
path_input = path_root / 'AHN3_retiled'

# path to targets
path_output = path_input.parent / 'AHN3_targets_pulse'

run = 'from_file'  # 'all', 'from_file'
#filename = 'falied.json'
filename = 'ESSD_feature_extraction_all_failed.json'  # if run is 'from_file', set name of file with input file names
assert run in ['all', 'from_file']

In [None]:
tiles = [el for el in path_input.iterdir() if el.match('tile_*_*.laz')]
print('Found: {} tiles'.format(len(tiles)))
if run == 'from_file':
    with open(filename, 'r') as f:
        tiles_read = json.load(f)
    tiles_read = [path_input/f for f in tiles_read]
    # check whether all files are available on dCache
    assert all([f in tiles for f in tiles_read]), f'Some of the tiles in {filename} are not in input dir'
    tiles = tiles_read
print('Retrieve and extract features for: {} tiles'.format(len(tiles)))

In [100]:
tiles = [el for el in path_input.iterdir() if el.match('tile_*_*/')]
print('Found: {} tiles'.format(len(tiles)))
if run == 'from_file':
    with open(filename, 'r') as f:
        tiles_read = json.load(f)
    tiles_read = [path_input/f for f in tiles_read]
    # check whether all files are available on dCache
    assert all([f in tiles for f in tiles_read]), f'Some of the tiles in {filename} are not in input dir'
    tiles = tiles_read
print('Normalize: {} tiles'.format(len(tiles)))

Found: 37457 tiles
Normalize: 1 tiles


## Setup Cluster

Setup Dask cluster used for all the macro-pipeline calculations.

In [4]:
from dask.distributed import Client

client = Client("tcp://10.0.3.10:42385")
client

0,1
Connection method: Direct,
Dashboard: /proxy/8787/status,

0,1
Comm: tcp://10.0.3.10:42385,Workers: 13
Dashboard: /proxy/8787/status,Total threads: 78
Started: 1 minute ago,Total memory: 624.00 GiB

0,1
Comm: tcp://10.0.2.119:39253,Total threads: 6
Dashboard: /proxy/8787/status,Memory: 48.00 GiB
Nanny: tcp://10.0.2.119:33739,
Local directory: /tmp/dask-scratch-space/worker-qofhbkat,Local directory: /tmp/dask-scratch-space/worker-qofhbkat
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 148.91 MiB,Spilled bytes: 0 B
Read bytes: 1.34 kiB,Write bytes: 4.04 kiB

0,1
Comm: tcp://10.0.0.41:37017,Total threads: 6
Dashboard: /proxy/8787/status,Memory: 48.00 GiB
Nanny: tcp://10.0.0.41:40961,
Local directory: /tmp/dask-scratch-space/worker-3x4hp41_,Local directory: /tmp/dask-scratch-space/worker-3x4hp41_
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 147.57 MiB,Spilled bytes: 0 B
Read bytes: 1.18 kiB,Write bytes: 1.31 kiB

0,1
Comm: tcp://10.0.0.41:40011,Total threads: 6
Dashboard: /proxy/8787/status,Memory: 48.00 GiB
Nanny: tcp://10.0.0.41:38529,
Local directory: /tmp/dask-scratch-space/worker-yooet2vu,Local directory: /tmp/dask-scratch-space/worker-yooet2vu
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 148.59 MiB,Spilled bytes: 0 B
Read bytes: 1.74 kiB,Write bytes: 6.20 kiB

0,1
Comm: tcp://10.0.3.231:37311,Total threads: 6
Dashboard: /proxy/8787/status,Memory: 48.00 GiB
Nanny: tcp://10.0.3.231:41129,
Local directory: /tmp/dask-scratch-space/worker-x2vfyj7g,Local directory: /tmp/dask-scratch-space/worker-x2vfyj7g
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 150.53 MiB,Spilled bytes: 0 B
Read bytes: 234.17 MiB,Write bytes: 186.35 kiB

0,1
Comm: tcp://10.0.1.35:33975,Total threads: 6
Dashboard: /proxy/8787/status,Memory: 48.00 GiB
Nanny: tcp://10.0.1.35:40389,
Local directory: /tmp/dask-scratch-space/worker-fnelsq2e,Local directory: /tmp/dask-scratch-space/worker-fnelsq2e
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 151.35 MiB,Spilled bytes: 0 B
Read bytes: 270.48690617969515 B,Write bytes: 132.2380430211843 B

0,1
Comm: tcp://10.0.3.231:45081,Total threads: 6
Dashboard: /proxy/8787/status,Memory: 48.00 GiB
Nanny: tcp://10.0.3.231:32847,
Local directory: /tmp/dask-scratch-space/worker-2x3n5lsx,Local directory: /tmp/dask-scratch-space/worker-2x3n5lsx
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 4.0%,Last seen: Just now
Memory usage: 151.35 MiB,Spilled bytes: 0 B
Read bytes: 240.64 MiB,Write bytes: 190.64 kiB

0,1
Comm: tcp://10.0.3.10:40011,Total threads: 6
Dashboard: /proxy/8787/status,Memory: 48.00 GiB
Nanny: tcp://10.0.3.10:45221,
Local directory: /tmp/dask-scratch-space/worker-geg_8vpo,Local directory: /tmp/dask-scratch-space/worker-geg_8vpo
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 149.56 MiB,Spilled bytes: 0 B
Read bytes: 12.21 kiB,Write bytes: 8.01 kiB

0,1
Comm: tcp://10.0.3.231:38163,Total threads: 6
Dashboard: /proxy/8787/status,Memory: 48.00 GiB
Nanny: tcp://10.0.3.231:40485,
Local directory: /tmp/dask-scratch-space/worker-zg1depwz,Local directory: /tmp/dask-scratch-space/worker-zg1depwz
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 148.14 MiB,Spilled bytes: 0 B
Read bytes: 256.80 MiB,Write bytes: 199.66 kiB

0,1
Comm: tcp://10.0.0.213:41051,Total threads: 6
Dashboard: /proxy/8787/status,Memory: 48.00 GiB
Nanny: tcp://10.0.0.213:40497,
Local directory: /tmp/dask-scratch-space/worker-s56aguob,Local directory: /tmp/dask-scratch-space/worker-s56aguob
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 146.37 MiB,Spilled bytes: 0 B
Read bytes: 12.49 kiB,Write bytes: 7.96 MiB

0,1
Comm: tcp://10.0.3.11:41161,Total threads: 6
Dashboard: /proxy/8787/status,Memory: 48.00 GiB
Nanny: tcp://10.0.3.11:44633,
Local directory: /tmp/dask-scratch-space/worker-hh5rg2lh,Local directory: /tmp/dask-scratch-space/worker-hh5rg2lh
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 148.74 MiB,Spilled bytes: 0 B
Read bytes: 1.31 kiB,Write bytes: 1.31 kiB

0,1
Comm: tcp://10.0.2.119:45543,Total threads: 6
Dashboard: /proxy/8787/status,Memory: 48.00 GiB
Nanny: tcp://10.0.2.119:42599,
Local directory: /tmp/dask-scratch-space/worker-eks52jes,Local directory: /tmp/dask-scratch-space/worker-eks52jes
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 150.53 MiB,Spilled bytes: 0 B
Read bytes: 1.60 kiB,Write bytes: 5.49 kiB

0,1
Comm: tcp://10.0.0.45:40201,Total threads: 6
Dashboard: /proxy/8787/status,Memory: 48.00 GiB
Nanny: tcp://10.0.0.45:32869,
Local directory: /tmp/dask-scratch-space/worker-8evzy0ra,Local directory: /tmp/dask-scratch-space/worker-8evzy0ra
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 4.0%,Last seen: Just now
Memory usage: 146.52 MiB,Spilled bytes: 0 B
Read bytes: 710.8379457920629 B,Write bytes: 2.28 kiB

0,1
Comm: tcp://10.0.3.231:35197,Total threads: 6
Dashboard: /proxy/8787/status,Memory: 48.00 GiB
Nanny: tcp://10.0.3.231:45217,
Local directory: /tmp/dask-scratch-space/worker-og2x_g2d,Local directory: /tmp/dask-scratch-space/worker-og2x_g2d
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 148.54 MiB,Spilled bytes: 0 B
Read bytes: 258.09 MiB,Write bytes: 199.21 kiB


In [None]:
local_tmp = pathlib.Path('/data/local/tmp')

nprocs_per_node = 2  

# start the cluster
scheduler_node = 'node1'
hosts = [f'node{i}' for i in range(1, 11)]
cluster = SSHCluster(hosts=[scheduler_node] + hosts, 
                     connect_options={'known_hosts': None, 
                                      'username': 'ubuntu', 
                                      'client_keys': '/home/ubuntu/.ssh/id_rsa'},
                     worker_options={'nthreads': 1, 
                                     'nprocs': nprocs_per_node,
                                     'memory_limit': 0,
                                     'local_directory': local_tmp/'dask-worker-space'}, 
                     scheduler_options={'dashboard_address': '8787'})
cluster

## Feature Extraction

We extract features for all points available.

In [10]:
# grid for Spain
grid = {
    'min_x': -78665,
    'max_x': 1141335,
    'min_y': 3789191.74,
    'max_y': 5009191.74,
    'n_tiles_side': 1220
}

In [4]:
# details of the retiling schema
grid = {
    'min_x': -113107.81,
    'max_x': 398892.19,
    'min_y': 214783.87,
    'max_y': 726783.87,
    'n_tiles_side': 512
}

# target mesh size & list of features
tile_mesh_size = 10
features = ['pulse_penetration_ratio', 'point_density']

# setup input dictionary to configure the feature extraction pipeline
feature_extraction_input_all = {
    'setup_local_fs': {'input_folder': path_input.as_posix(),
                       'output_folder': path_output.as_posix()},
    'load': {'attributes': ['raw_classification']},
    'generate_targets': {
        'tile_mesh_size' : tile_mesh_size,
        'validate' : True,
        'validate_precision': 0.001,  # solves numerical issues for 6 tiles which have points on the edge
        **grid
    },
    'extract_features': {
        'feature_names': features,
        'volume_type': 'cell',
        'volume_size': tile_mesh_size
    },
    'export_targets': {
        'attributes': features,
        'multi_band_files': False,
        'overwrite': True
    },
    'clear_cache' : {},
}

# write input dictionary to JSON file
with open('feature_extraction_all.json', 'w') as f:
    json.dump(feature_extraction_input_all, f)

In [101]:
# To calculate pulse density (return number = 1)

# details of the retiling schema
grid = {
    'min_x': -113107.81,
    'max_x': 398892.19,
    'min_y': 214783.87,
    'max_y': 726783.87,
    'n_tiles_side': 512
}

# target mesh size & list of features
tile_mesh_size = 10
features = ['point_density']

# setup input dictionary to configure the feature extraction pipeline
feature_extraction_input_all = {
    'setup_local_fs': {'input_folder': path_input.as_posix(),
                       'output_folder': path_output.as_posix()},
    'load': {'attributes': ['return_number', 'number_of_returns']},
    'apply_filter': {
        'filter_type': 'select_equal',
        'attribute': 'return_number',
        #unclassified (1), ground (2), buildings (6), water (9), wire conductor (14), artificial objects (26), never classified (0)
        'value': 1
    },
    'generate_targets': {
        'tile_mesh_size' : tile_mesh_size,
        'validate' : True,
        'validate_precision': 0.001,  # solves numerical issues for 6 tiles which have points on the edge
        **grid
    },
    'extract_features': {
        'feature_names': features,
        'volume_type': 'cell',
        'volume_size': tile_mesh_size
    },
    'export_targets': {
        'attributes': features,
        'multi_band_files': False,
        'overwrite': True
    },
    'clear_cache' : {},
}

# write input dictionary to JSON file
with open('feature_extraction_all.json', 'w') as f:
    json.dump(feature_extraction_input_all, f)

In [102]:
macro = MacroPipeline()

# extract the tile indices from the tile names
tile_indices = [[int(el) for el in tile.name.split('.')[0].split('_')[1:]] for tile in tiles]

# add pipeline list to macro-pipeline object and set the corresponding labels
macro.tasks = [DataProcessing(t.name, tile_index=idx).config(feature_extraction_input_all) 
               for t, idx in zip(tiles, tile_indices)]
macro.set_labels([os.path.splitext(tile.name)[0] for tile in tiles])

macro.setup_cluster(cluster="tcp://10.0.3.10:42385")

# run!
macro.run()

# save outcome results and write name of failed pipelines to file
macro.print_outcome(to_file='feature_extraction_all.out')
failed = macro.get_failed_pipelines()
if failed:
    with open('ESSD_feature_extraction_all_failed.json', 'w') as f:
        json.dump(['.'.join([pip.label, 'laz']) for pip in failed], f)
    raise RuntimeError('Some of the pipelines have failed')

2025-04-21 21:42:20,132 - distributed.client - ERROR - 
ConnectionRefusedError: [Errno 111] Connection refused

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/lidarac-yshi/miniconda3/envs/jupyter_dask/lib/python3.11/site-packages/distributed/comm/core.py", line 355, in connect
    comm = await wait_for(
           ^^^^^^^^^^^^^^^
  File "/home/lidarac-yshi/miniconda3/envs/jupyter_dask/lib/python3.11/site-packages/distributed/utils.py", line 1922, in wait_for
    return await fut
           ^^^^^^^^^
  File "/home/lidarac-yshi/miniconda3/envs/jupyter_dask/lib/python3.11/site-packages/distributed/comm/tcp.py", line 504, in connect
    convert_stream_closed_error(self, e)
  File "/home/lidarac-yshi/miniconda3/envs/jupyter_dask/lib/python3.11/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
distributed.comm.

In [None]:
macro.print_outcome(to_file='feature_extraction_all.out')
failed = macro.get_failed_pipelines()
if failed:
    with open('ESSD_feature_extraction_all_failed.json', 'w') as f:
        json.dump(['.'.join([pip.label, 'laz']) for pip in failed], f)
    raise RuntimeError('Some of the pipelines have failed')

In [None]:
failed = macro.get_failed_pipelines()
if failed:
    with open('ESSD_feature_extraction_all_failed.json', 'w') as f:
        json.dump([pip.label for pip in failed], f)
    raise RuntimeError('Some of the pipelines have failed')

## Terminate cluster

In [None]:
macro.shutdown()

## Troubleshooting 

### Cancel all jobs and restart the notebook

Copy and paste these lines in a separate Python shell. If the Dask dashboard shows that some tasks are still queued to be processed, run the lines again - this should clear the scheduler up and give back control to the current notebook. Normally proceed to terminate the cluster and restart the notebook.

In [None]:
from dask.distributed import Client, Future
client = Client('tcp://145.100.59.123:8786')
futures = [Future(key) for key in client.who_has().keys()]
client.cancel(futures)