In [None]:
import os
import glob
import dask
from dask.distributed import Client, progress, LocalCluster
import geopandas as gpd
import pandas as pd

from pyFIRS.wrappers import lastools
from pyFIRS.wrappers import fusion
from pyFIRS.utils import validation_summary, move_invalid_tiles, fname, PipelineError, inspect_failures

In [None]:
# data handling directories
WORKDIR = os.path.abspath('F:/willamette-valley_2009/')
TARGET_EPSG = 6339  # utm 10N, NAD83_2011
# TARGET_EPSG = 6340  # utm 11N, NAD83_2011

In [None]:
SRC = os.path.join(WORKDIR, 'src')
src_tiles = glob.glob(os.path.join(SRC, '*.laz'))
# src_tiles = glob.glob(os.path.join(SRC, '*.las'))

# where we're going to put processed source tiles
RAW = os.path.join(WORKDIR, 'raw')

print('Found {:,d} tiles in source directory:\n'
      ' {}'.format(len(src_tiles), SRC))

# Enough already, let's get to work with some lidar data
We'll define where we can find the binary executables for LAStools and FUSION command line tools.

In [None]:
las = lastools.useLAStools('C:/Program Files/LAStools/bin')
fus = fusion.useFUSION('C:/Program Files/FUSION/')

In [None]:
# take a peak at info from a lidar source tile
info_proc = las.lasinfo(i=src_tiles[0],
                        echo=True)

### Setting up parallel computing using `dask.distributed`
`LAStools` offers native multi-core processing as an optional argument (`cores`) supplied to its command-line tools. `FUSION` command line tools do not. To enable parallel processing of `FUSION` commands, we'll use `dask.distributed` to schedule the processing of tiles in asynchronous parallel batches. This approach also offers us the ability to track progress using a progress bar.

You'll first need to launch a parallel computing cluster. 

In [None]:
cluster=LocalCluster()#(scheduler_port=7001, dashboard_address=7002)
c = Client(cluster)

At this point, you should also be able to view an interactive dashboard on port 7002. If you're executing this on a remote server, you'll need to set up port forward so you can view the dashboard on your local machine's browser. Once you've done that, or if you're processing on your own machine, you can view the dashboard at [http://localhost:7002/status](http://localhost:7002/status).

In [None]:
# push our working directories and wrapper classes to the workers on the cluster as well
c.scatter([WORKDIR, SRC, RAW, 
           las, fus, 
           TARGET_EPSG], 
          broadcast=True);

In [None]:
def log_error(tile_id, process, error_msg):
    logfile = os.path.join(RAW, 'failed', tile_id + '.txt')
    os.makedirs(os.path.dirname(logfile), exist_ok=True)

    with open(logfile, '+w') as f:
        f.write('{} | {}: {}'.format(tile_id, process, error_msg))

    return


def has_error(tile_id):
    errors = glob.glob(os.path.join(RAW, 'failed', '*.txt'))
    tiles_with_errors = [fname(error) for error in errors]
    if tile_id in tiles_with_errors:
        return True
    else:
        return False

## Get the raw data into our working directory
First, move the tiles over to our working directory.

When we define functions using the `dask.delayed` decorator, the function will have 'lazy' instead of 'eager' execution. We can map the function to a list of inputs and it will not execute for any of them until we ask for results to be computed. When we use the `compute()` method for the client managing the scheduler that sends jobs to the workers, it then starts running the jobs.

In [None]:
@dask.delayed
def import_tile(tile_id):
    INFILE = os.path.join(SRC, tile_id + '.laz')
#     INFILE = os.path.join(SRC, tile_id + '.las')
    OUTFILE = os.path.join(RAW, tile_id + '.laz')
    

    if os.path.exists(OUTFILE):
        pass
    else:
        try:
            proc_import =  las.las2las(i=INFILE,
                                       drop_withheld=True,
                                       drop_class=(7,18),  # classified as noise
    #                                    epsg=32149,  # specify the source lidar projection, washington state plane south
    #                                    epsg=2927,  # specify the source lidar projection, washington state plane south
                                       longlat=True,  # original data is in geographic coordinates
    #                                    elevation_surveyfeet=True,
    #                                    survey_feet=True,
    #                                    nad83_2011=True,  # original data in nad83_2011 datum
                                       nad83_harn=True,  # original data in nad83_harn datum
                                       target_epsg=TARGET_EPSG, # reproject
                                       dont_remove_empty_files=True,
                                       cpu64=True,
                                       odir=RAW,
                                       olaz=True)
        except PipelineError as e:
            log_error(tile_id, 'import_tile', e.message)
    return tile_id

Next, validate that the data match LAS specifications and have not been corrupted.

In [None]:
@dask.delayed
def validate(tile_id):
    INFILE = os.path.join(RAW, tile_id + '.laz')
    OUTFILE = os.path.join(RAW, tile_id + '.xml')
    
    if os.path.exists(OUTFILE):
        pass
    else:
        try:
            proc_validate = las.lasvalidate(i=INFILE,
                                            o=OUTFILE)
        except PipelineError as e:
            log_error(tile_id, 'validate', e.message)
            
    return tile_id

Next, create spatial indexes for the input files to allow fast spatial queries (which are used, for example, when retiling and adding buffers).

In [None]:
@dask.delayed
def make_index(tile_id):
    INFILE = os.path.join(RAW, tile_id + '.laz')
    OUTFILE = os.path.join(RAW, tile_id + '.lax')

    if os.path.exists(OUTFILE): 
        pass
    else:
        try:
            proc_index = las.lasindex(i=INFILE,
                                      cpu64=True)
        
        except PipelineError as e:
            log_error(tile_id, 'make_index', e.message)

    return tile_id

In [None]:
@dask.delayed
def make_boundary(tile_id):
    INFILE = os.path.join(RAW, tile_id + '.laz')
    OUTFILE = os.path.join(RAW, tile_id + '.shp')
    
    if os.path.exists(OUTFILE): 
        pass
    else:
        try:
            proc_bnd = las.lasboundary(i=INFILE,
                                       o=OUTFILE,
                                       disjoint=True,
                                       labels=True,
                                       use_lax=True,
                                       cpu64=True)
        
        except PipelineError as e:
            log_error(tile_id, 'make_index', e.message)

    return tile_id

## Hand-build the computational graph
Define the recipe for computations.

In [None]:
tile_ids = [fname(tile) for tile in src_tiles]

get_data = {}
for tile in tile_ids:
    get_data['import-{}'.format(tile)]=(
        import_tile, 
        tile)
    get_data['validate-{}'.format(tile)]=(
        validate, 
        'import-{}'.format(tile))
    get_data['index-{}'.format(tile)]=(
        make_index, 
        'validate-{}'.format(tile))
    get_data['boundary-{}'.format(tile)]=(
        make_boundary, 
        'index-{}'.format(tile))
    
# this empty function will be added to recipe for computations
# it will be defined to depend upon all previous steps being completed
@dask.delayed
def done_importing(*args, **kwargs):
    return

get_data['done_importing']=(
    done_importing, 
    ['boundary-{}'.format(tile) for tile in tile_ids])

In [None]:
get_data_graph = c.get(get_data, 'done_importing')  # build the computational graph

In [None]:
get_data_results = c.persist(get_data_graph)  # start executing it

In [None]:
progress(get_data_results)  # progress bars

In [None]:
# c.cancel(get_data_results)

In [None]:
inspect_failures(os.path.join(RAW, 'failed'))

In [None]:
validation_summary(xml_dir=RAW, verbose=False)

In [None]:
# move_invalid_tiles(xml_dir=RAW, dest_dir=os.path.join(RAW, 'invalid'))

## Merge all the individual tile boundaries into a tile index

In [None]:
boundaries = glob.glob(os.path.join(RAW, '*.shp'))
# set up a lazy read_file function so we can read in files in parallel
gdfs = [dask.delayed(gpd.read_file)(shp) for shp in boundaries]
# and a lazy concatenation
gather_tiles = dask.delayed(pd.concat)(gdfs, axis=0, ignore_index=True)
# now execute the read and concatenate with the cluster
tileindex = gather_tiles.compute()
tileindex.crs = "EPSG:{}".format(TARGET_EPSG)

In [None]:
tileindex.head()

In [None]:
tileindex.to_file(os.path.join(RAW, 'raw_tileindex.shp'), index=False)

In [None]:
# c.close()
# cluster.close()