# 05: Climate data aggregation
*Aggregate gridded WBGT in the shade estimates into region-averaged estimates. Do this for both the reference dataset (UHE-Daily) as well as the climate change projections developed by `02_generate.ipynb`. This code is based on the Pangeo post [Conservative Region Aggregation with Xarray, Geopandas and Sparse](https://discourse.pangeo.io/t/conservative-region-aggregation-with-xarray-geopandas-and-sparse/2715/1) by Ryan Abernathey. Much of the functionality is from the [extended example](https://discourse.pangeo.io/t/conservative-region-aggregation-with-xarray-geopandas-and-sparse/2715/16) by Rich Signell.*

In [None]:
import logging
import time

import fsspec
import geopandas as gpd
import numpy as np
import pandas as pd
import xarray as xr
from dask.distributed import Client
from utils import gcm_list, prep_sparse, spatial_aggregation

Set up cluster to handle multiprocessing using a Dask client.

In [None]:
client = Client(threads_per_worker=1, n_workers=80)

Define functions for the notebook.

In [None]:
def load_population(grid_name="CarbonPlan"):
    """
    Load the population data generated in `03_population.ipynb`.
    """
    population_dict = {
        "CHC": "s3://carbonplan-climate-impacts/extreme-heat/v1.0/inputs/"
        "GHS_POP_E2030_GLOBE_R2023A_4326_30ss_V1_0_resampled_to_UHE_daily.zarr",
        "CarbonPlan": "s3://carbonplan-climate-impacts/extreme-heat/v1.0/"
        "inputs/GHS_POP_E2030_GLOBE_R2023A_4326_30ss_V1_0_resampled_to_CP.zarr",
    }
    population = xr.open_zarr(population_dict[grid_name])
    population = population.rename({"x": "lon", "y": "lat"}).drop("spatial_ref")
    return population

In [None]:
def gen_wget_strings(start_year: int, end_year: int) -> list:
    """
    Access the UHE-daily gridded data product that formed the basis for
    Tuholske et al (2021).
    """
    daterange = pd.date_range(f"{start_year}-01-01", f"{end_year}-12-31")
    return [
        (
            date.strftime("%Y.%m.%d"),
            f"https://data.chc.ucsb.edu/people/cascade/UHE-daily/wbgtmax/"
            f"{date.strftime('%Y')}/wbgtmax.{date.strftime('%Y.%m.%d')}.tif",
        )
        for date in daterange
    ]

In [None]:
def add_time_dim(xda):
    """
    Extract datetime from tiff encoding and create time dimension when fed
    into mfdataset.
    """
    xda = xda.expand_dims(
        time=[np.datetime64("-".join(xda.encoding["source"].split(".")[-4:-1]))]
    )
    return xda

In [None]:
def load_uhedaily_ds(start_year, end_year):
    """
    Load in the UHE-daily dataset.
    """
    strings = gen_wget_strings(start_year, end_year)
    subset = [i[1] for i in strings]
    ds = (
        xr.open_mfdataset(
            subset,
            engine="rasterio",
            chunks={},
            parallel=True,
            concat_dim="time",
            combine="nested",
            preprocess=add_time_dim,
        )
        .squeeze(dim=["band"], drop=True)
        .drop("spatial_ref")
        .rename({"band_data": "WBGT", "x": "lon", "y": "lat"})
        .sortby("lat")
    )
    return ds

In [None]:
def load_ds(gcm, scenario, years):
    """
    Load in the gridded WBGT in the shade estimates from `02_generate.ipynb`.
    """
    with dask.config.set(**{"array.slicing.split_large_chunks": False}):
        stores = [
            f's3://carbonplan-scratch/extreme-heat/wbgt-shade-gridded/years/{gcm}/{gcm}-{scenario.split('-')[0]}-{year}.zarr'
            for year in years
        ]
        ds = xr.open_mfdataset(stores, engine="zarr", chunks={})
        ds = ds.assign_coords(lon=(((ds[lon] + 180) % 360) - 180)).sortby(lon)
    return ds

In [None]:
def load_regions():
    """
    Load in the city and regions that to use for aggregation.
    """
    path = "s3://carbonplan-climate-impacts/extreme-heat/v1.0/inputs/all_regions_and_cities.json"
    with fsspec.open(path) as file:
        regions_df = gpd.read_file(file)

    regions_df.crs = "epsg:4326"
    # use an area-preserving projection
    crs_area = "ESRI:53034"
    regions_df = regions_df.to_crs(crs_area)
    return regions_df

In [None]:
lon = "lon"
lat = "lat"
scenario_years = {
    "historical": np.arange(1985, 2015),
    "ssp245": np.arange(2015, 2061),
}

In [None]:
regions_df = load_regions()
bbox = tuple(regions_df.total_bounds)

Access the gridded UHE-Daily data from Tuholske et al (2021) and extract timeseries for the regions of interest. These will form the reference dataset for `06_bias_correction.ipynb`. Thanks to Cascade Tuholske (Montana State University) and Pete Peterson (University of California, Santa Barbara) for making the gridded dataset available. The source gridded dataset may not remain available indefinitely, but the full city- and region-aggregated version is available here alongside the other inputs for the analysis, maintaining reproducibility of the project. 

The next steps aggregate the gridded datasets to region-average estimates. The non-city regions encompass all land area and thus sometimes include significant stretches of uninhabited land with potentially erroneously high or low temperatures (e.g., deserts). Weighting the aggregation by a gridded population product helps ensure that the estimates are human-relevant.

Load the UHE-Daily dataset and calculate weights.

In [None]:
ds = load_uhedaily_ds(1983, 2016)
fp = "s3://carbonplan-climate-impacts/extreme-heat/v1.0/inputs/wbgt-UHE-daily-historical.zarr"
ds = ds.sel(lon=slice(bbox[0], bbox[2]), lat=slice(bbox[1], bbox[3]))
population = load_population(grid_name="CHC")
population = population.sel(lon=slice(bbox[0], bbox[2]), lat=slice(bbox[1], bbox[3]))
sparse_weights, population = prep_sparse(
    ds, population, regions_df, return_population=True, variables_to_drop=["WBGT"]
)

Use weights to aggregate gridded estimates into region-average estimates.

In [None]:
variables_to_drop = ["WBGT"]
sample_time_slice = ds.isel(time=0)["WBGT"].load()
regridded = spatial_aggregation(ds, sparse_weights, "processing_id", load=False)
regridded = regridded.chunk(chunks={"time": -1, "processing_id": 1000})
logging.info(f"{time.ctime()}: Adjusting time dtype")
regridded_dt = regridded.assign_coords(
    {"time": regridded.time.astype("datetime64[ns]")}
)
logging.info(f"{time.ctime()}: Writing Zarr store")
regridded_dt.to_zarr(fp, consolidated=True, mode="w")

Repeat the above process but for our gridded WBGT estimates developed in `02_generate.ipynb`. 

Load a sample dataset as a template to calculate weights. The same weights can be used for every projection because all GCMs are on the same 0.25 degree grid.

In [None]:
ds = load_ds("ACCESS-CM2", "historical", np.arange(1985, 1986))
ds = ds.sel(lon=slice(bbox[0], bbox[2]), lat=slice(bbox[1], bbox[3]))
population = load_population(grid_name="CarbonPlan")
population = population.sel(lon=slice(bbox[0], bbox[2]), lat=slice(bbox[1], bbox[3]))
sparse_weights, population = prep_sparse(
    ds, population, regions_df, return_population=True, variables_to_drop=["WBGT"]
)

Aggregate all gridded estimates into region-average estimates.

In [None]:
for gcm in gcm_list:
    for scenario in ["historical", "ssp245"]:
        logging.info(f"Starting: {time.ctime()}: {gcm}-{scenario}")
        ds = load_ds(gcm, scenario, scenario_years[scenario])
        ds = ds.sel(lon=slice(bbox[0], bbox[2]), lat=slice(bbox[1], bbox[3]))
        population = population.sel(
            lon=slice(bbox[0], bbox[2]), lat=slice(bbox[1], bbox[3])
        )
        sample_time_slice = ds.isel(time=0)["WBGT"].load()

        regridded = spatial_aggregation(ds, sparse_weights, "processing_id", load=False)
        regridded = regridded.chunk(chunks={"time": -1, "processing_id": 100})
        fp = f"s3://carbonplan-scratch/extreme-heat/wbgt-shade-regions/{gcm}-{scenario}.zarr"
        logging.info(f"Writing: {time.ctime()}: {fp}")
        regridded.to_zarr(fp, consolidated=True, mode="w")