# Earth Observation data handling using Python

## 00 - Generics

### Add project directory to Python path

This code defines two functions and retrieves the project directory path. It's useful when we want to define some generic functions that can be imported. If you retrieve the project directoy path like this, it both works in an Ipython and Python environment. 

- `is_interactive()`: Checks if the code is running in an interactive environment.
- `get_proj_dir()`: Determines the project directory path based on the execution context. If running interactively, it infers the project directory from the Jupyter kernel. Otherwise, it infers it from the Python file. The function returns the project directory as a `pathlib.Path` object.

In [None]:
import os
import pathlib
import sys


def is_interactive() -> bool:
    """
    Check if the Python code is running in an interactive environment.
    """
    import __main__ as main

    return not hasattr(main, "__file__")


def get_proj_dir() -> pathlib.Path:
    """
    Get the project directory path.

    Returns:
        A `pathlib.Path` object representing the project directory path.
    """
    if is_interactive():
        print("Inferring project directory from the Jupyter kernel.")
        cwd = pathlib.Path().resolve()
        proj_dir = cwd.parent
    else:
        print("Inferring project directory from the Python file.")
        cwd = pathlib.Path(__file__)
        proj_dir = cwd.parent.parent

    return proj_dir


proj_dir: pathlib.Path = get_proj_dir()
sys.path.append(str(proj_dir / "src"))

In [None]:
proj_dir

### Very extensive list of libraries - I'll cleanup later

In [None]:
import time
import warnings
from copy import deepcopy
from typing import Any, Dict, List, Union

import cartopy.crs as crs

# import adlfs
# import azure.storage.blob
import colorcet as cc
import dask
import dask.array as da
import dask.bag as db
import dask.dataframe as dd

# import dask_gateway
import dask_geopandas
import geopandas as gpd

# import geoviews.tile_sources as gvts
import holoviews as hv
import hvplot.pandas  # noqa
import hvplot.xarray  # noqa
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import panel as pn
import planetary_computer
import pooch
import pyproj
import pystac
import pystac_client
import rasterio
import rioxarray
import rioxarray as rio
import shapely
import stackstac
import tqdm
import xarray as xr

# from azure.storage.blob import BlobServiceClient
from dask.distributed import Client
from geopandas.array import GeometryDtype
from ipyleaflet import Map, basemaps
from matplotlib.colors import ListedColormap
from odc.stac import configure_rio, stac_load
from xrspatial.multispectral import true_color

## 01 - Data Access

In [None]:
m = Map(basemap=basemaps.Esri.WorldImagery, scroll_wheel_zoom=True)
m.center = 52.058, 4.192
m.zoom = 14
m.layout.height = "800px"
m

### EO team discussion point 

Do we show how to import generic functions from a project directory? Otherwise I'll just redefine the functions here in the notebook

### Extract the coords from the interactive map -- IMPORTANT: wait 2 seconds until map is rendered, othewise you cannot extract the coords

In [None]:
from coastmonitor.geo.geometries import bbox_to_geometry, geo_bbox, geometry_to_bbox

bbox = [m.west, m.south, m.east, m.north]
bbox_geom = bbox_to_geometry(bbox)
geo_bbox(*bbox, src_crs=4326, dst_crs=4326).explore()

### Connect to the planetary STAC catalog

In [None]:
catalog = pystac_client.Client.open(
    "https://planetarycomputer.microsoft.com/api/stac/v1",
    modifier=planetary_computer.sign_inplace,
)

### EO Discussion
Floris: cell below breaks on my machine with APIError. 

In [None]:
# list(
#     catalog.get_children()
# )  # list all the STAC Collections (i.e. datasets hosted by planetary computer)

### Browse the STAC catalog 
Search the Sentinel-2 SR catalog

In [None]:
BAND = {
    "B02": "blue",
    "B03": "green",
    "B04": "red",
    "B08": "nir",
    "B11": "swir1",
    "SCL": "SCL",
}

search = catalog.search(
    collections=[
        "sentinel-2-l2a"
    ],  # atmospherically corrected Surface Reflectances (SR)
    intersects=bbox_geom,
    datetime="2022-12-31/2023-05-01",  # "2020-01-01/2020-01-31",
    query={"eo:cloud_cover": {"lt": 50}},
)

items = search.item_collection()
print(f"{len(items)} items found in catalog search.")

### Planetary computer STAC catalog

Microsoft Planetary Computer much more data; see [the catalog in the radiant earth STAC viewer](https://radiantearth.github.io/stac-browser/#/external/planetarycomputer.microsoft.com/api/stac/v1?.language=en). Other catalogs are the [C-Scale metadata federation](https://radiantearth.github.io/stac-browser/#/external/mqs.eodc.eu/stac/v1), [Digital Earth Africa (DEA)](https://radiantearth.github.io/stac-browser/#/external/explorer.digitalearth.africa/stac?.language=en) or [the Deltares CoCliCo](https://radiantearth.github.io/stac-browser/#/external/storage.googleapis.com/dgds-data-public/coclico/coclico-stac/catalog.json)


### Explain a STAC item here

- you can filter the STAC collection on all metadata that is shown here
- the asset contains several keys which refer to where the data is stored for the bands

In [None]:
items[0]

In [None]:
items[0].assets.keys()

In [None]:
href = items[0].assets["rendered_preview"].href
href

### Show the preview in your python notebook. 

In [None]:
import io
import urllib.request

import matplotlib.pyplot as plt

# Download the image from the URL
image = plt.imread(io.BytesIO(urllib.request.urlopen(href).read()))

# Plot the image
plt.imshow(image)
plt.axis("off")
plt.show()

### EO team TODO: 

It's better to read the data with Xarray (then we actually have the data). Plot below can be improved.. not sure why we don't get a nice rgb. Also, why is it turned (https://hvplot.holoviz.org/reference/xarray/rgb.html (flip_yaxis should fix it but it doesnt)? Besides, would be nice to show it in correct coordinates (impossible due to rendered preview).

In [None]:
items[0].assets["rendered_preview"].href

In [None]:
xr.open_dataset(items[0].assets["rendered_preview"].href, engine="rasterio").isel(
    {"band": slice(0, 3)}
)["band_data"].hvplot.rgb(band="band", data_aspect=1)

### Before I've made RGB plots like this

But maybe there is a better way without that xrspatial library

In [None]:
ds = xr.open_dataset(items[0].assets["rendered_preview"].href, engine="rasterio").isel(
    {"band": slice(0, 3)}
)["band_data"]

rgb = true_color(
    ds.isel(band=0),
    ds.isel(band=1),
    ds.isel(band=2),
)

rgb.isel({"band": slice(0, 3)}).hvplot.rgb(bands="band", y="y", x="x", data_aspect=1)

### How much data do our STAC items refer to?
 
The xarray.DataArray object has a total size of 80.84 GB (gigabytes), which is a very large file size - even though we only have data for a tiny piece of The Netherlands in a 4 month time period. 

The file size can make it challenging to work with the data on systems with limited storage capacity, but the use of chunking and parallel processing can help to mitigate this issue. The file is chunked into 8 MB (megabyte) blocks, with 10890 chunks in 3 graph layers. This chunking approach is used to enable parallel processing and optimize memory usage when working with large arrays. 

In [None]:
stackstac.stack(items, resolution=10, assets=list(BAND.keys()))

In [None]:
# stackstac contains many more arguments to filter the data (on the bounding box, a certain number of bands and by sorting the dates for instance)
stack = stackstac.stack(
    items,
    # epsg=3857,  # if you want to cast the data to a common crs
    assets=list(BAND.keys()),
    bounds_latlon=bbox,
    sortby_date="desc",  # maybe you want to sort the data by date?
)
stack

### EO team discussion point

We should discuss how much preprocessing we will include. For example, we could add a few lines to cast the stack from a Xarray DataArray to a Xarray DataSet, which makes it easier to compute spectral indices. However, adding code will make this tutorial notebook longer and more complex - see below

In [None]:
# # overwrite common names from Planetary Computer STAC because they are confusing
# stack = stack.assign_coords(common_name=("band", np.array(list(BAND.values()))))

# # rename to common names when common names are available
# common_names = stack.common_name.where(~stack.common_name.isnull(), stack.band)
# stack = stack.where(lambda x: x > 0, other=np.nan).assign_coords(
#     band=lambda x: common_names.rename("band"),  # use common names
# )

## 02 - Data processing

### Local Dask cluster

Here we launch a local Dask cluster, a Python-based multiprocessing library, which will speed up the computation. The cluster we make here is local, when you want to upscale your computations you should use a Dask gateway, hosted on a remote server, close to the data.

In [None]:
# when running locally (parallel)
client = Client(
    threads_per_worker=1,
    processes=True,
    local_directory="/tmp",
)
client

# asking for plots (.plot()) or numerical values (.compute()) will trigger the computation, which you can see in the dask dashboard

### Example: NDWI
Nothing is computed yet. Look at ndwi datarray, they are all dask arrays (lazy) 

In [None]:
green = stack.sel({"band": "B03"})
nir = stack.sel({"band": "B08"})
ndwi = (green - nir) / (green + nir)  # this is still a lazy Dask computation
ndwi

By making a plot or computing the values, all scheduled tasks are triggered and the data is brought into memory. This therefore takes somewhat longer..

In [None]:
# make plot
ndwi.isel({"time": 0}).hvplot(x="x", y="y", cmap=cc.CET_D9[::-1], data_aspect=1)

In [None]:
# compute values
ndwi_val = ndwi.isel({"time": 0}).compute()
ndwi_val.shape, ndwi_val

### Example: Compositing (RGB)

In [None]:
median = stack.median("time")

median_rgb = true_color(
    median.isel(band=0),
    median.isel(band=1),
    median.isel(band=2),
)
median_rgb.isel({"band": slice(0, 3)}).hvplot.rgb(
    bands="band", y="y", x="x", data_aspect=1
)

### Example: Scene classification

In [None]:
scl_colormap = np.array(
    [
        [255, 0, 255, 255],  # 0  - NODATA
        [255, 0, 4, 255],  # 1  - Saturated or Defective
        [0, 0, 0, 255],  # 2  - Dark Areas
        [97, 97, 97, 255],  # 3  - Cloud Shadow
        [3, 139, 80, 255],  # 4  - Vegetation
        [192, 132, 12, 255],  # 5  - Bare Ground
        [21, 103, 141, 255],  # 6  - Water
        [117, 0, 27, 255],  # 7  - Unclassified
        [208, 208, 208, 255],  # 8  - Cloud
        [244, 244, 244, 255],  # 9  - Definitely Cloud
        [195, 231, 240, 255],  # 10 - Thin Cloud
        [222, 157, 204, 255],  # 11 - Snow or Ice
    ],
    dtype="uint8",
)

scl_items = np.array(
    [
        "NO DATA",
        "Saturated or Defective",
        "Dark areas",
        "Cloud shadow",
        "Vegetation",
        "Bare Ground",
        "Water",
        "Unclassified",
        "Cloud",
        "Definitely cloud",
        "Thin cloud",
        "Snow or ice",
    ]
)

cmap = ListedColormap(scl_colormap / 255)
colors = cmap(np.arange(cmap.N))


def colorize(ds, colormap):
    return xr.DataArray(colormap[ds.data], coords=ds.coords, dims=(*ds.dims, "channel"))

In [None]:
fig, ax = plt.subplots(
    1,
    figsize=(6, 1),
    subplot_kw=dict(xticks=np.arange(0.5, len(scl_colormap), 1), yticks=[]),
)
ax.imshow([colors], extent=[0, 12, 0, 1])
ax.set_xticklabels(scl_items, rotation=30, ha="right");

In [None]:
SCL = stack.isel({"time": 0}).sel({"band": "SCL"})
scl_rgba = colorize(SCL.compute().astype("int"), scl_colormap)  # does the computation
scl_rgba.hvplot.rgb(x="x", y="y", bands="channel", data_aspect=1)  # does the plotting

### More extensive example with other STAC: NDVI

In [None]:
# water hyacinth in Winam Gulf for the month

m = Map(basemap=basemaps.Esri.WorldImagery, scroll_wheel_zoom=True)
m.center = -0.14, 34.68
m.zoom = 13
m.layout.height = "700px"
m

In [None]:
# Open the stac catalogue
catalog2 = pystac_client.Client.open("https://explorer.digitalearth.africa/stac")

# get the bounding box
bbox = [m.west, m.south, m.east, m.north]

# change the rasterio configuration
configure_rio(
    cloud_defaults=True,
    aws={"aws_unsigned": True},
    AWS_S3_ENDPOINT="s3.af-south-1.amazonaws.com",
)

# Set a start and end date
start_date = "2020-01-01"
end_date = "2020-01-31"

# Set the STAC collections
collections = ["s2_l2a"]

# Build a query with the set parameters
query = catalog2.search(
    bbox=bbox, collections=collections, datetime=f"{start_date}/{end_date}"
)

# Search the STAC catalog for all items matching the query
items = list(query.get_items())
print(f"Found: {len(items):d} items in the catalog")

# filter out items with cloud cover
items = [i for i in items if i.properties["eo:cloud_cover"] < 30]

## TODO: Explain DEA and their STAC plus loader

In [None]:
# set config file for DEA STAC loader
config = {
    "s2_l2a": {
        "assets": {
            "*": {
                "data_type": "uint16",
                "nodata": 0,
                "unit": "1",
            },
            "SCL": {
                "data_type": "uint8",
                "nodata": 0,
                "unit": "1",
            },
        },
        "aliases": {
            "costal_aerosol": "B01",
            "blue": "B02",
            "green": "B03",
            "red": "B04",
            "red_edge_1": "B05",
            "red_edge_2": "B06",
            "red_edge_3": "B07",
            "nir": "B08",
            "nir_narrow": "B08A",
            "water_vapour": "B09",
            "swir_1": "B11",
            "swir_2": "B12",
            "mask": "SCL",
            "aerosol_optical_thickness": "AOT",
            "scene_average_water_vapour": "WVP",
        },
    }
}

crs = "EPSG:6933"
resolution = 10

ds = stac_load(
    items,
    bands=("red", "green", "blue", "nir"),
    crs=crs,
    resolution=resolution,
    chunks={},
    groupby="solar_day",
    stac_cfg=config,
    bbox=bbox,
)

# View the Xarray Dataset
ds

In [None]:
# DEA does no normalization of the sentinel bands
red = ds["red"] / 1000
nir = ds["nir"] / 1000

In [None]:
# compute NDVI
ndvi = (nir - red) / (nir + red)  # this is still a lazy Dask computation

In [None]:
ndvi

In [None]:
ndvi_da = ndvi.compute()

In [None]:
# make plot, this might take a while
# compare with: https://www.facebook.com/LocateitLimited/videos/water-hyacinth-earth-observatory-lake-victoria/2757490344316232/
ndvi_da.plot(col="time", col_wrap=4, vmin=-0.6, vmax=1)

### EO team TODO:
Make plot & slider for 4 images above..

Why does the water have so high NDVI values; not for same area with MPC S2 dataset... (seems to be an error in the code somewhere) -> (Jaap: most datasources normalize Sentinel-2 to be similar to Landsat by dividing values by 1000)

Mask lake area with geojson / shp file (findable on internet?), i.e. getting rid of land area

create the `pn.widgets.DiscreteSlider` with the time options in the `DataArray`
Bonus: make sure the time shows up formatted in the slider

In [None]:
time_list = list(map(lambda t: str(t), ndvi_da.time.values))  # format time to str
t_slider = pn.widgets.DiscreteSlider(options=time_list, value=time_list[0])

then create the callback to visualize one timeslice of the DataFrame

In [None]:
def show_time_slice(time_str):
    return ndvi_da \
        .sel(time=np.datetime64(time_str)) \
        .hvplot(x="x", y="y", cmap=cc.CET_D9[::-1], clim=(-0.6, 1))

Create a panel `Column`, binding the slider to the callback using `pn.bind`. Make sure to select a good colum scheme with limits

In [None]:
col = pn.Column(t_slider, pn.bind(show_time_slice, t_slider))

In [None]:
col

In [None]:
# make the plot; water hyacinth gets less over time (i.e. open water area grows), meaning mean ndvi goes up (this is other way around for S2 MPC data)
df = ndvi.mean(dim=["x", "y"]).to_dataframe(name="ndvi").reset_index()  # makes the compute
#df.hvplot.line(x="time", y="ndvi")

In [None]:
df.plot.line(x="time", y="ndvi")

### Cloud (Dask cluster)

Some computations require more power than available on a local (parallelized) cluster. All code shown above is scalable and works in the cloud or at Deltares' own HPC as well when a dask gateway is set up (i.s.o local cluster). This requires access to these remote desktops in advance.

### More advanced

Almost all of xarrays built-in operations work on Dask arrays. Yet, if you want to use a function that isn't wrapped by xarray, one can use `apply_(g)ufunc` or `map_blocks` where you can automate embarrasingly parallel "map" type operations from NumPy arrays to Xarray objects with Dask arrays or across all blocks of a Dask array.

When running computations on a Dask cluster (local or cloud), the code needs to be executed on the worker nodes in the cluster. Uploading a Python file to the Dask workers ensures that the required functions and modules are available to the workers when executing the code. This can be achieved by running `client.upload_file("xx.py")`

## 03 - Data Visualization

### Holoviz Panel

In [None]:
ds = stack.copy().compute()
ds.coords["time"] = pd.DatetimeIndex(ds["time"]).strftime("%Y-%m-%dT%H:%M:%S")
time_options = ds.coords["time"].values.tolist()
time_slider = pn.widgets.DiscreteSlider(name="Time", options=time_options)


@pn.depends(time_slider.param.value)
def plot_rgb(time, **kwargs):
    rgb = (
        true_color(*ds.sel({"time": time}).isel({"band": slice(0, 3)}))
        .isel({"band": slice(0, 3)})
        .hvplot.rgb(bands="band", y="y", x="x")
    )
    return rgb.opts(title=f"RGB of {time}")

In [None]:
pn.extension()
title_bar = pn.Row(
    pn.pane.Markdown(
        "## Interactive Earth Observation dashboard",
        styles={"color": "black"},
        width=800,
        sizing_mode="fixed",
        margin=(10, 5, 10, 15),
    ),
    pn.Spacer(),
)
eo_panel = pn.Column(title_bar, pn.Row(time_slider), pn.Row(plot_rgb))

In [None]:
eo_panel

### QGIS

In [None]:
# for QGIS; export the TIFF to be visualized (CoG)
outdir = proj_dir / "tmp"
if not outdir.exists():
    outdir.mkdir(exist_ok=True, parents=True)

to_save = stack.isel({"time": 0})
fname = f"IMG-{to_save.time.dt.strftime('%Y%m%d').item()}.tif"
outpath = outdir / fname
to_save.rio.to_raster(raster_path=outpath, driver="COG")
print(f"Saving to: '{outpath}'")

In [None]:
# close down local cluster
client.close()