In [1]:
# imports
from openeo import connect, Connection
from openeo.rest.datacube import DataCube
from typing import List, Dict, Tuple, Union
from pathlib import Path
import xarray as xr
import hvplot.xarray

%matplotlib inline

In [2]:
# Connect to VITO backend
vito_url: str = "https://openeo.vito.be/openeo/1.0"
vito_creo_url: str = "https://openeo.creo.vito.be"
vito_dev_url: str = "https://openeo-dev.vito.be/openeo/1.0/"
ee_url: str = "https://earthengine.openeo.org"

# con: Connection = connect(ee_url).authenticate_basic(username="group1", password="test123")
con: Connection = connect(vito_url)
con.authenticate_oidc(provider_id="egi")

out_dir = Path("output")
out_dir.mkdir(parents=True, exist_ok=True)

Authenticated using refresh token.


In [5]:
# con.list_collections()
con.describe_collection("TERRASCOPE_S2_TOC_V2")

In [None]:
# con.list_processes()

# Create function for viewing results
We want to check every step of the process. Sometime we also want to try larger dataset. This function shortcuts some copy-pasting in this code.

In [14]:
def get_xarray_from_dc(dc: DataCube, out_directory: Path, name: str = "aquamonitor") -> Tuple[xr.Dataset, List[Path]]:
    job: RESTJob = dc.execute_batch(
        title=name,
        description=name,
        out_format="NetCDF"
    )

    results: JobResults = job.get_results()
    files = results.download_files(out_directory)
    return xr.open_dataset(files[0], engine="netcdf4"), files

# Create Mosaic of Landsat imagery.

GEE backend cannot merge multiple datasets using `DataCube.merge` resulting in 500 errors. The backend is not supported anymore, according to my [issue on GitHub](https://github.com/Open-EO/openeo-earthengine-driver/issues/68). We therefore revert to the VITO backend. They only have one LANDSAT dataset available, `LANDSAT8_L2`. A function is used to merge multiple data sources into one.

In [7]:
out_dir = Path("output")
out_dir.mkdir(parents=True, exist_ok=True)

band_names: str = ["swir1", "nir", "green"]  # Rock on

# Get the collections with bandames needed
collections: List[Tuple[Union[str, List[str]]]] = [
        # ('LANDSAT/LT4_L1T_TOA', ['B5', 'B2']), -> GEE
        # ('LANDSAT/LT5_L1T_TOA', ['B5', 'B2']), -> GEE
        # ('LANDSAT/LE7_L1T_TOA', ['B5', 'B2']), -> GEE
        # ('LANDSAT/LC8_L1T_TOA', ['B6', 'B3']) -> GEE
#         ("LANDSAT8_L1C", ["B06", "B05", "B03"]),
        ("TERRASCOPE_S2_TOC_V2", ["B11", "B08", "B03"]),
        # ("SENTINEL2_L1C_SENTINELHUB", ["B11", "B08", "B03"]),
    ]
# Define constraints for loading
denia_harbour_bbox: Dict[str, Union[float, str]] = {"west": 0.10594089795383788, "east": 0.12937267590793944, "south": 38.83464299556706, "north": 38.85035302841166, "crs": "EPSG:4326"}
spain_bounding_box: Dict[str, Union[float, str]] = {"west": -9.39288367353, "east": 3.03948408368, "south": 35.946850084, "north": 43.7483377142, "crs": "EPSG:4326"}
# temporal_extent: List[str] = ["2021-04-26", "2021-04-30"]
temporal_extent: List[str] = ["2019-01-01", "2021-01-01"]

combined_tiff_filename: str = "combined.tiff"
combined_netcdf_filename: str = "combined.nc"

# # Explore datasets
# dc.download(out_dir / l4_tiff_filename, format="GTIFF-THUMB")

# Get and merge collections
for i, cl in enumerate(collections):
    if not collections: raise RuntimeError("Please list collections to use.")
    dc: DataCube = con.load_collection(
        collection_id=cl[0],
        spatial_extent=denia_harbour_bbox,
        temporal_extent=temporal_extent,
        bands=cl[1]
    ).add_dimension(name="source_name", label=cl[0], type="other") \
    .rename_labels(dimension="bands", source=cl[1], target=band_names)
    if i==0:
        combined_dc: DataCube = dc
    else:
        combined_dc: DataCube = combined_dc.merge_cubes(dc)

In [None]:
combined_dc.download(out_dir / combined_netcdf_filename, format="netcdf")
# Also download for udf testing in JSON format
# combined_dc.download(out_dir / "combined.json", format="Json")

In [None]:
data_set_raw, files = get_xarray_from_dc(combined_dc, out_directory=out_dir, name="get_collection")

In [None]:
# data_set: xr.DataSet = xr.open_dataset(out_dir / combined_netcdf_filename)
data_set_raw
data_set_raw["green"].hvplot(
    groupby="t",
    cmap="turbo",
    widget_type="scrubber",
    widget_location="bottom"
)

## Divide the data into time-bands
Data is divided in bands of a variable amount of years. Sort the data and take n-th percentile of every time-band.

In [10]:
from openeo.processes import quantiles
from pathlib import Path
from datetime import timedelta
import pandas as pd

def load_udf(udf_path: Union[str, Path]):
    with open(udf_path, "r+") as f:
        return f.read()

year_interval: int = 1
percentile: float = 0.2  # fractions [0 - 1]
bucketed_netcdf_filename: str = "bucketed.nc"

dr: pd.DatetimeIndex = pd.date_range(start=temporal_extent[0], end=temporal_extent[1], freq=f"{year_interval}YS")
# dr: pd.DatetimeIndex = pd.date_range(start=temporal_extent[0], end=temporal_extent[1], freq=f"2MS")

In [None]:
# Trial with udf
percentile_udf_path: Path = Path(Path.cwd().parent / "udfs" / "percentile.py")

percentile_udf: str = load_udf(percentile_udf_path)

# from openeo.udf import execute_local_udf

# execute_local_udf(percentile_udf, out_dir / "combined.json", fmt="json")

combined_dc \
    .filter_bands("green") \
    .apply_dimension(code = percentile_udf, runtime="Python", dimension="t") \
    .add_dimension(name="bands", label="green", type="bands") \
    .download(out_dir / "green.cf", format="netcdf")

# for i, date in enumerate(dr[:-1]):
#     extent: List[str] = [date.date(), dr[i+1].date()]
#     # Sort each band
#     dc: DataCube = combined_dc \
#         .filter_temporal(extent=extent)
    
#     time_band = extent[1]-extent[0]
#     dt = timedelta(days=time_bands.days)

#     # Get percentile
#     for band in band_names:
#         band_percentile: DataCube = dc \
#             .filter_bands(band) \
#             .apply_dimension(code = percentile_udf, runtime="Python", dimension="t")
#         if i==0:
#             t_dc: DataCube = band_percentile
#         else:
#             t_dc = t_dc.merge_cubes(t_dc)
#     if i==0:
#         t_bucketed_dc: DataCube = t_dc
#     else:
#         t_bucketed_dc = t_bucketed_dc.merge_cubes(t_dc)

In [11]:
# Trial with aggregate_temporal
t_intervals = [[str(d), str(dr[i+1])] for i, d in enumerate(dr[:-1])]
t_bucketed_dc: DataCube = combined_dc \
    .aggregate_temporal(
        intervals=t_intervals,
        reducer=lambda data: quantiles(data, probabilities=[percentile]),
        labels=[t_int[0] for t_int in t_intervals]
    )
print(len(t_intervals))
# .filter_temporal(start_date="2020-12-01", end_date="2020-12-31") \

2


In [None]:
# Trial with built-in quantiles
for i, date in enumerate(dr[:-1]):
    extent: List[str] = [date.date(), dr[i+1].date()]
    # Sort each band
    dc: DataCube = combined_dc \
        .filter_temporal(extent=extent)
    
    time_band = extent[1]-extent[0]
    dt = timedelta(days=time_bands.days)

    # Get percentile
    dc = dc.apply_dimension(dimension="t", target_dimension="bands", process=lambda data: quantiles(data, probabilities=[percentile]))
    t_dc = dc.add_dimension(name="t", label=str(extent[0] + dt / 2), type="temporal")

    if i==0:
        t_bucketed_dc: DataCube = t_dc
    else:
        t_bucketed_dc = t_bucketed_dc.merge_cubes(t_dc)

In [None]:
t_bucketed_dc.download(out_dir / bucketed_netcdf_filename, format="netcdf")

In [None]:
data_set_bucketed, files = get_xarray_from_dc(t_bucketed_dc, out_dir, 't_bucketing_aquamonitor')

In [None]:
data_set_bucketed

# data_set_bucketed["green"].hvplot(
#     groupby="t",
#     cmap="turbo",
#     widget_type="scrubber",
#     widget_location="bottom"
# )

## Create the time-bucketed NDVI cube
To detect surface-to-water changes and vice-versa, we rely on the NDWI: $$NDWI = \frac{\rho_{green} - \rho_{nir}}{\rho_{green} + \rho_{nir}}$$

In [12]:
from openeo.rest.job import RESTJob, JobResults

green: DataCube = t_bucketed_dc.band("green")
nir: DataCube = t_bucketed_dc.band("nir")
ndwi: DataCube = (green - nir) / (green + nir)

In [None]:
# data_set_ndwi, files = get_xarray_from_dc(ndwi, out_dir, "ndwi_calc_aquamonitor")
    
ndwi.download(out_dir / "ndwi.nc", format="NetCDF")
# Also download dataset for local testing with linear regression
# ndwi.download(out_dir / "ndwi.json", format="json")

In [15]:
data_set_ndwi, files = get_xarray_from_dc(ndwi, out_dir, 'ndwi_calculation')
data_set_ndwi["var"].hvplot(
    groupby="t",
    cmap="turbo",
    widget_type="scrubber",
    widget_location="bottom"
)

0:00:00 Job 'fece5e14-fe24-4db7-8d37-266df18b84ce': send 'start'
0:00:48 Job 'fece5e14-fe24-4db7-8d37-266df18b84ce': queued (progress N/A)
0:00:54 Job 'fece5e14-fe24-4db7-8d37-266df18b84ce': queued (progress N/A)
0:01:01 Job 'fece5e14-fe24-4db7-8d37-266df18b84ce': queued (progress N/A)
0:01:10 Job 'fece5e14-fe24-4db7-8d37-266df18b84ce': queued (progress N/A)
0:01:20 Job 'fece5e14-fe24-4db7-8d37-266df18b84ce': queued (progress N/A)
0:01:33 Job 'fece5e14-fe24-4db7-8d37-266df18b84ce': queued (progress N/A)
0:01:49 Job 'fece5e14-fe24-4db7-8d37-266df18b84ce': queued (progress N/A)
0:02:09 Job 'fece5e14-fe24-4db7-8d37-266df18b84ce': queued (progress N/A)
0:02:33 Job 'fece5e14-fe24-4db7-8d37-266df18b84ce': queued (progress N/A)
0:03:04 Job 'fece5e14-fe24-4db7-8d37-266df18b84ce': queued (progress N/A)
0:03:42 Job 'fece5e14-fe24-4db7-8d37-266df18b84ce': queued (progress N/A)
0:04:29 Job 'fece5e14-fe24-4db7-8d37-266df18b84ce': queued (progress N/A)
0:05:28 Job 'fece5e14-fe24-4db7-8d37-266df18b84

## Perform a linear regression on each band
Compute the linear regression for the ndwi we use as our wetness indicator. The slope of the linear regression will be the indicator water-to-surface changes or vice-versa.

In [50]:
def load_udf(udf_path: Union[str, Path]):
    with open(udf_path, "r+") as f:
        return f.read()

linear_regression_udf_path: Path = Path(Path.cwd().parent / "udfs" / "linear_regression.py")
linear_regression_udf: str = load_udf(linear_regression_udf_path)

from openeo.udf import execute_local_udf

execute_local_udf(linear_regression_udf, out_dir / "out", fmt="json")

lin_reg: DataCube = ndwi.reduce_temporal_udf(code=linear_regression_udf, runtime="Python")

In [None]:
lin_reg.download(out_dir / "lin_reg.nc", format="netcdf")

In [51]:
data_set_lin_reg, files = get_xarray_from_dc(lin_reg, out_dir, 'linear_regression aquamonitor')

0:00:00 Job '9c2acf32-2b21-42d2-9e40-1a0b5290051a': send 'start'
0:00:44 Job '9c2acf32-2b21-42d2-9e40-1a0b5290051a': queued (progress N/A)
0:00:50 Job '9c2acf32-2b21-42d2-9e40-1a0b5290051a': queued (progress N/A)
0:00:57 Job '9c2acf32-2b21-42d2-9e40-1a0b5290051a': queued (progress N/A)
0:01:06 Job '9c2acf32-2b21-42d2-9e40-1a0b5290051a': queued (progress N/A)
0:01:17 Job '9c2acf32-2b21-42d2-9e40-1a0b5290051a': queued (progress N/A)
0:01:30 Job '9c2acf32-2b21-42d2-9e40-1a0b5290051a': queued (progress N/A)
0:01:46 Job '9c2acf32-2b21-42d2-9e40-1a0b5290051a': queued (progress N/A)
0:02:06 Job '9c2acf32-2b21-42d2-9e40-1a0b5290051a': queued (progress N/A)
0:02:30 Job '9c2acf32-2b21-42d2-9e40-1a0b5290051a': queued (progress N/A)
0:03:01 Job '9c2acf32-2b21-42d2-9e40-1a0b5290051a': queued (progress N/A)
0:03:39 Job '9c2acf32-2b21-42d2-9e40-1a0b5290051a': queued (progress N/A)
0:04:27 Job '9c2acf32-2b21-42d2-9e40-1a0b5290051a': queued (progress N/A)
0:05:26 Job '9c2acf32-2b21-42d2-9e40-1a0b529005

In [52]:
data_set_lin_reg
# data_set_lin_reg["var"].hvplot(
# #     groupby="t",
#     cmap="turbo",
#     widget_type="scrubber",
#     widget_location="bottom"
# )

## Debugging
Cell for minimal code to send in Stories for debugging purposes

In [None]:
from openeo import connect, Connection
from openeo.rest.datacube import DataCube
from typing import Dict, Union, List
import hvplot.xarray
import pathlib

import xarray as xr

vito_url: str = "https://openeo.vito.be/openeo/1.0"
con: Connection = connect(vito_url)
con.authenticate_oidc(provider_id="egi")

out_dir = pathlib.Path("output")
out_dir.mkdir(parents=True, exist_ok=True)

denia_harbour_bbox: Dict[str, Union[float, str]] = {"west": 0.10594089795383788, "east": 0.12937267590793944, "south": 38.83464299556706, "north": 38.85035302841166, "crs": "EPSG:4326"}
temporal_extent: List[str] = ["2020-08-01", "2021-01-01"]

collection = ("LANDSAT8_L1C", ["B06", "B05", "B03"])
band_names: str = ["swir1", "nir", "green"]
dc: DataCube = con.load_collection(
        collection_id=collection[0],
        spatial_extent=denia_harbour_bbox,
        temporal_extent=temporal_extent,
        bands=collection[1]
    ).add_dimension(name="source_name", label=collection[0], type="other") \
    .rename_labels(dimension="bands", source=collection[1], target=band_names)

green: DataCube = dc.filter_bands("green")
nir: DataCube = dc.filter_bands("nir")
ndwi: DataCube = (green - nir) / (green + nir)

ndwi.download(out_dir / "test.nc", format="netcdf")

In [None]:
dataset: xr.Dataset = xr.open_dataset(out_dir / "test.nc")

dataset["green"].hvplot(
    groupby="t",
    cmap="turbo",
    widget_type="scrubber",
    widget_location="bottom"
)