In [None]:
import fnmatch
import json
import getpass
import os
import pathlib
import datetime
                    
from dask.distributed import LocalCluster, SSHCluster 
from laserfarm import Retiler, DataProcessing, GeotiffWriter, MacroPipeline
from laserfarm.remote_utils import get_wdclient, get_info_remote, list_remote

def last_modified(opts, remote_path):
    info = get_info_remote(get_wdclient(opts), remote_path.as_posix())
    format_ = '%a, %d %b %Y %H:%M:%S GMT'
    return datetime.datetime.strptime(info['modified'], format_)

# Macro-Pipeline Workflow

## Set Run-Specific Input

Fill in the username/password for the SURF dCache. LAZ files updated since the last workflow run will be re-run through the full pipeline.

In [None]:
webdav_login = input('WebDAV username: ')
webdav_password = getpass.getpass('WebDAV password: ')
last_run = datetime.datetime.strptime(input('Date last run (YYYY-MM-DD): '), '%Y-%m-%d')

## Check Connection to Remote Storage

In [None]:
remote_path_root = pathlib.Path('/pnfs/grid.sara.nl/data/projects.nl/eecolidar/01_Escience/')
wd_opts = {
    'webdav_hostname': 'https://webdav.grid.surfsara.nl:2880',
    'webdav_login': webdav_login,
    'webdav_password': webdav_password
}
assert get_wdclient(wd_opts).check(remote_path_root.as_posix())

## Setup Cluster

Setup Dask cluster used for all the macro-pipeline calculations.

In [None]:
local_tmp = pathlib.Path('/tmp')
cluster = LocalCluster(processes=True, 
                       n_workers=2, 
                       threads_per_worker=1, 
                       local_directory=local_tmp/'dask-worker-space')
# nprocs_per_node = 2
# cluster = SSHCluster(hosts=['172.17.0.2', 
#                             '172.17.0.2', 
#                             '172.17.0.3'], 
#                      connect_options={'known_hosts': None, 
#                                       'username': 'ubuntu', 
#                                       'client_keys': '/home/ubuntu/.ssh/id_rsa'},
#                      worker_options={'nthreads': 1, 
#                                      'nprocs': nprocs_per_node,
#                                      'local_directory': local_tmp/'dask-worker-space'}, 
#                      scheduler_options={'dashboard_address': '8787'})
cluster

## Retiling

The raw point-cloud files are downloaded and retiled to a regular grid.

In [None]:
# dCache path to raw LAZ files 
remote_path_ahn = remote_path_root / 'test_pipeline/test_full/raw'

# dCache path where to copy retiled PLY files
remote_path_retiled = remote_path_ahn.parent / 'retiled'

# details of the retiling schema
grid = {
    'min_x': -113107.81,
    'max_x': 398892.19,
    'min_y': 214783.87,
    'max_y': 726783.87,
    'n_tiles_side': 512
}

# determine which LAZ files have been updated since the last run 
laz_files = [f for f in list_remote(get_wdclient(wd_opts), remote_path_ahn.as_posix())
             if f.lower().endswith('.laz') and last_modified(wd_opts, remote_path_ahn/f) > last_run]
print('Retrieve and retile: {} LAZ files'.format(len(laz_files)))

In [None]:
# setup input dictionary to configure the retiling pipeline
retiling_input = {
    'setup_local_fs': {'tmp_folder': local_tmp.as_posix()},
    'pullremote': remote_path_ahn.as_posix(),
    'set_grid': grid,
    'split_and_redistribute': {},
    'validate': {},
    'pushremote': remote_path_retiled.as_posix(),
    'cleanlocalfs': {}
}

# write input dictionary to JSON file
with open('retiling.json', 'w') as f:
    json.dump(retiling_input, f)

In [None]:
macro = MacroPipeline()

# add pipeline list to macro-pipeline object and set the corresponding labels
macro.tasks = [Retiler(file).config(retiling_input).setup_webdav_client(wd_opts) for file in laz_files]
macro.set_labels([os.path.splitext(file)[0] for file in laz_files])

macro.setup_cluster(cluster=cluster)

# run! 
macro.run()

# save outcome results and check that no error occurred before continuing
macro.print_outcome(to_file='retiling.out')
assert not macro.get_failed_pipelines()

## Feature Extraction

Features computed for the retiled point-cloud data are assigned to a regular 'target' grid. 

In [None]:
# target mesh size & list of features
tile_mesh_size = 10.
features = ['perc_95_normalized_height', 'pulse_penetration_ratio', 'entropy_normalized_height', 'point_density']

# dCache path where to copy the feature-enriched target data
remote_path_targets = remote_path_ahn.parent / 'targets'

# determine which tiles have been updated since last run, and extract tile index numbers
tiles = [t.strip('/') for t in list_remote(get_wdclient(wd_opts), remote_path_retiled.as_posix())
         if fnmatch.fnmatch(t, 'tile_*_*/') and last_modified(wd_opts, remote_path_retiled/t) > last_run]
tile_indices = [[int(el) for el in tile.split('_')[1:]] for tile in tiles]
print('Retrieve and process: {} tiles'.format(len(tile_indices)))

In [None]:
# setup input dictionary to configure the feature extraction pipeline
feature_extraction_input = {
    'setup_local_fs': {'tmp_folder': local_tmp.as_posix()},
    'pullremote': remote_path_retiled.as_posix(),
    'load': {'attributes': ['raw_classification']},
    'normalize': 1,
    'apply_filter': {
        'filter_type': 'select_equal', 
        'attribute': 'raw_classification',
        'value': [1, 6]#ground surface (2), water (9), buildings (6), artificial objects (26), vegetation (?), and unclassified (1)
    },
    'generate_targets': {
        'tile_mesh_size' : tile_mesh_size,
        'validate' : True,
        **grid
    },
    'extract_features': {
        'feature_names': features,
        'volume_type': 'cell',
        'volume_size': tile_mesh_size
    },
    'export_targets': {
        'attributes': features,
        'multi_band_files': False
    },
    'pushremote': remote_path_targets.as_posix(),
#     'cleanlocalfs': {}
}

# write input dictionary to JSON file
with open('feature_extraction.json', 'w') as f:
    json.dump(feature_extraction_input, f)

In [None]:
macro = MacroPipeline()

# add pipeline list to macro-pipeline object and set the corresponding labels
macro.tasks = [DataProcessing(t, tile_index=idx).config(feature_extraction_input).setup_webdav_client(wd_opts) 
               for t, idx in zip(tiles, tile_indices)]
macro.set_labels(tiles)

macro.setup_cluster(cluster=cluster)

# run!
macro.run()

# save outcome results and check that no error occurred before continuing
macro.print_outcome(to_file='feature_extraction.out')
assert not macro.get_failed_pipelines()

## GeoTIFF Export

Export the rasterized features from the target grid to GeoTIFF files.

In [None]:
# dCache path where to copy the GeoTIFF files
remote_path_geotiffs = remote_path_ahn.parent / 'geotiffs'

In [None]:
# setup input dictionary to configure the GeoTIFF export pipeline
geotiff_export_input = {
    'setup_local_fs': {'tmp_folder': local_tmp.as_posix()},
    'pullremote': remote_path_targets.as_posix(),
    'parse_point_cloud': {},
    'data_split': {'xSub': 1, 'ySub': 1},
    'create_subregion_geotiffs': {'output_handle': 'geotiff'},
    'pushremote': remote_path_geotiffs.as_posix(),
    'cleanlocalfs': {}   
}

# write input dictionary to JSON file
with open('geotiff_export.json', 'w') as f:
    json.dump(geotiff_export_input, f)

In [None]:
macro = MacroPipeline()

# add pipeline list to macro-pipeline object and set the corresponding labels
macro.tasks = [GeotiffWriter(input_dir=feature, bands=feature).config(geotiff_export_input).setup_webdav_client(wd_opts) 
               for feature in features]
macro.set_labels(features)

macro.setup_cluster(cluster=cluster)

# run!
macro.run()

# save outcome results and check that no error occurred before continuing
macro.print_outcome(to_file='geotiff_export.out')
assert not macro.get_failed_pipelines()

## Terminate cluster

In [None]:
cluster.close()