# Parallel ingestion

In this notebook we will take several LAS files and ingest them in parallel using the TileDB driver in PDAL

You will need the aws cli for this,a please take the time to do install this.

We will first copy a subset of the available data

In [78]:
import glob
import multiprocessing as mp
import numpy as np
import os
import shutil
import tiledb

try:
    shutil.rmtree("tmp")
except:
    pass

os.mkdir("tmp")

!aws s3 sync s3://pc-bulk/NZ20_AbelTas/ ./tmp/ --exclude "*" --include "CL2_BN24_2020_1000_45*" --endpoint-url https://opentopography.s3.sdsc.edu --no-sign-request

download: s3://pc-bulk/NZ20_AbelTas/CL2_BN24_2020_1000_4523.lax to tmp/CL2_BN24_2020_1000_4523.lax
download: s3://pc-bulk/NZ20_AbelTas/CL2_BN24_2020_1000_4525.lax to tmp/CL2_BN24_2020_1000_4525.lax
download: s3://pc-bulk/NZ20_AbelTas/CL2_BN24_2020_1000_4524.lax to tmp/CL2_BN24_2020_1000_4524.lax
download: s3://pc-bulk/NZ20_AbelTas/CL2_BN24_2020_1000_4526.lax to tmp/CL2_BN24_2020_1000_4526.lax
download: s3://pc-bulk/NZ20_AbelTas/CL2_BN24_2020_1000_4527.lax to tmp/CL2_BN24_2020_1000_4527.lax
download: s3://pc-bulk/NZ20_AbelTas/CL2_BN24_2020_1000_4528.lax to tmp/CL2_BN24_2020_1000_4528.lax
download: s3://pc-bulk/NZ20_AbelTas/CL2_BN24_2020_1000_4524.laz to tmp/CL2_BN24_2020_1000_4524.laz
download: s3://pc-bulk/NZ20_AbelTas/CL2_BN24_2020_1000_4525.laz to tmp/CL2_BN24_2020_1000_4525.laz
download: s3://pc-bulk/NZ20_AbelTas/CL2_BN24_2020_1000_4526.laz to tmp/CL2_BN24_2020_1000_4526.laz
download: s3://pc-bulk/NZ20_AbelTas/CL2_BN24_2020_1000_4523.laz to tmp/CL2_BN24_2020_1000_4523.laz
download: 

List all the available data to check it is there

In [79]:
!ls ./data

ls: cannot access './data': No such file or directory


In [80]:
# how many cpu do we have
mp.cpu_count()

16

In [94]:
inputs = glob.glob("tmp/CL2_BN24*.laz")
inputs

['tmp/CL2_BN24_2020_1000_4527.laz',
 'tmp/CL2_BN24_2020_1000_4525.laz',
 'tmp/CL2_BN24_2020_1000_4523.laz',
 'tmp/CL2_BN24_2020_1000_4528.laz',
 'tmp/CL2_BN24_2020_1000_4526.laz',
 'tmp/CL2_BN24_2020_1000_4524.laz']

Merge serially using PDAL

In [82]:
%%time
!pdal merge tmp/*.laz arrays/soutput_array.laz

CPU times: user 389 ms, sys: 212 ms, total: 601 ms
Wall time: 28.2 s


Inspect one of the files using PDAL

In [83]:
!pdal info tmp/CL2_BN24_2020_1000_4527.laz

{
  "file_size": 14947473,
  "filename": "tmp/CL2_BN24_2020_1000_4527.laz",
  "now": "2021-09-22T20:44:49-0500",
  "pdal_version": "2.3.0 (git-version: 0800a2)",
  "reader": "readers.las",
  "stats":
  {
    "bbox":
    {
      "EPSG:4326":
      {
        "bbox":
        {
          "maxx": 172.7268292,
          "maxy": -40.80979276,
          "maxz": 215.4225721,
          "minx": 172.7211112,
          "miny": -40.81629235,
          "minz": 67.11381431
        },
        "boundary": { "type": "Polygon", "coordinates": [ [ [ 172.721111152471678, -40.816278733288371, 67.14097855227844 ], [ 172.721138310131067, -40.809792757220528, 67.113814310393153 ], [ 172.726829184186613, -40.809806370905136, 215.397103739775559 ], [ 172.726802580730578, -40.816292350068267, 215.422572079407615 ], [ 172.721111152471678, -40.816278733288371, 67.14097855227844 ] ] ] }
      },
      "native":
      {
        "bbox":
        {
          "maxx": 1576959.99,
          "maxy": 5482319.99,
          "ma

First create the empty array schema

In [100]:
parallel_array = "arrays/parallel_lidar"

try:
    shutil.rmtree(array_name)
except:
    pass

bbox = {
    "maxx": 1577440,
    "maxy": 5482320,
    "maxz": 857,
    "minx": 1574559,
    "miny": 5481599,
    "minz": -103
}       
        
zstd_compression = tiledb.FilterList([tiledb.ZstdFilter(level=7)])
bzip2_compression = tiledb.FilterList([tiledb.Bzip2Filter(level=5)])
gzip_compression = tiledb.FilterList([tiledb.Bzip2Filter(level=9)])

dom = tiledb.Domain(
    tiledb.Dim(name="X", domain=(bbox['minx'], bbox['maxx']), dtype=np.float32, filters=zstd_compression),
    tiledb.Dim(name="Y", domain=(bbox['miny'], bbox['maxy']), dtype=np.float32, filters=zstd_compression),
    tiledb.Dim(name='Z', domain=(bbox['minz'], bbox['maxz']), dtype=np.float32, filters=zstd_compression)
)

attrs=(
        tiledb.Attr(name='Intensity', filters=bzip2_compression, dtype=np.uint16),
        tiledb.Attr(name='ReturnNumber', filters=zstd_compression, dtype=np.uint8),
        tiledb.Attr(name='NumberOfReturns', filters=zstd_compression, dtype=np.uint8),
        tiledb.Attr(name='ScanDirectionFlag', filters=bzip2_compression, dtype=np.uint8),
        tiledb.Attr(name='EdgeOfFlightLine', filters=bzip2_compression, dtype=np.uint8),
        tiledb.Attr(name='Classification', filters=zstd_compression, dtype=np.uint8),
        tiledb.Attr(name='ScanAngleRank', filters=bzip2_compression, dtype=np.float32),
        tiledb.Attr(name='UserData', filters=gzip_compression, dtype=np.uint8),
        tiledb.Attr(name='PointSourceId', filters=bzip2_compression, dtype=np.uint16),
        tiledb.Attr(name='GpsTime', filters=zstd_compression, dtype=np.float64),
        tiledb.Attr(name='ScanChannel', filters=bzip2_compression, dtype=np.uint8),
        tiledb.Attr(name='ClassFlags', filters=gzip_compression, dtype=np.uint8),
)

tiledb_schema = tiledb.ArraySchema(
    domain=dom,
    sparse=True,
    attrs=attrs,
    tile_order='row-major',
    capacity=100_000,
    allows_duplicates=True,
    coords_filters=zstd_compression,
    cell_order="hilbert"
)

tiledb.Array.create(array_name, tiledb_schema)

Group the input files in tasks of 3 files

In [101]:
def batch(iterable, n=1):
    l = len(iterable)
    for ndx in range(0, l, n):
        yield iterable[ndx:min(ndx + n, l)]

tasks = list(batch(inputs, 1))       
        
print(tasks)

[['tmp/CL2_BN24_2020_1000_4527.laz'], ['tmp/CL2_BN24_2020_1000_4525.laz'], ['tmp/CL2_BN24_2020_1000_4523.laz'], ['tmp/CL2_BN24_2020_1000_4528.laz'], ['tmp/CL2_BN24_2020_1000_4526.laz'], ['tmp/CL2_BN24_2020_1000_4524.laz']]


In [102]:
%%time

pipeline = """
[
  %s,
  {
    "type": "writers.tiledb",
    "array_name": "%s",
    "chunk_size": 100000000,
    "append": true
  }
]
"""

def ingest(id, files):
    import subprocess
    inputs = "\"" + "\",\"".join(files) + "\""
    task = pipeline % (inputs, parallel_array)
    with open(f"{id}.json", "w") as f:
        f.write(task)
        
    return subprocess.run(["pdal", "pipeline", "-i", f"{id}.json"])    
    
pool = mp.Pool(mp.cpu_count())
results = [pool.apply(ingest, args=(i, t,)) for i,t in enumerate(tasks)]

PDAL: [TileDB::Dimension] Error: Coordinate 0.000292969 is out of domain bounds [5.4816e+06, 5.48232e+06] on dimension 'Y'

PDAL: [TileDB::Dimension] Error: Coordinate 1.66058e+38 is out of domain bounds [-103, 857] on dimension 'Z'

PDAL: [TileDB::Dimension] Error: Coordinate -2.40324e+36 is out of domain bounds [-103, 857] on dimension 'Z'

PDAL: [TileDB::Dimension] Error: Coordinate 6.35275e-24 is out of domain bounds [5.4816e+06, 5.48232e+06] on dimension 'Y'

PDAL: [TileDB::Dimension] Error: Coordinate 1.14881e+28 is out of domain bounds [1.57456e+06, 1.57744e+06] on dimension 'X'



CPU times: user 0 ns, sys: 173 ms, total: 173 ms
Wall time: 1min 1s


PDAL: [TileDB::Dimension] Error: Coordinate -6.30823e+06 is out of domain bounds [-103, 857] on dimension 'Z'

