# Test Otsu thresholding on water image

The global water watch algorithm uses [Otsu thresholding](http://www.labbookpages.co.uk/software/imgProc/otsuThreshold.html) to determine the MNDWI threshold between water and land. We can implement this using local xarray implementation, which can then be passed to chunks as a [UDF](https://open-eo.github.io/openeo-python-client/udf.html) in the openeo backend.

In [None]:
# imports
from typing import List, Dict, Tuple, Union
from pathlib import Path

import geojson
from openeo import connect, Connection
from openeo.rest.datacube import DataCube
from pyproj import CRS, Proj, Transformer
from pyproj.aoi import AreaOfInterest
from shapely.geometry import MultiPolygon, Polygon
from shapely.ops import transform

from utils import Reservoir

In [None]:
# Connect to backend:
openeo_platform_url: str = "openeo.cloud"
vito_url: str = "https://openeo.vito.be/openeo/1.1"
vito_dev_url: str = "openeo-dev.vito.be"

backend_url = vito_url

con: Connection = connect(backend_url)
con.authenticate_oidc(provider_id="egi")

debug = True

out_dir: Path = Path("output")
out_dir.mkdir(parents=True, exist_ok=True)

In [None]:
# Find Level 1C product of Sentinel 2 mission
collections = con.list_collections()
if backend_url == vito_url or vito_dev_url:
    collection_id = "SENTINEL2_L1C_SENTINELHUB"
elif backend_url == openeo_platform_url:
    collection_id = "SENTINEL2_L1C"
con.describe_collection(collection_id)

In [None]:
# Get reservoirs from database
reservoir_dir: Path = out_dir / "reservoirs"

reservoirs: List[Reservoir] = Reservoir.from_gcp(reservoir_dir)

## Setup AoI and parameters
Eventually we will run the algorithm based on a certain spatial and temporal extent. There are more parameters used in the algorithm that can be finetuned later on. We therefore collect all relevant parameters in the beginning of the notebook.
We want to load the data from the backend. For visualization options, we want to load RGB. We load swir16 for the NDWI product as well, as well as nir for some NDVI filters that are applied later on.

In case of debug, we just take the bounding box of one of the reservoirs in Chzechia that show seasonal variation and extend it so that the reservoirs fit.
Otherwise the entirety of Chzechia is used.

In [None]:
import math

def get_utm_zone(lon: float) -> int:
    return math.ceil((180 + lon) / 6)

In [None]:
if debug:
    geojson_str = "{\"type\":\"Polygon\",\"coordinates\":[[[16.258372886421807,49.561646293673824],[16.314909857006697,49.561646293673824],[16.314909857006697,49.58980547068479],[16.258372886421807,49.58980547068479],[16.258372886421807,49.561646293673824]]],\"geodesic\":false}"
    gjson: geojson.Polygon = geojson.loads(geojson_str)
    bbox = Polygon(gjson.coordinates[0])
else:
    # entire chzechia
    bbox = Polygon([[12.09,51.06],[12.09, 48.55], [18.87,48.55], [18.87, 51.06], [12.09,51.06]])

# convert bbox polygon to utm zone
wgs84: CRS = CRS('EPSG:4326')
utm_zone: int = get_utm_zone(min(bbox.exterior.xy[0]))
utm: CRS = CRS(proj='utm', zone=utm_zone)
project_to_utm: Transformer = Transformer.from_crs(wgs84, utm, always_xy=True)
project_to_latlon: Transformer = Transformer.from_crs(utm, wgs84, always_xy=True)

bbox_utm = transform(project_to_utm.transform, bbox)
if debug:
    # transform and buffer 1km so all imagery plus buffers is loaded.
    bbox_utm = bbox_utm.buffer(1000.)
    bbox = transform(project_to_latlon.transform, bbox_utm)

band_names = ["green", "nir", "swir", "cloudmask", "cloudp"]
band_codes = ["B03", "B08", "B11", "CLM", "CLP"]

# after crs transform, we get a distorted box, take extremities as bbox
xys = bbox_utm.exterior.coords.xy
bbox_openeo = {
    "west": min(xys[0]),
    "east": max(xys[0]),
    "south": min(xys[1]),
    "north": max(xys[1]),
    "crs": ":".join(utm.to_authority())
}

print(f"openeo spatial extent: {bbox_openeo}")
print(f"UTM zone: {utm_zone}")
if debug:
    start = "2021-05-01"
    stop = "2021-08-01"
else:
    start = "2017-04-01"
    stop = "2021-01-01"

## Buffer reservoirs using 300m buffer
In order to pickup on flooding / high water levels, we buffer the reservoirs using a 300m buffer. As the AoI needs to be given to the `chunk_polygon` method, we this this locally and not on the cluster.

In [None]:
# Select reservoirs within bbox and buffer 300m
from copy import copy

def buffer_in_utm(reservoir, buffer_m):
    try:
        new_res = copy(reservoir)
        bounds = new_res.geometry.bounds
        min_lon = bounds[0]
        _utm_zone: int = get_utm_zone(min_lon)
        if abs(_utm_zone - utm_zone) > 1:
            # If not close to utm zone, then not in AoI
            return None
        buffered_geom = transform(project_to_utm.transform, new_res.geometry).buffer(buffer_m, 1)
        latlon_geom = transform(project_to_latlon.transform, buffered_geom)
        new_res.geometry = latlon_geom
    except ValueError as e:
        print(reservoir.geometry.wkt)
    return new_res
    

selected = list(
    filter(lambda r: bbox.covers(r.geometry),
    filter(lambda r: r is not None,
    map(lambda r: buffer_in_utm(r, 300.),
        reservoirs
    )))
)
selected_mp = MultiPolygon(list(map(lambda s: s.geometry, selected)))
selected[0].geometry

## Load optical data
Load optical data using parameters declared above. Altough the Waterwatch algorithm uses Landsat 7 & 8 missions as well as Sentinel 2, we just use Sentinel-2 here for simplicity.

In [None]:
dc_optical: DataCube = con.load_collection(
        collection_id=collection_id,
        spatial_extent=bbox_openeo,
        temporal_extent=(start, stop),
        bands=band_codes
    ).rename_labels(dimension="bands", source=band_codes, target=band_names)

## Filter optical data
Filtering happens in two steps:
1. Filter based on the cloud coverage percentage band (CLP) in the Sentinel-2 dataset. Calculate the percentile cloud chance in the AoI per image, and filter the top x percentile based on the percentile cloud expected in that area. For Chzechia we take a 35% percentile based on the MODIS cloud occurrence dataset.
2. In the AoI, calculate the data coverage per image, then filter images with too little coverage.

### filter on cloud percentages

In [None]:
def load_udf(path: Path):
    with open(path, 'r+') as f:
        return f.read()

udf_path: Path = Path.cwd().parent / "udfs" / "filter_mostly_clean_images.py"
quality_score_udf = load_udf(udf_path)

In [None]:
from shapely.geometry.base import BaseGeometry

def filter_mostly_clean_images(
    dc: DataCube,
    geometry: BaseGeometry,
    quality_score_udf: str,
    cutoff_percentile: int = 35,
    score_percentile: int = 75,
    quality_band: str = 'cloudp',
    
) -> DataCube:
    """
    filters images based on cloud coverage percentile
    """
    process = lambda data: data.run_udf(udf=quality_score_udf, runtime="Python")
    return dc.chunk_polygon(chunks=geometry, process=process, context={
        "cutoff_percentile": cutoff_percentile,
        "quality_band": quality_band,
        "score_percentile": score_percentile
    })

filtered_dc: DataCube = filter_mostly_clean_images(dc_optical, selected_mp, quality_score_udf)

### Filter on area coverage

In [None]:
def load_udf(path: Path):
    with open(path, 'r+') as f:
        return f.read()

udf_path: Path = Path.cwd().parent / "udfs" / "preprocess_polygons.py"
preprocess_polygons_udf = load_udf(udf_path)

In [None]:
def preprocess_polygons(
    dc: DataCube,
    geometry: BaseGeometry,
    minimum_filled_fraction: int = 0.35,
    quality_check_bands: List[str] = ["green", "nir", "swir"]
    
) -> DataCube:
    """
    
    """
    process = lambda data: data.run_udf(udf=preprocess_polygons_udf, runtime="Python")
    return dc.chunk_polygon(chunks=geometry, process=process, context={
        "minimum_filled_fraction": minimum_filled_fraction,
        "quality_check_bands": quality_check_bands
    })

preprocessed_dc: DataCube = preprocess_polygons(filtered_dc, selected_mp, quality_score_udf)

## Load water occurrence data

In [None]:
con.describe_collection("GLOBAL_SURFACE_WATER")

In [None]:
dc_wo: DataCube = con.load_collection(
    collection_id="GLOBAL_SURFACE_WATER",
    spatial_extent=bbox_openeo,
    bands=["occurrence"]
)

As the temporal extent works in a weird way with the water occurrence data, either from 1984 until 2019, or until 2020, we have to filter after loading in both date ranges. After of filtering, we want to drop the t-axis. This is because this does not correlate with time the same way as the optical datacube.

In [None]:
dc_wo_latest: DataCube = dc_wo.filter_temporal(extent=("2019-12-31", "2020-01-02")).drop_dimension("t")

Now we resample spatially onto the optical datacube

In [None]:
dc_wo_resampled: DataCube = dc_wo_latest.resample_cube_spatial(preprocessed_dc, method="nearest")

## Calculate MNDWI

Next step is to calculate the MNDWI of the datacube and merge this cube with the JRC datacube.

In [None]:
green: DataCube = preprocessed_dc.band("green")
swir: DataCube = preprocessed_dc.band("swir")
mndwi: DataCube = (green - swir) / (green + swir)

## Merge Water Occurrence and MNDWI
To merging two cubes where one cube has no t dimension is not supported yet: https://discuss.eodc.eu/t/merging-datacubes/310/2?u=jaapel
What we do is resample the Water Occurrence dataset on every t that is also in the mndwi datset.
For this to work, we unfortunately need to download the mndwi cube, and check the timesteps that it is in. We can then use these timesteps as an input to the `aggregate_temporal` step to "aggregate" the water occurrence dataset.
Finally we can merge the two DataCubes: first we need to add a dimension that differs between both cubes if we want to keep both values.

In [None]:
from openeo import processes

mndwi_mergeable = mndwi.add_dimension(name="bands", label="MNDWI", type="bands")
# Workaround for https://discuss.eodc.eu/t/merging-datacubes/310/5?u=jaapel
# mndwi_mergeable = mndwi_mergeable.aggregate_temporal(daterange, reducer=processes.max)
mndwi_mergeable.metadata.dimension_names()

Multiply the datacube by 1.0 otherwise we try to merge cubes with different data types (int16 vs float32)

In [None]:
dc_wo_m: DataCube = dc_wo_resampled.drop_dimension("bands") * 1.0
dc_wo_m = dc_wo_m.add_dimension(name="bands", label="wo", type="bands")
dc_wo_m.metadata.dimension_names()

## Merge DataCube
Now merge the aggregated Water Occurrence cube on the MNDWI cube

In [None]:
from openeo import processes

dc_merged: DataCube = mndwi_mergeable.merge_cubes(dc_wo_m)

## Download and inspect result

In [None]:
from openeo.rest.job import RESTJob
job: RESTJob = dc_merged.create_job("netcdf", title="merging_wo", description="merging water occurrence.")
job = job.start_and_wait()

In [None]:
merged_path = out_dir / "merged.nc"
job.get_results().get_assets()[0].download(merged_path)

In [None]:
import rioxarray
import xarray as xr

merged_path = out_dir / "merged.nc"
fixed_merged_path: Path = out_dir / "merged_fixed.nc"
ds_merged: xr.Dataset = rioxarray.open_rasterio(merged_path)
ds_merged = ds_merged.drop("crs")
ds_merged.to_netcdf(fixed_merged_path)
ds_merged

In [None]:
import cartopy.crs as ccrs

import geoviews as gv
import holoviews as hv
import numpy as np

from holoviews import opts, streams
from holoviews.element.tiles import OSM

gv.extension("bokeh","matplotlib")

In [None]:
kdims = ["x", "y", "t"]
vdims = ["wo", "MNDWI"]

hv.Dimension.type_formatters[np.datetime64] = '%Y-%m-%d-%H:%M'  # readable time format
gv_merged = gv.Dataset(ds_merged, kdims=kdims, vdims=vdims, crs=ccrs.UTM(utm_zone)).redim(x="lon", y="lat")
# gv_merged = gv.Dataset(da_merged, kdims=kdims, vdims=vdims).redim(x="lon", y="lat")
print(repr(gv_merged))

In [None]:
dmap = gv_merged.to(gv.Image, ["lon", "lat"], "MNDWI", group="raw_data", label="raw", datatype=["xarray"], dynamic=True)
overlay = OSM() * dmap
overlay.opts(
    # opts.Image(cmap="turbo", colorbar=True, clim=(0, 100), alpha=0.8, height=500, width=500, tools=["hover"]),
    opts.Image(cmap="turbo", colorbar=True, clim=(-1, 1), alpha=0.8, height=500, width=500, tools=["hover"]),
    opts.Tiles(height=500, width=500))

overlay

## Local dataset
First we download the MNDWI datacube and experiment with it locally

In [None]:
job: RESTJob = mndwi.send_job("netcdf", "mndwi", description="mndwi")
job = job.start_and_wait()

In [None]:
mndwi_path: Path = out_dir / "mndwi.nc"
job.get_results().get_assets()[0].download(mndwi_path)

In [None]:
import rioxarray
import xarray as xr


fixed_mndwi_path: Path = out_dir / "mndwi_fixed.nc"
ds_mndwi: xr.Dataset = rioxarray.open_rasterio(mndwi_path)
da_mndwi = ds_mndwi.drop("crs")
da_mndwi.to_netcdf(fixed_mndwi_path)

In [None]:
da_mndwi

In [None]:
mndwi_da = ds_merged["MNDWI"]

In [None]:
import cartopy.crs as ccrs

import geoviews as gv
import holoviews as hv
import numpy as np

from holoviews import opts, streams
from holoviews.element.tiles import OSM

gv.extension("bokeh","matplotlib")

In [None]:
kdims = ["x", "y", "t"]
vdims = ["var"]

hv.Dimension.type_formatters[np.datetime64] = '%Y-%m-%d-%H:%M'  # readable time format
gv_mndwi = gv.Dataset(da_mndwi, kdims=kdims, vdims=vdims, crs=ccrs.UTM(utm_zone)).redim(x="lon", y="lat")

print(repr(gv_mndwi))

In [None]:
dmap_mndwi = gv_mndwi.to(gv.Image, ["lon", "lat"], "var", group="raw_data", label="raw", datatype=["xarray"], dynamic=True)
overlay_mndwi = OSM() * dmap_mndwi
overlay_mndwi.opts(
    opts.Image(cmap="turbo", colorbar=True, clim=(-1, 1), alpha=0.8, height=500, width=500, tools=["hover"]),
    opts.Tiles(height=500, width=500))

overlay_mndwi

## Get otsu around water edges

The algorithm uses Canny edge detection to detect changes around the water edges in the reservoir. Then we use otsu thresholding to define a correct mndwi threshold for filtering non-water/water pixels around these edges.

In [None]:
%matplotlib inline

In [None]:
mndwi_da.dims

In [None]:
from skimage.filters import threshold_otsu
from skimage.feature import canny

import matplotlib.pyplot as plt
import matplotlib as mpl

# non_nan = da_mndwi.ffill("x").bfill("x").ffill("y").bfill("y")
# non_nan = ds_merged["MNDWI"].ffill("x").bfill("x").ffill("y").bfill("y")
t = 3
mndwi_da = ds_merged["MNDWI"].isel(t=t)
isnan = np.isnan(mndwi_da)
masked_mndwi = np.ma.array(mndwi_da.values, mask=isnan, fill_value=np.NaN)
can = canny(masked_mndwi, 0.7, 0.5, 1)

flat = mndwi_da.values.ravel()
flat = flat[~np.isnan(flat)]
thresh = threshold_otsu(flat, nbins=100)
print(f"otsu: {thresh}")
plt.imshow(isnan)
plt.colorbar()

In [None]:
cmap = mpl.cm.turbo
norm = mpl.colors.Normalize(vmin=-1, vmax=1)
plt.imshow(mndwi_da, cmap=cmap, norm=norm)
plt.colorbar()

In [None]:
mndwi_da.values

In [None]:
plt.imshow(can)
plt.colorbar()

In [None]:
import skimage.morphology

dilated = skimage.morphology.dilation(can, footprint=np.ones([3, 3]))
dilated = np.ma.array(dilated, mask=isnan, fill_value=np.NaN)
cmap = mpl.cm.turbo
norm = mpl.colors.Normalize(vmin=-2, vmax=2)
plt.imshow(dilated, cmap=cmap, norm=norm)
plt.colorbar()

In [None]:
plt.imshow(isnan, cmap=cmap, norm=norm)
plt.colorbar()

In [None]:
mndwi_edge = np.ma.array(masked_mndwi, mask=np.logical_or(isnan, ~dilated), fill_value=np.NaN)
plt.imshow(mndwi_edge)

Zoom as I cannot see these tiny specks

In [None]:
cmap = mpl.cm.turbo
norm = mpl.colors.Normalize(vmin=-1, vmax=1)
plt.imshow(mndwi_edge[270:319, 130:199], cmap=cmap, norm=norm)
plt.colorbar()

In [None]:
flat = mndwi_edge[~mndwi_edge.mask]
flat = flat[~np.isnan(flat)]
flat

In [None]:
th = threshold_otsu(flat, nbins=100)
th

In [None]:
water = mndwi_da > th
water = np.ma.array(water, mask=isnan, fill_value=np.NaN)
plt.imshow(water)
plt.colorbar()

In [None]:
area = water.sum() * 10e2 #m2
area  # 21618000.0

## Download Global Water Occurrence dataset

In [None]:
# con.describe_collection("GLOBAL_SURFACE_WATER")

In [None]:
water_occurrence: DataCube = con.load_collection(
    collection_id="GLOBAL_SURFACE_WATER",
    spatial_extent=bbox_openeo,
    bands=["occurrence"]
)

In [None]:
job: RESTJob = water_occurrence.send_job("netcdf", "wo", description="wo")
job = job.start_and_wait()

In [None]:
wo_path = out_dir / "wo.nc"
job.get_results().get_assets()[0].download(wo_path)

In [None]:
import rioxarray
import xarray as xr

wo_path = out_dir / "wo.nc"
fixed_wo_path: Path = out_dir / "wo_fixed.nc"
ds_wo: xr.Dataset = rioxarray.open_rasterio(wo_path)
da_wo = ds_wo.drop("crs")
da_wo.to_netcdf(fixed_wo_path)
da_wo

In [None]:
ds_merged["wo"]

In [None]:
plt.imshow(ds_merged["wo"].isel(t=t).values)

In [None]:
# reproject onto utm
import rioxarray

da_wo = da_wo.rio.write_crs(4326)
da_wo

In [None]:
da_wo_utm = da_wo.rio.reproject(da_wo.rio.estimate_utm_crs())
print(f"new crs: {da_wo_utm.rio.crs}")
da_wo_utm
plt.imshow(da_wo_utm.isel(t=1))

In [None]:
ds_merged["MNDWI"].coords.get("x")

In [None]:
# Then start resample on mdnwi datacube
da_wo_interp = da_wo_utm.interp(coords={"x": ds_merged["MNDWI"].coords.get("x"), "y": ds_merged["MNDWI"].coords.get("y")}, assume_sorted=False)
da_wo_interp

In [None]:
plt.imshow(da_wo_interp.isel(t=1))

## Use merged cube

In [None]:
wo = ds_merged["wo"].isel(t=t)

In [None]:
wo_edge = np.ma.array(wo, mask=np.logical_or(isnan, ~dilated), fill_value=np.NaN)
# wo_edge = wo.where(dilated)
wo_flat = wo_edge[~wo_edge.mask]
wo_flat = wo_flat[~np.isnan(wo_flat)]
p = np.median(wo_flat)
p

In [None]:
plt.imshow(wo[270:319, 130:199])

In [None]:
plt.imshow(wo_edge[270:319, 130:199])

In [None]:
plt.imshow(isnan)
plt.colorbar()

In [None]:
# get correct "water height" and exclude low values of mndwi (we're sure it's land)
water_fill_JRC = wo.values > p
water_fill_JRC = np.ma.array(water_fill_JRC, mask=isnan, fill_value=np.NaN)
nonwater = mndwi_da.values < -0.15
plt.imshow(water_fill_JRC)
plt.colorbar()

In [None]:
plt.imshow(nonwater)
plt.colorbar()

In [None]:
# Now filter where we already have a value for "water"
# water_fill_JRC.mask = np.logical_or(water, water_fill_JRC.mask)
# water_fill = water_fill_JRC.where(~water)
water_fill = np.logical_and(nonwater, water_fill_JRC)
plt.imshow(water_fill)
plt.colorbar()

In [None]:
# area_filled = water_fill.where(~np.isnan(water_fill)).sum() * 10e2  # m2
area_filled = water_fill.sum() * 10e2
# print(f"area filled: {area_filled.values[()]}")
print(f"area filled: {area_filled}")
filled_fraction = area_filled / area
# print(f"filled fraction: {filled_fraction.values[()]}")
print(f"filled fraction: {filled_fraction}")

In [None]:
total_water = water_fill + water
assert total_water.sum() * 10e2 == area_filled + area

plt.imshow(total_water)
plt.colorbar()

## Make udf-ready
A UDF uses xr.DataArray as input and output. Also, this functionality needs to work for all t.

In [None]:
print(mndwi_da.attrs)

In [None]:
from skimage.filters import threshold_otsu
from skimage.feature import canny
import skimage.morphology

import matplotlib.pyplot as plt
import matplotlib as mpl

t = 3

mndwi_da = ds_merged["MNDWI"]
print(mndwi_da.dims)
wo = ds_merged["wo"]
print(wo.dims)

isnan = np.isnan(mndwi_da).values
masked_mndwi = np.ma.array(mndwi_da.values, mask=isnan, fill_value=np.NaN)
masked_wo = np.ma.array(wo.values, mask=isnan, fill_value=np.NaN)
for i in range(masked_mndwi.shape[0]):
    mndwi_slice = masked_mndwi[i, :, :]
    wo_slice = masked_wo[i, :, :]
    nanmask = isnan[i, :, :]
    
    edge_image = canny(mndwi_slice, sigma=0.7, low_threshold=0.5, high_threshold=1)
    dilated = skimage.morphology.dilation(edge_image, footprint=np.ones([3, 3]))
    dilated = np.ma.array(dilated, mask=nanmask, fill_value=np.NaN)
    mndwi_edge = np.ma.array(mndwi_slice, mask=np.logical_or(nanmask, ~dilated))
    flat = mndwi_edge[~mndwi_edge.mask]
    flat = flat[~np.isnan(flat)]
    th = threshold_otsu(flat, nbins=100)
    print(f"otsu threshold: {th}")
    water = mndwi_slice > th
    water = np.ma.array(water, mask=nanmask, fill_value=np.NaN)
    area = water.sum() * 10e2 #m2
    print(f"area: {area}")
    
    wo_edge = np.ma.array(wo_slice, mask=np.logical_or(nanmask, ~dilated), fill_value=np.NaN)
    wo_flat = wo_edge[~wo_edge.mask]
    wo_flat = wo_flat[~np.isnan(wo_flat)]
    p = np.median(wo_flat)
    
    water_fill_JRC = wo_slice > p
    water_fill_JRC = np.ma.array(water_fill_JRC, mask=nanmask, fill_value=np.NaN)
    nonwater = mndwi_slice < -0.15
    water_fill = np.logical_and(nonwater, water_fill_JRC)
    area_filled = water_fill.sum() * 10e2
    print(f"area filled: {area_filled}")
    filled_fraction = area_filled / area
    print(f"filled fraction: {filled_fraction}")
    total_water = water_fill + water
    print(f"total_water_area: {total_water.sum() * 10e2}")
    print(area_filled + area)
    assert total_water.sum() * 10e2 == area_filled + area
    
    total_water = np.expand_dims(total_water, axis=0)
    water_fill = np.expand_dims(water_fill, axis=0)
    water = np.expand_dims(water, axis=0)
    
    if i == 0:
        w = water
        wf = water_fill
        tw = total_water
    else:
        w = np.append(w, water, axis=0)
        wf = np.append(wf, water_fill, axis=0)
        tw = np.append(tw, total_water, axis=0)

da_w = xr.DataArray(data=w, coords=mndwi_da.coords, dims=mndwi_da.dims)
da_wf = xr.DataArray(data=wf, coords=mndwi_da.coords, dims=mndwi_da.dims)
da_tw = xr.DataArray(data=tw, coords=mndwi_da.coords, dims=mndwi_da.dims)

## Test UDF
The previous functionality is inserted into a udf, that we will now use for a mapping operation using `chunk_polygon`.

In [None]:
def load_udf(path: Path):
    with open(path, 'r+') as f:
        return f.read()

udf_path: Path = Path.cwd().parent / "udfs" / "gww.py"
gww_udf = load_udf(udf_path)

### Execute locally
We cannot add bands that were not there before, so first we add and copy some random bands (in this case from the MNDWI band label).
First download cube with added bands

In [None]:
water = dc_merged.filter_bands(["MNDWI"]).apply(lambda _: processes.int(1)).rename_labels(dimension="bands", target=["water"], source=["MNDWI"])
water_fill = dc_merged.filter_bands(["MNDWI"]).apply(lambda _: processes.int(1)).rename_labels(dimension="bands", target=["water_fill"], source=["MNDWI"])
total_water = dc_merged.filter_bands(["MNDWI"]).apply(lambda _: processes.int(1)).rename_labels(dimension="bands", target=["total_water"], source=["MNDWI"])
udf_dc = dc_merged.merge_cubes(water).merge_cubes(water_fill).merge_cubes(total_water)

Try out water dc

In [None]:
job = water.create_job("netcdf", title="water", description="water")
job = job.start_and_wait()

In [None]:
water_path = out_dir / "water.nc"
job.get_results().get_assets()[0].download(water_path)

In [None]:
import rioxarray
import xarray as xr

water_path = out_dir / "water.nc"
ds_water: xr.Dataset = rioxarray.open_rasterio(water_path)

In [None]:
ds_water

In [None]:
job.get_results().get_assets()

Now try entire set of extra bands

In [None]:
job = udf_dc.create_job("netcdf", title="udf_dc", description="download dc for udf testing locally")
job = job.start_and_wait()

In [None]:
udf_dc_path = out_dir / "udf_dc.nc"
job.get_results().get_assets()[0].download(udf_dc_path)

In [None]:
import rioxarray
import xarray as xr

udf_dc_path = out_dir / "udf_dc.nc"
fixed_udf_dc_path: Path = out_dir / "udf_dc_fixed.nc"
ds_udf: xr.Dataset = rioxarray.open_rasterio(udf_dc_path)
ds_udf = ds_udf.drop("crs")
ds_udf.to_netcdf(fixed_udf_dc_path)

In [None]:
ds_udf

In [None]:
from openeo.udf import execute_local_udf
from openeo.udf.udf_data import UdfData
result: UdfData = execute_local_udf(gww_udf, udf_dc_path, fmt='netcdf')

In [None]:
result_da = result.datacube_list[0].get_array()

In [None]:
result_da.coords

In [None]:
result_da.drop_sel({"bands": ["water_fill", "water", "total_water"]})

In [None]:
plt.imshow(result_da.sel(bands="water_fill").isel(t=3))

In [None]:
from openeo import processes

def run_gww_algorithm(
    dc: DataCube,
    geometry: BaseGeometry
) -> DataCube:
    # We need these bands to be available in the cube
    water = dc_merged.filter_bands(["MNDWI"]).apply(lambda _: processes.int(1)).rename_labels("bands", target=["water"], source=["MNDWI"])
    water_fill = dc_merged.filter_bands(["MNDWI"]).apply(lambda _: processes.int(1)).rename_labels("bands", target=["water_fill"], source=["MNDWI"])
    total_water = dc_merged.filter_bands(["MNDWI"]).apply(lambda _: processes.int(1)).rename_labels("bands", target=["total_water"], source=["MNDWI"])
    dc = dc.merge_cubes(water).merge_cubes(water_fill).merge_cubes(total_water)
    process = lambda data: data.run_udf(udf=gww_udf, runtime="Python")
    return dc.chunk_polygon(chunks=geometry, process=process, context={
        "mndwi_band": "MNDWI",
        "wo_band": "wo"
    })

gww_dc = run_gww_algorithm(dc_merged, selected_mp)

In [None]:
job = gww_dc.create_job("netcdf", title="gww_udf", description="gww_udf")
job = job.start_and_wait()

In [None]:
gww_path = out_dir / "gww.nc"
job.get_results().get_assets()[0].download(gww_path)

In [None]:
import rioxarray
import xarray as xr

gww_path = out_dir / "gww.nc"
fixed_gww_path: Path = out_dir / "gww_fixed.nc"
ds_gww: xr.Dataset = rioxarray.open_rasterio(gww_path)
ds_gww = ds_gww.drop("crs")
ds_gww.to_netcdf(fixed_gww_path)

In [None]:
ds_gww

In [None]:
import cartopy.crs as ccrs

import geoviews as gv
import holoviews as hv
import numpy as np

from holoviews import opts, streams
from holoviews.element.tiles import OSM

gv.extension("bokeh","matplotlib")

In [None]:
kdims = ["x", "y", "t"]
vdims = ["MNDWI", "wo", "water", "water_fill", "total_water"]

hv.Dimension.type_formatters[np.datetime64] = '%Y-%m-%d-%H:%M'  # readable time format
gv_gww = gv.Dataset(ds_gww, kdims=kdims, vdims=vdims, crs=ccrs.UTM(utm_zone)).redim(x="lon", y="lat")

print(repr(gv_gww))

In [None]:
dmap_mndwi = gv_gww.to(gv.Image, ["lon", "lat"], "total_water", group="raw_data", label="raw", datatype=["xarray"], dynamic=True)
overlay_mndwi = OSM() * dmap_mndwi
overlay_mndwi.opts(
    opts.Image(cmap="turbo", colorbar=True, clim=(0, 1), alpha=0.8, height=500, width=500, tools=["hover"]),
    opts.Tiles(height=500, width=500))

overlay_mndwi