### Author

- **Walid Ghariani** - [GitHub Profile](https://github.com/WalidGharianiEAGLE)

### Introduction

Water is a vital part of Earth ecosystems and life, supporting biodiversity, and sustaining human livelihoods. Monitoring surface water dynamics (occurence, frequency and change) is important for managing resources, and mitigating natural hazards such as floods and droughts. Wetlands in particular are unique ecosystems under the influence of precipitaion, hydrological processes and coastal dynamics, which contribute in shaping the habitats, and species diversity. Within this context, the [Keta](https://rsis.ramsar.org/ris/567) and [Songor](https://rsis.ramsar.org/ris/566) [Ramsar](https://www.ramsar.org/) Sites in southeastern Ghana present a dynamic coastal wetland system where water levels fluctuate due to rainfall, tidal influence, and lagoon-river interactions. 

These ramsar sites consists of different wetlands classes such as marshes, floodplains, mangroves, and seasonally inundated grasslands. These unique sites are also critical habitats for thousands of resident and migratory birds, fish, and sea turtles, while supporting local livelihoods through fishing, agriculture, and salt production. Threfore an effective monitoring of water dynamics of these ecosystems is essential for conserving biodiversity and sustaining community resources.

Sentinel-1 Synthetic Aperture Radar (SAR) can detect water under all weather conditions, overcoming limitations of optical imagery caused by cloud cover. By processing and analyzing Sentinel-1 time series and its radar backscatter (VH, VV), water occurrence, frequency and dynamics in Ramsar wetlands could be detected and monitored providing valuable insights of these complex wetlands sites.

In [None]:
from typing import Tuple, NamedTuple
import datetime as dt
from tqdm import tqdm

import fsspec
import numpy as np
import xarray as xr
import hvplot.xarray
import pandas as pd
import matplotlib.pyplot as plt
from pystac_client import Client
from odc.geo.geobox import GeoBox
from scipy.interpolate import griddata
from scipy.ndimage import uniform_filter
from skimage.filters import threshold_otsu

import session_info
import warnings

warnings.filterwarnings("ignore", category=UserWarning, module="pkg_resources")

In [None]:
session_info.show()

## Data search

- Search for Sentinel-1 GRD data using pystac-client and EODC STAC API

In [None]:
# Configs for the Area Of Interest (AOI), time range and Polarization
aoi_bounds = [0.60912065235268, 5.759873096746288, 0.714565658530316, 5.837736228130655]
date_start = dt.datetime(2024, 1, 1)
date_end = dt.datetime(2025, 1, 1)

In [None]:
catalog = Client.open("https://stac.core.eopf.eodc.eu")
search = catalog.search(
    collections=["sentinel-1-l1-grd"],
    bbox=aoi_bounds,
    datetime=f"{date_start:%Y-%m-%d}/{date_end:%Y-%m-%d}",
)
items = search.item_collection()

In [None]:
# lets inspect the first item
items[0]

<hr>

## Data exploration

Lets open the first item of Sentinel-1 Zarr product, navigates its hierarchical structure, and extracts measurement data, geolocation conditions, and calibration metadata for a specific polarization.

In [None]:
polarization = "VH"  # or "VV"

url = items[0].assets["product"].href
store = fsspec.get_mapper(url)
datatree = xr.open_datatree(store, engine="zarr", chunks={})
group = [x for x in datatree.children if f"{polarization}" in x][0]
group

In [None]:
grd = datatree[group]["measurements/grd"]
gcp = datatree[group]["conditions/gcp"].to_dataset()
calibration = datatree[group]["quality/calibration"].to_dataset()

In [None]:
grd

In [None]:
gcp

In [None]:
calibration

## Spatial subset

To focus our analysis over the chosen AOI, we can efficiently crop our dataset using a spatial subset. The following functions determine the slices in azimuth_time and ground_range that cover the AOI, and then extract and mask the corresponding portion of the GDR dataset.

In [None]:
def aoi_slices(
    gcp_ds: xr.Dataset,
    aoi_bounds: list[float] | tuple[float, float, float, float],
    offset: int = 2,
) -> dict[str, slice]:
    """
    Get the azimuth_time and ground_range slices for cropping around an AOI.

    Parameters
    ----------
    gcp_ds : xarray.Dataset
        Dataset with 'latitude' and 'longitude' variables.
    aoi_bounds : list or tuple
        [min_lon, min_lat, max_lon, max_lat]
    offset : int
        Number of GCP grid cells to include around the nearest point.

    Returns
    -------
    dict
        {"azimuth_time": az_slice, "ground_range": gr_slice}
    """
    min_lon, min_lat, max_lon, max_lat = aoi_bounds
    lat_c = (min_lat + max_lat) / 2
    lon_c = (min_lon + max_lon) / 2

    dist = (gcp_ds.latitude - lat_c) ** 2 + (gcp_ds.longitude - lon_c) ** 2

    flat_index = dist.argmin().values
    i, j = np.unravel_index(flat_index, gcp_ds.latitude.shape)

    def clamp(index, dim):
        start = max(0, index - offset)
        end = min(dim - 1, index + offset)
        return slice(start, end + 1)

    az_slice = clamp(i, gcp_ds.sizes["azimuth_time"])
    gr_slice = clamp(j, gcp_ds.sizes["ground_range"])

    return {"azimuth_time": az_slice, "ground_range": gr_slice}


def subset(
    gdr: xr.Dataset,
    gcp_ds: xr.Dataset,
    aoi_bounds: list[float] | tuple[float, float, float, float],
    offset=2,
) -> xr.Dataset:
    """
    Crop GDR to AOI and return the subset.

    Parameters
    ----------
    gdr : xarray.DataArray or Dataset
        GDR data to crop.
    gcp_ds : xarray.Dataset
        GCP dataset for slicing.
    aoi_bounds : list or tuple
        [min_lon, min_lat, max_lon, max_lat]
    offset : int
        Number of GCP grid cells to include around the nearest point.

    Returns
    -------
    xarray.DataArray or Dataset
        Cropped and masked GDR subset.
    """
    slices = aoi_slices(gcp_ds, aoi_bounds, offset)
    gcp_crop = gcp_ds.isel(**slices)

    az_min, az_max = (
        gcp_crop.azimuth_time.min().values,
        gcp_crop.azimuth_time.max().values,
    )
    gr_min, gr_max = (
        gcp_crop.ground_range.min().values,
        gcp_crop.ground_range.max().values,
    )
    gdr_crop = gdr.sel(
        azimuth_time=slice(az_min, az_max),
        ground_range=slice(gr_min, gr_max),
    )

    gcp_interp = gcp_crop.interp_like(gdr_crop)
    gdr_crop = gdr_crop.assign_coords(
        latitude=gcp_interp.latitude,
        longitude=gcp_interp.longitude,
    )

    minx, miny, maxx, maxy = aoi_bounds
    mask = (
        (gdr_crop.latitude >= miny)
        & (gdr_crop.latitude <= maxy)
        & (gdr_crop.longitude >= minx)
        & (gdr_crop.longitude <= maxx)
    )
    mask = mask.compute()

    return gdr_crop.where(mask, drop=True)

In [None]:
gdr_subset = subset(grd, gcp, aoi_bounds, offset=1)

In [None]:
gdr_subset

In [None]:
gdr_subset.plot(robust=True, cmap="cividis")
plt.show()

## Radiometric calibration

In order to get a meanigful physical properties of features in the SAR scene that could be used for quantitative analysis, we need to apply a radiometric calibration on the backscatter values. This step converts the backscatter into a calibrated normalized radar cross section, correcting for incidence angle and sensor characteristics ensuring SAR images from different dates or viewing geometries are directly comparable. 

- Reference: https://step.esa.int/docs/tutorials/S1TBX%20SAR%20Basics%20Tutorial.pdf

In [None]:
def radiometric_calibration(
    gdr: xr.DataArray,
    calibration_ds: xr.Dataset,
    calibration_type="sigma_nought",
):
    """
    Perform radiometric calibration on a grd data array.

    Parameters:
    -----------
    gdr : xarray.DataArray
        The data array to calibrate.
    calibration_ds : xarray.Dataset
        The calibration dataset.
    calibration_type : str, optional
        The name of the calibration type in the calibration dataset.
        Default is "sigma_nought".

    Returns:
    --------
    xarray.DataArray
        The calibrated GRD data.
    """
    calibration_matrix = calibration_ds.interp_like(gdr)
    return (gdr / calibration_matrix[calibration_type]) ** 2

- Check the availble data vars in the calibration dataset that could be used for the radiometric calibration

In [None]:
calibration.data_vars

In [None]:
sigma_0 = radiometric_calibration(
    gdr_subset, calibration, calibration_type="sigma_nought"
)

In [None]:
sigma_0

In [None]:
sigma_0.plot(robust=True, cmap="cividis", cbar_kwargs={"label": "Sigma Nought"})

## Speckel filtering

Raw SAR imegery are characaterized by grainy or salt and pepper effect caused by random constructive and destructive interference, known as speckle. In order to reduce this effect and noise, we apply a spatial filter called **Lee Filter** [Lee et al. (2009)](https://doi.org/10.1080/02757259409532206) that averages the pixel values while preserving edges.

In [None]:
def lee_filter(img: np.ndarray, size: int = 5) -> np.ndarray:
    """
    Numpy-based Lee filter for a single 2D array.
    (Internal helper used by lee_filter_dask)
    Adapted from reference: https://stackoverflow.com/questions/39785970/speckle-lee-filter-in-python
    """
    img = img.astype(np.float32)
    mask_valid = np.isfinite(img)
    img_filled = np.where(mask_valid, img, 0)

    img_mean = uniform_filter(img_filled, size)
    img_sqr_mean = uniform_filter(img_filled**2, size)
    img_variance = img_sqr_mean - img_mean**2
    img_variance = np.maximum(img_variance, 0)

    valid_pixels = img[mask_valid]
    overall_variance = np.var(valid_pixels) if valid_pixels.size > 0 else 0.0

    img_weights = img_variance / (img_variance + overall_variance + 1e-6)
    img_output = img_mean + img_weights * (img_filled - img_mean)
    img_output[~mask_valid] = np.nan
    return img_output


def lee_filter_dask(da: xr.DataArray, size: int = 5) -> xr.DataArray:
    """
    Apply a Lee speckle filter to an xarray.DataArray (Dask-compatible).

    Parameters
    ----------
    da : xr.DataArray
        Input DataArray (float32 preferred). May be Dask-backed.
    size : int
        Window size (odd number recommended)

    Returns
    -------
    xr.DataArray
        Filtered DataArray (Dask-backed if input was Dask-backed)
    """
    filtered = xr.apply_ufunc(
        lee_filter,
        da,
        kwargs={"size": size},
        input_core_dims=[["azimuth_time", "ground_range"]],
        output_core_dims=[["azimuth_time", "ground_range"]],
        vectorize=False,
        dask="parallelized",
        output_dtypes=[np.float32],
        dask_gufunc_kwargs={"allow_rechunk": True},  
    )
    filtered.attrs.update(da.attrs)
    filtered.attrs.update(attrs={"speckle filter method": "lee"})

    return filtered

In [None]:
sigma_0_spk = lee_filter_dask(sigma_0, size=5)

In [None]:
sigma_0_spk

In [None]:
sigma_0_spk.plot(robust=True, cmap="cividis", cbar_kwargs={"label": "Sigma Nought"})
plt.title("Sigma Nought after Lee Filter")
plt.show()

## Georefrecing and regredding

Sentinel-1 GRD ZARR comes in irregular image geometry that does not align with common geographic grids. In order to conduct spatial analysis, and scenes comparison, and mapping, we need to georefrence and resample the data onto a regular grid. We will use an [ODC GeoBox](https://odc-geo.readthedocs.io/en/latest/intro-geobox.html) along with with [scipy.griddata](https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.griddata.html) to interpolate the SAR values onto a consistent latitude-longitude grid.

In [None]:
class GridInfo(NamedTuple):
    """Container for grid information."""

    gbox: GeoBox
    xs: np.ndarray  # -> 1D longitude array
    ys: np.ndarray  # -> 1D latitude array
    xg: np.ndarray  # -->2D meshgrid longitude
    yg: np.ndarray  # --> 2D meshgrid latitude


def _extract_valid_data(
    sar_da: xr.DataArray,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Extracts valid longitude, latitude, and data values from a DataArray.
    """
    lon = sar_da.longitude.values.flatten()
    lat = sar_da.latitude.values.flatten()
    data = sar_da.values.flatten()
    mask = np.isfinite(lon) & np.isfinite(lat) & np.isfinite(data)
    return lon[mask], lat[mask], data[mask]


def _build_geobox(
    bounds: list[float] | Tuple[float, float, float, float],
    resolution: Tuple[float, float],
    crs: str = "EPSG:4326",
) -> GridInfo:
    """
    Creates a GeoBox and returns grid information for interpolation.
    """
    gbox = GeoBox.from_bbox(bounds, crs=crs, resolution=resolution)
    coords = gbox.coordinates
    xs = coords["longitude"].values
    ys = coords["latitude"].values
    xg, yg = np.meshgrid(xs, ys)
    return GridInfo(gbox, xs, ys, xg, yg)


def _interpolate_to_grid(
    lon: np.ndarray,
    lat: np.ndarray,
    data: np.ndarray,
    grid_info: GridInfo,
    method: str = "nearest",
) -> xr.DataArray:
    """
    Interpolates scattered data onto a regular grid and returns a DataArray with CRS and GeoBox.
    """
    data_interp = griddata(
        (lon, lat), data, (grid_info.xg, grid_info.yg), method=method
    )
    da = xr.DataArray(
        data_interp,
        dims=("y", "x"),
        coords={"x": grid_info.xs, "y": grid_info.ys},
        attrs={"interpolation": method},
    )
    da = da.rio.write_crs(grid_info.gbox.crs)
    da = da.rio.write_transform(grid_info.gbox.transform)
    # Attach the GeoBox to the DataArray
    da.attrs["odcgeobox"] = grid_info.gbox
    return da


def regrid(
    da: xr.DataArray,
    bounds: list[float] | Tuple[float, float, float, float],
    resolution: Tuple[float, float],
    crs: str = "EPSG:4326",
    method: str = "nearest",
) -> xr.DataArray:
    """
    Regrids DataArray to a regular grid using ODC GeoBox and scipy griddata.
    """
    lon, lat, data = _extract_valid_data(da)
    grid_info = _build_geobox(bounds, resolution, crs)
    da_out = _interpolate_to_grid(lon, lat, data, grid_info, method)
    return da_out

In [None]:
resolution = 10 / 111320  # approx 10 meters in degrees
crs = "epsg:4326"

sigma_0_spk_geo = regrid(
    da=sigma_0_spk,
    bounds=aoi_bounds,
    resolution=resolution,
    crs=crs,
    method="nearest",  # "linear" or "cubic"
)

In [None]:
sigma_0_spk_geo

In [None]:
sigma_0_spk_geo.odcgeobox

In [None]:
sigma_0_spk_geo.plot(robust=True, cmap="cividis", cbar_kwargs={"label": "Sigma Nought"})
plt.title("Regridded Sigma Nought after Lee Filter")
plt.show()

## Convert backscatter to dB

We convert the regridded sigma_0 backscatter intensity from Linear scale to decibels (dB) using a logarithmic transformation. This enhances contrast and simplifies statistical analysis and interpretation of the image. It is considered a standard approach for representing SAR intensity.

In [None]:
sigma_0_spk_geo_db = 10 * np.log10(sigma_0_spk_geo)

In [None]:
sigma_0_spk_geo_db

In [None]:
sigma_0_spk_geo_db.plot(
    robust=True, cmap="cividis", cbar_kwargs={"label": "Sigma Nought dB"}
)
plt.title("Calibrated Sigma Nought in dB")
plt.show()

In [None]:
sigma_0_spk_geo_db.hvplot.image(
    x="x",
    y="y",
    robust=True,
    cmap="cividis",
    title="SAR GRD",
)

## Water mask 

To separate water from non-water surfaces, we first inspect the distribution of backscatter values using a histogram. In the following histogram we could choose -19 as therhold.

In [None]:
plt.hist(sigma_0_spk_geo_db.values.ravel(), bins=50, alpha=0.7)
plt.title("Histogram of Sigma Nought (dB) Values")
plt.show()

Since SAR water thresholds vary across scenes and times, a fixed cutoff is unreliable. Threfore, we apply an adaptative thresholding method using Otsu algorithm provided by [skimage](https://scikit-image.org/docs/stable/api/skimage.filters.html#skimage.filters.threshold_otsu), which automatically determines an optimal threshold from the intensity distribution and create a water mask accordingly.

In [None]:
def xr_threshold_otsu(
    da: xr.DataArray,
    mask_nan: bool = True,
    return_threshold: bool = False,
    mask_name: str = None,
) -> xr.DataArray:
    """
    Compute Otsu's threshold and generate a binary mask from a DataArray.

    Parameters
    ----------
    da : xr.DataArray
        Input DataArray with values to threshold.
    mask_nan : bool, optional
        If True, mask NaN values before thresholding.
    return_threshold : bool, optional
        If True, return the threshold value as an attribute.
    mask_name : str, optional
        Name for the binary mask DataArray (useful for metadata).

    Returns
    -------
    xr.DataArray
        A binary mask DataArray with the same spatial metadata as the input.
        If return_threshold is True, the threshold value is stored in attrs.
    """
    values = da.values
    if mask_nan:
        values = values[~np.isnan(values)]

    threshold = threshold_otsu(values)
    mask = (da > threshold).astype(np.uint8)

    mask_da = xr.DataArray(
        mask,
        dims=da.dims,
        coords=da.coords,
        name=mask_name,
        attrs={"threshold": threshold} if return_threshold else {},
    )
    return mask_da

In [None]:
water_mask = xr_threshold_otsu(
    sigma_0_spk_geo_db, return_threshold=True, mask_name="water_mask"
)
print(f"Otsu threshold: {water_mask.attrs['threshold']}")

In [None]:
water_mask

In [None]:
(1 - water_mask).plot(cmap="Blues")
plt.title("Water Mask (1 = Water, 0 = Non-Water)")
plt.show()

In [None]:
(1 - water_mask).hvplot.image(
    x="x",
    y="y",
    cmap="Blues",
    robust=True,
    title="Water Mask",
)

# Time series analysis 

So far we have walked through each processing step separately. To automate the workflow and apply it efficiently across Sentinel-1 acquisitions, we could now wrap all these operations into a single processing function. This allows us to automatically generate water masks for every item in our STAC collection.

In [None]:
def process_item(
    item, aoi_bounds, polarization="VH", resolution=10 / 111320, crs="epsg:4326"
):
    """Process a STAC item to generate water mask and timestamp."""
    url = item.assets["product"].href
    store = fsspec.get_mapper(url)
    datatree = xr.open_datatree(store, engine="zarr", chunks={})

    group_VH = [x for x in datatree.children if f"{polarization}" in x][0]

    grd = datatree[group_VH]["measurements/grd"]
    gcp = datatree[group_VH]["conditions/gcp"].to_dataset()
    calibration = datatree[group_VH]["quality/calibration"].to_dataset()

    gdr_subset = subset(grd, gcp, aoi_bounds, offset=1)
    sigma_0 = radiometric_calibration(
        gdr_subset, calibration, calibration_type="sigma_nought"
    )
    sigma_0_spk = lee_filter_dask(sigma_0, size=5)
    sigma_0_spk_geo = regrid(
        da=sigma_0_spk,
        bounds=aoi_bounds,
        resolution=resolution,
        crs=crs,
        method="nearest",
    )
    sigma_0_spk_geo_db = 10 * np.log10(sigma_0_spk_geo)
    water_mask = xr_threshold_otsu(
        sigma_0_spk_geo_db, return_threshold=True, mask_name="water_mask"
    )

    t = np.datetime64(item.properties["datetime"][:-2], "ns")
    water_mask = water_mask.assign_coords(time=t)

    return water_mask

Now we loop through all STAC items and use the `process_item` to build the full water-mask dataset.

In [None]:
water_masks = []
thresholds = []

for item in tqdm(items):
    water_mask = process_item(item, aoi_bounds)
    water_masks.append(1 - water_mask)  # invert mask to have 1 = water
    thresholds.append(water_mask.attrs["threshold"])

water_mask_ds = xr.concat(water_masks, dim="time")
water_mask_ds = water_mask_ds.assign_coords(threshold=("time", thresholds)).sortby(
    "time"
)

In [None]:
water_mask_ds

In [None]:
# Check the water mask thresholds over time
plt.plot(water_mask_ds.time, water_mask_ds.threshold, marker="o")
plt.title("Water Mask Thresholds over Time")
plt.show()

In [None]:
dates = water_mask_ds.time
dates

Lets inspect the water mask for the first 4 dates.

In [None]:
water_mask_ds.sel(time=dates[:4]).plot(col="time", cmap="Blues", vmin=0, vmax=1)

We found out that the RAW Sentinel-1 GRD scene for 2024-06-07 has issues and artificat that will lead to incorrect water classification. Therefore, we will exclude it from our water frequency analysis.

In [None]:
bad_date = pd.to_datetime("2024-06-07").date()
filtered_water_ds = water_mask_ds.sel(
    time=water_mask_ds.time.to_index().date != bad_date
)

In [None]:
filtered_water_ds

## Monthly surface water frequency

Now that we have the water masks, we can group them by month, compute the average water presence for each pixel, and convert it to a percentage (%) to represent monthly water frequency.

In [None]:
monthly_water_frequency = filtered_water_ds.groupby("time.month").mean("time") * 100

monthly_water_frequency.name = "SWF"
month_names = pd.date_range(start="2024-01-01", periods=12, freq="ME").strftime("%B")
monthly_water_frequency.coords["month"] = ("month", month_names)
monthly_water_frequency = monthly_water_frequency.assign_attrs(
    long_name="Surface Water Frequency"
)
monthly_water_frequency = monthly_water_frequency.assign_attrs(units="%")
monthly_water_frequency

In [None]:
monthly_water_frequency.plot(col="month", col_wrap=4, cmap="Blues", robust=True)

## Annual surface water frequency

Lets calculates the average water occurence across the entire time series over the year.

In [None]:
annual_water_frequency = filtered_water_ds.mean("time") * 100
annual_water_frequency = annual_water_frequency.assign_attrs(
    long_name="Surface Water Frequency"
)
annual_water_frequency = annual_water_frequency.assign_attrs(units="%")

In [None]:
annual_water_frequency.plot(cmap="Blues")
plt.title("Annual Surface Water Frequency (%)")
plt.show()

In [None]:
annual_water_frequency.hvplot.image(
    x="x",
    y="y",
    robust=True,
    cmap="Blues",
    title="Annual Surface Water Frequency (%)",
)

## Annual water change

We could also computes the standard deviation of water presence over time. This highlights areas with high water variability (seasonal or dynamic water bodies).

In [None]:
annual_water_change = filtered_water_ds.std("time") * 100
annual_water_change = annual_water_change.assign_attrs(
    long_name="Surface Water Variation"
)
annual_water_change = annual_water_change.assign_attrs(units="%")

In [None]:
annual_water_change.plot(cmap="magma")
plt.title("Annual Surface Water Variation")
plt.show()

In [None]:
annual_water_change.hvplot.image(
    x="x",
    y="y",
    robust=True,
    cmap="magma",
    title="Annual Surface Water Variation",
)

## Conclusion

In this notebook, we demonstrated how to use Sentinel-1 data in `.zarr` format for time-series analysis of surface water dynamics in a wetland coastal and cloudy-prone area. The zarr structure is especially useful for efficient extraction of data over a specific area of interest without loading the full dataset.

We developed a streamlined time series workflow using `pystac-client` and the EODC STAC API to preprocess Sentinel-1 GRD data. This included radiometric calibration, speckle filtering, georeferencing, and regridding. We also implemented an automated process for surface water detection and derived surface water occurrence, frequency, and change.

**Note**: Although in this notebook we opted to use `VH` polarization to detect the water, users may also experiment with `VV` polarization or combine both polarizations for improved water detection, and implement their own methods for deriving surface water masks. This workflow can also be adapted for other applications, such as monitoring lake dynamic and flood events.