In [1]:
import copy
import fnmatch
import json
import getpass
import os
import pathlib
import datetime

from os import listdir
from os.path import isfile, join
from distutils import dir_util             
from dask.distributed import LocalCluster, SSHCluster, Client, as_completed  
from laserfarm import Retiler, DataProcessing, GeotiffWriter, MacroPipeline
from laserfarm.remote_utils import get_wdclient, get_info_remote, list_remote

def last_modified(opts, remote_path):
    info = get_info_remote(get_wdclient(opts), remote_path.as_posix())
    format_ = '%a, %d %b %Y %H:%M:%S GMT'
    return datetime.datetime.strptime(info['modified'], format_)

# Macro-Pipeline AHN4 Workflow - Retiling

## Set Run-Specific Input

Fill in the username/password for the SURF dCache. Choose whether you want to i) run all input files, ii) run the only input files listed in `filename`, or iii) run the input that was updated since the last workflow run.

In [2]:
#remote_path_root = pathlib.Path('/data/local/home/eecolidar_webdav/02_UvA')
remote_path_root = pathlib.Path('/project/lidarac/Software/Yifang/JupyterDaskOnSLURM/AHN4_test')
# dCache path to raw LAZ files 
remote_path_input = remote_path_root / 'rawlas'

# dCache path where to copy retiled LAZ files
remote_path_output = remote_path_input.parent / 'Retiled'

run = 'all' # 'all', 'updated', 'from_file'
#filename = 'retile_failed.json'  # if run is 'from_file', set name of file with input file names
assert run in ['all', 'updated', 'from_file']

## Check Connection to Remote Storage

In [3]:
laz_files = [f for f in listdir(remote_path_input) if isfile(join(remote_path_input, f))
             if f.lower().endswith('.laz')]

In [4]:
laz_files

['C_44BZ2_clip.laz']

## Setup Cluster

Setup Dask cluster used for the macro-pipeline calculation.

In [None]:
#local_tmp = pathlib.Path('/pnfs/grid.sara.nl/data/projects.nl/eecolidar/02_UvA/YShi/temp')
local_tmp = pathlib.Path('/data/local/tmp')
nprocs_per_node = 1 

# start the cluster
scheduler_node = 'node1'
hosts = [f'node{i}' for i in range(1, 11)]
# hosts = [host for host in hosts if host not in ['node7', 'node9', 'node10']]
#hosts = ['node1']
cluster = SSHCluster(hosts=[scheduler_node] + hosts, 
                     connect_options={'known_hosts': None, 
                                      'username': 'ubuntu', 
                                      'client_keys': '/home/ubuntu/.ssh/id_rsa'},
                     worker_options={'nthreads': 1, 
                                     'nprocs': nprocs_per_node,
                                     'memory_limit': 'auto',
                                     'local_directory': local_tmp/'dask-worker-space'}, 
                     scheduler_options={'dashboard_address': '8787'})
cluster

In [5]:
from dask.distributed import Client

client = Client("tcp://10.0.2.186:41037")
client

0,1
Connection method: Direct,
Dashboard: /proxy/8787/status,

0,1
Comm: tcp://10.0.2.186:41037,Workers: 4
Dashboard: /proxy/8787/status,Total threads: 16
Started: 2 minutes ago,Total memory: 120.00 GiB

0,1
Comm: tcp://10.0.1.193:46397,Total threads: 4
Dashboard: /proxy/8787/status,Memory: 30.00 GiB
Nanny: tcp://10.0.1.193:35897,
Local directory: /tmp/dask-worker-space/worker-2pkr5bdn,Local directory: /tmp/dask-worker-space/worker-2pkr5bdn
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 96.72 MiB,Spilled bytes: 0 B
Read bytes: 0.92 kiB,Write bytes: 2.10 kiB

0,1
Comm: tcp://10.0.1.193:41063,Total threads: 4
Dashboard: /proxy/8787/status,Memory: 30.00 GiB
Nanny: tcp://10.0.1.193:35255,
Local directory: /tmp/dask-worker-space/worker-sgizers3,Local directory: /tmp/dask-worker-space/worker-sgizers3
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 96.12 MiB,Spilled bytes: 0 B
Read bytes: 0.92 kiB,Write bytes: 2.91 kiB

0,1
Comm: tcp://10.0.2.186:39667,Total threads: 4
Dashboard: /proxy/8787/status,Memory: 30.00 GiB
Nanny: tcp://10.0.2.186:44713,
Local directory: /tmp/dask-worker-space/worker-m3frnl3y,Local directory: /tmp/dask-worker-space/worker-m3frnl3y
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 85.73 MiB,Spilled bytes: 0 B
Read bytes: 32.33 kiB,Write bytes: 354.81 kiB

0,1
Comm: tcp://10.0.1.193:39809,Total threads: 4
Dashboard: /proxy/8787/status,Memory: 30.00 GiB
Nanny: tcp://10.0.1.193:34089,
Local directory: /tmp/dask-worker-space/worker-oqn4h_49,Local directory: /tmp/dask-worker-space/worker-oqn4h_49
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 4.0%,Last seen: Just now
Memory usage: 96.61 MiB,Spilled bytes: 0 B
Read bytes: 0.92 kiB,Write bytes: 2.90 kiB


## Retiling

The raw point-cloud files are downloaded and retiled to a regular grid.

In [6]:
# details of the retiling schema
grid = {
    'min_x': -113107.81,
    'max_x': 398892.19,
    'min_y': 214783.87,
    'max_y': 726783.87,
    'n_tiles_side': 512
}

In [7]:
# set path where output will be written 
#local_tmp = pathlib.Path('/project/lidarac/Data/temp')

retiling_input = {
    'setup_local_fs': {
        'input_folder': remote_path_input.as_posix(),
        'output_folder': remote_path_output.as_posix()
    },
    'set_grid': grid,
    'split_and_redistribute': {},
    'validate': {}
}


In [8]:
macro = MacroPipeline()

# add pipeline list to macro-pipeline object and set the corresponding labels
macro.tasks = [Retiler(file).config(retiling_input) for file in laz_files]
macro.set_labels([os.path.splitext(file)[0] for file in laz_files])

macro.setup_cluster(cluster="tcp://10.0.2.186:41037")

# run! 
macro.run()

# save outcome results and check that no error occurred before continuing
macro.print_outcome(to_file='retile.out')

failed = macro.get_failed_pipelines()
if failed:
    with open('retile_failed.json', 'w') as f:
        json.dump([pip.label + '.laz' for pip in failed], f)
    raise RuntimeError('Some of the pipelines have failed')

## Terminate cluster

In [None]:
macro.shutdown()

## Troubleshooting 

### Cancel all jobs and restart the notebook

Copy and paste these lines in a separate Python shell. If the Dask dashboard shows that some tasks are still queued to be processed, run the lines again - this should clear the scheduler up and give back control to the current notebook. Normally proceed to terminate the cluster and restart the notebook.

In [None]:
from dask.distributed import Client, Future
client = Client('tcp://145.100.59.123:8786')
futures = [Future(key) for key in client.who_has().keys()]
client.cancel(futures)

In [None]:
client

In [None]:
client.shutdown()
