In [1]:
import os
import shutil
import glob
import subprocess
import pandas as pd
import geopandas as gpd
import dask
from dask.distributed import Client, progress, LocalCluster
from pyFIRS.wrappers import lastools, fusion
from pyFIRS.utils import clean_dir, clip_tile_from_shp, convert_project, PipelineError

Launch a parallel computing cluster. 

In [2]:
cluster=LocalCluster(scheduler_port=7001, diagnostics_port=7002)
c = Client(cluster)
num_cores = len(c.ncores()) # identify how many workers we have

At this point, you should also be able to view an interactive dashboard on port 7002. If you're executing this on a remote server, you'll need to set up port forward so you can view the dashboard on your local machine's browser. Once you've done that, or if you're processing on your own machine, you can view the dashboard at [http://localhost:7002/status](http://localhost:7002/status).

In [3]:
las = lastools.useLAStools('/storage/lidar/LAStools/bin')
fus = fusion.useFUSION('/storage/lidar/FUSION/')

In [4]:
# where the imported lidar data is currently stored
workdir = os.path.abspath('/storage/lidar/odf_northwest_2015/clatsop/')

# define data handling directories
interim = os.path.join(workdir,'interim')
processed = os.path.join(workdir,'processed')
layers = os.path.join(interim, 'layers')

# the coordinate reference system we'll be working with
target_epsg = 26910 # utm 10 N

In [5]:
def log_error(tile_id, process, error_msg):
    logfile = os.path.join(interim, 'failed', tile_id + '.txt')
    os.makedirs(os.path.dirname(logfile), exist_ok=True)
    
    with open(logfile, '+w') as f:
        f.write('{} | {}: {}'.format(tile_id, process, error_msg))
    
    return

def has_error(tile_id):
    errors = glob.glob(os.path.join(interim, 'failed', '*.txt'))
    tiles_with_errors = [os.path.basename(error).split('.')[0] for error in errors]
    if tile_id in tiles_with_errors:
        return True
    else:
        return False

In [6]:
strata_cols_to_grid = {'Elev strata (below 0.15) return proportion':'strat0_return-proportion',
                       'Elev strata (0.15 to 1.37) return proportion':'strat1_return-proportion',
                       'Elev strata (5.00 to 10.00) return proportion':'strat2_return-proportion',
                       'Elev strata (10.00 to 20.00) return proportion':'strat3_return-proportion',
                       'Elev strata (20.00 to 30.00) return proportion':'strat4_return-proportion',
                       'Elev strata (above 30.00) return proportion':'strat5_return-proportion',
                       'Int strata (below 0.15) median':'strat0_intensity-median',
                       'Int strata (0.15 to 1.37) median':'strat1_intensity-median',
                       'Int strata (1.37 to 5.00) median':'strat2_intensity-median',
                       'Int strata (5.00 to 10.00) median':'strat3_intensity-median',
                       'Int strata (10.00 to 20.00) median':'strat4_intensity-median',
                       'Int strata (above 30.00) median':'strat5_intensity-median'
                      }

elevation_cols_to_grid = {'Elev P05':'height_05-percentile',
                          'Elev P25':'height_25-percentile',
                          'Elev P50':'height_50-percentile',
                          'Elev P75':'height_75-percentile',
                          'Elev P95':'height_95_percentile',
                          'Elev maximum':'height_max'
                         }

topo_cols_to_grid = {'Elevation':'elevation',
                     'Slope (degrees)':'slope',
                     'Aspect (degrees, azimuth)':'aspect',
                     'Profile curvature * 100':'profile_curvature',
                     'Plan curvature * 100':'plan_curvature',
                     'Solar Radiation Index':'solar_radiation_index',
                     'Overall Curvature':'overall_curvature'
                    }

In [7]:
# push our working directories and wrapper classes to the workers on the cluster as well
c.scatter([interim, processed, layers, las, fus, 
           target_epsg, num_cores, has_error, log_error,
           strata_cols_to_grid, topo_cols_to_grid, elevation_cols_to_grid], 
          broadcast=True);

## Create a Canopy GridMetrics
Calculate forest attributes using the FUSION `gridmetrics` tool.

In [8]:
@dask.delayed
def make_gridmetrics(tile_id):
    infile = os.path.join(interim, 'classified', tile_id + '.laz')
    groundfile = os.path.join(interim, 'dtm_ground_tiles', tile_id + '.dtm')
    odir = os.path.join(interim, 'gridmetrics')
    outfile = os.path.join(odir, tile_id + '.csv')
    
    # get the latitude of the tile centroid
    gdf = gpd.read_file(os.path.join(interim, 'tile_boundaries', tile_id+'.shp'))
    latlon = gdf.exterior.centroid.to_crs({'init':'EPSG:4326'})
    latitude = latlon.geometry.y.values[0]
    
    if not os.path.exists(outfile):
        if not has_error(tile_id):
            try:
                proc = fus.gridmetrics(groundfile=groundfile,
                                       heightbreak=1.37, # breast height, in meters
                                       cellsize=10, # in units of lidar data
                                       outputfile=outfile,
                                       datafiles=infile,
                                       strata=(0.15, 1.37, 5.0, 10.0, 20.0, 30.0),
                                       intstrata=(0.15, 1.37, 5.0, 10.0, 20.0, 30.0),
                                       las_class=(0,1,2,3,4,5),
                                       topo=(10,latitude),
                                       odir=odir) # will make sure output directory is created if doesn't already exist
                
            except PipelineError as e:
                        log_error(tile_id, 'make_gridmetrics', e.message)
    else: # output file already exists
        pass
                
    return tile_id

In [10]:
def csv2grid(tile_id, csvfile, col_num, col_name):
    outfile = os.path.join(interim, 'gridmetrics', 'rasters', 
                           '{}_{}.asc'.format(tile_id, col_name))
    odir = os.path.dirname(outfile)
    
    if not os.path.exists(outfile):
        if not has_error(tile_id):
            try:
                proc = fus.csv2grid(inputfile=csvfile,
                                    column=col_num,
                                    outputfile=outfile,
                                    odir=odir)
                
            except PipelineError as e:
                        log_error(tile_id, 'csv2grid', e.message)
    else: # output file already exists
        pass
                
    return tile_id, proc

In [11]:
@dask.delayed
def batch_csv2grid(tile_id):
    # read the csv containing strata data, identify the columns to extract
    strata_data = os.path.join(interim, 'gridmetrics', tile_id + '_all_returns_strata_stats.csv')
    with open(strata_data) as f:
        header = f.readline()
        cols = header.split(',')
        strata_columns = [{'col_num': cols.index(col),
                           'col_name': strata_cols_to_grid[col]}
                          for col in cols if col in strata_cols_to_grid.keys()]

    for col in strata_columns:
        strata_proc = csv2grid(tile_id, strata_data, col['col_num'], col['col_name'])

    
    # read the csv containing topo data, identify the columns to extract
    topo_data = os.path.join(interim, 'gridmetrics', tile_id + '_topo_metrics.csv')
    with open(topo_data) as f:
        header = f.readline()
        cols = header.split(',')
        topo_columns = [{'col_num': cols.index(col),
                         'col_name': topo_cols_to_grid[col]}
                        for col in cols if col in topo_cols_to_grid.keys()]

    for col in topo_columns:
        topo_proc = csv2grid(tile_id, topo_data, col['col_num'], col['col_name'])

        
    # read the csv containing elevation data, identify the columns to extract
    elevation_data = os.path.join(interim, 'gridmetrics', tile_id + '_all_returns_elevation_stats.csv')    
    with open(elevation_data) as f:
        header = f.readline()
        cols = header.split(',')
        elevation_columns = [{'col_num': cols.index(col),
                              'col_name': elevation_cols_to_grid[col]}
                             for col in cols if col in elevation_cols_to_grid.keys()]

    for col in elevation_columns:
        elev_proc = csv2grid(tile_id, elevation_data, col['col_num'], col['col_name'])
    
    return tile_id

In [12]:
@dask.delayed
def batch_asc2tif(tile_id):
    infiles = glob.glob(os.path.join(interim, 'gridmetrics', 'rasters', '{}*.asc'.format(tile_id)))
    
    for infile in infiles:
        dirname, basename = os.path.split(infile)
        outfilename = basename.split('.')[0] + '.tif'
        outfile = os.path.join(dirname, outfilename)
    
        if not os.path.exists(outfile):
            if not has_error(tile_id):
                try:
                    convert_project(infile, '.tif', 'EPSG:{}'.format(target_epsg))
                except Exception as e:
                    log_error(tile_id, 'batch_asc2tif', e.message)
        else: # output file already exists
            pass
    
    return tile_id

In [13]:
@dask.delayed
def remove_grid_metrics_buffer(tile_id, *args):
    if type(tile_id) == list:
        tile_id = tile_id[0]
    infiles = glob.glob(os.path.join(interim, 'gridmetrics', 'rasters', '{}*.tif'.format(tile_id)))    
    in_shp = os.path.join(interim, 'tile_boundaries', tile_id + '.shp')
    odir = os.path.join(processed, 'rasters', 'gridmetrics')
    
    for infile in infiles:
        basename = os.path.basename(infile)
        outfile = os.path.join(odir, basename)
    
        if not os.path.exists(outfile):
            if not has_error(tile_id):
                try:
                    clip_tile_from_shp(infile, in_shp, odir)

                except Exception as e:
                    log_error(tile_id, 'remove_grid_metrics_buffer', e.message)
        else: # output file already exists
            pass
    
    return tile_id

In [14]:
@dask.delayed
def tile_done(tile_id, *args, **kwargs):
    if type(tile_id) == list:
        tile_id = tile_id[0]
    
    return tile_id

@dask.delayed
def tiles_done(*args, **kwargs):
    return

In [15]:
tile_ids = [os.path.basename(file).split('.')[0] for file in glob.glob(os.path.join(interim, 'retiled', '*.laz'))]
print('Found {:,d} tiles to process'.format(len(tile_ids)))

Found 989 tiles to process


In [16]:
dsk = {}
for tile in tile_ids:
    dsk['batch_csv2grid-{}'.format(tile)]=(batch_csv2grid, tile)
    dsk['batch_asc2tif-{}'.format(tile)]=(batch_asc2tif, 'batch_csv2grid-{}'.format(tile))
    dsk['remove_grid_metrics_buffer-{}'.format(tile)]=(remove_grid_metrics_buffer, 'batch_asc2tif-{}'.format(tile))
    dsk['tile_done-{}'.format(tile)]=(tile_done, ['remove_grid_metrics_buffer-{}'.format(tile)])
    
dsk['tiles_done'] = (tiles_done, ['tile_done-{}'.format(tile) for tile in tile_ids])

In [17]:
tiles_graph = c.get(dsk, 'tiles_done')

In [18]:
tiles_results = c.compute(tiles_graph) # this might take a while...

In [19]:
progress(tiles_results)

VBox()

In [20]:
failed = glob.glob(os.path.join(interim, 'failed', '*.txt'))

summary = '''Processing Summary
--------------------
{:,d} tiles failed
'''.format(len(failed))

print(summary)

Processing Summary
--------------------
0 tiles failed



In [None]:
# results = c.compute([make_gridmetrics(tile_id) for tile_id in tile_ids])
# progress(results)

In [None]:
# csv2grid_results = c.compute([batch_csv2grid(tile_id) for tile_id in tile_ids])
# progress(csv2grid_results)

In [None]:
# asc2tif_results = c.compute([batch_asc2tif(tile_id) for tile_id in tile_ids])
# progress(asc2tif_results)

In [None]:
# clip_results = c.compute([remove_grid_metrics_buffer(tile_id) for tile_id in tile_ids])
# progress(clip_results)

In [None]:
%%time
# get rid of the .asc files FUSION generated
clean_dir(os.path.join(interim, 'gridmetrics', 'rasters'), ['.asc'])

In [None]:
c.close()
cluster.close()