# Process Lidar

This run at 10m was done with 12 r6a.4xlarge EC2 Instance.

In [1]:
import geopandas as gpd
import pandas as pd
#import geoplot as gplt
import laspy
import shapely
import os
from datetime import datetime, timedelta
import numpy as np
from itertools import product
import math
import pyproj
import re
import pyspark
import warnings
from collections import defaultdict

In [2]:
def lp(v, suppressOut=False):
    if not suppressOut:
        print(f"[{datetime.now()}] {v}")

lp("Starting...")

[2025-04-01 14:26:30.957125] Starting...


In [3]:
resolution = 10  #grid resolution in meters
outPath = f"{os.curdir}{os.sep}output{os.sep}"
lazPath = f"{os.path.realpath(os.curdir)}{os.sep}datasets{os.sep}laz{os.sep}"
lp(f"lazPath:  {lazPath}")
assert os.path.exists(lazPath), f"{lazPath} does not exists PANIC!"

outputFileName = f"{outPath}AggregateLidarData_{resolution}m.parquet"
boxesFile = f"{outPath}boxesDf_{resolution}m.parquet"

captureHoods = []


[2025-04-01 14:26:30.962779] lazPath:  /home/ec2-user/notebooks/NewOrleansElevation/datasets/laz/


In [4]:
conf = pyspark.SparkConf()\
                .set("spark.executor.memory", "110g")\
                .setAppName(outputFileName)\
                .setMaster("spark://ip-10-0-4-166.ec2.internal:7077")
lp(conf)

[2025-04-01 14:26:30.967297] <pyspark.conf.SparkConf object at 0x7f24043d4fd0>


In [5]:
if not os.path.exists(outPath):
    lp(f"Creating output path {outPath}")
    os.makedirs(outPath)

In [6]:
dataPath = f"{os.curdir}{os.sep}datasets{os.sep}"
fullDataPath = os.path.realpath(dataPath) + os.sep
lp(f"Path:  {dataPath}\t\tFullPath:{fullDataPath}")

[2025-04-01 14:26:30.976342] Path:  ./datasets/		FullPath:/home/ec2-user/notebooks/NewOrleansElevation/datasets/


In [7]:
testLasFileName = os.listdir(f"{dataPath}laz")[5]
lp(f"Opening {testLasFileName} to get crs and more")
with laspy.open(f"{dataPath}laz{os.sep}{testLasFileName}", 'r') as f: 
    testLas = f.read()

[2025-04-01 14:26:30.982135] Opening USGS_LPC_LA_2021GreaterNewOrleans_C22_w0801n3325.laz to get crs and more


In [8]:
neighborhoodDf = gpd.read_file(f"{dataPath}Neighborhoods.geojson").to_crs(testLas.vlrs[0].parse_crs())

In [9]:
sorted(neighborhoodDf['gnocdc_lab'].unique())

['ALGIERS POINT',
 'AUDUBON',
 'B. W. COOPER',
 'BAYOU ST. JOHN',
 'BEHRMAN',
 'BLACK PEARL',
 'BROADMOOR',
 'BYWATER',
 'CENTRAL BUSINESS DISTRICT',
 'CENTRAL CITY',
 'CITY PARK',
 'DESIRE AREA',
 'DILLARD',
 'DIXON',
 'EAST CARROLLTON',
 'EAST RIVERSIDE',
 'FAIRGROUNDS',
 'FILMORE',
 'FISCHER DEV',
 'FLORIDA AREA',
 'FLORIDA DEV',
 'FRENCH QUARTER',
 'FRERET',
 'GARDEN DISTRICT',
 'GENTILLY TERRACE',
 'GENTILLY WOODS',
 'GERT TOWN',
 'HOLLYGROVE',
 'HOLY CROSS',
 'IBERVILLE',
 'IRISH CHANNEL',
 'LAKE CATHERINE',
 'LAKE TERRACE & OAKS',
 'LAKESHORE - LAKE VISTA',
 'LAKEVIEW',
 'LAKEWOOD',
 'LEONIDAS',
 'LITTLE WOODS',
 'LOWER GARDEN DISTRICT',
 'LOWER NINTH WARD',
 'MARIGNY',
 'MARLYVILLE - FONTAINEBLEAU',
 'MID-CITY',
 'MILAN',
 'MILNEBURG',
 'McDONOGH',
 'NAVARRE',
 'NEW AURORA - ENGLISH TURN',
 'OLD AURORA',
 'PINES VILLAGE',
 'PLUM ORCHARD',
 'PONTCHARTRAIN PARK',
 'READ BLVD EAST',
 'READ BLVD WEST',
 'SEVENTH WARD',
 'ST.  ANTHONY',
 'ST. BERNARD AREA',
 'ST. CLAUDE',
 'ST. ROCH

In [10]:
if len(captureHoods) > 0:
    captureHoodsMask = neighborhoodDf['gnocdc_lab'].isin(captureHoods)
else:
    captureHoodsMask = np.repeat(True, neighborhoodDf.shape[0])

In [11]:
bounds = [int(b) for b in neighborhoodDf[captureHoodsMask].total_bounds]

In [12]:
xPixels = (bounds[2] - bounds[0]) / resolution
yPixels = (bounds[3] - bounds[1]) / resolution
lp(f"Resolution will be {xPixels} x {yPixels}  Runtime based on {xPixels*yPixels}")

[2025-04-01 14:26:31.311526] Resolution will be 4880.7 x 3475.8  Runtime based on 16964337.06


In [13]:
boxesDf = gpd.read_parquet(boxesFile)

In [14]:
lp("Creating spark context")
sc = pyspark.SparkContext(conf=conf)

[2025-04-01 14:26:35.392337] Creating spark context


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/01 14:26:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [15]:
lp("Broadcasting boxes")
boxesBroadcast = sc.broadcast( np.array(list(boxesDf['geometry'].bounds.itertuples(name=None) )))

[2025-04-01 14:26:38.022349] Broadcasting boxes


In [16]:
boxesBroadcast.value[0]

array([8.450000e+02, 7.760390e+05, 3.316338e+06, 7.760490e+05,
       3.316348e+06])

In [17]:
len(boxesBroadcast.value)

5394495

In [18]:
tileIndex = gpd.read_file(f"{dataPath}USGS_LA_2021GNO_1_C22_TileIndex{os.sep}USGS_LA_2021GNO_1_C22_TileIndex.shp").to_crs(neighborhoodDf.crs)
tileIndex.index = tileIndex['Name'].map(lambda f: f"{fullDataPath}laz{os.sep}USGS_LPC_LA_2021GreaterNewOrleans_C22_{f}.laz")
tileIndexBroadcast = sc.broadcast(tileIndex['geometry'].map(lambda g: g.bounds).to_dict())
tileIndexBroadcast.value[tileIndex.index[0]]
lp(f"tileIndexBroadcast length:  {len(tileIndexBroadcast.value)}")

[2025-04-01 14:26:43.012242] tileIndexBroadcast length:  7589


In [19]:
def processLas(lasFileName, suppressOut=True):
    startLazTime = datetime.now()

    lp(f"Processing {lasFileName}...",suppressOut)
    
    with laspy.open(lasFileName, 'r') as f:
        las = f.read()

    

    outDict = {}

    lazBox = shapely.box(*tileIndexBroadcast.value[lasFileName])
    boxMask = gpd.GeoSeries(map(lambda b: shapely.box(*b[1:]), boxesBroadcast.value)).intersects(lazBox)
    
    for inputTup in boxesBroadcast.value[boxMask]:
        startTime = datetime.now()
        
        altTotal = 0
        waterTotal = 0
        pointTotal = 0
        intenseTotal = 0
    
    

        BoxIdx = inputTup[0]
        boxBounds = inputTup[1:]
            
        transBoxBounds = (  int((boxBounds[0] - las.header.offsets[0]) / las.header.scales[0]),\
                            int((boxBounds[1] - las.header.offsets[1]) / las.header.scales[1]),\
                            int((boxBounds[2] - las.header.offsets[0]) / las.header.scales[0]),\
                            int((boxBounds[3] - las.header.offsets[1]) / las.header.scales[1]),)

        
        lp(f"LAZ Bounds         = ({las.header.mins[0]},{las.header.mins[1]},{las.header.maxs[0]},{las.header.maxs[1]})",suppressOut)
        lp(f"LAZ Unscale Bounds = ({las.X.min()},{las.Y.min()},{las.X.max()},{las.y.max()})",suppressOut)
        lp(f"newBox:  {transBoxBounds}", suppressOut)
        
        X = las.X
        Y = las.Y
        Z = las.Z
        cls = las.classification
        intense = las.intensity

                
        groundMask = np.isin(cls, [2,9])
        inBoundsMaskX = np.logical_and(X >= (transBoxBounds[0]), (X <= (transBoxBounds[2])))
        lp(f"InBoundsX:  {np.count_nonzero(inBoundsMaskX)}", suppressOut)
        inBoundsMaskY = np.logical_and(Y >= (transBoxBounds[1]), (Y <= (transBoxBounds[3])))
        lp(f"InBoundsY:  {np.count_nonzero(inBoundsMaskY)}", suppressOut)
        inBoundsMask = np.logical_and(inBoundsMaskX,inBoundsMaskY)
        lp(f"InBounds:  {np.count_nonzero(inBoundsMask)}", suppressOut)
        goodPointMask = np.logical_and( groundMask, inBoundsMask)
        
        lp(f"Good points / Total {np.count_nonzero(goodPointMask)}/{goodPointMask.shape[0]}", suppressOut)
 
        altTotal += int(Z[goodPointMask].sum())
        waterTotal += np.count_nonzero(cls[goodPointMask] == 9)
        intenseTotal += int(intense[goodPointMask].sum())
        pointTotal += np.count_nonzero(goodPointMask)

        outDict[BoxIdx] = (altTotal, waterTotal, intenseTotal, pointTotal, datetime.now() - startTime)
        

    lp(f"LAZ:  {lasFileName} processed in {datetime.now()-startLazTime}")

    return (lasFileName, outDict)


In [20]:
def seqOp(x1, x2):
    outDict = defaultdict(lambda: (0,0,0,0, timedelta(0)), x1)
    x2Dict = x2[1]
    
    for key, value in x2Dict.items():
        outDict[key] = tuple((outDict[key][i] + value[i] for i in range(5) ))

    return dict(outDict)

def comboOp(x1,x2):
    outDict = {}
    x1Dict =defaultdict(lambda: (0,0,0,0, timedelta(0)), x1)
    x2Dict =defaultdict(lambda: (0,0,0,0, timedelta(0)), x2)
    for boxTup in boxesBroadcast.value:
        idx = boxTup[0]
        outDict[idx] = tuple((x1Dict[idx][i] + x2Dict[idx][i] for i in range(5)))

    return outDict


In [21]:
#testing
if False:
    
    test1 = processLas(os.path.realpath("datasets/laz/USGS_LPC_LA_2021GreaterNewOrleans_C22_w0776n3315.laz"), suppressOut=False)
    test2 = processLas(os.path.realpath("datasets/laz/USGS_LPC_LA_2021GreaterNewOrleans_C22_w0780n3324.laz"), suppressOut=False)
    testSeq1 = seqOp({}, test1)   
    testSeq2 = seqOp(testSeq1, test2)
    #print(testSeq2)
                     
    out = comboOp(comboOp(testSeq1, testSeq2), testSeq1)

    print(out)

In [22]:
lazFiles = [lazPath + p for p in os.listdir(lazPath) if p[-4:]=='.laz']
lp(f"{len(lazFiles)} .laz files found in {lazPath}]")

[2025-04-01 14:26:43.038018] 617 .laz files found in /home/ec2-user/notebooks/NewOrleansElevation/datasets/laz/]


In [23]:
partitions = round(len(lazFiles) / 3)
lp(f"Partitions:  {partitions}")
boxesRdd = sc.parallelize(lazFiles, partitions)

[2025-04-01 14:26:43.041881] Partitions:  206


In [24]:
boxesProcessedRdd = boxesRdd.map(processLas)

In [25]:
lp(f"{boxesDf.shape[0]} boxes total")

[2025-04-01 14:26:43.220795] 5394495 boxes total


In [26]:
sparkTime = datetime.now()
output = boxesProcessedRdd.aggregate({}, seqOp, comboOp)
lp(f"Spark completed in {datetime.now() - sparkTime}")

                                                                                

[2025-04-02 02:31:14.694237] Spark completed in 12:04:31.468376


In [27]:
sc.stop()
sparkDoneMSG = "!!!!Done with Spark you can shut off the workers now!!!!"
lp(sparkDoneMSG)
_ = warnings.warn(sparkDoneMSG)

[2025-04-02 02:31:15.695658] !!!!Done with Spark you can shut off the workers now!!!!




In [28]:
#If those code were to fail after reteiving all of the reduced data
#we would be soooooo sad
#to wit this is a simplified output meathod by date
#which can be used to recreate the dataset if the unthinkable
failSafeOutFile = outPath + datetime.now().strftime('%Y-%m-%d_%H%M%S') + '.csv'
lp(f"Writing failsafe data to {failSafeOutFile}")
with open(failSafeOutFile, 'w') as f:
    for key in output.keys():
        f.write(str(key))
        f.write(',')
        f.write(str(output[key][:-1])[1:-1]) #drop runtime not so important and parens
        f.write("\n")
        

lp(f"Done failsafe data to {failSafeOutFile}  [{os.path.getsize(failSafeOutFile)/1E6}MB]!")

[2025-04-02 02:31:15.700292] Writing failsafe data to ./output/2025-04-02_023115.csv
[2025-04-02 02:31:22.501086] Done failsafe data to ./output/2025-04-02_023115.csv  [161.829623MB]!


In [29]:
!aws s3 cp $failSafeOutFile s3://requesterpays.garyscorner.net/datasets/NewOrleansElevation/

upload: output/2025-04-02_023115.csv to s3://requesterpays.garyscorner.net/datasets/NewOrleansElevation/2025-04-02_023115.csv


In [30]:
lp("Starting boxesDf load")
startBoxLoadTime = datetime.now()
outCols = ['AltitudeTotal','WaterTotal','Int','Total','RunTime']
boxesDf.drop(columns=outCols, errors='ignore', inplace=True)
boxesDf = boxesDf.join(pd.DataFrame.from_dict(output, orient='index',columns=outCols))
lp(f"Finished boxesDf load time:  {datetime.now()-startBoxLoadTime}")

[2025-04-02 02:31:24.554681] Starting boxesDf load
[2025-04-02 02:31:36.722228] Finished boxesDf load time:  0:00:12.167476


In [31]:
boxesDf.head()

Unnamed: 0,geometry,AltitudeTotal,WaterTotal,Int,Total,RunTime
845.0,"POLYGON ((776049 3316338, 776049 3316348, 7760...",0,0,0,0,0 days 00:00:01.203105
846.0,"POLYGON ((776049 3316348, 776049 3316358, 7760...",0,0,0,0,0 days 00:00:01.202179
847.0,"POLYGON ((776049 3316358, 776049 3316368, 7760...",0,0,0,0,0 days 00:00:01.199199
848.0,"POLYGON ((776049 3316368, 776049 3316378, 7760...",0,0,0,0,0 days 00:00:01.223570
849.0,"POLYGON ((776049 3316378, 776049 3316388, 7760...",0,0,0,0,0 days 00:00:01.194712


In [32]:
lp(f"Sending boxesDf to {outputFileName}")
boxesDf.to_parquet( outputFileName )
lp(f"Done boxesDf to {outputFileName}")

[2025-04-02 02:31:36.736768] Sending boxesDf to ./output/AggregateLidarData_10m.parquet
[2025-04-02 02:31:42.029622] Done boxesDf to ./output/AggregateLidarData_10m.parquet


In [33]:
lp(f"Mean runtime:  {boxesDf['RunTime'].mean()}")
lp(f"Max runtime:  {boxesDf['RunTime'].max()}")
lp(f"Total runtime:  {boxesDf['RunTime'].sum()}")
lp(f"Totals Min/Mean/Max\t\t{boxesDf['Total'].min()} / {boxesDf['Total'].mean()} / {boxesDf['Total'].max()}")

[2025-04-02 02:31:42.040421] Mean runtime:  0 days 00:00:00.866326127
[2025-04-02 02:31:42.045385] Max runtime:  0 days 00:00:07.243506
[2025-04-02 02:31:42.050255] Total runtime:  54 days 02:09:51.962585
[2025-04-02 02:31:42.056788] Totals Min/Mean/Max		0 / 216.38568540706777 / 2659


In [34]:
if False:
    lp("Altitude plot")
    boxesDf['AltCalc'] = boxesDf['AltitudeTotal'] / boxesDf['Total']
    boxesDf.loc[pd.isna(boxesDf['AltCalc']),'AltCalc'] = 0
    ax = neighborhoodDf.plot(color='pink')
    boxesDf.plot(column='AltCalc',ax=ax)

In [35]:
if False:
    lp("Intense plot")
    boxesDf['IntCalc'] = boxesDf['Int'] / boxesDf['Total']
    boxesDf.loc[pd.isna(boxesDf['IntCalc']),'IntCalc'] = 0
    ax = neighborhoodDf.plot(color='pink')
    boxesDf.plot(column='IntCalc',ax=ax)

In [36]:
lp("Uploading to AWS")
!aws s3 cp $outputFileName s3://requesterpays.garyscorner.net/datasets/NewOrleansElevation/

[2025-04-02 02:31:42.068168] Uploading to AWS
upload: output/AggregateLidarData_10m.parquet to s3://requesterpays.garyscorner.net/datasets/NewOrleansElevation/AggregateLidarData_10m.parquet


In [37]:
lp("Done!")

[2025-04-02 02:31:44.000579] Done!
