# World Water Toolbox

This is an example of the processing chain of using the World Water Toolbox with the openEO Platform. 
The Processing chain is divided to 3 main sub-flows.

![flow](images/flow.PNG)

In [6]:
# import libraries
import numpy as np
import pathlib

import openeo
from openeo.extra.spectral_indices.spectral_indices import append_index
from openeo.processes import array_element, normalized_difference

from eo_utils import *
import dask.array as da
import xarray as xr
import scipy.signal

In [7]:
#connect with openeo backend
connection = openeo.connect("openeo.cloud")
connection.authenticate_oidc()
connection.describe_account()

Authenticated using refresh token.


{'default_plan': 'early-adopter',
 'info': {'default_plan': 'early-adopter',
  'oidc_userinfo': {'acr': 'https://refeds.org/assurance/IAP/low',
   'eduperson_assurance': ['https://aai.egi.eu/LoA#Low',
    'https://refeds.org/assurance/IAP/low'],
   'eduperson_entitlement': ['urn:mace:egi.eu:group:vo.openeo.cloud:role=vm_operator#aai.egi.eu',
    'urn:mace:egi.eu:group:vo.openeo.cloud:role=member#aai.egi.eu',
    'urn:mace:egi.eu:group:vo.openeo.cloud:role=early_adopter#aai.egi.eu'],
   'email': 'sulova.andrea@gmail.com',
   'email_verified': True,
   'sub': '1edbae7adc053e5164b8ac7696e17a9ec031bf5a11fe6dce659cafe39a9366a2@egi.eu',
   'voperson_verified_email': ['sulova.andrea@gmail.com']},
  'roles': ['EarlyAdopter']},
 'name': 'sulova.andrea@gmail.com',
 'user_id': '1edbae7adc053e5164b8ac7696e17a9ec031bf5a11fe6dce659cafe39a9366a2@egi.eu'}

# Input parameters


 * Create an output Folder

In [8]:
out_dir = pathlib.Path("output")
out_dir.mkdir(parents=True, exist_ok=True)

- Specify the Area of Interest

In [9]:
# Colombbia
center = [4.707, -73.987]
# Denmark
# center = [55.6, 12.5]
zoom = 14
eoMap = openMap(center,zoom)
eoMap.map

Map(center=[4.707, -73.987], controls=(ZoomControl(options=['position', 'zoom_in_text', 'zoom_in_title', 'zoom…

In [11]:
bbox = eoMap.getBbox()
spatial_extent  = {'west':bbox[0],'east':bbox[2],'south':bbox[1],'north':bbox[3],'crs':4326}
print('west',bbox[0],'\neast',bbox[2],'\nsouth',bbox[1],'\nnorth',bbox[3])

west -74.27856445312501 
east -73.02955627441408 
south 4.601064939225764 
north 4.874784352885875


# 1 ) *Sentinel-2 ARD Pre-Processing*


* Specify collections :
  
  * **SENTINEL2_L1C** (EODC) - Missing Zenith Azimuth, support FORCE and Fmask

  * **SENTINEL2_L1C_SENTINELHUB** (TerraScope backend) - supports SMAC and iCor atmospheric correction

  * **boa_sentinel_2** (EODC) -  (ARD)  processed with FORCE. Missing Zenith and Azimuth information, cloud mask needs to be applied


  * **SENTINEL2_L2A_SENTINELHUB** (TerraScope backend) - Having Zenith and Azimuth information,  (ARD) processed with sen2cor., cloud mask needs to be applied

  Read more about ARD https://docs.openeo.cloud/usecases/ard/msi/#reference-implementations



![S2](images/S2.png)

In [12]:
# print(connection.list_collections())
# connection.list_processes()
# connection.describe_collection(collection)

# ARD
# collection      = 'boa_sentinel_2'
collection        = 'SENTINEL2_L2A_SENTINELHUB'

# L1C
# collection      = 'SENTINEL2_L1C'    
# collection      = 'SENTINEL2_L1C_SENTINELHUB'
# connection.describe_collection(collection)

* Specify temporal extent and bands:

In [20]:
start_date      = '2022-01-07'
end_date        = '2022-01-13'
bands           = ['B01', 'B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B08', 'B8A', 'B09',  'B11', 'B12', 'CLP', 'SCL' , 'sunAzimuthAngles', 'sunZenithAngles'] 

# Bogota
spatial_extent = {'west': -74.27856445312501 , 'east': 72.15713924157274, 'south': 4.40172965700321, 'north': 4.949216031685158}

# Alps
# spatial_extent = {'west': 4.3617128367019395, 'east': 10.013202094514442, 'south': 46.34565603628651, 'north': 48.133091757973475}

# spatial_extent  = {'west':bbox[0],'east':bbox[2],'south':bbox[1],'north':bbox[3]}


S2_cube = connection.load_collection(collection,
                                     spatial_extent = spatial_extent,
                                     temporal_extent = [start_date, end_date],
                                     bands = bands)

A) *Mask Clouds, Shadows and Snow*
- atmospheric_correction: https://docs.openeo.cloud/usecases/ard/msi/#reference-implementations

In [21]:
# Option A: SCL + CLP

# Scene classification data, based on Sen2Cor process
# scl == 3    Cloud Shadows 
# scl == 8    Clouds medium probability
# scl == 9    Clouds high probability
# scl == 10   Cirrus
# scl == 11   Snow / Ice

scl = S2_cube.band("SCL")
mask = (scl == 3) | (scl == 8) | (scl == 9) | (scl == 10) |(scl == 11)
S2_cube_scl = S2_cube.mask(mask)

# CLP (cloud probabilities) based on s2cloudless
clp = S2_cube.band("CLP")
clp = clp.resample_spatial(resolution=20, method = "bicubic")
mask = (clp / 255) > 0.3  # 160m resolution s2cloudless so it does not have to use
S2_cube = S2_cube_scl.mask(mask)


In [9]:
# Option B: CLP Resample

# CLP (cloud probabilities) based on s2cloudless
clp = S2_cube.band("CLP")
clp = clp.resample_spatial(resolution=20, method = "bicubic")
clp = (clp / 255) > 0.3  # 160m resolution s2cloudless so it does not have to use
S2_cube = S2_cube.mask(clp)


- Examples of Cloud Masking


![cloudMask](images/cloudMask.png)

B) NDVI Calculation 
-   NDVI (Sentinel 2) = (B8 – B4) / (B8 + B4)


In [22]:
S2_cube = append_index(S2_cube,"NDVI")

C) NDWI Calculation 

-   NDWI (Sentinel 2) = (B3 – B8) / (B3 + B8)

In [23]:
S2_cube = append_index(S2_cube,"NDWI")

D) Mask Terrain Shadow

In [25]:
udf_code = """

import dask.array as da
import numpy as np
from numba import jit

def rasterize(azimuth, transform=None):

    azimuth = np.deg2rad(azimuth)
    xdir, ydir = np.sin(azimuth), np.cos(azimuth)

    if transform is not None:
        xdir, ydir = transform * (xdir, ydir)
        xdir -= transform.xoff
        ydir -= transform.yoff

    slope = ydir / xdir
    if slope < 1. and slope > -1.:
        xdir = 1.
        ydir = slope
    else:
        xdir = 1. / slope
        ydir = 1.
    return xdir, ydir

@jit('f8(f8, f8, f8)', nopython=True, nogil=True)
def height(xbase, ybase, angle):

    rho = (xbase**2 + ybase**2)**.5
    return rho * np.tan(angle)


@jit('boolean(f8, f8, Tuple((i8, i8)))', nopython=True, nogil=True)
def within_bounds(pixel_x, pixel_y, bounds):

    return (pixel_y < bounds[0]) and (pixel_x < bounds[1]) and (pixel_y > 0) and (pixel_x > 0)


@jit('i8[:,:](f4[:,:], Tuple((f8,f8)), f8, Tuple((f8, f8)), i8, i8)', nopython=True, nogil=True)
def hillshade(elevation_model, resolution, zenith, ray, ystart, yend):

    if max(ray) != 1.:
        raise ValueError("xy-direction is not rasterized.")
    shadow = np.zeros((yend - ystart, elevation_model.shape[1]), dtype=np.int64)
    zenith = np.deg2rad(90 - zenith)
    dx, dy = ray
    xres, yres = resolution
    z_max = elevation_model.max()
    bounds = elevation_model.shape

    for pixel_y in range(ystart, yend):
        for pixel_x in range(elevation_model.shape[1]):

            pixel_z = elevation_model[pixel_y, pixel_x]
            ray_x = float(pixel_x)
            ray_y = float(pixel_y)
            intersection = None

            while within_bounds(ray_x, ray_y, bounds):
                xbase = (ray_x - pixel_x) * xres
                ybase = (ray_y - pixel_y) * yres
                ray_z = height(xbase, ybase, zenith) + pixel_z
                if ray_z > z_max:
                    break
                if ray_z < elevation_model[int(ray_y), int(ray_x)]:
                    intersection = (ray_y, ray_x)
                    break
                ray_x += dx
                ray_y += dy

            if intersection is not None:
                shadow[pixel_y - ystart, pixel_x] = 1
    return shadow


def _run_shader(sun_zenith, sun_azimuth, elevation_model, resolution_x, resolution_y, dem_transform):

    shadow_stack = np.zeros(sun_azimuth.shape)

    for step in range(sun_azimuth.shape[0]):
        azimuth = np.nanmean(sun_azimuth[step, ...])
        zenith = np.nanmean(sun_zenith[step, ...])
        if np.all(np.isnan(zenith)):
            continue
        # Convert the azimuth into its components on the XY-plane
        ray_xdir, ray_ydir = rasterize(azimuth, dem_transform)

        # Assume chunking is already done by Dask
        ystart = 0
        yend = elevation_model.shape[1]
        # Make sure inputs have the right data type
        resolution = (float(resolution_x), float(resolution_y))
        zenith = float(zenith)
        ray = (float(ray_xdir), float(ray_ydir))

        shadow = hillshade(elevation_model[0, ].astype(np.float32),
                           resolution,
                           zenith,
                           ray,
                           ystart,
                           yend)
        shadow_stack[step, ...] = shadow.astype(np.int16).reshape(elevation_model.shape)

    shadow_stack[np.isnan(sun_azimuth)] = 255
    return shadow_stack


def terrain_shadow_mask(dem, sun_zenith, sun_azimuth, dem_transform, dem_resolution):

    dem_overlapped = da.overlap.overlap(dem.data, depth={0: 0, 1: 10, 2: 10}, boundary=0)
    sun_azimuth_overlapped = da.overlap.overlap(sun_azimuth.data,
                                                depth={0: 0, 1: 10, 2: 10},
                                                boundary=0)
    sun_zenith_overlapped = da.overlap.overlap(sun_zenith.data,
                                               depth={0: 0, 1: 10, 2: 10},
                                               boundary=0)
    shadow_overlapped = da.map_blocks(_run_shader,
                                      sun_zenith_overlapped,
                                      sun_azimuth_overlapped,
                                      dem_overlapped,
                                      dem_resolution[0],
                                      dem_resolution[1],
                                      dem_transform)
    shadow_da = da.overlap.trim_internal(shadow_overlapped, {0: 0, 1: 10, 2: 10})
    shadow = sun_zenith.copy(data=shadow_da).astype(np.int16)

    shadow = shadow.rio.set_nodata(255)
    return shadow
"""

S2_cube = S2_cube.apply_dimension(code = udf_code, runtime = "Python", dimension ="t")

E) *Create Monthly Best-Pixel Mosaic*

In [26]:
S2_median_cube = S2_cube.median_time()

Check & download results: https://editor.openeo.org/

In [15]:
# Send cube to server
S2_median_cube = S2_median_cube.save_result(format='GTiff') #GTiff #netCDF
my_job  = S2_median_cube.send_job(title="S2_L2A")
results = my_job.start_and_wait().get_results()

results.download_files(out_dir/"S2_L2A")

0:00:00 Job 'vito-93994139-fb8b-4c87-afd9-7888d32070b0': send 'start'
0:00:16 Job 'vito-93994139-fb8b-4c87-afd9-7888d32070b0': queued (progress N/A)
0:00:23 Job 'vito-93994139-fb8b-4c87-afd9-7888d32070b0': queued (progress N/A)
0:00:32 Job 'vito-93994139-fb8b-4c87-afd9-7888d32070b0': queued (progress N/A)
0:00:41 Job 'vito-93994139-fb8b-4c87-afd9-7888d32070b0': queued (progress N/A)
0:00:52 Job 'vito-93994139-fb8b-4c87-afd9-7888d32070b0': queued (progress N/A)
0:01:06 Job 'vito-93994139-fb8b-4c87-afd9-7888d32070b0': queued (progress N/A)
0:01:23 Job 'vito-93994139-fb8b-4c87-afd9-7888d32070b0': queued (progress N/A)
0:01:44 Job 'vito-93994139-fb8b-4c87-afd9-7888d32070b0': queued (progress N/A)
0:02:09 Job 'vito-93994139-fb8b-4c87-afd9-7888d32070b0': queued (progress N/A)
0:02:39 Job 'vito-93994139-fb8b-4c87-afd9-7888d32070b0': queued (progress N/A)
0:03:18 Job 'vito-93994139-fb8b-4c87-afd9-7888d32070b0': queued (progress N/A)
0:04:06 Job 'vito-93994139-fb8b-4c87-afd9-7888d32070b0': queu

JobFailedException: Batch job 'vito-93994139-fb8b-4c87-afd9-7888d32070b0' didn't finish successfully. Status: error (after 2:12:17).

In [None]:
# !gdalinfo -hist output/openEO.nc
S2_L2A = xr.open_dataset(out_dir/'S2_L2A')

# 2) *Sentinel-1 ARD Pre-Processing*

Load Collection of Sentinel-1 

![Cat](images/S1.png)

In [24]:
S1_cube = connection.load_collection('SENTINEL1_GRD', 
                                     spatial_extent = spatial_extent, 
                                     temporal_extent = [start_date, end_date], 
                                     bands = ['VH','VV'])

A)  Analysis-Ready-Data 

In [25]:
S1_cube = S1_cube.ard_normalized_radar_backscatter()

*B) Mask Urban Areas*

In [26]:
worldcover_cube = connection.load_collection("ESA_WORLDCOVER_10M_2020_V1", 
                                            temporal_extent = [start_date, end_date], 
                                            spatial_extent = spatial_extent, 
                                            bands = ["MAP"])
                                            #.filter_bbox({'west':bbox[0],'east':bbox[2],'south':bbox[1],'north':bbox[3]})

builtup_mask = worldcover_cube.band("MAP") != 50
S1_cube = S1_cube.mask(builtup_mask)

*B) Mask Radar Shadows*

*C) Mask Sentinel-1 Exclusion Layer*

*D) Create Monthly Median Mosaic*

In [None]:
S1_median_cube = S1_cube.median_time()

Check & download results: https://editor.openeo.org/

In [None]:
S1_median_cube = S1_median_cube.save_result(format='GTiff') #GTiff #netCDF
my_job  = S1_median_cube.send_job(title="S1_ARD")
results = my_job.start_and_wait().get_results()

results.download_files(out_dir/'S1_ARD')

# 3) Water Masking

![Cat](images/WM.png)

- Load LUT and run logistic expressions


In [60]:
udf_code ="""

import numpy as np
import xarray as xr


## Note this will be updated. Currently demonstration only. 
## The lookup table defines bands and logistic regression expressions 
## for each ecoregion.
LOOKUPTABLE = {
    'tropical': 
    {
        'S1': {'bands': ['VV'], 
               'expression': lambda VV: 1 / (1 + np.exp(- (-7.17 + (-0.48 * VV))))},
        'S2': {'bands': ['NDVI', 'NDWI'], 
               'expression': lambda NDVI, NDWI: 1 / (1 + np.exp(- (0.845 + (2.14 * NDVI) + (-13.5 * NDWI))))},
        'S2_S1': {'bands': ['VV', 'NDWI'], 
                  'expression': lambda VV, NDWI: 1 / (1 + np.exp(- (-2.64 + (-0.23 * VV) + (-8.6 * NDWI))))},
     },
    
    'subtropical':
    {
        'S1': {'bands': ['VV', 'VH'], 
               'expression': lambda VV, VH: 1 / (1 + np.exp(- (-8.1 + (-0.13 * VV) + (-0.27 * VH))))},
        'S2': {'bands': ['NDVI', 'NDWI'], 
               'expression': lambda NDVI, NDWI: 1 / (1 + np.exp(- (1.267 + (0.316 * NDVI) + (-11 * NDWI))))},
        'S2_S1': {'bands': ['VV', 'NDWI'], 
                  'expression': lambda VV, NDWI: 1 / (1 + np.exp(- (-2.64 + (-0.23 * VV) + (-8.6 * NDWI))))},
    }
}
    
    
def _create_img(expr_dict, dataset, sensor_value):
    
    # lambda expression to create surface water probability
    exp = expr_dict['expression']
    
    # bands required as input to the lambda expression
    bands = expr_dict['bands']
    
    # filter the dictionary to required bands identified above. 
    filtered_dict = dict((k, dataset[k]) for k in bands)
    
    # Calculate the probability using the input dataset and lambda expression.
    probability = (exp(**filtered_dict)*100).astype(np.uint8)
    
    # Create the QA band with sensor value.
    sensor_rank = xr.full_like(probability, sensor_value)
    
    # create an empty dataset and add the probability and sensor_rank outputs.
    ds = xr.Dataset()
    ds['probability'] = probability
    ds['sensor_rank'] = sensor_rank

    return ds    

def _water_mask(dataset, region_parameters):
    
    composite = xr.where(
        
        # Where valid data for both S2 and S1.
        xr.ufuncs.logical_and(~dataset['S2_mask'], ~dataset['S1_mask']), 
        
        # Create water mask with sensor_value 5
        _create_img(region_parameters['S2_S1'], dataset, 5),
        
        # Else
        xr.where(
            
            # Where Sentinel-2 data is valid.
            ~dataset['S2_mask'], 
            
            # Create water mask from sentinel-2 with sensor value 4.
            _create_img(region_parameters['S2'], dataset, 4),
            
            # If no Sentinel-2 data, use Sentinel-1 on it's own. Sensor value 3.
            _create_img(region_parameters['S1'], dataset, 3), 
            )    
        )    
    
    return composite

def water_mask(dataset, ecoregion):
   
    # Access the parameters required for that ecoregion.
    region_parameters = LOOKUPTABLE[ecoregion]
    
    # Create the water mask.
    composite = _water_mask(dataset, region_parameters)

    return composite
"""

S1 = S1_median_cube.apply_dimension(code = udf_code, runtime = "Python", dimension = "t")  
S2 = S2_median_cube.apply_dimension(code = udf_code, runtime = "Python", dimension = "t") 

- Rank and produce final water map


Check & download results: https://editor.openeo.org/

In [None]:
S1_S2 = S1_S2.save_result(format='GTiff') #GTiff #netCDF
my_job  = S1_S2.send_job(title="S1_ARD")
results = my_job.start_and_wait().get_results()

results.download_files(out_dir/'S1_ARD')