# Cloudbutton geospatial use case: Model calculation

In [2]:
import os
import pathlib
import json
import subprocess
import shutil

import lithops
from lithops.storage import Storage

import numpy as np
#import pdal
from osgeo import gdal
from scipy import ndimage

&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "conn=None">])
&&&&&&&&&&&&&&&&&&& args (<IPython.core.history.HistoryManager object at 0x7f2797cfa280>, <sqlite3.Connection object at 0x7f2797ed28a0>)


In [8]:
DATA_BUCKET = 'cb-geospatial-wildfire1'
COMPUTE_BACKEND = 'aws_lambda'
STORAGE_BACKEND = 'aws_s3'
STORAGE_PREFIX = 's3://'
INPUT_DATA_PREFIX = 'input-las-tiles/'

&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "conn=None">])
&&&&&&&&&&&&&&&&&&& args (<IPython.core.history.HistoryManager object at 0x7f2797cfa280>, <sqlite3.Connection object at 0x7f2797ed28a0>)


Experiment parameters


In [9]:
FCC_WINDOW = 3
FCC_BREAKPOINT = 0.01

&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "conn=None">])
&&&&&&&&&&&&&&&&&&& args (<IPython.core.history.HistoryManager object at 0x7f2797cfa280>, <sqlite3.Connection object at 0x7f2797ed28a0>)


---
Upload dataset
---


In [5]:
LOCAL_INPUT_DIR = 'input-las-tiles'

&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "conn=None">])
&&&&&&&&&&&&&&&&&&& args (<IPython.core.history.HistoryManager object at 0x7f2797cfa280>, <sqlite3.Connection object at 0x7f2797ed28a0>)


In [10]:
storage = Storage(backend=STORAGE_BACKEND)

&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "conn=None">])
&&&&&&&&&&&&&&&&&&& args (<IPython.core.history.HistoryManager object at 0x7f2797cfa280>, <sqlite3.Connection object at 0x7f2797ed28a0>)


In [11]:
bucket_objects = storage.list_keys(bucket=DATA_BUCKET)

for file_name in os.listdir(LOCAL_INPUT_DIR):
    if file_name not in bucket_objects:
        key = os.path.join(INPUT_DATA_PREFIX, file_name)
        with open(os.path.join(LOCAL_INPUT_DIR, file_name), 'rb') as file:
            print(f'Uploading {key}...')
            data = file.read()
            storage.put_object(bucket=DATA_BUCKET, key=key, body=data)
            print('Ok!')

&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "conn=None">])
&&&&&&&&&&&&&&&&&&& args (<IPython.core.history.HistoryManager object at 0x7f2797cfa280>, <sqlite3.Connection object at 0x7f2797ed28a0>)
Uploading input-las-tiles/lasfile.las...
Ok!


---
Calculte DEM, DSM and CHM
---


In [12]:
storage.list_keys(bucket=DATA_BUCKET)

&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "conn=None">])
&&&&&&&&&&&&&&&&&&& args (<IPython.core.history.HistoryManager object at 0x7f2797cfa280>, <sqlite3.Connection object at 0x7f2797ed28a0>)
&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "obj">])
&&&&&&&&&&&&&&&&&&& args (<IPython.core.formatters.IPythonDisplayFormatter object at 0x7f27479d9e20>, ['input-las-tiles/lasfile.las'])
&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "obj">, <Parameter "include=None">, <Parameter "exclude=None">])
&&&&&&&&&&&&&&&&&&& args (<IPython.core.formatters.MimeBundleFormatter object at 0x7f27479d9cd0>, ['input-las-tiles/lasfile.las'])
&&&&&&&&&&&&&&&&&&&&&& param include=None
&&&&&&&&&&&&&&&&&&&&&& param.kind POSITIONAL_OR_KEYWORD
&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "obj">])
&&&&&&&&&&&&&&&&&&& args (<IPython.core.f

['input-las-tiles/lasfile.las']

In [21]:
def calculate_models(obj, storage):
    # Create temporary file paths
    import pdal
    tmp_path_prefix = '/tmp/geo/'
    if os.path.exists(tmp_path_prefix):
        shutil.rmtree(tmp_path_prefix)
    for subpath in ['dsm', 'dem', 'chm', 'aspect', 'slope', 'fcc']:
        os.makedirs(os.path.join(tmp_path_prefix, subpath), exist_ok=True)

    las_tile_filename = pathlib.Path(obj.key).name
    tile_key = pathlib.Path(obj.key).stem

    # Save obj to file
    data = obj.data_stream.read()
    input_file_path = os.path.join(tmp_path_prefix, las_tile_filename)
    with open(input_file_path, 'wb') as file:
        file.write(data)

    # DSM pipeline
    dsm_file_path = os.path.join(tmp_path_prefix, 'dsm', tile_key + '.gtiff')
    dsm_pipeline_json = {
        "pipeline": [
            {
                "type": "readers.las",
                "filename": f"{input_file_path}",
                "spatialreference": "EPSG:25830"
            },
            {
                "type": "filters.reprojection",
                "in_srs": "EPSG:25830",
                "out_srs": "EPSG:25830"
            },
            {
                "type": "filters.outlier",
                "method": "radius",
                "radius": 1.0,
                "min_k": 4
            },
            {
                "type": "filters.range",
                # Classification equals 2 (corresponding to noise points in LAS).
                "limits": "Classification![7:7]"
            },
            {
                "type": "filters.range",
                "limits": "returnnumber[1:1]"
            },
            {
                "type": "writers.gdal",
                "gdaldriver": "GTiff",
                "nodata": "-9999",
                "output_type": "max",
                "resolution": 1,
                "filename": f"{dsm_file_path}"
            }
        ]
    }
    dsm_pipeline_json_str = json.dumps(dsm_pipeline_json, indent=4)
    pipeline = pdal.Pipeline(dsm_pipeline_json_str)
    #pipeline.validate()
    #pipeline.loglevel = 8
    print('Executing DSM pipeline...')
    result = pipeline.execute()
    print(result)

    # DEM pipeline
    dem_file_path = os.path.join(tmp_path_prefix, 'dem', tile_key + '.gtiff')
    dem_pipeline_json = {
        "pipeline": [
            {
                "type": "readers.las",
                "filename": f"{input_file_path}",
                "spatialreference": "EPSG:25830"
            },
            {
                "type": "filters.reprojection",
                "in_srs": "EPSG:25830",
                "out_srs": "EPSG:25830"
            },
            {
                "type": "filters.assign",
                "assignment": "Classification[:]=0"
            },
            {
                "type": "filters.elm"
            },
            {
                "type": "filters.outlier",
                "method": "radius",
                "radius": 1.0,
                "min_k": 4
            },
            {

                "type": "filters.smrf",
                "ignore": "Classification[7:7]",
                "slope": 0.2,
                "window": 16,
                "threshold": 0.45,
                "scalar": 1.2
            },
            {
                "type": "filters.range",
                # Classification equals 2 (corresponding to ground in LAS).
                "limits": "Classification[2:2]",
            },
            {
                "type": "writers.gdal",
                "gdaldriver": "GTiff",
                "nodata": "-9999",
                "output_type": "max",
                "resolution": 1,
                "filename": f"{dem_file_path}"
            }
        ]
    }
    dem_pipeline_json_str = json.dumps(dem_pipeline_json, indent=4)
    pipeline = pdal.Pipeline(dem_pipeline_json_str)
    #pipeline.validate()  # Check if json options are good
    #pipeline.loglevel = 8
    print('Executing DEM pipeline...')
    result = pipeline.execute()
    print(result)

    # calculate CHM
    chm_file_path = os.path.join(tmp_path_prefix, 'chm', tile_key + '.tiff')
    cmd = ['gdal_calc.py', '-A', dem_file_path, '-B', dsm_file_path,
           '--calc="B-A"', '--NoDataValue=0', '--outfile', chm_file_path]
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
    stdout, stderr = p.communicate()
    print(stdout, stderr)
    # assert p.returncode == 0

    # calculate aspect
    aspect_file_path = os.path.join(tmp_path_prefix, 'aspect', tile_key + '.tiff')
    cmd = ['gdaldem', 'aspect', dem_file_path, aspect_file_path, '-compute_edges']
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
    stdout, stderr = p.communicate()
    print(stdout, stderr)
    # assert p.returncode == 0

    # calculate slope
    slope_file_path = os.path.join(tmp_path_prefix, 'slope', tile_key + '.tiff')
    cmd = ['gdaldem', 'slope', dem_file_path, slope_file_path, '-compute_edges']
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
    stdout, stderr = p.communicate()
    print(stdout, stderr)
    # assert p.returncode == 0

    # calculate FCC
    in_ds = gdal.Open(dem_file_path)
    rows = in_ds.RasterYSize
    cols = in_ds.RasterXSize
    in_band = in_ds.GetRasterBand(1)
    data = in_band.ReadAsArray(0, 0, cols, rows).astype(float)
    data[data > FCC_BREAKPOINT] = 1
    data[data <= FCC_BREAKPOINT] = 0

    # Computing fraction on the whole raster through a moving window.
    def _compute_fraction(array):
        nveg = np.sum(array == 1)
        total = len(array)
        out = (nveg/total)*100
        return(out)

    TCC = ndimage.generic_filter(data, _compute_fraction, size=FCC_WINDOW)

    gtiff_driver = gdal.GetDriverByName("GTiff")
    fcc_file_path = os.path.join(tmp_path_prefix, 'fcc', tile_key + '.tiff')
    out_ds = gtiff_driver.Create(fcc_file_path, cols, rows, 1, in_band.DataType)
    out_ds.SetProjection(in_ds.GetProjection())
    out_ds.SetGeoTransform(in_ds.GetGeoTransform())

    out_band = out_ds.GetRasterBand(1)
    out_band.WriteArray(TCC)
    # out_ds.BuildOverviews("Average", [2, 4, 8, 16, 32])
    out_ds.FlushCache()
    del in_ds, out_ds

    outputs = [dsm_file_path, dem_file_path, chm_file_path,
               aspect_file_path, slope_file_path, fcc_file_path]
    for output_path in outputs:
        if os.path.exists(output_path):
            with open(output_path, 'rb') as output_file:
                data = output_file.read()
                cos_key = output_path.replace(tmp_path_prefix, '')
                storage.put_object(bucket=DATA_BUCKET, key=cos_key, body=data)
        else:
            print(f'Failed to upload {output_path}')

    out = subprocess.check_output(['find', '/tmp/geo/'])
    return out

&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "conn=None">])
&&&&&&&&&&&&&&&&&&& args (<IPython.core.history.HistoryManager object at 0x7f2797cfa280>, <sqlite3.Connection object at 0x7f2797ed28a0>)


In [14]:
fexec = lithops.FunctionExecutor(backend="aws_lambda", storage="aws_s3", runtime="tmc:09", runtime_memory=2048)

&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "conn=None">])
&&&&&&&&&&&&&&&&&&& args (<IPython.core.history.HistoryManager object at 0x7f2797cfa280>, <sqlite3.Connection object at 0x7f2797ed28a0>)


2023-10-06 22:13:45,249 [INFO] config.py:141 -- Lithops v2.9.1.dev0 - Python3.8
2023-10-06 22:13:47,017 [INFO] aws_s3.py:68 -- S3 client created - Region: us-west-2
2023-10-06 22:13:59,621 [INFO] aws_lambda.py:94 -- AWS Lambda client created - Region: us-west-2


In [15]:
os.path.join(STORAGE_PREFIX, DATA_BUCKET, INPUT_DATA_PREFIX)

&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "obj">])
&&&&&&&&&&&&&&&&&&& args (<IPython.core.formatters.IPythonDisplayFormatter object at 0x7f27479d9e20>, 's3://cb-geospatial-wildfire1/input-las-tiles/')
&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "obj">, <Parameter "include=None">, <Parameter "exclude=None">])
&&&&&&&&&&&&&&&&&&& args (<IPython.core.formatters.MimeBundleFormatter object at 0x7f27479d9cd0>, 's3://cb-geospatial-wildfire1/input-las-tiles/')
&&&&&&&&&&&&&&&&&&&&&& param include=None
&&&&&&&&&&&&&&&&&&&&&& param.kind POSITIONAL_OR_KEYWORD
&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "obj">])
&&&&&&&&&&&&&&&&&&& args (<IPython.core.formatters.PlainTextFormatter object at 0x7f27479d9b80>, 's3://cb-geospatial-wildfire1/input-las-tiles/')
&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "obj">])
&&&&&&

's3://cb-geospatial-wildfire1/input-las-tiles/'

In [22]:
fs = fexec.map(calculate_models, os.path.join(STORAGE_PREFIX, DATA_BUCKET, INPUT_DATA_PREFIX))

2023-10-06 22:23:04,735 [INFO] invokers.py:108 -- ExecutorID 082df0-0 | JobID M002 - Selected Runtime: tmc:09 - 2048MB


&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "conn=None">])
&&&&&&&&&&&&&&&&&&& args (<IPython.core.history.HistoryManager object at 0x7f2797cfa280>, <sqlite3.Connection object at 0x7f2797ed28a0>)
*******************iterdata s3://cb-geospatial-wildfire1/input-las-tiles/
*******************extra_args None
*******************data ['s3://cb-geospatial-wildfire1/input-las-tiles/']
*******************func <function calculate_models at 0x7f27513098b0>
*******************new_parameters [<Parameter "obj">]
&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "obj">])
&&&&&&&&&&&&&&&&&&& args ('s3://cb-geospatial-wildfire1/input-las-tiles/',)


2023-10-06 22:23:06,895 [INFO] invokers.py:172 -- ExecutorID 082df0-0 | JobID M002 - Starting function invocation: calculate_models() - Total: 1 activations
2023-10-06 22:23:06,911 [INFO] invokers.py:208 -- ExecutorID 082df0-0 | JobID M002 - View execution logs at /tmp/lithops-lithops/logs/082df0-0-M002.log


In [23]:
res = fexec.get_result(fs=fs)

2023-10-06 22:23:21,891 [INFO] wait.py:98 -- ExecutorID 082df0-0 - Getting results from 1 function activations


&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "conn=None">])
&&&&&&&&&&&&&&&&&&& args (<IPython.core.history.HistoryManager object at 0x7f2797cfa280>, <sqlite3.Connection object at 0x7f2797ed28a0>)
&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "obj">])
&&&&&&&&&&&&&&&&&&& args (<IPython.core.formatters.IPythonDisplayFormatter object at 0x7f27479d9e20>,     0%|          | 0/1  )
&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "obj">, <Parameter "include=None">, <Parameter "exclude=None">])
&&&&&&&&&&&&&&&&&&& args (<IPython.core.formatters.MimeBundleFormatter object at 0x7f27479d9cd0>,     0%|          | 0/1  )
&&&&&&&&&&&&&&&&&&&&&& param include=None
&&&&&&&&&&&&&&&&&&&&&& param.kind POSITIONAL_OR_KEYWORD
&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "obj">])
&&&&&&&&&&&&&&&&&&& args (<IPython.core.formatters.HTML

    0%|          | 0/1  

2023-10-06 22:23:22,020 [INFO] executors.py:593 -- ExecutorID 082df0-0 - Cleaning temporary data


In [24]:
for r in res:
    print(r.decode('utf-8').strip())
    print('---')

/tmp/geo/
/tmp/geo/chm
/tmp/geo/chm/lasfile.tiff
/tmp/geo/fcc
/tmp/geo/fcc/lasfile.tiff
/tmp/geo/aspect
/tmp/geo/aspect/lasfile.tiff
/tmp/geo/dem
/tmp/geo/dem/lasfile.gtiff
/tmp/geo/slope
/tmp/geo/slope/lasfile.tiff
/tmp/geo/dsm
/tmp/geo/dsm/lasfile.gtiff
/tmp/geo/lasfile.las
---
&&&&&&&&&&&&&&&&&&& self.parameters.values() odict_values([<Parameter "self">, <Parameter "conn=None">])
&&&&&&&&&&&&&&&&&&& args (<IPython.core.history.HistoryManager object at 0x7f2797cfa280>, <sqlite3.Connection object at 0x7f2797ed28a0>)
