# Cloudbutton Geospatial: Water Consumption Workflow

---

In [1]:
import sys
sys.path.append('../')

In [2]:
from collections import defaultdict
from cloudbutton_geospatial.io_utils.plot import plot_results
from cloudbutton_geospatial.utils.notebook import date_picker
from rasterio.windows import Window
from scipy.spatial import distance_matrix
from shapely.geometry import Point, MultiPoint, box
from pprint import pprint
import functools
import collections
import datetime
import os
import shutil
import math
import numpy as np
import pandas as pd
import lithops
import requests
import rasterio
import fiona
import json
import random
import re
import tempfile
import concurrent.futures
from IPython.display import Image
import matplotlib.pyplot as plt
from lithops.storage import Storage
from lithops.storage.utils import StorageNoSuchKeyError
from io import BytesIO

## Workflow parameters

Area outside the processed tile that we want to consider for taking SIAM stations into account:

In [3]:
AREA_OF_INFLUENCE = 16000

Lithops Variables:

In [4]:
DATA_BUCKET = 'cloudbutton-geospatial-wc'
COMPUTE_BACKEND = 'aws_lambda'
STORAGE_BACKEND = 'aws_s3'
STORAGE_PREFIX = 's3://'
RUNTIME = 'aitorarjona/cloudbutton-geospatial-wc:01'
RUNTIME_MEMORY = 2048

In [5]:
DTM_PREFIX = 'DTMs/'
DTM_ASC_PREFIX = 'DTMs/asc/'
DTM_GEOTIFF_PREFIX = 'DTMs/geotiff/'

Split tile into square chunks (number of tiles = SPLITS^2):

In [6]:
SPLITS = 3

Correlation coefficient between elevation and temperature:

In [7]:
r = -0.0056

Elevation to interpolate temperature:

In [8]:
zdet = 2000

Day of year to calculate solar irradiation:

In [9]:
date = date_picker(default=datetime.date(2022, 5, 15))

DatePicker(value=datetime.date(2022, 5, 15), description='Pick a Date')

In [10]:
DAY_OF_YEAR = date.value.timetuple().tm_yday
DAY_OF_YEAR

135

Initialize Lithops Storage and Function Executor:

In [11]:
storage = lithops.storage.Storage(backend=STORAGE_BACKEND)

In [12]:
fexec = lithops.FunctionExecutor(backend=COMPUTE_BACKEND, storage=STORAGE_BACKEND, runtime=RUNTIME, runtime_memory=RUNTIME_MEMORY)

2022-06-28 20:43:44,803 [INFO] config.py:139 -- Lithops v3.1.0 - Python3.10
2022-06-28 20:43:44,818 [INFO] aws_s3.py:68 -- S3 client created - Region: us-east-1
2022-06-28 20:43:44,819 [INFO] aws_lambda.py:106 -- AWS Lambda client created - Region: us-east-1


## Data preparation

### SIAM data

In [13]:
siam_data_key = 'siam_data.csv'
try:
    siam_data_head = storage.head_object(bucket=DATA_BUCKET, key=siam_data_key)
    print(f'SIAM meteo data already in storage: {siam_data_head}')
except StorageNoSuchKeyError:
    print('Uploading SIAM meteo data to Object Storage...')
    with open(siam_data_key, 'rb') as f:
        storage.put_object(bucket=DATA_BUCKET, key=siam_data_key, body=f)

SIAM meteo data already in storage: {'date': 'Tue, 28 Jun 2022 18:43:45 GMT', 'x-clv-request-id': 'c0268a02-dfae-4cf5-90d6-770e09f2b24e', 'server': 'Cleversafe', 'x-clv-s3-version': '2.5', 'accept-ranges': 'bytes', 'x-amz-request-id': 'c0268a02-dfae-4cf5-90d6-770e09f2b24e', 'etag': '"8a1fd5da76b1123e66cc0155e6c8f5f7"', 'content-type': 'text/csv', 'last-modified': 'Mon, 06 Jun 2022 08:53:34 GMT', 'content-length': '3850'}


### Shapefile

In [14]:
shapefile_key = 'shapefile_murcia.zip'
try:
    shapefile_head = storage.head_object(bucket=DATA_BUCKET, key=shapefile_key)
    print(f'Shapefile already in storage: {siam_data_head}')
except StorageNoSuchKeyError:
    print('Uploading shapefile to Object Storage...')
    with open(shapefile_key, 'rb') as f:
        storage.put_object(bucket=DATA_BUCKET, key=shapefile_key, body=f)

Shapefile already in storage: {'date': 'Tue, 28 Jun 2022 18:43:45 GMT', 'x-clv-request-id': 'c0268a02-dfae-4cf5-90d6-770e09f2b24e', 'server': 'Cleversafe', 'x-clv-s3-version': '2.5', 'accept-ranges': 'bytes', 'x-amz-request-id': 'c0268a02-dfae-4cf5-90d6-770e09f2b24e', 'etag': '"8a1fd5da76b1123e66cc0155e6c8f5f7"', 'content-type': 'text/csv', 'last-modified': 'Mon, 06 Jun 2022 08:53:34 GMT', 'content-length': '3850'}


### Digital Terrain Models

Download DTM files for free from http://centrodedescargas.cnig.es/CentroDescargas/buscadorCatalogo.do?codFamilia=MDT05# and put them in `input_DTMs` folder.

In [15]:
dtm_asc_keys = storage.list_keys(bucket=DATA_BUCKET, prefix=DTM_ASC_PREFIX)
# dtm_asc_keys

Find downloaded MDTs:

In [16]:
local_dtm_input = 'input_DTMs'
local_dtms = [os.path.join(local_dtm_input, dtm) for dtm in os.listdir(local_dtm_input) if dtm.endswith('.asc')]

def upload_file(file_path):
    key = os.path.join(DTM_ASC_PREFIX, os.path.basename(file_path))
    if key in dtm_asc_keys:
        print(f'Tile {key} already in storage')
        return key
    with open(file_path, 'rb') as f:
        print(f'Uploading {key}...')
        storage.put_object(bucket=DATA_BUCKET, key=key, body=f)
    return key

with concurrent.futures.ThreadPoolExecutor(max_workers=16) as pool:
    result = list(pool.map(upload_file, local_dtms))
    # list(result)

Convert ASCII raster files to Cloud Optimized GeoTIFF:

In [17]:
def asc_to_geotiff(obj, storage):
    asc_file_name = os.path.basename(obj.key)
    tile_id, _ = os.path.splitext(asc_file_name)
    out_path = os.path.join(tempfile.gettempdir(), tile_id + '.tiff')
    out_key = os.path.join(DTM_GEOTIFF_PREFIX, tile_id + '.tiff')

    try:
        out_obj = storage.head_object(bucket=DATA_BUCKET, key=out_key)
    except StorageNoSuchKeyError:
        out_obj = None

    if out_obj:
        print(f'GeoTIFF {tile_id} already exists, skipping...')
        return out_key

    print(f'Converting {tile_id} to GeoTIFF...')
    with rasterio.open(obj.data_stream, 'r') as src:
        profile = src.profile
        # Cloud optimized GeoTiff parameters
        profile.update(driver='GTiff')
        profile.update(blockxsize=256)
        profile.update(blockysize=256)
        profile.update(tiled=True)
        profile.update(compress='deflate')
        profile.update(interleave='band')
        with rasterio.open(out_path, 'w', **profile) as dest:
            dest.write(src.read())

    with open(out_path, 'rb') as f:
        storage.put_object(bucket=DATA_BUCKET, key=out_key, body=f)

    return out_key


In [18]:
fs_cog = fexec.map(asc_to_geotiff, os.path.join(STORAGE_PREFIX, DATA_BUCKET, DTM_ASC_PREFIX))

2022-06-28 20:43:46,344 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | JobID M000 - Selected Runtime: aitorarjona/cloudbutton-geospatial-wc:01 - 2048MB
2022-06-28 20:43:47,678 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | JobID M000 - Starting function invocation: asc_to_geotiff() - Total: 36 activations
2022-06-28 20:43:47,742 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | JobID M000 - View execution logs at /tmp/lithops/logs/5449fa-0-M000.log


In [19]:
_, _ = fexec.wait(fs=fs_cog)

2022-06-28 20:43:47,748 [INFO] lithops.wait -- ExecutorID 5449fa-0 - Waiting for 100% of 36 function activations to complete


    0%|          | 0/36  

In [20]:
dtm_geotiff_keys = storage.list_keys(bucket=DATA_BUCKET, prefix=DTM_GEOTIFF_PREFIX)
# dtm_geotiff_keys

Get bounds and tile ID for every tile:

In [21]:
def get_tile_meta(obj):
    storage = Storage()
    tile_id = os.path.splitext(os.path.basename(obj.key))[0]
    with rasterio.open(BytesIO(storage.get_cloudobject(obj)), 'r') as src:
        x1, y1 = src.profile['transform'] * (0, 0)
        x2, y2 = src.profile['transform'] * (src.profile['width'], src.profile['height'])
    return tile_id, (x1, y1), (x2, y2)

In [22]:
fs_meta = fexec.map(get_tile_meta, os.path.join(STORAGE_PREFIX, DATA_BUCKET, DTM_GEOTIFF_PREFIX))

2022-06-28 20:43:57,195 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | JobID M001 - Selected Runtime: aitorarjona/cloudbutton-geospatial-wc:01 - 2048MB
2022-06-28 20:43:57,846 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | JobID M001 - Starting function invocation: get_tile_meta() - Total: 36 activations
2022-06-28 20:43:57,847 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | JobID M001 - View execution logs at /tmp/lithops/logs/5449fa-0-M001.log


In [23]:
tiles_meta = fexec.get_result(fs=fs_meta)

2022-06-28 20:43:57,870 [INFO] lithops.wait -- ExecutorID 5449fa-0 - Getting results from 36 function activations


    0%|          | 0/36  

2022-06-28 20:44:11,032 [INFO] lithops.executors -- ExecutorID 5449fa-0 - Cleaning temporary data


In [24]:
# tiles_meta

In [25]:
import ipyleaflet
import ipywidgets
import utm

m = ipyleaflet.Map(center=(38.082906, -1.330466), zoom=9.5)

for tile_id, bound1, bound2 in tiles_meta:
    x1, y1 = bound1
    x2, y2 = bound2
    xc, yc = (x1 + x2) / 2, y1
    
    wgs_bounds_1 = utm.to_latlon(x1, y1, 30, 'S')
    wgs_bounds_2 = utm.to_latlon(x2, y2, 30, 'S')
    # wgs_bounds_c = utm.to_latlon(xc, yc, 30, 'S')

    rectangle = ipyleaflet.Rectangle(bounds=(wgs_bounds_1, wgs_bounds_2))
    m.add_layer(rectangle)    

m.layout.height="750px"

m

Map(center=[38.082906, -1.330466], controls=(ZoomControl(options=['position', 'zoom_in_text', 'zoom_in_title',…

## Raster Data Interpolation

Split data tiles in subtiles for increased parallelism:

In [26]:
def data_chunker(obj, n_splits, block_x, block_y, storage):
    tile_key = os.path.basename(obj.key)
    tile_id, _ = os.path.splitext(tile_key)

    storage = Storage()

    with rasterio.open(BytesIO(storage.get_cloudobject(obj))) as src:
        transform = src.transform

        # Compute working window
        step_w = src.width / n_splits
        step_h = src.height / n_splits

        offset_h = round(step_h * block_x)
        offset_w = round(step_w * block_y)

        profile = src.profile

        width = math.ceil(step_w * (block_y + 1) - offset_w)
        height = math.ceil(step_h * (block_x + 1) - offset_h)

        profile.update(width=width)
        profile.update(height=height)

        window = Window(offset_w, offset_h, width, height)

        chunk_file = os.path.join(tempfile.gettempdir(), tile_id + str(block_x) + '_' + str(block_y) + '.tif')
        with rasterio.open(chunk_file, 'w', **profile) as dest:
            dest.write(src.read(window=window))

    with open(chunk_file, 'rb') as f:
        co = storage.put_cloudobject(body=f, bucket=DATA_BUCKET)

    return (tile_key, block_x, block_y, co)

In [27]:
iterdata = [(os.path.join(STORAGE_PREFIX, DATA_BUCKET, tile), SPLITS, i, j)
            for i in range(SPLITS) for j in range(SPLITS) for tile in dtm_geotiff_keys]
print(f'Chunking {len(dtm_geotiff_keys)} tiles in {SPLITS * SPLITS} chunks each using {len(iterdata)} functions')
# iterdata

Chunking 36 tiles in 9 chunks each using 324 functions


In [28]:
chunker_fs = fexec.map(data_chunker, iterdata)

2022-06-28 20:44:11,250 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | JobID M002 - Selected Runtime: aitorarjona/cloudbutton-geospatial-wc:01 - 2048MB
2022-06-28 20:44:11,813 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | JobID M002 - Starting function invocation: data_chunker() - Total: 324 activations
2022-06-28 20:44:12,032 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | JobID M002 - View execution logs at /tmp/lithops/logs/5449fa-0-M002.log


In [29]:
chunks = fexec.get_result(fs=chunker_fs)

2022-06-28 20:44:12,043 [INFO] lithops.wait -- ExecutorID 5449fa-0 - Getting results from 324 function activations


    0%|          | 0/324  

2022-06-28 20:44:30,167 [INFO] lithops.executors -- ExecutorID 5449fa-0 - Cleaning temporary data


In [30]:
# chunks

Compute solar irradiation for a given day of year using GRASS libraries:

In [31]:
def compute_solar_irradiation(inputFile, outputFile, crs='32630'):
    # Define grass working set
    GRASS_GISDB = '/tmp/grassdata'
    #GRASS_GISDB = 'grassdata'
    GRASS_LOCATION = 'GEOPROCESSING'
    GRASS_MAPSET = 'PERMANENT'
    GRASS_ELEVATIONS_FILENAME = 'ELEVATIONS'

    os.environ['GRASSBIN'] = 'grass76'

    from grass_session import Session
    import grass.script as gscript
    import grass.script.setup as gsetup
    from grass.pygrass.modules.shortcuts import general
    from grass.pygrass.modules.shortcuts import raster
    
    os.environ.update(dict(GRASS_COMPRESS_NULLS='1'))

    # Clean previously processed data
    if os.path.isdir(GRASS_GISDB):
        shutil.rmtree(GRASS_GISDB)
    
    with Session(gisdb=GRASS_GISDB, location=GRASS_LOCATION, mapset=GRASS_MAPSET, create_opts='EPSG:32630') as ses:
        # Set project projection to match elevation raster projection
        general.proj(epsg=crs, flags='c') 
        # Load raster file into working directory
        raster.import_(input=inputFile, output=GRASS_ELEVATIONS_FILENAME, flags='o')    
        
        # Set project region to match raster region
        general.region(raster=GRASS_ELEVATIONS_FILENAME, flags='s')    
        # Calculate solar irradiation
        gscript.run_command('r.slope.aspect', elevation=GRASS_ELEVATIONS_FILENAME,
                            slope='slope', aspect='aspect')
        gscript.run_command('r.sun', elevation=GRASS_ELEVATIONS_FILENAME,
                            slope='slope', aspect='aspect', beam_rad='beam',
                            step=1, day=DAY_OF_YEAR)
        
        # Get extraterrestrial irradiation from history metadata
        regex = re.compile(r'\d+\.\d+')
        output = gscript.read_command("r.info", flags="h", map=["beam"])
        splits = str(output).split('\n')
        line = next(filter(lambda line: 'Extraterrestrial' in line, splits))
        extraterrestrial_irradiance = float(regex.search(line)[0])
        
        # Export generated results into a GeoTiff file
        if os.path.isfile(outputFile):
            os.remove(outputFile)

        raster.out_gdal(input='beam', output=outputFile)
        
        return extraterrestrial_irradiance

Get stations contained in the area of interest:

In [32]:
def filter_stations(bounds, stations):
    total_points = MultiPoint([Point(x, y) for x, y in stations[['X', 'Y']].to_numpy()])
    total_points_list = list(total_points.geoms)
    intersection = bounds.buffer(AREA_OF_INFLUENCE).intersection(total_points)
    filtered_stations = [point for point in total_points_list if intersection.contains(point)]

    return stations[[point in filtered_stations for point in total_points_list]]

Inverse Distance Weighting interpolation:

In [33]:
def compute_basic_interpolation(shape, stations, field_value, offset = (0,0)):
    station_pixels = [[pixel[0], pixel[1]] for pixel in stations['pixel'].to_numpy()]
    
    # Get an array where each position represents pixel coordinates
    tile_pixels = np.indices(shape).transpose(1,2,0).reshape(shape[0]*shape[1], 2) + offset
    dist = distance_matrix(station_pixels, tile_pixels)
    weights = np.where(dist == 0, np.finfo('float32').max, 1.0 / dist )
    weights /=  weights.sum(axis=0)
    
    return np.dot(weights.T, stations[field_value].to_numpy()).reshape(shape).astype('float32')

Interpolate temperatures from a subset of the tile:

In [34]:
def radiation_interpolation(tile_key, block_x, block_y, chunk_cloudobject, storage):
    tile_id, _ = os.path.splitext(tile_key)
    print(tile_id)

    # Write tile chunk to file
    chunk_file = os.path.join(tempfile.gettempdir(), tile_id + str(block_x) + '_' + str(block_y) + '.tif')
    print(chunk_file)

    with open(chunk_file, 'wb') as f:
        body = storage.get_cloudobject(chunk_cloudobject)
        f.write(body)

    with rasterio.open(chunk_file, 'r') as chunk_src:
        profile = chunk_src.profile

    extr_chunk_file = os.path.join(tempfile.gettempdir(), tile_id + '_extr_' + str(block_x) + '_' + str(block_y) + '.tif')
    rad_chunk_file = os.path.join(tempfile.gettempdir(), tile_id + '_rad_' + str(block_x) + '_' + str(block_y) + '.tif')

    # Compute solar irradiation from inputFile, creates radiation raster at outputFile
    extraterrestrial_irradiation = compute_solar_irradiation(inputFile=chunk_file, outputFile=rad_chunk_file)

    # Create and store a raster with extraterrestrial irradiation
    with rasterio.open(extr_chunk_file, 'w', **profile) as dest:
        data = np.full((profile['height'], profile['width']), extraterrestrial_irradiation, dtype='float32')
        dest.write(data, 1)

    with open(extr_chunk_file, 'rb') as f:
        extr_co = storage.put_cloudobject(body=f, bucket=DATA_BUCKET)

    with open(rad_chunk_file, 'rb') as f:
        rad_co = storage.put_cloudobject(body=f, bucket=DATA_BUCKET)

    return [(tile_key, 'extr', block_x, block_y, extr_co), (tile_key, 'rad', block_x, block_y, rad_co)]

In [35]:
def map_interpolation(tile_key, block_x, block_y, chunk_cloudobject, data_field, storage):
    tile_id, _ = os.path.splitext(tile_key)

    # Get SIAM meteo data
    siam_stream = storage.get_object(DATA_BUCKET, siam_data_key, stream=True)
    siam_data = pd.read_csv(siam_stream)

    # print(siam_data)

    chunk = storage.get_cloudobject(chunk_cloudobject)

    with rasterio.open(BytesIO(chunk)) as chunk_src:
        transform = chunk_src.transform
        profile = chunk_src.profile

        bounding_rect = box(chunk_src.bounds.left, chunk_src.bounds.top, chunk_src.bounds.right, chunk_src.bounds.bottom)
        filtered = pd.DataFrame(filter_stations(bounding_rect, siam_data))
        #print(filtered)

        if filtered.shape[0] == 0:
            return [(tile_key, data_field, block_x, block_y, None)]

        filtered['pixel'] = filtered.apply(lambda station: rasterio.transform.rowcol(transform, station['X'], station['Y']), axis=1)

        # Interpolate variables from meteo station data, generate raster with result
        dest_chunk_file = os.path.join(tempfile.gettempdir(), tile_id + '_' + data_field + '_' + str(block_x) + '_' + str(block_y) + '.tif')

        with rasterio.open(dest_chunk_file, 'w', **profile) as chunk_dest:
            if data_field == 'temp':
                elevations = chunk_src.read(1)  # Get elevations content
                print(dest_chunk_file)
                interpolation = compute_basic_interpolation(elevations.shape, filtered, 'tdet', (0, 0))
                interpolation += r * (elevations - zdet)
                chunk_dest.write(np.where(elevations == chunk_src.nodata, np.nan, interpolation), 1)
            elif data_field == 'humi':
                interpolation = compute_basic_interpolation((profile['height'], profile['width']), filtered, 'hr', (0, 0))
                chunk_dest.write(interpolation, 1)
            elif data_field == 'wind':
                interpolation = compute_basic_interpolation((profile['height'], profile['width']), filtered, 'v', (0, 0))
                chunk_dest.write(interpolation, 1)
            else:
                raise Exception(f'Unknown data field "{data_field}"')

    # Upload results to storage as Cloudobject
    with open(dest_chunk_file, 'rb') as f:
        co = storage.put_cloudobject(body=f, bucket=DATA_BUCKET)

    return [(tile_key, data_field, block_x, block_y, co)]

Lithops serverless computation:

In [36]:
fs_rad = fexec.map(radiation_interpolation, chunks, runtime_memory=2048)
fs_temp = fexec.map(map_interpolation, chunks, extra_args=('temp', ), runtime_memory=2048)
fs_humi = fexec.map(map_interpolation, chunks, extra_args=('humi', ), runtime_memory=2048)
fs_wind = fexec.map(map_interpolation, chunks, extra_args=('wind', ), runtime_memory=2048)

2022-06-28 20:44:30,294 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | JobID M003 - Selected Runtime: aitorarjona/cloudbutton-geospatial-wc:01 - 2048MB
2022-06-28 20:44:30,827 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | JobID M003 - Starting function invocation: radiation_interpolation() - Total: 324 activations
2022-06-28 20:44:30,839 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | JobID M003 - View execution logs at /tmp/lithops/logs/5449fa-0-M003.log
2022-06-28 20:44:30,858 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | JobID M004 - Selected Runtime: aitorarjona/cloudbutton-geospatial-wc:01 - 2048MB
2022-06-28 20:44:31,328 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | JobID M004 - Starting function invocation: map_interpolation() - Total: 324 activations
2022-06-28 20:44:31,461 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | JobID M004 - View execution logs at /tmp/lithops/logs/5449fa-0-M004.log
2022-06-28 20:44:31,464 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | J

In [37]:
res_rad = fexec.get_result(fs=fs_rad)
res_temp = fexec.get_result(fs=fs_temp)
res_humi = fexec.get_result(fs=fs_humi)
res_wind = fexec.get_result(fs=fs_wind)

2022-06-28 20:44:32,036 [INFO] lithops.wait -- ExecutorID 5449fa-0 - Getting results from 324 function activations


    0%|          | 0/324  

2022-06-28 20:53:26,101 [INFO] lithops.executors -- ExecutorID 5449fa-0 - Cleaning temporary data
2022-06-28 20:53:26,148 [INFO] lithops.wait -- ExecutorID 5449fa-0 - Getting results from 324 function activations


    0%|          | 0/324  

2022-06-28 20:53:27,469 [INFO] lithops.executors -- ExecutorID 5449fa-0 - Cleaning temporary data
2022-06-28 20:53:27,473 [INFO] lithops.wait -- ExecutorID 5449fa-0 - Getting results from 324 function activations


    0%|          | 0/324  

2022-06-28 20:53:28,759 [INFO] lithops.executors -- ExecutorID 5449fa-0 - Cleaning temporary data
2022-06-28 20:53:28,763 [INFO] lithops.wait -- ExecutorID 5449fa-0 - Getting results from 324 function activations


    0%|          | 0/324  

2022-06-28 20:53:30,185 [INFO] lithops.executors -- ExecutorID 5449fa-0 - Cleaning temporary data


In [38]:
res_flatten = []
for l in [res_rad, res_temp, res_humi, res_wind]:
    for elem in l:
        for sub_elem in elem:
            res_flatten.append(sub_elem)

In [39]:
# res_flatten

In [40]:
grouped_chunks = collections.defaultdict(list)

for chunk_result in res_flatten:
    tile_key, data_field, block_x, block_y, co = chunk_result
    grouped_chunks[(tile_key, data_field)].append((block_x, block_y, co))

In [41]:
# grouped_chunks

Join split subsets into a tile:

In [42]:
def merge_blocks(tile_data, chunks, storage):
    from rasterio.windows import Window

    tile_key, data_field = tile_data

    cobjs = [tup[2] for tup in chunks]
    if None in cobjs:
        return None

    # Get width and height from original tile
    source_tile_key = os.path.join(DTM_GEOTIFF_PREFIX, tile_key)
    with rasterio.open(BytesIO(storage.get_object(bucket=DATA_BUCKET, key=source_tile_key))) as source_tile:
        height = source_tile.profile['height']
        width = source_tile.profile['width']

    # Open first object to obtain profile metadata
    with rasterio.open(BytesIO(storage.get_cloudobject(chunks[0][2]))) as chunk_src:
        profile = chunk_src.profile
        profile.update(width=width)
        profile.update(height=height)

    # Iterate each object and print its block into the destination file
    merged_file = os.path.join(tempfile.gettempdir(), data_field + '_' + tile_key)
    with rasterio.open(merged_file, 'w', **profile) as dest:
        for chunk in chunks:
            j, i, co = chunk
            with rasterio.open(BytesIO(storage.get_cloudobject(co))) as src:
                step_w = math.floor(width / SPLITS)
                step_h = math.floor(height / SPLITS)
                curr_window = Window(round(step_w * i), round(step_h * j), src.width, src.height)
                content = src.read(1)
                dest.write(content, 1, window=curr_window)

    output_key = os.path.join(DTM_PREFIX, data_field, tile_key)
    with open(merged_file, 'rb') as out_file:
        storage.put_object(bucket=DATA_BUCKET, key=output_key, body=out_file)

    return output_key

Combine previous split subsets:

In [43]:
iterdata = []
for (tile_id, data_field), chunks in grouped_chunks.items():
    iterdata.append(((tile_id, data_field), chunks))

In [44]:
fs_merged = fexec.map(merge_blocks, iterdata, runtime_memory=2048)

2022-06-28 20:53:30,284 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | JobID M007 - Selected Runtime: aitorarjona/cloudbutton-geospatial-wc:01 - 2048MB
2022-06-28 20:53:30,617 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | JobID M007 - Starting function invocation: merge_blocks() - Total: 180 activations
2022-06-28 20:53:30,619 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | JobID M007 - View execution logs at /tmp/lithops/logs/5449fa-0-M007.log


In [45]:
tiles_merged = fexec.get_result(fs=fs_merged)

2022-06-28 20:53:30,644 [INFO] lithops.wait -- ExecutorID 5449fa-0 - Getting results from 180 function activations


    0%|          | 0/180  

2022-06-28 20:54:26,437 [INFO] lithops.executors -- ExecutorID 5449fa-0 - Cleaning temporary data


In [46]:
tile_keys_merged = set([os.path.basename(t) for t in tiles_merged])

In [47]:
# tile_keys_merged

## Computation of potential evaporation

In [48]:
def compute_crop_evapotranspiration(temperatures,
                                    humidities,
                                    wind_speeds,
                                    external_radiations,
                                    global_radiations,
                                    KCs):
    gamma = 0.665*101.3/1000
    eSat = 0.6108 * np.exp((17.27*temperatures)/(temperatures+237.3))
    delta = 4098 * eSat / np.power((temperatures + 237.3),2)
    eA = np.where(humidities < 0, 0, eSat * humidities / 100)     # Avoid sqrt of a negative number
    T4 = 4.903 * np.power((273.3 + temperatures),4)/1000000000
    rSrS0 = global_radiations/(external_radiations * 0.75)
    rN = 0.8* global_radiations-T4*(0.34-0.14*np.sqrt(eA))*((1.35*rSrS0)-0.35)
    den = delta + gamma *(1 + 0.34* wind_speeds)
    tRad = 0.408 * delta * rN / den
    tAdv = gamma * (900/(temperatures+273))*wind_speeds * (eSat - eA)/den
    return ((tRad + tAdv) * 7 * KCs).astype('float32')

In [49]:
vineyard = ['VI', 'VO', 'VF', 'FV', 'CV' ]
olive_grove = ['OV', 'VO', 'OF', 'FL', 'OC']
fruit = ['FY', 'VF', 'OF', 'FF', 'CF']
nuts = ['FS', 'FV', 'FL', 'FF', 'CS' ]
citrus = ['CI', 'CV', 'OC', 'CF', 'CS' ]

def get_kc(feature):
    # TODO: Get more precise values of Kc
    print(feature['properties'].keys())
    # sigpac_use = feature['properties']['uso_sigpac']
    sigpac_use = 'FF'
    if sigpac_use in vineyard:
        # Grapes for wine - 0.3, 0.7, 0.45
        return 0.7  
    if sigpac_use in olive_grove:
        # Olive grove - ini: 0.65, med: 0.7, end: 0.7
        return 0.7 
    if sigpac_use in fruit:
        # Apples, Cherries, Pears - 0.45, 0.95, 0.7
        return 0.95
    if sigpac_use in nuts:
        # Almonds - 0.4, 0.9, 0.65
        return 0.9
    if sigpac_use in citrus:
        # Citrus, without ground coverage - 0.7, 0.65, 0.7
        return 0.65
    
    return None

In [50]:
def get_geometry_window(src, geom_bounds):
    left, bottom, right, top = geom_bounds
    src_left, src_bottom, src_right, src_top = src.bounds
    window = src.window(max(left,src_left), max(bottom,src_bottom), min(right,src_right), min(top,src_top))
    window_floored = window.round_offsets(op='floor', pixel_precision=3)
    w = math.ceil(window.width + window.col_off - window_floored.col_off)
    h = math.ceil(window.height + window.row_off - window_floored.row_off)
    return Window(window_floored.col_off, window_floored.row_off, w, h)     

In [51]:
def compute_evapotranspiration_by_shape(tem, hum, win, rad, extrad, dst):

    import fiona
    from shapely.geometry import shape, box
    from rasterio import features

    non_arable_land = ['AG', 'CA', 'ED', 'FO', 'IM', 'PA', 'PR', 'ZU', 'ZV']

    #with fiona.open('zip://home/docker/shape.zip') as shape_src:
    with fiona.open('zip:///tmp/shape.zip') as shape_src:
        for feature in shape_src.filter(bbox=tem.bounds):
            KC = get_kc(feature)
            if KC is not None:
                geom = shape(feature['geometry'])
                window = get_geometry_window(tem, geom.bounds)
                win_transform = rasterio.windows.transform(window, tem.transform)
                # Convert shape to raster matrix
                image = features.rasterize([geom],
                                           out_shape=(window.height, window.width),
                                           transform = win_transform,
                                           fill = 0,
                                           default_value = 1).astype('bool')
                # Get values to compute evapotranspiration
                temperatures = tem.read(1, window=window)
                humidities = hum.read(1, window=window)
                wind_speeds = win.read(1, window=window)
                # Convert from W to MJ (0.0036)
                global_radiations = rad.read(1, window=window) * 0.0036
                external_radiations = extrad.read(1, window=window) * 0.0036
                KCs = np.full(temperatures.shape, KC)
                # TODO: compute external radiation
                #external_radiations = np.full(temperatures.shape, 14)
                # TODO: compute global radiation
                # global_radiations = np.full(temperatures.shape, 10)
                etc = compute_crop_evapotranspiration(
                        temperatures,
                        humidities,
                        wind_speeds,
                        external_radiations,
                        global_radiations,
                        KCs
                )
                etc[temperatures == tem.nodata] = dst.nodata
                etc[np.logical_not(image)] = dst.nodata
                dst.write(etc + dst.read(1, window=window), 1, window=window)

In [52]:
def compute_global_evapotranspiration(tem, hum, win, rad, extrad, dst):    
    for ji, window in tem.block_windows(1):
        bounds = rasterio.windows.bounds(window, tem.transform)
        temperatures = tem.read(1, window=window)
        humidities = hum.read(1, window=window)
        wind_speeds = win.read(1, window=window)
         # Convert from W to MJ (0.0036)
        global_radiations = rad.read(1, window=window) * 0.0036
        external_radiations = extrad.read(1, window=window) * 0.0036
        # TODO: compute external radiation
        #external_radiations = np.full(temperatures.shape, 14)
        # TODO: compute global radiation
        # global_radiations = np.full(temperatures.shape, 10)
        # TODO: compute KCs
        KCs = np.full(temperatures.shape, 1)
        etc = compute_crop_evapotranspiration(
                temperatures,
                humidities,
                wind_speeds,
                external_radiations,
                global_radiations,
                KCs
        )
        dst.write(np.where(temperatures == tem.nodata, dst.nodata, etc), 1, window=window)

In [53]:
def combine_calculations(tile_key, storage):
    from functools import partial
      
    # Download shapefile
    shapefile = storage.get_object(bucket=DATA_BUCKET, key='shapefile_murcia.zip', stream=True)

    with open('/tmp/shape.zip', 'wb') as shapf:
        for chunk in iter(partial(shapefile.read, 200 * 1024 * 1024), ''):
            if not chunk:
                break
            shapf.write(chunk)
    try:
        temp = storage.get_object(bucket=DATA_BUCKET, key=os.path.join(DTM_PREFIX, 'temp', tile_key))
        humi = storage.get_object(bucket=DATA_BUCKET, key=os.path.join(DTM_PREFIX, 'humi', tile_key))
        rad = storage.get_object(bucket=DATA_BUCKET, key=os.path.join(DTM_PREFIX, 'rad', tile_key))
        extrad = storage.get_object(bucket=DATA_BUCKET, key=os.path.join(DTM_PREFIX, 'extr', tile_key))
        wind = storage.get_object(bucket=DATA_BUCKET, key=os.path.join(DTM_PREFIX, 'wind', tile_key))
    except StorageNoSuchKeyError:
        print("Storage error")
        return None
    
    output_file = os.path.join(tempfile.gettempdir(), 'eva' + '_' + tile_key)
    with rasterio.open(BytesIO(temp)) as temp_raster:
        with rasterio.open(BytesIO(humi)) as humi_raster:
            with rasterio.open(BytesIO(rad)) as rad_raster:
                with rasterio.open(BytesIO(extrad)) as extrad_raster:
                    with rasterio.open(BytesIO(wind)) as wind_raster:
                        profile = temp_raster.profile
                        profile.update(nodata=0)
                        with rasterio.open(output_file, 'w+', **profile) as dst:
#                             compute_global_evapotranspiration(temp_raster, humi_raster, wind_raster,
#                                                               rad_raster, extrad_raster, dst)
                            compute_evapotranspiration_by_shape(temp_raster, humi_raster, wind_raster,
                                                                rad_raster, extrad_raster, dst)
    
    output_key = os.path.join(DTM_PREFIX, 'eva', tile_key)
    with open(output_file, 'rb') as output_f:
        storage.put_object(bucket=DATA_BUCKET, key=output_key, body=output_f)
    return output_key

In [54]:
fs_eva = fexec.map(combine_calculations, tile_keys_merged, runtime_memory=2048)
res_eva = fexec.get_result(fs=fs_eva)

2022-06-28 20:54:26,530 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | JobID M008 - Selected Runtime: aitorarjona/cloudbutton-geospatial-wc:01 - 2048MB
2022-06-28 20:54:26,881 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | JobID M008 - Starting function invocation: combine_calculations() - Total: 36 activations
2022-06-28 20:54:26,883 [INFO] lithops.invokers -- ExecutorID 5449fa-0 | JobID M008 - View execution logs at /tmp/lithops/logs/5449fa-0-M008.log
2022-06-28 20:54:26,899 [INFO] lithops.wait -- ExecutorID 5449fa-0 - Getting results from 36 function activations


    0%|          | 0/36  

2022-06-28 20:55:09,305 [INFO] lithops.executors -- ExecutorID 5449fa-0 - Cleaning temporary data


In [55]:
res_eva

['DTMs/eva/PNOA_MDT05_ETRS89_HU30_0976_LID.tiff',
 'DTMs/eva/PNOA_MDT05_ETRS89_HU30_0891_LID.tiff',
 'DTMs/eva/PNOA_MDT05_ETRS89_HU30_0931_LID.tiff',
 'DTMs/eva/PNOA_MDT05_ETRS89_HU30_0975_LID.tiff',
 'DTMs/eva/PNOA_MDT05_ETRS89_HU30_0932_LID.tiff',
 'DTMs/eva/PNOA_MDT05_ETRS89_HU30_0870_LID.tiff',
 'DTMs/eva/PNOA_MDT05_ETRS89_HU30_0935_LID.tiff',
 'DTMs/eva/PNOA_MDT05_ETRS89_HU30_0911_LID.tiff',
 'DTMs/eva/PNOA_MDT05_ETRS89_HU30_0934_LID.tiff',
 'DTMs/eva/PNOA_MDT05_ETRS89_HU30_0955_LID.tiff',
 'DTMs/eva/PNOA_MDT05_ETRS89_HU30_0978_LID.tiff',
 'DTMs/eva/PNOA_MDT05_ETRS89_HU30_0977_LID.tiff',
 'DTMs/eva/PNOA_MDT05_ETRS89_HU30_0997_LID.tiff',
 'DTMs/eva/PNOA_MDT05_ETRS89_HU30_0956_LID.tiff',
 'DTMs/eva/PNOA_MDT05_ETRS89_HU30_0910_LID.tiff',
 'DTMs/eva/PNOA_MDT05_ETRS89_HU30_0997B_LID.tiff',
 'DTMs/eva/PNOA_MDT05_ETRS89_HU30_0933_LID.tiff',
 'DTMs/eva/PNOA_MDT05_ETRS89_HU30_0890_LID.tiff',
 'DTMs/eva/PNOA_MDT05_ETRS89_HU30_0912_LID.tiff',
 'DTMs/eva/PNOA_MDT05_ETRS89_HU30_0845_LID.tiff',

---

In [56]:
fexec.plot(dst=fexec.executor_id)

2022-06-28 20:55:09,549 [INFO] lithops.executors -- ExecutorID 5449fa-0 - Creating execution plots


In [57]:
fexec.clean(clean_cloudobjects=True)

In [59]:
fexec.job_summary()

2022-06-28 21:00:39,924 [INFO] lithops.executors -- View log file logs at /tmp/lithops/logs/2022-06-28_21:00:39.csv


In [65]:
len(dtm_asc_keys)

36

In [63]:
input_sz = 0
for input_key in dtm_asc_keys:
    meta = storage.head_object(bucket=DATA_BUCKET, key=input_key)
    input_sz += int(meta['content-length'])

In [64]:
input_sz

6071113373

---

## Visualization of results

In [None]:
from matplotlib import pyplot as plt

fig, ax = plt.subplots()

with rasterio.open('PNOA_MDT05_ETRS89_HU30_0891_LID.tiff') as src:
    arr = src.read(1, out_shape=(src.height, src.width))
    ax.set_title(tile_id)
    img = ax.imshow(arr, cmap='Greens')
    fig.colorbar(img, shrink=0.5)

fig.set_size_inches(18.5, 10.5)
plt.show()

# obj.seek(0)

---