In [1]:
import copy
import fnmatch
import json
import getpass
import os
import pathlib
import datetime
                    
from dask.distributed import Client, SSHCluster
from laserfarm import Retiler, DataProcessing, GeotiffWriter, MacroPipeline
from laserfarm.remote_utils import get_wdclient, get_info_remote, list_remote

# Macro-Pipeline AHN2 Workflow - GeoTIFF Export (All Points)

## Set Run-Specific Input

Choose whether you want to run all input files or run the only input files listed in `filename`.

In [3]:
path_root = pathlib.Path('/data/local/home/eecolidar_webdav/01_Escience/')

# path to normalized files 
path_input = path_root / 'ALS/Netherlands/ahn2/targets_all'

# path to targets
path_output = path_input.parent / 'geotiff_all'

run = 'from_file'  # 'all', 'from_file'
filename = 'geotiff_export_all_failed.json'  # if run is 'from_file', set name of file with input file names
assert run in ['all', 'from_file']

In [4]:
features = [el for el in path_input.iterdir() if not el.match('tile_*_*.log')]
print('Found: {} features'.format(len(features)))
if run == 'from_file':
    with open(filename, 'r') as f:
        features_read = json.load(f)
    features_read = [path_input/f for f in features_read]
    # check whether all files are available on dCache
    assert all([f in features for f in features_read]), f'Some of the features in {filename} are not in input dir'
    features = features_read
print('Extract geotiffs for: {} features'.format(len(features)))

Found: 2 features
Extract geotiffs for: 1 features


## Setup Cluster

Setup Dask cluster used for all the macro-pipeline calculations.

In [5]:
local_tmp = pathlib.Path('/data/local/tmp')

nprocs_per_node = 1  

# start the cluster
scheduler_node = 'node1'
hosts = [f'node{i}' for i in range(1, 11)]
cluster = SSHCluster(hosts=[scheduler_node] + hosts, 
                     connect_options={'known_hosts': None, 
                                      'username': 'ubuntu', 
                                      'client_keys': '/home/ubuntu/.ssh/id_rsa'},
                     worker_options={'nthreads': 1, 
                                     'nprocs': nprocs_per_node,
                                     'memory_limit': 0,
                                     'local_directory': local_tmp/'dask-worker-space'}, 
                     scheduler_options={'dashboard_address': '8787'})
cluster

distributed.deploy.ssh - INFO - distributed.scheduler - INFO - -----------------------------------------------
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - -----------------------------------------------
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - Clear task state
distributed.deploy.ssh - INFO - distributed.scheduler - INFO -   Scheduler at: tcp://145.100.59.123:8786
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://145.100.59.48:40379'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://145.100.59.27:45575'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://145.100.59.197:41683'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://145.100.59.182:38653'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://145.100.59.123:38193'
distributed.deploy.ssh - INFO - dis

## GeoTIFF Export

Export the rasterized features from the target grid to GeoTIFF files.

In [6]:
# output handle: AHN2 dataset, features, target grid spacing 10m, normalization grid spacing 1m, all points
output_handle = 'ahn2_feat_10m_1m_all'

# setup input dictionary to configure the geotiff export pipeline
geotiff_export_input_all = {
    'parse_point_cloud': {},
    'data_split': {'xSub': 1, 'ySub': 1},
    'create_subregion_geotiffs': {'output_handle': output_handle},
}

# write input dictionary to JSON file
with open('geotiff_export_input_all.json', 'w') as f:
    json.dump(geotiff_export_input_all, f)

In [None]:
macro = MacroPipeline()

for feature in features:
    gw = GeotiffWriter(bands=feature.name, label=feature.name)
    geotiff_export_input_all_ = copy.deepcopy(geotiff_export_input_all)
    geotiff_export_input_all_['setup_local_fs'] = {
        'input_folder': feature.as_posix(),
        'output_folder': path_output.as_posix()
    }
    gw.config(geotiff_export_input_all_)
    macro.add_task(gw)

macro.setup_cluster(cluster=cluster)

# run!
macro.run()

# save outcome results and write name of failed pipelines to file
macro.print_outcome(to_file='geotiff_export_all.out')
failed = macro.get_failed_pipelines()
if failed:
    with open('geotiff_export_all_failed.json', 'w') as f:
        json.dump([pip.label for pip in failed], f)
    raise RuntimeError('Some of the pipelines have failed')

## Terminate cluster

In [10]:
macro.shutdown()

tornado.application - ERROR - Exception in callback <bound method Client._heartbeat of <Client: 'tcp://145.100.59.123:8786' processes=10 threads=10>>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
    return self.callback()
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1157, in _heartbeat
    self.scheduler_comm.send({"op": "heartbeat-client"})
  File "/usr/local/lib/python3.7/site-packages/distributed/batched.py", line 117, in send
    raise CommClosedError
distributed.comm.core.CommClosedError
tornado.application - ERROR - Exception in callback <bound method Client._heartbeat of <Client: 'tcp://145.100.59.123:8786' processes=10 threads=10>>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
    return self.callback()
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1157, in _heartbeat
   

## Troubleshooting 

### Cancel all jobs and restart the notebook

Copy and paste these lines in a separate Python shell. If the Dask dashboard shows that some tasks are still queued to be processed, run the lines again - this should clear the scheduler up and give back control to the current notebook. Normally proceed to terminate the cluster and restart the notebook.

In [None]:
from dask.distributed import Client, Future
client = Client('tcp://145.100.59.123:8786')
futures = [Future(key) for key in client.who_has().keys()]
client.cancel(futures)