# Cloudbutton geospatial use case: 3D fuel mapping for wildfire risk assessment


In [None]:
import pylab
# from sklearn.naive_bayes import GaussianNB
# from sklearn import model_selection
# from sklearn.linear_model import LogisticRegression
# from sklearn.preprocessing import scale, StandardScaler
# from sklearn.metrics import silhouette_score
# from sklearn.cluster import KMeans
# from shapely.geometry import mapping
from IPython.display import Image
# from IPython.core.display import HTML
# from shapely.geometry import mapping
# import rasterio as rio
# from sklearn.pipeline import Pipeline
# from rasterio.mask import mask
# from rasterio.plot import show
# from rasterio.merge import merge
# from rasterio.plot import show
# from rasterio.plot import show_hist
# from rasterio.windows import Window
# from rasterio.plot import reshape_as_raster
# from rasterio.plot import reshape_as_image
from lithops.storage import Storage
import matplotlib.pyplot as plt
# import geopandas as gpd
import pandas as pd
import numpy as np
import lithops
import pathlib
# import shapely
import shutil
# import pickle
import subprocess
# import concurrent.futures
# import joblib
# import gdal
# import earthpy as ep
# import earthpy.spatial as es
import time
# import glob
# import ogr
import os
# import io
# import pdal
# import xarray as xr
import json
# from matplotlib.colors import ListedColormap, BoundaryNorm

In [33]:
DATA_BUCKET_TEST = {'optim-size': 'laz_test_size', 'no-partitioned': 'laz_no_particionado', 'well-partitioned': '', 'partitioned':''}
DATA_BUCKET_TYPE = {'optim-size':'optim-size','TMP': 'no-partitioned-tmp', 'no-partitioned': 'no-partitioned'} #,'well-partitioned': 'well-partitioned', 'partitioned': 'partitioned'}

# LOCAL_INPUT_DIR = fr'E:\data\{DATA_BUCKET_TEST["no-partitioned"]}\20GB\10GB' # 20GB\10GB\2GB\1GB\512MB\256MB\128MB

DATA_BUCKET = f'objects-geospatial-wildfirerisk-{DATA_BUCKET_TYPE["optim-size"]}'
INPUT_DATA_PREFIX = f'data-example/' # Modified

Experiment parameters


In [34]:
FCC_WINDOW = 3
FCC_BREAKPOINT = 0.01

---
Upload dataset
---


In [35]:
storage = Storage()

2022-06-17 09:54:25,594 [INFO] lithops.storage.backends.ibm_cos.ibm_cos -- IBM COS client created - Region: eu-de


In [36]:
'''bucket_objects = storage.list_keys(bucket=DATA_BUCKET)
for subdir, dirs, files in os.walk(LOCAL_INPUT_DIR):
    print(subdir)
    for file_name in files:
        key = os.path.join(INPUT_DATA_PREFIX, file_name)  # Added
        if key not in bucket_objects:   # Changed: if file_name not in bucket_objects:
            with open(os.path.join(subdir, file_name), 'rb') as file: #Changed
                print(f'\tUploading {key}...')
                data = file.read()
                storage.put_object(bucket=DATA_BUCKET, key=key, body=data)
                print('\tOk!')
        else:   # Added
            print(f'\tIt is already uploaded: {key}...')   # Added
print("Finished!")'''

'bucket_objects = storage.list_keys(bucket=DATA_BUCKET)\nfor subdir, dirs, files in os.walk(LOCAL_INPUT_DIR):\n    print(subdir)\n    for file_name in files:\n        key = os.path.join(INPUT_DATA_PREFIX, file_name)  # Added\n        if key not in bucket_objects:   # Changed: if file_name not in bucket_objects:\n            with open(os.path.join(subdir, file_name), \'rb\') as file: #Changed\n                print(f\'\tUploading {key}...\')\n                data = file.read()\n                storage.put_object(bucket=DATA_BUCKET, key=key, body=data)\n                print(\'\tOk!\')\n        else:   # Added\n            print(f\'\tIt is already uploaded: {key}...\')   # Added\nprint("Finished!")'

---
Calculte DEM, DSM and CHM
---


In [37]:
def calculate_models(obj, storage):
    import pdal
    from osgeo import gdal
    from scipy import ndimage
    import time

    start_t = time.time()
    # Create temporary file paths
    tmp_path_prefix = '/tmp/geo/'
    if os.path.exists(tmp_path_prefix):
        shutil.rmtree(tmp_path_prefix)
    for subpath in ['dsm', 'dem', 'chm', 'aspect', 'slope', 'fcc']:
        os.makedirs(os.path.join(tmp_path_prefix, subpath), exist_ok=True)

    las_tile_filename = pathlib.Path(obj.key).name
    tile_key = pathlib.Path(obj.key).stem

    # Save obj to file
    data = obj.data_stream.read()
    input_file_path = os.path.join(tmp_path_prefix, las_tile_filename)
    with open(input_file_path, 'wb') as file:
        file.write(data)

    # DSM pipeline
    dsm_file_path = os.path.join(tmp_path_prefix, 'dsm', tile_key + '.gtiff')
    dsm_pipeline_json = {
        "pipeline": [
            {
                "type": "readers.las",
                "filename": f"{input_file_path}",
                "spatialreference": "EPSG:25830"
            },
            {
                "type": "filters.reprojection",
                "in_srs": "EPSG:25830",
                "out_srs": "EPSG:25830"
            },
            {
                "type": "filters.outlier",
                "method": "radius",
                "radius": 1.0,
                "min_k": 4
            },
            {
                "type": "filters.range",
                # Classification equals 2 (corresponding to noise points in LAS).
                "limits": "Classification![7:7]"
            },
            {
                "type": "filters.range",
                "limits": "returnnumber[1:1]"
            },
            {
                "type": "writers.gdal",
                "gdaldriver": "GTiff",
                "nodata": "-9999",
                "output_type": "max",
                "resolution": 1,
                "filename": f"{dsm_file_path}"
            }
        ]
    }
    dsm_pipeline_json_str = json.dumps(dsm_pipeline_json, indent=4)
    pipeline = pdal.Pipeline(dsm_pipeline_json_str)
    pipeline.validate()
    pipeline.loglevel = 8
    print('Executing DSM pipeline...')
    result = pipeline.execute()
    print(result)

    # DEM pipeline
    dem_file_path = os.path.join(tmp_path_prefix, 'dem', tile_key + '.gtiff')
    dem_pipeline_json = {
        "pipeline": [
            {
                "type": "readers.las",
                "filename": f"{input_file_path}",
                "spatialreference": "EPSG:25830"
            },
            {
                "type": "filters.reprojection",
                "in_srs": "EPSG:25830",
                "out_srs": "EPSG:25830"
            },
            {
                "type": "filters.assign",
                "assignment": "Classification[:]=0"
            },
            {
                "type": "filters.elm"
            },
            {
                "type": "filters.outlier",
                "method": "radius",
                "radius": 1.0,
                "min_k": 4
            },
            {

                "type": "filters.smrf",
                "ignore": "Classification[7:7]",
                "slope": 0.2,
                "window": 16,
                "threshold": 0.45,
                "scalar": 1.2
            },
            {
                "type": "filters.range",
                # Classification equals 2 (corresponding to ground in LAS).
                "limits": "Classification[2:2]",
            },
            {
                "type": "writers.gdal",
                "gdaldriver": "GTiff",
                "nodata": "-9999",
                "output_type": "max",
                "resolution": 1,
                "filename": f"{dem_file_path}"
            }
        ]
    }
    dem_pipeline_json_str = json.dumps(dem_pipeline_json, indent=4)
    pipeline = pdal.Pipeline(dem_pipeline_json_str)
    pipeline.validate()  # Check if json options are good
    pipeline.loglevel = 8
    print('Executing DEM pipeline...')
    result = pipeline.execute()
    print(result)

    # calculate CHM
    chm_file_path = os.path.join(tmp_path_prefix, 'chm', tile_key + '.tiff')
    cmd = ['gdal_calc.py', '-A', dem_file_path, '-B', dsm_file_path,
           '--calc="B-A"', '--NoDataValue=0', '--outfile', chm_file_path]
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
    stdout, stderr = p.communicate()
    print(stdout, stderr)
    # assert p.returncode == 0

    # calculate aspect
    aspect_file_path = os.path.join(tmp_path_prefix, 'aspect', tile_key + '.tiff')
    cmd = ['gdaldem', 'aspect', dem_file_path, aspect_file_path, '-compute_edges']
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
    stdout, stderr = p.communicate()
    print(stdout, stderr)
    # assert p.returncode == 0

    # calculate slope
    slope_file_path = os.path.join(tmp_path_prefix, 'slope', tile_key + '.tiff')
    cmd = ['gdaldem', 'slope', dem_file_path, slope_file_path, '-compute_edges']
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
    stdout, stderr = p.communicate()
    print(stdout, stderr)
    # assert p.returncode == 0

    # calculate FCC
    in_ds = gdal.Open(dem_file_path)
    rows = in_ds.RasterYSize
    cols = in_ds.RasterXSize
    in_band = in_ds.GetRasterBand(1)
    data = in_band.ReadAsArray(0, 0, cols, rows).astype(np.float)
    data[data > FCC_BREAKPOINT] = 1
    data[data <= FCC_BREAKPOINT] = 0

    # Computing fraction on the whole raster through a moving window.
    def _compute_fraction(array):
        nveg = np.sum(array == 1)
        total = len(array)
        out = (nveg/total)*100
        return(out)

    TCC = ndimage.generic_filter(data, _compute_fraction, size=FCC_WINDOW)

    gtiff_driver = gdal.GetDriverByName("GTiff")
    fcc_file_path = os.path.join(tmp_path_prefix, 'fcc', tile_key + '.tiff')
    out_ds = gtiff_driver.Create(fcc_file_path, cols, rows, 1, in_band.DataType)
    out_ds.SetProjection(in_ds.GetProjection())
    out_ds.SetGeoTransform(in_ds.GetGeoTransform())

    out_band = out_ds.GetRasterBand(1)
    out_band.WriteArray(TCC)
    # out_ds.BuildOverviews("Average", [2, 4, 8, 16, 32])
    out_ds.FlushCache()
    del in_ds, out_ds

    outputs = [dsm_file_path, dem_file_path, chm_file_path,
               aspect_file_path, slope_file_path, fcc_file_path]
    for output_path in outputs:
        if os.path.exists(output_path):
            with open(output_path, 'rb') as output_file:
                data = output_file.read()
                cos_key = output_path.replace(tmp_path_prefix, '')
                storage.put_object(bucket=DATA_BUCKET, key=cos_key, body=data)
        else:
            print(f'Failed to upload {output_path}')

    # out = subprocess.check_output(['find', '/tmp/geo/'])
    return f'Time: {time.time() - start_t} seconds -> Key: {obj.key}'

In [38]:
fexec = lithops.FunctionExecutor(monitoring='rabbitmq')

2022-06-17 09:54:25,777 [INFO] lithops.config -- Lithops v2.6.0
2022-06-17 09:54:25,804 [INFO] lithops.storage.backends.ibm_cos.ibm_cos -- IBM COS client created - Region: eu-de
2022-06-17 09:54:25,808 [INFO] lithops.serverless.backends.ibm_cf.ibm_cf -- IBM CF client created - Region: eu-de - Namespace: my-geospatial-wildfirerisk_foundry-geospatial-wildfirerisk-1


In [39]:
fexec.map(calculate_models, f'cos://{DATA_BUCKET}/{INPUT_DATA_PREFIX}')

2022-06-17 09:54:25,836 [INFO] lithops.invokers -- ExecutorID 6b255e-6 | JobID M000 - Selected Runtime: sararl27/lithops-ibm_cf-runtime-3.9:0.1 - 2048MB
2022-06-17 09:54:26,558 [INFO] lithops.invokers -- ExecutorID 6b255e-6 | JobID M000 - Starting function invocation: calculate_models() - Total: 49 activations
2022-06-17 09:54:26,692 [INFO] lithops.invokers -- ExecutorID 6b255e-6 | JobID M000 - View execution logs at C:\Users\saral\AppData\Local\Temp\lithops\logs\6b255e-6-M000.log


[<lithops.future.ResponseFuture at 0x289c9bace80>,
 <lithops.future.ResponseFuture at 0x289c9bac3d0>,
 <lithops.future.ResponseFuture at 0x289c9c121f0>,
 <lithops.future.ResponseFuture at 0x289c9c20970>,
 <lithops.future.ResponseFuture at 0x289c9c20880>,
 <lithops.future.ResponseFuture at 0x289c9c20910>,
 <lithops.future.ResponseFuture at 0x289c9c20940>,
 <lithops.future.ResponseFuture at 0x289c9c20a90>,
 <lithops.future.ResponseFuture at 0x289c9c20ac0>,
 <lithops.future.ResponseFuture at 0x289c9c20af0>,
 <lithops.future.ResponseFuture at 0x289c9c20b20>,
 <lithops.future.ResponseFuture at 0x289c9c20b50>,
 <lithops.future.ResponseFuture at 0x289c9c20b80>,
 <lithops.future.ResponseFuture at 0x289c9c20bb0>,
 <lithops.future.ResponseFuture at 0x289c9c20be0>,
 <lithops.future.ResponseFuture at 0x289c9c20c10>,
 <lithops.future.ResponseFuture at 0x289c9c20c40>,
 <lithops.future.ResponseFuture at 0x289c9c20c70>,
 <lithops.future.ResponseFuture at 0x289c9c20ca0>,
 <lithops.future.ResponseFuture

In [40]:
res = fexec.get_result()

2022-06-17 09:54:27,314 [INFO] lithops.wait -- ExecutorID 6b255e-6 - Getting results from 49 function activations


    0%|          | 0/49  

2022-06-17 10:01:24,047 [INFO] lithops.executors -- ExecutorID 6b255e-6 - Cleaning temporary data


In [41]:
max_t = [0,'']
for r in res:
    tmp = float(r.split(' ')[1])
    if tmp > max_t[0]:
        max_t[0] = tmp
        max_t[1] = r
    print(r)
    print('---')

Time: 149.46400380134583 seconds -> Key: data-example/PNOA_2017_CLM-CAS_278-4470_ORT-CLA-RGB - copia (10) - copia.laz
---
Time: 122.35160899162292 seconds -> Key: data-example/PNOA_2017_CLM-CAS_278-4470_ORT-CLA-RGB - copia (10).laz
---
Time: 151.81607031822205 seconds -> Key: data-example/PNOA_2017_CLM-CAS_278-4470_ORT-CLA-RGB - copia (11) - copia.laz
---
Time: 186.48723888397217 seconds -> Key: data-example/PNOA_2017_CLM-CAS_278-4470_ORT-CLA-RGB - copia (11).laz
---
Time: 112.28131794929504 seconds -> Key: data-example/PNOA_2017_CLM-CAS_278-4470_ORT-CLA-RGB - copia (12) - copia.laz
---
Time: 159.0072054862976 seconds -> Key: data-example/PNOA_2017_CLM-CAS_278-4470_ORT-CLA-RGB - copia (12).laz
---
Time: 93.48393249511719 seconds -> Key: data-example/PNOA_2017_CLM-CAS_278-4470_ORT-CLA-RGB - copia (13) - copia.laz
---
Time: 156.4616961479187 seconds -> Key: data-example/PNOA_2017_CLM-CAS_278-4470_ORT-CLA-RGB - copia (13).laz
---
Time: 182.0946855545044 seconds -> Key: data-example/PNOA_2

In [42]:
print(f'Max = {max_t[1]}')

Max = Time: 408.1469249725342 seconds -> Key: data-example/PNOA_2017_CLM-CAS_278-4470_ORT-CLA-RGB - copia (7).laz


---
Scalability
---

In [43]:
def get_process_cost(fexec):
    import pandas as pd
    df = pd.read_csv(fexec.log_path)
    cost = float(df[df["Job_ID"] == "Summary"]["Cost"])
    return cost

#### Time “calculate_models”

In [44]:
# Statistics:
endTime = set()
startTime = set()

for future in fexec.futures:
    for key in future.stats.keys():
        if key.endswith("worker_func_start_tstamp"):
            startTime.add(future.stats[key])
        if key.endswith("worker_func_end_tstamp"):
            endTime.add(future.stats[key])

timeExec = max(endTime) - min(startTime)

In [45]:
print(f"Time: {timeExec} seconds")

Time: 414.03885197639465 seconds


In [None]:
fexec.plot(dst="plots/scalability_wildFireRisk")

In [None]:
Image(filename="plots/scalability_wildFireRisk_histogram.png")

In [None]:
Image(filename="plots/scalability_wildFireRisk_timeline.png")

#### Cost "calculate_models"

In [None]:
fexec.job_summary()

In [None]:
pd.read_csv(fexec.log_path)

In [None]:
cost_interpolation = get_process_cost(fexec)
print(f"The experiment cost of 'calculate_models' ${cost_interpolation:.4f}.")

---
Throughput
---

In [None]:
fexec = lithops.FunctionExecutor()
storage = Storage()

### Read

In [None]:
def storage_performance_read(key_name, storage):
    get_t0 = time.time()
    storage.get_object(bucket=DATA_BUCKET, key=key_name)
    get_t1 = time.time()
    get_sz = len(key_name)
    read_bandwidth_mb = get_sz / (get_t1 - get_t0) / 1e6

    stats = {'t0': get_t0, 't1': get_t1, 'bandwidth': read_bandwidth_mb, 'size': get_sz}

    return stats

In [None]:
data_file_read = [str(obj['Key']) for obj in storage.list_objects(DATA_BUCKET) if str(obj['Key']).endswith('.laz') or str(obj['Key']).endswith('.las')]

fexec = lithops.FunctionExecutor()
fs = fexec.map(storage_performance_read, data_file_read)
read_results = fexec.get_result(fs=fs)

### Write

In [None]:
def storage_performance_write(obj, size_file, storage):
    data = obj.data_stream.read()
    put_sz = size_file # Modified, get the size in bytes of the data to upload
    put_t0 = time.time()
    storage.put_object(bucket=DATA_BUCKET,  key=obj.key, body=data)
    put_t1 = time.time()
    write_bandwidth_mb = put_sz / (put_t1 - put_t0) / 1e6

    stats = {'t0': put_t0, 't1': put_t1, 'bandwidth': write_bandwidth_mb, 'size': put_sz}

    return stats

In [None]:
data_file_write = [{'obj':(f'cos://{DATA_BUCKET}/'+obj["Key"]), 'size_file': obj['Size']} for obj in storage.list_objects(DATA_BUCKET) if str(obj['Key']).endswith('.tiff') or str(obj['Key']).endswith('.gtiff')]
fexec = lithops.FunctionExecutor()
fs = fexec.map(storage_performance_write, data_file_write)
write_results = fexec.get_result(fs=fs)

### Resume

In [None]:
def create_agg_bdwth_plot(res_write, res_read, dst):
    def compute_times_rates(start_time, d):
        x = np.array(d)
        tzero = start_time
        tr_start_time = x[:, 0] - tzero
        tr_end_time = x[:, 1] - tzero
        rate = x[:, 2]

        N = len(tr_start_time)
        runtime_rate_hist = np.zeros((N, len(runtime_bins)))

        for i in range(N):
            s = tr_start_time[i]
            e = tr_end_time[i]
            a, b = np.searchsorted(runtime_bins, [s, e])
            if b-a > 0:
                runtime_rate_hist[i, a:b] = rate[i]

        return {'start_time': tr_start_time,
                'end_time': tr_end_time,
                'rate': rate,
                'runtime_rate_hist': runtime_rate_hist}

    start_time = min((min(t['t0'] for t in res_write), (min(t['t0'] for t in res_read)))) - 1

    fig = pylab.figure(figsize=(8, 6))
    ax = fig.add_subplot(1, 1, 1)
    for datum, l in [(res_write, 'Aggregate Write Bandwidth'), (res_read, 'Aggregate Read Bandwidth')]:
        mb_rates = [(res['t0'], res['t1'], res['bandwidth']) for res in datum]
        max_seconds = int(max([mr[1]-start_time for mr in mb_rates])*1.2)
        max_seconds = 8 * round(max_seconds/8)
        runtime_bins = np.linspace(0, max_seconds, max_seconds)

        mb_rates_hist = compute_times_rates(start_time, mb_rates)

        ax.plot(mb_rates_hist['runtime_rate_hist'].sum(axis=0)/1000, label=l)

    ax.set_xlabel('Execution Time (sec)')
    ax.set_ylabel("GB/sec")
    ax.set_xlim(0, )
    ax.set_ylim(0, )
    pylab.legend()
    pylab.grid(True, axis='y')

    dst = os.path.expanduser(dst) if '~' in dst else dst
    fig.tight_layout()
    fig.savefig(dst, format='png')

In [None]:
create_agg_bdwth_plot(write_results, read_results, './plots/throughput_storage_kpi.png')
Image(filename="plots/throughput_storage_kpi.png")

In [None]:
def calc_mean_throughput(dict):
    res = 0
    for val in dict:
        res += val['bandwidth']
    return (res/len(dict))

In [None]:
print(f'Mean throughput read: {calc_mean_throughput(read_results)} MB/s')
print(f'Mean throughput write: {calc_mean_throughput(write_results)} MB/s')

### Time "throughput"

In [None]:
# Statistics:
endTime = set()
startTime = set()

for future in fexec.futures:
    for key in future.stats.keys():
        if key.endswith("worker_func_start_tstamp"):
            startTime.add(future.stats[key])
        if key.endswith("worker_func_end_tstamp"):
            endTime.add(future.stats[key])

timeExec = max(endTime) - min(startTime)

In [None]:
print(f"Time: {timeExec} seconds")

In [None]:
fexec.plot(dst="plots/scalability_throughput")

In [None]:
Image(filename="plots/scalability_throughput_histogram.png")

In [None]:
Image(filename="plots/scalability_throughput_timeline.png")

#### Cost throughput

In [None]:
fexec.job_summary()

In [None]:
pd.read_csv(fexec.log_path)

In [None]:
cost_interpolation = get_process_cost(fexec)
print(f"The experiment cost of 'throughput' ${cost_interpolation:.4f}.")