# 1.0.0: Featurize EO and trait data

## Imports and config

In [1]:
from pathlib import Path
from dask import compute, delayed
from dask.distributed import Client, LocalCluster
from dask import config
import pandas as pd
import rioxarray as riox
from tqdm.notebook import tqdm
import xarray as xr

from src.conf.conf import get_config
from src.conf.environment import log


%load_ext autoreload
%autoreload 2

cfg = get_config()

In [3]:
cluster = LocalCluster(dashboard_address=":39143", n_workers=30, memory_limit="150GB")
client = Client(cluster)

## Combine EO data and one trait as a DataFrame

In [4]:
trait_map_fns = []

for dataset in cfg.datasets.Y.use:
    trait_maps_dir = (
        Path(cfg.interim_dir)
        / cfg[dataset].interim.dir
        / cfg[dataset].interim.traits
        / cfg.PFT
        / cfg.model_res
    )
    trait_map_fns += list(trait_maps_dir.glob("*.tif"))


# Sort trait_map_fns by number in file name (eg. X1, X2, X3)
trait_map_fns = sorted(trait_map_fns, key=lambda x: int(x.stem.split("X")[-1]))

Great, now let's get the EO data filenames as well, and then add our trait map filename to the list so that we can load everything simultaneously.

In [5]:
eo_data_dir = Path(cfg.interim_dir) / cfg.eo_data.interim.dir / cfg.model_res
eo_fns = sorted(list(eo_data_dir.glob("*/*.tif")))

Since our preprocessing steps ensured that all of our rasters have the same extent and resolution, we can now load them all into a chunked `xarray.Dataset`, and then convert directly to a `dask.dataframe`. This means we can load 151 global, ~1km resolution rasters (150 EO variables + our single trait map) into a single dataframe in a matter of seconds.

In [6]:
from typing import Tuple


NCHUNKS = 9  # npartitions will be NCHUNKS**2


@delayed
def get_dtype_map(fn) -> Tuple[str, str]:
    band = 1
    da = riox.open_rasterio(
        fn,
        chunks={"x": 36000 // NCHUNKS, "y": 18000 // NCHUNKS},
        mask_and_scale=False,
    )
    long_name = da.attrs["long_name"]

    if fn.stem[0] == "X":
        band = cfg.datasets.Y.trait_stat
        long_name = f"{fn.stem}_{long_name[band - 1]}"
        da.attrs["long_name"] = long_name

    dtype = da.sel(band=band).dtype
    da.close()

    return long_name, str(dtype)


@delayed
def load_X_raster(fn) -> Tuple[xr.DataArray, str]:
    band = 1
    da = riox.open_rasterio(
        fn,
        chunks={"x": 36000 // NCHUNKS, "y": 18000 // NCHUNKS},
        mask_and_scale=True,
    )
    long_name = da.attrs["long_name"]

    return da.sel(band=band), long_name


@delayed
def load_Y_raster(trait_stem: str) -> Tuple[xr.DataArray, str]:
    # find all matching files in fns
    trait_fns = [fn for fn in trait_map_fns if fn.stem == trait_stem]

    if len(trait_fns) == 0:
        raise ValueError(f"No files found for trait {trait_stem}")

    das = []
    for raster_file in trait_fns:
        da = riox.open_rasterio(
            raster_file,
            chunks={"x": 36000 // NCHUNKS, "y": 18000 // NCHUNKS},
            mask_and_scale=True,
        )

        band = cfg.datasets.Y.trait_stat
        long_name = da.attrs["long_name"]
        long_name = f"{raster_file.stem}_{long_name[band - 1]}"
        da.attrs["long_name"] = long_name

        das.append(da.sel(band=band))

    if len(das) == 1:
        return das[0], long_name

    else:
        # Find the array position of the fn in trait_fns that contains "gbif"
        gbif_idx = [i for i, fn in enumerate(trait_fns) if "gbif" in str(fn)][0]
        splot_idx = 1 - gbif_idx

        merged = xr.where(
            das[splot_idx].notnull(), das[splot_idx], das[gbif_idx], keep_attrs=True
        )

        for da in das:
            da.close()

        return merged, long_name

Even when we only load a single trait and handle everything with dask it's still a lot of data to load into memory. To make things more manageable, we are loading the datasets using `dask.delayed`, and then we're computing each partition of the resulting `dask.dataframe` independently before finally concatenating the results together.

In [7]:
dtypes = set(compute(*[get_dtype_map(fn) for fn in eo_fns + trait_map_fns]))
X_arrays = list(compute(*[load_X_raster(fn) for fn in eo_fns]))
unique_traits = set([fn.stem for fn in trait_map_fns])
Y_arrays = list(compute(*[load_Y_raster(trait) for trait in unique_traits]))

all_arrays = X_arrays + Y_arrays
array_dict = {long_name: da for da, long_name in all_arrays}

# Create dtype dictionary for type casting after dropping nans
dtypes = {long_name: dtype for long_name, dtype in dtypes}

# Create DataArray dict containing the name of each DA and the DA itself.
# This will be used 
combined_ds = xr.Dataset(array_dict)

Next we will collect the names of the EO data arrays (X) and the trait arrays (Y). These will also be the column names in the final dataframe, and we can then drop all NA rows where any X value is missing and/or all Y values are missing. This requires two `.dropna()` calls, as we can't simply drop rows where some, but not all Y values are missing because that would discard valuable training data. On the flipside, we do want to drop rows where we don't have a complete suite of predictors because otherwise calculating the Area of Applicability will fail due to missing values.

**Note:* If we choose to use XGBoost as the model, it *can* handle missing values. It may be that we can find a suitable method for handling missing predictor data when calculating the AoA (e.g. with imputation), but for now we are choosing to simply drop them.

In [8]:
# Get the names of the predictor and trait arrays
all_names = list(dtypes.keys())
X_names = all_names[:len(eo_fns)]
Y_names = all_names[len(eo_fns):]

with config.set(**{"array.slicing.split_large_chunks": False}):
    # Convert to Dask DataFrame and drop missing values
    ddf = (
        combined_ds.to_dask_dataframe()
        .drop(columns=["band", "spatial_ref"])
        .dropna(how="all", subset=Y_names)
        .dropna(how="any", subset=X_names)
        .astype(dtypes)
    )

With the `dask` task graph now generated, we can begin computing the dataframe by iterating over each partition. If we don't do this, the task graph will eventually overflow our available memory. This is just one of many ways to take advantage of the efficiency and parallelization of `dask`. Once the partitions have been computed, we can simply concatenate the `dfs` with regular `pandas`.

This is possible because, since we first loaded all the rasters into the same `xarray.Dataset`, we know that each partition contains entirely unique XY coordinates, and therefore no coordinates could be duplicated across partitions.

In [9]:
# Compute in smaller chunks
npartitions = ddf.npartitions
dfs = [
    ddf.get_partition(i).compute()
    for i in tqdm(range(npartitions), total=npartitions, desc="Computing partitions")
]

# Concatenate the chunks
df = pd.concat(dfs).reset_index(drop=True)

Computing partitions:   0%|          | 0/81 [00:00<?, ?it/s]

2024-05-16 13:22:42 UTC - asyncio - ERROR - Task exception was never retrieved
future: <Task finished name='Task-259849' coro=<Client._gather.<locals>.wait() done, defined at /home/dl1070/micromamba/envs/traits/lib/python3.12/site-packages/distributed/client.py:2197> exception=AllExit()>
Traceback (most recent call last):
  File "/home/dl1070/micromamba/envs/traits/lib/python3.12/site-packages/distributed/client.py", line 2206, in wait
    raise AllExit()
distributed.client.AllExit


KeyboardInterrupt: 

In [14]:
df.head()

Unnamed: 0,x,y,ETH_GlobalCanopyHeightSD_2020_v1,ETH_GlobalCanopyHeight_2020_v1,sur_refl_b01_2001-2024_m10_mean,sur_refl_b01_2001-2024_m11_mean,sur_refl_b01_2001-2024_m12_mean,sur_refl_b01_2001-2024_m1_mean,sur_refl_b01_2001-2024_m2_mean,sur_refl_b01_2001-2024_m3_mean,...,vodca_x-band_mean,vodca_x-band_p5,vodca_x-band_p95,wc2.1_30s_bio_1,wc2.1_30s_bio_12,wc2.1_30s_bio_13-14,wc2.1_30s_bio_15,wc2.1_30s_bio_4,wc2.1_30s_bio_7,X50_mean
0,-179.895,68.395,8,4,10000,1675,41,259,7960,10000,...,12914,12956,12755,-12.245833,365.0,42.0,44.364677,1201.524536,37.800003,1.447954
1,-179.895,68.255,4,3,6161,1449,44,197,6213,7977,...,13841,13844,13457,-11.654166,340.0,42.0,51.789902,1237.107056,39.200001,1.063078
2,-179.885,67.365,1,0,4586,1753,68,485,5751,7228,...,15345,14444,15253,-11.316667,384.0,46.0,47.370106,1263.077637,40.199997,1.369326
3,-179.875,68.705,2,1,4641,2243,30,138,6857,8364,...,9674,9398,10504,-11.075,291.0,36.0,48.830692,1281.555298,40.299999,1.630068
4,-179.855,68.735,1,0,4848,2138,27,124,6260,7932,...,9609,9321,10456,-11.366667,300.0,35.0,46.124699,1257.740234,39.5,2.958496


In [15]:
client.close()
cluster.close()

## Examine features

As a sanity check, let's check to see how the size of our final features compares to the original size of our GBIF trait maps. 

In [12]:
gbif_trait_map_fns = (
    Path(cfg.interim_dir)
    / cfg.gbif.interim.dir
    / cfg.gbif.interim.traits
    / cfg.PFT
    / cfg.model_res
).glob("*.tif")

feats = pd.read_parquet(
    Path(cfg.train.dir)
    / cfg.PFT
    / cfg.model_res
    / "_".join(cfg.datasets.Y.use)
    / cfg.train.features
)

for fn in list(gbif_trait_map_fns)[:1]:
    trait_count = riox.open_rasterio(fn).sel(band=1).notnull().sum().values.item()
    feat_count = feats[[f"{fn.stem}_mean"]].dropna().shape[0]
    print(
        f"{fn.stem} has {trait_count:,} trait values and {feat_count:,} feature values ({feat_count / trait_count:%}%)"
    )

X95 has 5,944,102 trait values and 4,827,760 feature values (81.219333%%)


We were able to match about 81% of our original GBIF trait values with EO predictors! This may seem surprising, since we would expect our predictors to have at least as broad of coverage as our GBIF observations, but in fact many GBIF observations have locations that are in water and built-up areas, both of which were masked when harmonizing the EO data.