### **Cloud probability per LSOA**

This Notebook explores ways to translate raster-calculated cloud probability per pixels to the polygon-based cloud probability. This ensures the data representation is understandable and much easier to implement into the regional and local policy-making.

In this workflow we are using LSOA geographies from [here](https://github.com/Imago-SDRUK/geographies?tab=readme-ov-file#list-of-data) (n=46844). The previous workflow provides output xarray Datasets exported to the tiled GeoTIFFs (n=858).

In [190]:
import numpy as np
import pandas as pd


import xarray as xr

# Define a replacement, minimal AlignmentError
class AlignmentError(ValueError):
    """Compatibility shim for xarray.AlignmentError removed in xarray >= 2025."""
    pass

# Patch it into xarray so xvec/xproj can import it
xr.AlignmentError = AlignmentError


from pystac_client import Client
import geopandas as gpd
import rioxarray
from rioxarray import merge

import odc.stac

import dask.array as da
import dask.dataframe as dd

import yaml
import warnings

from rasterstats import zonal_stats
import xvec
from exactextract import exact_extract

import tempfile

import rasterio

import sys

from dask.distributed import Client as DaskClient, LocalCluster
from dask import delayed, compute

sys.path.append('/src/external_libs')
#os.environ["PROJ_LIB"] = pyproj.datadir.get_data_dir()

lsoa="data/uk_datazones_within_newcastle.gpkg"  # "data/uk_datazones.gpkg" if you want all LSOAs
tiles="data/uk_20km_grid_Newcastle_subset.gpkg" # "data/uk_20km_grid.gpkg" for all tiles
lsoa_id_col='LSOA21CD' # column name with unique LSOA IDs, as in `geographies`

# output stats
lsoa_stats_exactextract="data/lsoa_stats_exactextract.gpkg"
lsoa_stats_rasterstats="data/lsoa_stats_rasterstats.gpkg"
lsoa_stats_xvec="data/lsoa_stats_xvec.gpkg"

#lsoa_stats_tile_path="data/lsoa_tile.gpkg"
# soa_intersected_path="data/lsoa_tile_intersected.gpkg"

lsoa_gdf=gpd.read_file(lsoa)
tiles=gpd.read_file(tiles)
tile=tiles.iloc[[3]].copy()

#reload cloud probability product
"""cloud_mean_bbox=rasterio.open("Newcastle_NZ26_2024.tif") # TODO - open through load_tile()
cloud_mean_bbox_xr = rioxarray.open_rasterio(cloud_mean_bbox, mask_and_scale=False)
cloud_mean = rioxarray.open_rasterio("data/Newcastle_NZ26_2024.tif")"""


'cloud_mean_bbox=rasterio.open("Newcastle_NZ26_2024.tif") # TODO - open through load_tile()\ncloud_mean_bbox_xr = rioxarray.open_rasterio(cloud_mean_bbox, mask_and_scale=False)\ncloud_mean = rioxarray.open_rasterio("data/Newcastle_NZ26_2024.tif")'

In this code we run computations for the **data subset**:
- six cloud probability datasets (Newcastle tiles)
- all LSOAs entirely covered by the tiles (`within`). We don't include any LSOAs covering pixels from other tiles! In this case, we **replicate the real data relations** from the UK scale on the local scale - all LSOAs in the UK are covered by cloud probability datasets (none of them is outside).

For the sake of clarity, we open the previously exported GeoTIF cloud probability rasters as xarray Datasets. In the final version of workflow, this step can be omitted as **we don't need any intermediate exports**, taking up much resource.

In [183]:
import xarray as xr
from pathlib import Path
from typing import Sequence

def open_cloud_datasets(*paths: str | Path) -> list[xr.Dataset]:
    datasets = []
    for i, p in enumerate(paths, 1):
        ds = xr.open_dataset(p, engine="rasterio")
        if ds.rio.crs is None:
            ds = ds.rio.write_crs(ds.rio.read_crs() or "EPSG:27700")
        datasets.append(ds)

    print(f"Successfully opened {len(datasets)} xarray Datasets")

    return datasets

In [184]:
rasters = [
    "data/Newcastle_NZ26_2024.tif",
    "data/Newcastle_NZ04_2024.tif",
    "data/Newcastle_NZ24_2024.tif",
    "data/Newcastle_NZ06_2024.tif",
    "data/Newcastle_NZ44_2024.tif",
    "data/Newcastle_NZ46_2024.tif",
]

datasets = open_cloud_datasets(*rasters)

print(datasets)

Successfully opened 6 xarray Datasets
[<xarray.Dataset> Size: 8MB
Dimensions:      (band: 2, x: 1000, y: 1000)
Coordinates:
  * band         (band) int64 16B 1 2
  * x            (x) float64 8kB 4.2e+05 4.2e+05 4.2e+05 ... 4.4e+05 4.4e+05
  * y            (y) float64 8kB 5.8e+05 5.8e+05 5.8e+05 ... 5.6e+05 5.6e+05
    spatial_ref  int64 8B ...
Data variables:
    band_data    (band, y, x) float32 8MB ..., <xarray.Dataset> Size: 8MB
Dimensions:      (band: 2, x: 1000, y: 1000)
Coordinates:
  * band         (band) int64 16B 1 2
  * x            (x) float64 8kB 4e+05 4e+05 4e+05 ... 4.2e+05 4.2e+05 4.2e+05
  * y            (y) float64 8kB 5.6e+05 5.6e+05 5.6e+05 ... 5.4e+05 5.4e+05
    spatial_ref  int64 8B ...
Data variables:
    band_data    (band, y, x) float32 8MB ..., <xarray.Dataset> Size: 8MB
Dimensions:      (band: 2, x: 1000, y: 1000)
Coordinates:
  * band         (band) int64 16B 1 2
  * x            (x) float64 8kB 4.2e+05 4.2e+05 4.2e+05 ... 4.4e+05 4.4e+05
  * y            (y

#### 1. `Exactextract`

**Pros:**
* Written in C++, fast
* Partial cell-weighting.

**Cons:**
* `exactextract` doesn't accept list of xarray Datasets as an input - only one, so we would have to loop over
* raster I/O is not chunk-aware - parallelism is possible, but it will read through all chunks...?
* Few custom statistics.
* Requires C++ extensions installation, and might be not available through `pip`. Faces issues with the current image Python version (probably required <=3.11)

The following worked:
- `conda install -c conda-forge cmake compilers`
- `conda install -c conda-forge exactextract`

As we can't feed in the list of datasets, we would need to loop over datasets and then combine and aggregate results by the LSOA ID.

In [216]:
import re

def calc_lsoa_stats_exactextract(datasets, lsoa_gdf, out_file, lsoa_id_col='LSOA21CD'):
    """
    Calculate zonal statistics for multiple xarray datasets over LSOA polygons, weighted by the number of pixels in LSOA covered by each tile

    Outputs are identical.
    
    Parameters
    ----------
    datasets : list of xarray.DataArray or xarray.Dataset
        The raster datasets to extract from.
    lsoa_gdf : geopandas.GeoDataFrame
        Polygons over which to calculate statistics.
    lsoa_id_col : str
        Name of the column in lsoa_gdf with unique polygon IDs.
    stats : list of str
        List of statistics to calculate (mean, median, min, max, stdev, count).

    Returns
    -------
    pandas.DataFrame
        Aggregated statistics per LSOA ID.
    """
    results = []
    stats = ['mean','min','max','count']

    # extract stats for each dataset
    for ds in datasets:
        stat_df = exact_extract(
            rast=ds,
            vec=lsoa_gdf,
            ops=stats,
            include_cols=[lsoa_id_col],
            include_geom=True,
            output='pandas'
            #strategy="raster-sequential" # TODO - to test this instead of default "feature-sequential"
            #weights=... # this shouldn't be used in this case unless we want to weight the cloud probability by the pixel count from the processed dataset (band 2)
        )
        results.append(stat_df)

        # NOTE: this will create a dataframe with columns of the following pattern: "band_data_band_<num>_<stat>", eg "band_data_band_1_mean"
        #print(stat_df.columns)
        #stat_df.head()

    combined = pd.concat(results)

    # Extract unique band prefixes (everything before _mean/_min/_max/_count)
    suffixes = ['_mean', '_min', '_max', '_count']
    pattern = r"(.*)_(mean|min|max|count)"

    stat_cols = [c for c in combined.columns if any(c.endswith(s) for s in suffixes)] # we need to extract names of columns with stats
    band_prefixes = sorted({re.match(pattern, c).group(1) for c in stat_cols})

    grouped = combined.groupby(lsoa_id_col) # group rows by LSOA ID
    output_rows = []

    # aggregate per LSOA polygon
    for lsoa, group in grouped:
        # start row with geometry from the first occurrence
        row = {lsoa_id_col: lsoa, 'geometry': group.iloc[0].geometry} #take the geometry from the first row - they are the same

        for band in band_prefixes:
            mean_col = f"{band}_mean"
            min_col  = f"{band}_min"
            max_col  = f"{band}_max"
            count_col = f"{band}_count"

            if mean_col not in group:
                continue

            # WEIGHTING
            num = (group[mean_col] * group[count_col]).sum() #multiply mean value for that band in ONE raster by number of pixels contributing to that mean and then SUM
            total = group[count_col].sum() #SUM of pixel numbers contributing to the mean
            wmean = num / total if total > 0 else None # final weighted mean across all rasters

            # add to row
            row[f"{band}_mean"] = wmean 
            row[f"{band}_min"] = group[min_col].min()
            row[f"{band}_max"] = group[max_col].max()
            row[f"{band}_count"] = total

        output_rows.append(row)

    out_gdf = pd.DataFrame(output_rows) # conver the list of dictionaries into a geodataframe, so each row is a LSOA polygon with aggregated and weighted
    out_gdf = out_gdf.set_geometry('geometry')
    out_gdf.crs=lsoa_gdf.crs

    """#DEPRECATED - merge with original polygons
    #lsoa_with_stats = lsoa_gdf.merge(out_df, on=lsoa_id_col, how='left')"""
    out_gdf.to_file(out_file, driver="GPKG")

    print(f"Saved weighted aggregated LSOA statistics to {out_file}")

    return out_gdf


In [217]:
lsoa_with_stats=calc_lsoa_stats_exactextract(
    datasets=datasets,
    lsoa_gdf=lsoa_gdf,
    lsoa_id_col=lsoa_id_col,
    out_file=lsoa_stats_exactextract
)

print(lsoa_with_stats)

Saved weighted aggregated LSOA statistics to data/lsoa_stats_exactextract.gpkg
      LSOA21CD                                           geometry  \
0    E01008162  MULTIPOLYGON (((426698.655 562983.803, 426704....   
1    E01008163  MULTIPOLYGON (((425968.068 562574.153, 425971....   
2    E01008164  MULTIPOLYGON (((426690.327 562513.51, 426597 5...   
3    E01008165  MULTIPOLYGON (((426095 562463, 426145 562292, ...   
4    E01008166  MULTIPOLYGON (((427011.334 563237.222, 426841....   
..         ...                                                ...   
941  E01035625  MULTIPOLYGON (((429975.663 573394.466, 430543....   
942  E01035626  MULTIPOLYGON (((431383.313 573499.812, 431486....   
943  E01035627  MULTIPOLYGON (((430305.059 570535.324, 430373....   
944  E01035628  MULTIPOLYGON (((430565.719 570620.231, 430815....   
945  E01035636  MULTIPOLYGON (((409032.084 562824.327, 409105....   

     band_data_band_1_mean  band_data_band_1_min  band_data_band_1_max  \
0                6

**Conclusion:**
- quick, but not Dask-aware - `concat` calculates everything?
- considers LSOA coverage > 1 tile (proven by `band_data_band_2_mean` which is not always integer)
- There are two ways to work with geometries in `exact_extract`. They give the identical outputs, but differ in resource involved:
    1. [**recommended, implemented**] define `include_geom=True` so the `exact_extract` returns geodataframe. Then define geometry from each LSOA group by raster as the first geometry (they are all the same).
    2. do not define `include_geom`, so the `exact_extract` returns dataframe. Then merge the original dataframe with stats on the LSOA on 'lsoa_id' (left join).

#### 2. Zonal statistics
Here, we utilise `rasterstats.zonal_stats` to calculate statistics of cloud probability per LSOA:

**Pros**:
* Classic tool, widely supported
* Allows custom statistics.

**Cons**:
* Could be wrapped in Dask.delayed, but raster is read through a non-parallelised rasterio driver
* Not able to weight the raster cells intersecting polygon boundaries by their partial overlap.
* Slower (Python + numpy)
* Could be memory-heavy

In [None]:
from collections import defaultdict

def calc_lsoa_stats_rasterstats(rasters, lsoa_gdf, out_file, lsoa_id_col='LSOA21CD'):
    """
    Calculate zonal statistics for multiple raster datasets over LSOA polygons using rasterstats.zonal_stats.
    
    Parameters
    ----------
    rasters : list of str or rasterio.DatasetReader
        Raster files or opened raster objects.
    lsoa_gdf : geopandas.GeoDataFrame
        Polygons over which to calculate statistics.
    out_file : str
        Path to save GeoPackage with merged stats.
    lsoa_id_col : str
        Column name of unique polygon IDs.
    stats : list of str
        Statistics to calculate ('mean','median','min','max','std','count').

    Returns
    -------
    pandas.DataFrame
        Aggregated statistics per LSOA ID.
    """
    all_stats = []

    for raster in rasters:
        # Compute stats directly
        zs = zonal_stats(
            lsoa_gdf,
            raster,
            stats=['mean','median','min','max','std','count'],
            all_touched=True,  # include all pixels touched by polygon
            nodata=None,       # let rasterstats handle nodata properly
            geojson_out=False
        )

        # Convert to DataFrame
        df = pd.DataFrame(zs)
        df[lsoa_id_col] = lsoa_gdf[lsoa_id_col].values  # add LSOA IDs

        # Optional: prefix columns by raster filename to avoid collisions
        raster_name = raster.split('/')[-1].split('.')[0]
        df = df.rename(columns={stat: f"{raster_name}_{stat}" for stat in ['mean','median','min','max','std','count']})

        all_stats.append(df)

    # Merge stats from all rasters
    df_stats = all_stats[0]
    for df in all_stats[1:]:
        df_stats = df_stats.merge(df, on=lsoa_id_col, how='left')

    # Merge with original GeoDataFrame
    gdf_with_stats = lsoa_gdf.merge(df_stats, on=lsoa_id_col, how='left')

    # Save to GeoPackage
    gdf_with_stats.to_file(out_file, driver="GPKG")
    print(f"Saved final LSOA stats to {out_file}")
    
    return df_stats, gdf_with_stats

In [None]:
lsoa_stats, lsoa_gdf_with_stats = calc_lsoa_stats_rasterstats(
    rasters=rasters,
    lsoa_gdf=lsoa_gdf,
    out_file=lsoa_stats_rasterstats,
    lsoa_id_col=lsoa_id_col
)

print(lsoa_stats)

**Conclusion:**
- can't accept xarray Dataset/Array - either opens GeoTIFF, or `rasterio` objects - **not recommended to use at all**
- extracts stats from all rasters even if they don't intersect (replacing with 0), which requires additional processing and calculation by tile weight for each LSOA
- slower and not Dask-aware - `concat` calculates everything

#### 3.1. Xvec zonal stats

`xvec` library can be used for creating a 'vector data cube' and calculating a zonal statistics.
This method is similar to the classic zonal statistcs - keeping the structure of the original cube with attributes, but indexing by a polygon geometry.
The main question - how better does it perform compared to other methods if using Dask and HPC?

Here is the implementation of `xvec.zonal_stats`, but there could be another option - `xvec.to_geodataframe` with pixel value extraction + spatial joins between cloud geodataframe and LSOA geodataframe.Here is the implementation of `xvec.zonal_stats`, but there could be another option - `xvec.to_geodataframe` with pixel value extraction + spatial joins between cloud geodataframe and LSOA geodataframe.

See documentation [here](https://xvec.readthedocs.io/en/stable/zonal_stats.html) for further detail.

In [178]:
def calc_lsoa_stats_xvec(datasets, lsoa_gdf, out_file, lsoa_id_col="LSOA21CD", band_index=0):
    """
    Compute zonal statistics using xvec.zonal_stats for speed.
    """
    results = []
    stats_list=["mean", "sum", "min", "max"]

    for i, ds in enumerate(datasets):

        #print(lsoa_gdf.head())
        #print(ds)

        da = ds["band_data"].isel(band=band_index)
        stat_ds = da.xvec.zonal_stats(
            geometry=lsoa_gdf["geometry"],
            x_coords="x",
            y_coords="y",
            stats=stats_list
            #method="exactextract" DEFAULT
        )

        
        # NOTE: this creates a dataarray with two dimensions:
        # `geometry` - basically LSOA
        # `zonal_statistics` - headers with stat names
        # coordinates transformed to `GeometryIndex`

        # Add LSOA IDs as a coordinate
        stat_ds = stat_ds.assign_coords({lsoa_id_col: ("geometry", lsoa_gdf[lsoa_id_col].values)})
        # print(stat_ds.head(30))

        # Convert to GeoDataFrame
        df= stat_ds.xvec.to_geodataframe().reset_index()
        #print(df.columns)
        stat_df = df.pivot(index="geometry", columns="zonal_statistics", values="band_data").reset_index()
        # TODO - LSOA id column is lost when pivoting

        # add raster source
        stat_df["source_raster"] = getattr(ds, "name", f"raster_{i}")

        results.append(stat_df)

        print(stat_df.head())

    
    # combine all datasets
    combined = pd.concat(results, ignore_index=True)
    print(combined.columns)

    # aggregate per LSOA
    lsoa_stats = combined.groupby(lsoa_id_col)[stats_list].agg("mean").reset_index()

    # merge back with original polygons
    lsoa_gdf_with_stats = lsoa_gdf.merge(lsoa_stats, on=lsoa_id_col, how="left")

    lsoa_gdf_with_stats.to_file(out_file, driver="GPKG", layer="lsoa_stats")
    print(f"Saved LSOA statistics to {out_file}")

    return lsoa_gdf_with_stats


In [179]:
lsoa_stats = calc_lsoa_stats_xvec(
    datasets=datasets,
    lsoa_gdf=lsoa_gdf,
    out_file=lsoa_stats_xvec,
    lsoa_id_col=lsoa_id_col,
    band_index=0  # pick first band
)

zonal_statistics                                           geometry  max  \
0                 MULTIPOLYGON (((408824.904 550047.415, 408890 ...  NaN   
1                 MULTIPOLYGON (((410011.459 550191.508, 410063....  NaN   
2                 MULTIPOLYGON (((412368.453 549984.768, 412208 ...  NaN   
3                 MULTIPOLYGON (((412648.063 549799.562, 412885....  NaN   
4                 MULTIPOLYGON (((415704.375 550617.312, 415795....  NaN   

zonal_statistics  mean  min  sum source_raster  
0                  NaN  NaN  NaN      raster_0  
1                  NaN  NaN  NaN      raster_0  
2                  NaN  NaN  NaN      raster_0  
3                  NaN  NaN  NaN      raster_0  
4                  NaN  NaN  NaN      raster_0  
zonal_statistics                                           geometry  \
0                 MULTIPOLYGON (((408824.904 550047.415, 408890 ...   
1                 MULTIPOLYGON (((410011.459 550191.508, 410063....   
2                 MULTIPOLYGON (((41

KeyError: 'LSOA21CD'

In [124]:
# TEST
"""
# extract dataset
ds = datasets[0]
print(ds.data_vars)
da = ds["band_data"].isel(band=0)
#convert to gdf
gdf_grid = da.xvec.to_geodataframe()

print(gdf_grid.columns)

gdf_grid.head()"""

Data variables:
    band_data  (band, y, x) float32 8MB ...
Index(['band', 'spatial_ref', 'band_data'], dtype='object')


  gdf_grid = da.xvec.to_geodataframe()


Unnamed: 0_level_0,Unnamed: 1_level_0,band,spatial_ref,band_data
y,x,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
579990.0,420010.0,1,0,56.213795
579990.0,420030.0,1,0,56.655174
579990.0,420050.0,1,0,56.413792
579990.0,420070.0,1,0,56.668964
579990.0,420090.0,1,0,57.275864


**Conclusion:**
- newer versions work directly with xarray Datasets (not even Arrays), which is good
- might conflict with the current container. Shouldn't be installed through `pip` (this worked: `conda install xvec -c conda-forge`).
`xvec` conflicts with `xarray>=2025.1.0`:
> cannot import name 'AlignmentError' from 'xarray' (/opt/conda/lib/python3.12/site-packages/xarray/__init__.py)

Temporarily fixed as a monkey patch in imports.

-`xvec.zonal_stats` is using `exactextract` under the hood, but at the same time cannot aggregate zonal stats for each LSOA across multiple rasters - you need to do it yourself
- this method converts too much (xarray/geodataframe)


#### LSOA in tiles Reference index (tile -> dataset)

The issue is that our xarray Datasets are tiled, while many LSOA boundaries intersect the tile boundaries. To tackle this issue, we can:
- iterate over each tile, intersect it with the LSOAs and ...
- create a mapping between tiled and contained LSOA (reference table). Each pixel can have more than one LSOA reference, and each LSOA can have more tile references.
- create a mosaic first and run LSOA calculation for the entire UK. This will probably cost much resources for Dask.

The goal is to avoid any repeated searches to find out which LSOAs are contained within each tile.
In this case we create a dictionary, where each tile contains all LSOAs which do intersect that tile, or index each value by the combined index from the dataset and tile ID.

Two ways of working with the dictionary (mapping):
- if working with Dataset: rasterise tile polygons
- if working with Dataframe:

What we can do:
- translate text tile index into integer id
- iterate over tiles (for _,row in tiles.bounds.iterrows())
- create a unique index for each pixel with its own index + tile id
- for each LSOA find pixels which contain its tile id
- calculate `exact_extract` for each LSOA




#### 4. Parquet - statistics
- export as parquet
- transform xarray Datasets to dataframes
- read parquet
- perform a spatial join between parquet points (pixel values) and LSOA features, using `dask_geopandas`
- calculate the mean value per LSOA feature, **optionally** weighting by:
    - number of pixel values in LSOA
    - another dimension ('pixel_count') - how many acquisitions were gathered from satellite collections

TODO:
- Convert many tiles to parquet 
- Create unique index with x,y of each pixel (now - record)
- Spatial join between LSOA id and unique ID of each pixel in tile -> New unique IDx

Dask - geospatial library?
`dask-geopandas`