# Wildcat Creek 2018 Labor Day Flood Inundation Mapping

This notebook uses the Wildcat Creek (near Manhattan, KS) Labor Day flood event in 2018 to demonstarte how to map the flood event using FLDPLN tiled library.

## Import Modules

Import necessary modules.

In [1]:
import sys
import time
from dask.distributed import Client, LocalCluster
from dask import visualize

### Import FLDPLN Modules

In [2]:
# Tool/script folder
fldplnToolFolder = r'E:\CUAHSI_SI\training\source' # tool development folder, has the latest version

# Add the tool/script folder to sys.path to access fldpln modules
sys.path.append(fldplnToolFolder) 
# fldpln module
from fldpln import *
from fldpln_library import *
from fldpln_gauge import *

## Setup Input Tiled Library and Output Folders

Here we setup the folder under which tiled libraries (organized as folders) are located. We also setup the output folder (i.e., outputFolder) under which a map folder and a 'scratch' folder are created. The map folder, which is specified later, comtains all inundation depth maps. The scratch folder stores temporary files.

In [3]:
# tiled library folder
libFolder =  r'E:\CUAHSI_SI\training\examples\wildcat_10m_3dep\tiled_snz_library'

# libraries to be mapped
allLibNames = ['lib_py']

# Set output folder
outputFolder = r'E:\CUAHSI_SI\training\examples\wildcat_10m_3dep\maps'

## Prepare Gauge Stage and Calculate Gauge Depth of Flow (DOF)

Here we obtain and prepare flood event stages from stream gauges. The stage at a gauge typically refers to the gauge's datum, which is not necessary of the stream bed elevation which is based on a certain vertical datum. In order to use gauge stage in a FLDPLN library, we need to make sure that gauge stage elevation (gauage + stage) and FSP's filled elevation are based on the same vertical datum. The depth of flow (DOF) at the FSP can then be calculated as the difference. The Wildcat Creek DEM and FLDPLN library are based on the NAVD88 vertical datum. So gauge stage elevations need to be based on the vertical datum too to calculate the DOFs at those gauges. 

### Gauge Stage from AHPS and USGS

Both USGS and NWS AHPS maintain stream gauages which record past flood stages. There are three AHPS and USGS gauges ([WKCK1](https://water.weather.gov/ahps2/hydrograph.php?wfo=top&gage=wkck1), [MWCK1](https://water.weather.gov/ahps2/hydrograph.php?wfo=top&gage=MWCK1), [MSTK1](https://water.weather.gov/ahps2/hydrograph.php?wfo=top&gage=MSTK1)) on the Wildcat Creek that record the 2018 Labor Day flood event. Here we use the maximum Labor Day flood event stages at those gauges to map the maximum inundation extent and depth of the event.

#### Event Stage from AHPS Historic Crests

The flood stage for the 2018 Labor Day flood event in 2018 are availble as AHPS histroic crests at those gauges [WKCK1](https://water.noaa.gov/gauges/WKCK1), [MWCK1](https://water.weather.gov/ahps2/hydrograph.php?wfo=top&gage=MWCK1) and [MSTK1](https://water.weather.gov/ahps2/hydrograph.php?wfo=top&gage=MSTK1). Excel file wildcat_gauges_albers_meters.xlsx has several sheets which store both gauge information (for example, gauge datum) and the event statges with different gauge combinations. The key fields needed for those gauges are: stationid, x, y, and stage_elevation

Note that most USGS and AHPS gauge stages are measured in feet and **Make sure that gauge coordinates are in the same coordinate system of the library and gauge stages are also in the same vertical unit of the library.** 

In [4]:
# # Two downstream gauges 
# gaugeStageFileName = 'wildcat_gauges.xlsx' # KS LiDAR DEM in UTM with vertical unit in feet
gaugeStageFileName = 'wildcat_gauges_albers_meters.xlsx' # 3DEP DEM in Albers with vertical unit in meters
sheetName = 'ThreeGauges' # all 3 gauges
# sheetName = 'TwoDsGauges' # 2 downstream gauges
# sheetName = 'MSTK1' # the last downstream gauge used in HEC-RAS model

# read gauge file
gaugeStages = pd.read_excel(gaugeStageFileName, sheet_name=sheetName) 
# print(gaugeStages)

# Need to calculate gauge stage elevation if necessary!

# keep only necessary fields from gauges
keptFields = ['stationid','x','y','stage_elevation']
gaugeWithStageElevations = gaugeStages[keptFields]
print(gaugeWithStageElevations)

        stationid           x             y  stage_elevation
0  06879805,WKCK1 -60735.0580  1.799635e+06       343.911936
1  06879810,MWCK1 -54988.6141  1.796210e+06       325.907400
2  06879815,MSTK1 -52277.2352  1.795783e+06       317.851536


#### Event Stage from USGS NWIS

We can also get event maximum stage directly from USGS NWIS to cehck the historic crests from AHPS. Note that the stages are in feet and we need to convert stages to stage elevation before using it in flood inundation mapping.

In [4]:
# Wildcat Creek 3 USGS gauges (in the order from upstream to downstream)
usgsIds = ['06879805','06879810','06879815'] 
ahpsIds = ['WKCK1','MWCK1','MSTK1']

# A period between two dates: Wildcat Creek Sep.3 2018 flood event
instStages = GetUsgsGaugeStageFromWebService(usgsIds,startDate='2018-09-02',endDate='2018-09-04')
print(instStages)

# find the max stage within the time period
maxStages = instStages.groupby(['stationid'],as_index=False).agg({'stage_ft':'max'})
# find the most recent time with the max stage
tdf = pd.merge(instStages, maxStages, how='inner', on=['stationid','stage_ft'])
gaugeStagesFromNwis = tdf.groupby(['stationid'], as_index=False).agg({'stationid':'first','stage_ft':'first','stage_time':'max'})
print(gaugeStagesFromNwis)



    stationid  stage_ft                     stage_time
0    06879805      6.87  2018-09-02T00:00:00.000-05:00
1    06879805      6.87  2018-09-02T00:15:00.000-05:00
2    06879805      6.87  2018-09-02T00:30:00.000-05:00
3    06879805      6.87  2018-09-02T00:45:00.000-05:00
4    06879805      6.87  2018-09-02T01:00:00.000-05:00
..        ...       ...                            ...
812  06879815      5.73  2018-09-04T22:45:00.000-05:00
813  06879815      5.73  2018-09-04T23:00:00.000-05:00
814  06879815      5.72  2018-09-04T23:15:00.000-05:00
815  06879815      5.72  2018-09-04T23:30:00.000-05:00
816  06879815      5.71  2018-09-04T23:45:00.000-05:00

[817 rows x 3 columns]
  stationid  stage_ft                     stage_time
0  06879805     25.97  2018-09-03T04:45:00.000-05:00
1  06879810     28.29  2018-09-03T07:00:00.000-05:00
2  06879815     25.18  2018-09-03T08:30:00.000-05:00


### Synthetic Gauge Stage from the National Water Model and HAND

HAND FIM uses NWM's discharge and turn it into stage. Here we use HAND reach stage to run FLDPLN for the event. Concepually, we turn reach stage into a synthetic gauge located at the either the mid-point or the outlet of the reach. Selecting the HAND reaches and sythteric gauge location is done by graduate student David Weiss manually for the Wildcat Creek example. Those sytheteic gauges can be treated as USGS/AHPS guages. The key fields needed are: stationid, x, y, and stage_elevation.  Note that we assume the HAND reach stage elevation is the same as the FLDPLN library DEM. 

In [6]:
# Synthetic FSP gauges from NWC reach stage
# gaugeStageFileName = 'wildcat_gauges.xlsx'
# sheetName = 'ReachStageAsDof' 
gaugeStageFileName = 'wildcat_gauges_albers_meters.xlsx'
# sheetName = 'ReachMedianStage' # HAND reach median stage as DOF
sheetName = 'ReachOutletStage' # HAND reach outlet stage as DOF

# read gauge file
gaugeStages = pd.read_excel(gaugeStageFileName, sheet_name=sheetName) # 3 gauges
print(gaugeStages)

# Need to calculate gauge stage elevation if necessary!

# keep only necessary fields from gauges
keptFields = ['stationid','x','y','stage_elevation']
gaugeWithStageElevations = gaugeStages[keptFields]
print(gaugeWithStageElevations)

    stationid             x             y  FilledElev_FLDPLN   HydroID  \
0         1.0 -61538.205794  1.799968e+06        1123.727413  22160126   
1         2.0 -60868.205794  1.799988e+06        1111.158236  22160123   
2         3.0 -60798.205794  1.799258e+06        1107.293577  22160124   
3         4.0 -60088.205794  1.799138e+06        1101.593758  22160125   
4         5.0 -58898.205794  1.798198e+06        1093.053342  22160110   
5         6.0 -58148.205794  1.797518e+06        1079.412746  22160111   
6         7.0 -57358.205794  1.797018e+06        1074.712158  22160104   
7         8.0 -56828.205794  1.796868e+06        1067.207817  22160101   
8         9.0 -56308.205794  1.796368e+06        1066.029766  22160100   
9        10.0 -56068.205794  1.796758e+06        1059.294473  22160093   
10       11.0 -55898.205794  1.796548e+06        1053.092636  22160094   
11       12.0 -55208.205794  1.796178e+06        1048.627037  22160095   
12       13.0 -54528.205794  1.795848e

### Snap Gauges to FSPs and Calculate Gauge DOF

Here we snap gauges (with their stage elevation) to FLDPLN flood source pixels (FSPs), which are the stream pixels. Each snapped gauge FSP has a stream bed elevaltion, which is used to claculate the depth of flow/flood (DOF) at those FSPs. 

This process also identifies the FLDPLN libraries that the gauges belong to. Note that the same gauges can be snapped to more than one library as FLDPLN libraries may overlap and the overalpping FSPs may have different coordinates! 

In [7]:
# snap gauges to FSPs on-the-fly
print('Snap gauges to FSPs ...')
print(f'Number of gauges: {len(gaugeWithStageElevations.index)}')

# FLDPLN libraries to whose FSPs gauges are sanpped. All the libraries by default but can be a subset
libs2Map = ['lib_py']

# snap the gauges to FSPs. 
# Fields 'StrOrd','DsDist','SegId','FilledElev'are used for interpolating other FSP DOF
# Note that 'lib_name','FspX', 'FspY' together uniquely identify a FSP (as there are overlapping FSPs between libraries)!
gaugeFspDf = SnapGauges2Fsps(libFolder,libs2Map,gaugeWithStageElevations,snapDist=350,gaugeXField='x',gaugeYField='y',fspColumns=['FspX','FspY','StrOrd','DsDist','SegId','FilledElev']) 
print(gaugeFspDf)

# calculate gauge FSP's DOF
gaugeFspDf['Dof'] = gaugeFspDf['stage_elevation'] - gaugeFspDf['FilledElev']

# keep only necessary columns for gauge FSPs
gaugeFspDf = gaugeFspDf[['lib_name','FspX','FspY','StrOrd','DsDist','SegId','FilledElev','Dof']] # Note that 'lib_name','FspX', 'FspY' together uniquely identify a FSP!!!

# show info
print(f'Number of snapped gauge FSPs: {len(gaugeFspDf)}')
# Find libs where the gauges are snapped to, and they are the actual libs to map
libs2Map = gaugeFspDf['lib_name'].drop_duplicates().tolist()
print(f'Libraries gauges snapped to: {libs2Map}')
print(gaugeFspDf)

#
# save snapped gauges to CSV file for checking
# gaugeFspDf.to_csv(os.path.join(outputFolder, 'SnappedGauges.csv'), index=False)

Snap gauges to FSPs ...
Number of gauges: 24
    index  stationid             x             y  stage_elevation  \
0       0        1.0 -61538.205794  1.799968e+06       346.630015   
1       1        2.0 -60868.205794  1.799988e+06       342.802630   
2       2        3.0 -60798.205794  1.799258e+06       342.836882   
3       3        4.0 -60088.205794  1.799138e+06       341.025877   
4       4        5.0 -58898.205794  1.798198e+06       338.710258   
5       5        6.0 -58148.205794  1.797518e+06       334.307905   
6       6        7.0 -57358.205794  1.797018e+06       333.577365   
7       7        8.0 -56828.205794  1.796868e+06       330.471942   
8       8        9.0 -56308.205794  1.796368e+06       331.243373   
9       9       10.0 -56068.205794  1.796758e+06       328.668555   
10     10       11.0 -55898.205794  1.796548e+06       326.599535   
11     11       12.0 -55208.205794  1.796178e+06       326.155221   
12     12       13.0 -54528.205794  1.795848e+06       323

  nearestP2Df = pd.concat([nearestP2Df,t],ignore_index=False)


## Interpolate FSP's DOF

Here we interpolate the DOF for all the FSPs between the gauge-FSPs using their DOF calculated from previous step. The interpolation uses stream orders and starts from low stream order (i.e., main streams) to high stream order (i.e., tributatried). Either horizontal or vertical (by defaut) interpolation can be used.

In [8]:
# Find libs with snapped gauges. They are the actual libs to map
libs2Map = gaugeFspDf['lib_name'].drop_duplicates().tolist()

# prepare the DF for storing interpolated FSP DOF
fspDof = pd.DataFrame(columns=['LibName','FspId','Dof'])

# prepare DFs for saving interpolated FSPs and their segment IDs
fspCols = fspInfoColumnNames + ['Dof']
segIdCols = ['SegId','LibName']
fsps = pd.DataFrame(columns=fspCols)
segIds =pd.DataFrame(columns=segIdCols)

# map each library
for libName in libs2Map:
    # interpolate DOF for the gauges
    # print('Interpolate FSP DOF using gauge DOF ...')
    # fspIdDof = InterpolateFspDofFromGauge(libFolder,libName,gaugeFspDf) # 'V' by default
    fspIdDof = InterpolateFspDofFromGauge(libFolder,libName,gaugeFspDf,weightingType='H') # horizontal interpolation
    fspIdDof['LibName'] = libName
    # fspDof = fspDof.append(fspIdDof[['LibName','FspId','Dof']], ignore_index=True)
    fspDof = pd.concat([fspDof,fspIdDof[['LibName','FspId','Dof']]], ignore_index=True)

    # Keep interpolated FSP DOF for saving later
    fspFile = os.path.join(libFolder, libName, fspInfoFileName)
    fspDf = pd.read_csv(fspFile) 
    fspDf = pd.merge(fspDf,fspDof,how='inner',on=['FspId'])
    # fsps = fsps.append(fspDf, ignore_index=True)
    fsps = pd.concat([fsps,fspDf], ignore_index=True)
    
    # Keep FSP segment IDs for saving later
    t =  pd.DataFrame(fspDf['SegId'].drop_duplicates().sort_values())
    t['LibName'] = libName
    # segIds = segIds.append(t, ignore_index=True)
    segIds = pd.concat([segIds,t], ignore_index=True)

# show interpolated FSPs with Dof
print(fspDof)

#
# save interpolated FSP DOF and their segments for checking. This block of code should be commented out if no-checking needed
#
# Save DOF and segment IDs to CSV files
FspDofFile = os.path.join(outputFolder, 'Interpolated_FSP_DOF.csv')
SegIdFile = os.path.join(outputFolder, 'Interpolated_SegIds.csv')
fsps.to_csv(FspDofFile, index=False)
segIds.to_csv(SegIdFile, index=False)

# # turn interpolated sgements into a shapefile
# for libName in libs2Map:
#     segShp = os.path.join(libFolder, libName, 'stream_orders.shp')
#     segs = gpd.read_file(segShp)
#     segs['LibName'] = libName
#     # print(segs)
#     # join by two fields: SegId and LibName
#     segDf = pd.merge(segs,segIds,how='inner',on=['SegId','LibName'])
#     # print(segDf)
#     # write segments as a shapefile
#     segDf.to_file(os.path.join(outputFolder, 'Interpolated_Segements.shp'))

     LibName FspId       Dof
0     lib_py     1  4.073070
1     lib_py     2  4.076093
2     lib_py     3  4.078231
3     lib_py     4  4.081254
4     lib_py     5  4.084277
...      ...   ...       ...
2329  lib_py  2330  3.263017
2330  lib_py  2331  2.651393
2331  lib_py  2332  2.039768
2332  lib_py  2333  1.607284
2333  lib_py  2334  1.174800

[2334 rows x 3 columns]


  fspDof = pd.concat([fspDof,fspOrd],ignore_index=True)
  fspDof = pd.concat([fspDof,fspIdDof[['LibName','FspId','Dof']]], ignore_index=True)
  fsps = pd.concat([fsps,fspDf], ignore_index=True)


## Map Flood Inundation Depth


### Set Mapping Parameters

Setup the map folder (i.e., outMapFolderName) which is under the output folder and comtains all inundation depth maps. Additional settings include whether to mosaic tiles as single COG file and whether use a Dask local cluster to speed up the mapping.

In [8]:
# set up map folder
outMapFolderName = 'labor_day_2018_AHPS_reach_outlet'

# Create folders for storing temp and output map files
outMapFolder,scratchFolder = CreateFolders(outputFolder,'scratch',outMapFolderName)

# whether mosaci tiles as a single COG
mosaicTiles = True #True #False

# Using LocalCluster by default
useLocalCluster = False # This doesn't work on my office desktop though it works fine on KBS server
numOfWorkers = round(0.8*os.cpu_count())
numOfWorkers = 6
print(f'Number of workers: {numOfWorkers}')

Number of workers: 6


### Map Inundation Depth

The process of generating inundation depth map happens here.

In [11]:
print(libs2Map)

['lib_py']


In [None]:
def MapFloodDepthWithTiles_v1(libFolder,libName,fileFormat,outMapFolder,fspDof='MinDtf',aoiExtent=None):
# fileFormat--'snappy' or 'mat'. 'snappy' format needs to install the 'fastparquet' python package
# fspDof--Cab be 'MinDtf', 'NumOfFsps', 'Depression', a float number, and a DF of FSPs with DOF. 
#         When fspDof is a DF, it must have 3 columns ['FspX','FspY','Dof'] storing FSP's coordinates and DOF
# aoiExtent--rectangle area interested, further limits the tiles decided by the FSPs' DOF. Can be None or a list of [minX,maxX,minY,maxY], default is None.
# keepTileMaps--false or true, default is false
# 
    # create the folder for generating tile maps
    os.makedirs(outMapFolder,exist_ok=True)
    # Read lib meta data file
    metaDataFile = os.path.join(libFolder, libName, metaDataFileName)
    with open(metaDataFile,'r') as jf:
        md = json.load(jf)
    cellSize = md['CellSize']
    srText = md['SpatialReference']
    libSr = rasterio.crs.CRS.from_wkt(srText)

    # decide the tiles to map
    tileIds,fppExtents = Tiles2Map_v1(libFolder,libName,fspDof,aoiExtent=None)  #FUNCTION 1 - Tiles2Map
    print('Tiles need to be mapped:',tileIds)

    # map the selected tile
    if tileIds is None:
        tileTifs = None
    else:
        tileTifs = []
        for tid,fppExtent in zip(tileIds,fppExtents):    #FUNCTION 2 - MapOneTile
            tif=MapOneTile_v1(libFolder,libName,tid,fppExtent,cellSize,libSr,fileFormat,outMapFolder,fspDof,aoiExtent)
            if not(tif is None):
                tileTifs.append(tif)
        if not tileTifs: # empty list
            tileTifs = None
    
    return tileTifs

######################################################################################################################
def Tiles2Map_v1(libFolder,libName,fspDof='MinDtf',aoiExtent=None):
# Find the tiles need to be mapped for the library
    # Read in the fsp-tile index and tile index files for selecting tiles for mapping
    # read in fsp-tile index file to select the tiles for mapping
    fspIdxFile = os.path.join(libFolder, libName, fspTileIndexFileName)
    fspIdxDf = pd.read_csv(fspIdxFile)
    # print(fspIdxDf)

    # tile index file stores tile and FPP extents for the tile
    tileIdxFile = os.path.join(libFolder, libName, tileIndexFileName)
    tileIdxDf = pd.read_csv(tileIdxFile)
    # print(tileIdxDf)
    
    # Select the tiles for mapping based on the FSPs and fsp-tile index
    # for a dataframe of FSPs
    elif isinstance(fspDof, pd.DataFrame):
        # find which tiles need to be mapped
        fspTiles = pd.merge(fspIdxDf, fspDof, how='inner', on=['FspId'])
        # print(fspTiles)
        # select those where DOF > minDtf
        fspTiles = fspTiles[fspTiles['Dof']>fspTiles['MinDtf']]
        # print(fspTiles)
        # find the tiles need to be mapped
        # fspTiles = fspTiles['TileId'].unique()
        fspTiles = fspTiles['TileId'].drop_duplicates().sort_values().tolist()
    else:
        print(f'Unsupported fspDof type {fspDof}!')
        return

    # further limit the tiles to those that intersect with the AOI extent
    if aoiExtent is None:
        tiles = fspTiles
    
    # tiles selected
    if len(tiles) == 0:
        # print('No tile needs to be mapped!')
        return None,None
    else:
        # get each tile's fppExtent
        fppExtents=[]
        for tid in tiles:
            fppExtent = tileIdxDf[tileIdxDf['TileId']==tid].reset_index().loc[0,['FppMinX','FppMaxX','FppMinY','FppMaxY']].values.tolist()
            fppExtents.append(fppExtent)
        return tiles,fppExtents

######################################################################################################################
def MapOneTile_v1(libFolder,libName,tid,fppExtent,cellSize,libSr,fileFormat,outMapFolder,fspDof='MinDtf',aoiExtent=None):
# Map one tile as a GeoTif file   
    if fileFormat == 'snappy':
        # tileName = os.path.join(libFolder, libName, tileFileMainName+'_'+str(tid)+'.gzip') # for gzip
        tileName = os.path.join(libFolder, libName, tileFileMainName+'_'+str(tid)+'.snz') # for snappy
        tdf = pd.read_parquet(tileName) # the original column datatypes are kept when read into a DF!

    # Turn FSP-FPP relations to a 2D array
    dtfArray, noData, mapMinX, mapMaxY = TileFspFppRelations2Array_v1(tdf, fppExtent, cellSize, fspDof, aoiExtent)
    
    # map the tile
    if not (dtfArray is None): # needs to be mapped
        # Create and save map as a GeoTif file
        # print('Saving map as a TIF raster ...')
        
        # output file name
        rasterName = os.path.join(outMapFolder,libName+'_tile_'+str(tid)+'.tif')
        
        # create GeoTIFF profile
        # create an Affine transformation from upper left corner coordinates and pixel sizes
        transform = rasterio.transform.from_origin(mapMinX, mapMaxY, cellSize, cellSize)
        profile = dict(
            driver="GTiff",
            height = dtfArray.shape[0], 
            width = dtfArray.shape[1],
            count=1,
            dtype=str(dtfArray.dtype),
            crs=libSr,
            transform=transform,
            nodata=noData
        )
        
        # write to COG file
        with MemoryFile() as memfile:
            # write the array to a memory file
            with memfile.open(**profile) as mem:
                # Populate the input file with numpy array
                mem.write(dtfArray,1)
            # open the memory file reading
            with memfile.open(mode='r') as mem:
                dst_profile = cog_profiles.get("deflate")
                cog_translate(
                    mem,
                    rasterName,
                    dst_profile,
                    in_memory=True,
                    quiet=True,
                )
        return rasterName
    
        # # code to save tile as regular GeoTIFF file
        # with rasterio.open(rasterName, 'w', **profile) as tifRaster:
        #     tifRaster.write(dtfArray, 1)
        # return rasterName
    else:
        return None

######################################################################################################################
def TileFspFppRelations2Array_v1(fspFppRels, fppExtent, cellSize, fspDof='MinDtf', aoiExtent=None, noData=-9999):
# The minimum bounding extent of the FPPs in the relations is always used when create the map for the tile!
#
# FspFppRels-- a dataframe of FSP-FPP relations which have the columns of ["FspId", "FppCol", "FppRow", "Dtf", "FilledDepth"] from a tile
# fppExtent--a list of [minX, maxX, minY, maxY], FPP's external extent of the tile and is also used to locate FPP's columns and rows in map coordinate
# fspDof--Cab be 'MinDtf', 'NumOfFsps', 'Depression', a float number, and a DF of FSP depth of flow, i.e., FSP stage. 
#         When fspDof is a table, it must have 3 columns ['FspX','FspY','Dof'] storing the coordinates and "Depth of Flow" for the FSPs
# aoiExtent--None or a rectabgle extent of [minX, maxX, minY, maxY] that INTERSECTs with the fppExtent

# Returns: dtfArray, noData, mapMinX, mapMinY

    tdf = fspFppRels
    # print('Number of FSP-FPP relations:', len(tdf))
    if len(tdf)==0:
        # no FPP needs to be mapping
        return None, None, None, None
    
    # Calculate pixel values at each FPP based on the types of fspDof: 'MinDtf', 'NumOfFsps', 'Depression', a constant DOF, and a list of DOF
    # Pixel value at each FPP is saved in the 'Dtf' column.
    # Map the tile with a FSP DOF df
    if isinstance(fspDof, pd.DataFrame):
        # print('Map with a list of FSPs with DOFs ...')

        # Only keep those relations whose DTF is less than or equal to the max interpolated DOF. 
        # This significantly saves memory and time when merge the relations with the DOFs!
        maxDof = fspDof['Dof'].max()
        tdf = tdf[tdf['Dtf'] < maxDof] # tdf.drop(tdf[tdf['Dtf']<=0].index, inplace=True) # saves memory than tdf = tdf[tdf['Dtf'] > 0]?
        # print('Number of relations to be mapped: ',len(tdf))
        
        # set FSP DOF column data types to speed merge
        fspDof = fspDof.astype(dtype={"FspId":np.int32,"Dof":np.float32},copy=False)

        # create index to speed up merge
        # tdf.astype(np.float32,copy=False).set_index(keys=['FspX','FspY'],inplace=True)
        # fspDof.astype(np.float32,copy=False).set_index(keys=['FspX','FspY'],inplace=True)
        # tdf = pd.merge(tdf, fspDof, how='inner', left_index=True,right_index=True)
        
        # map the FPPs whose FSPs' DOF > the MinDOF
        tdf = pd.merge(tdf, fspDof, how='inner', on=['FspId']) #.astype(np.float32,copy=False)
        
        # calculate DTF
        tdf['Dtf_Final'] = tdf['Dof'] - tdf['Dtf']
        tdf = tdf[tdf['Dtf_Final'] > 0] # tdf.drop(tdf[tdf['Dtf']<=0].index, inplace=True) # saves memory than tdf = tdf[tdf['Dtf'] > 0]?
                
        tdf = tdf.groupby(['FppCol', 'FppRow'],as_index=False).agg({'Dtf_Final':'max','FilledDepth':'first'}) #Depth = ('Dtf', max),FilledDepth=('FilledDepth',first))
        # print(tdf)
        # add the depth of filled drpression
        tdf['Dtf_Final'] = tdf['Dtf_Final'] + tdf['FilledDepth']
        # drop 'FilledDepth'
    else:
        print(f'Unsupported fspDof type {fspDof}!')
        return None, None, None, None
    #
    # Turn relations into 2D array
    #
    if len(tdf)==0:
        # no FPP needs to be mapping
        return None, None, None, None

    # drop off not-used columns in the DF
    tdf = tdf[['FppCol','FppRow','Dtf_Final']]
    # tdf.drop(columns=['FilledDepth'],axis=1,inplace=True)

    # Determine the minimum map extent to speed up the mapping
    # original map extent is the FPP's extent
    mapMinX,mapMaxX,mapMinY,mapMaxY = fppExtent

    # further reduce map extent if FPP extent is reduced
    if (not (aoiExtent is None)) or isinstance(fspDof,(int, float)) or isinstance(fspDof, pd.DataFrame):
        # further reduce the map extent with the FPPs 
        mapMinCol,mapMaxCol = tdf['FppCol'].min(),tdf['FppCol'].max()
        mapMinRow,mapMaxRow = tdf['FppRow'].min(),tdf['FppRow'].max()
        # shift FPP's cols and rows
        tdf['FppCol'] = tdf['FppCol']-mapMinCol
        tdf['FppRow'] = tdf['FppRow']-mapMinRow
        # calculate map's new extent
        mapMaxX = mapMinX + (mapMaxCol+1)*cellSize # this line MUST before the next line as the next line changes mapMinX!
        mapMinX = mapMinX + mapMinCol*cellSize
        mapMinY = mapMaxY - (mapMaxRow+1)*cellSize # this line MUST before the next line as the next line changes mapMaxY!
        mapMaxY = mapMaxY - mapMinRow*cellSize
    
    # print('Map extent (minX, maxX, minY, maxY) :',(mapMinX, mapMaxX, mapMinY, mapMaxY))
    # Calculate map rows and columns
    tCols = int(round((mapMaxX-mapMinX)/cellSize))
    tRows = int(round((mapMaxY-mapMinY)/cellSize))
    # print(f'Turn FSP-FPP relations to a 2D array of {tRows, tCols} ...')
    
    # Initialize the array for saving as a raster
    dtfArray =  np.full(shape=(tRows,tCols),fill_value=noData,dtype=np.float32)
    
    # # update the array with FPP's DTF
    for (idx,idy,dtf) in tdf.itertuples(index=False): # itertuples() is the fastest way of iterating a df
        # idx,idy,dtf = (getattr(row,'FppCol'),getattr(row,'FppRow'),getattr(row,'Dtf')) 
        dtfArray[idy,idx] = dtf
 
    return dtfArray, noData, mapMinX, mapMaxY

In [10]:
# show mapping info
print(f'Tiled FLDPLN library folder: {libFolder}')
print(f'Map folder: {outMapFolder}')
# Find libs needs mapping
libs2Map = fspDof['LibName'].drop_duplicates().tolist()
print(f'Libraries to map: {libs2Map}')

# check running time
startTimeAllLibs = time.time()

# create a local cluster to speed up the mapping. Must be run inside "if __name__ == '__main__'"!!!
if useLocalCluster:
    # cluster = LocalCluster(n_workers=4,processes=False)
    try:
        print('Start a LocalCluster ...')
        # NOTE: set worker space (i.e., local_dir) to a folder that the LocalCluster can access. When run the script through a scheduled task, 
        # the system uses C:\Windows\system32 by default, which a typical user doesn't have the access!
        # cluster = LocalCluster(n_workers=numOfWorkers,memory_limit='32GB',local_dir="D:/projects_new/fldpln/tools") # for KARS production server (192G RAM & 8 cores)
        # cluster = LocalCluster(n_workers=numOfWorkers,processes=False) # for KARS production server (192G RAM & 8 cores)
        cluster = LocalCluster(n_workers=numOfWorkers,memory_limit='8GB',local_dir="E:\temp") # for office desktop (64G RAM & 8 cores)
        # print('Watch workers at: ',cluster.dashboard_link)
        print(f'Number of workers: {numOfWorkers}')
        client = Client(cluster)
        # print scheduler info
        # print(client.scheduler_info())
    except:
        print('Cannot create a LocalCLuster!')
        useLocalCluster = False

# dict to store lib processing time
libTime={}

# map each library
for libName in libs2Map:
    # check running time
    startTime = time.time()
    
    # select the FSPs within the lib
    fspIdDof = fspDof[fspDof['LibName']==libName][['FspId','Dof']]
    # mapping flood depth
    if useLocalCluster:
        
        print(f'Map [{libName}] using LocalCLuster ...')
        # generate a DAG
        dag,dagRoot=MapFloodDepthWithTilesAsDag(libFolder,libName,'snappy',outMapFolder,fspIdDof,aoiExtent=None)
        if dag is None:
            tileTifs = None
        else:
            # visualize DAG
            # visualize(dag)
            # Compute DAG
            tileTifs = client.get(dag, dagRoot)
            if not tileTifs: # list is empty
                tileTifs =  None
    else:
        print(f'Map {libName} ...')
        tileTifs = MapFloodDepthWithTiles_v1(libFolder,libName,'snappy',outMapFolder,fspIdDof,aoiExtent=None)
    print(f'Actual mapped tiles: {tileTifs}')

    # Mosaic all the tiles from a library into one tif file
    if mosaicTiles and not(tileTifs is None):
        print('Mosaic tile maps ...')
        mosaicTifName = libName+'_'+outMapFolderName+'.tif'
        # Simplest implementation, may crash with very large raster
        MosaicGtifs(outMapFolder,tileTifs,mosaicTifName,keepTifs=False)
    
    # check time
    endTime = time.time()
    usedTime = round((endTime-startTime)/60,3)
    libTime[libName] = usedTime
    # print(f'{libName} processing time (minutes):', usedTime)

# Show processing time
# Individual lib processing time
print('Individual library mapping time:', libTime)
# total time
endTimeAllLibs = time.time()
print('Total processing time (minutes):', round((endTimeAllLibs-startTimeAllLibs)/60,3))

#
# Shutdown local clusters
#
if useLocalCluster:
    print('Shutdown LocalCluster ...')
    cluster.close()
    client.shutdown()
    client.close()
    useLocalCluster = False

Tiled FLDPLN library folder: E:\CUAHSI_SI\training\examples\wildcat_10m_3dep\tiled_snz_library
Map folder: E:\CUAHSI_SI\training\examples\wildcat_10m_3dep\maps\labor_day_2018_AHPS_reach_outlet
Libraries to map: ['lib_py']
Map lib_py ...
Tiles need to be mapped: [4, 5, 9, 10, 13, 14, 19, 20, 25, 26, 31, 32]
10
Actual mapped tiles: ['E:\\CUAHSI_SI\\training\\examples\\wildcat_10m_3dep\\maps\\labor_day_2018_AHPS_reach_outlet\\lib_py_tile_4.tif', 'E:\\CUAHSI_SI\\training\\examples\\wildcat_10m_3dep\\maps\\labor_day_2018_AHPS_reach_outlet\\lib_py_tile_5.tif', 'E:\\CUAHSI_SI\\training\\examples\\wildcat_10m_3dep\\maps\\labor_day_2018_AHPS_reach_outlet\\lib_py_tile_9.tif', 'E:\\CUAHSI_SI\\training\\examples\\wildcat_10m_3dep\\maps\\labor_day_2018_AHPS_reach_outlet\\lib_py_tile_10.tif', 'E:\\CUAHSI_SI\\training\\examples\\wildcat_10m_3dep\\maps\\labor_day_2018_AHPS_reach_outlet\\lib_py_tile_13.tif', 'E:\\CUAHSI_SI\\training\\examples\\wildcat_10m_3dep\\maps\\labor_day_2018_AHPS_reach_outlet\\l