# 07: Solar radiation and wind data aggregation
*Extract daily solar radiation and wind data for subsequent use in developing WBGT in the sun estimates in `08_shade_sun_adjustment.ipynb`.*

In [None]:
# use zarr v2
import coiled
import dask
import numpy as np
import pandas as pd
import xarray as xr
from dask import delayed
from dask.distributed import progress
from metsim.datetime import date_range
from metsim.disaggregate import shortwave
from metsim.physics import solar_geom
from tqdm.notebook import tqdm
from utils import (
    gcm_list,
    gcms_with_nonstandard_calendars_list,
    load_regions,
    load_virtual_nasa_nex,
    prep_sparse,
    remove_360_longitudes,
    spatial_aggregation,
)

In [None]:
cluster = coiled.Cluster(
    n_workers=5,
    name="06",
    worker_vm_types=["m7g.medium"],
    scheduler_vm_types=["c7g.8xlarge"],
    region="us-west-2",
    spot_policy="spot_with_fallback",
)

cluster.adapt(minimum=1, maximum=1000)

client = cluster.get_client()

In [None]:
@delayed
def process_gcm(gcm: tuple, sparse_weights) -> str:
    """
    Process GCM historical and ssps using the pre-calculated sparse weights.
    """
    historical = (
        load_virtual_nasa_nex(gcm=gcm, scenario="historical")[["rsds", "sfcWind"]]
        .pipe(remove_360_longitudes)
        .chunk({"time": 30, "lat": 600, "lon": 1440})
    )

    historical_agg = spatial_aggregation(historical, sparse_weights)

    # Process each SSP scenario
    for scenario in ["ssp245", "ssp370"]:
        # Load SSP data
        ssp = (
            load_virtual_nasa_nex(gcm=gcm, scenario=scenario)[["rsds", "sfcWind"]]
            .pipe(remove_360_longitudes)
            .chunk({"time": 30, "lat": 600, "lon": 1440})
        )

        ssp_agg = spatial_aggregation(ssp, sparse_weights)

        # Concatenate historical + SSP
        combined = xr.concat([historical_agg, ssp_agg], dim="time")

        if gcm in gcms_with_nonstandard_calendars_list:
            combined = combined.convert_calendar(
                "gregorian",
                dim="time",
                align_on="year",
                missing=np.nan,
                use_cftime=None,
            )
            combined = combined.interpolate_na(dim="time", method="linear")

        # Save to Zarr
        output_path = f"s3://carbonplan-scratch/extreme-heat/wind_solrad-regions/{gcm}-{scenario}-wind-solrad-regions.zarr"
        combined.to_zarr(output_path, consolidated=True, mode="w")

    return gcm

In [None]:
regions_df = load_regions(extension="central-asia")
population = (
    xr.open_zarr(
        "s3://carbonplan-climate-impacts/extreme-heat/v1.0/inputs/"
        "GHS_POP_E2030_GLOBE_R2023A_4326_30ss_V1_0_resampled_to_CP.zarr"
    )
    .rename({"x": "lon", "y": "lat"})
    .drop_vars("spatial_ref")
)

In [None]:
# Create delayed objects for each GCM
sample_ds = (
    load_virtual_nasa_nex(gcm="ACCESS-CM2", scenario="historical")[["rsds", "sfcWind"]]
    .pipe(remove_360_longitudes)
    .isel(time=slice(0, 1))
)

sparse_weights_da = prep_sparse(
    sample_ds, population, regions_df, variables_to_drop=["rsds", "sfcWind"]
)
sparse_weights_da

delayed_results = []
for gcm in gcm_list:
    result = process_gcm(gcm, sparse_weights_da)

    delayed_results.append(result)

In [None]:
results = dask.persist(delayed_results, retires=1)


In [None]:
progress(results)

Calculate representative elevation and latitude for each region, which will be used below by `metsim` for solar geometry calculations.

In [None]:
elev = xr.open_zarr(
    "s3://carbonplan-climate-impacts/extreme-heat/v1.0/inputs/elevation.zarr"
)
elev = elev.chunk({"lat": -1, "lon": -1}).compute()
elev.coords["lon"] = (elev.coords["lon"] + 180) % 360 - 180
elev = elev.sortby(elev.lon)
sparse_weights = prep_sparse(sample_ds, population, regions_df, return_population=False)
# # attach a placeholder time timension
elev = elev.expand_dims(dim="time").assign_coords(
    {"time": pd.date_range("2000-01-01", "2000-01-01")}
)
assert (population["lon"].values == elev["lon"].values).all()
assert (population["lat"].values == elev["lat"].values).all()
assert (
    population["population"].values.shape == elev["elevation"].isel(time=0).values.shape
)
elev_regions = spatial_aggregation(elev, sparse_weights).drop("time")

In [None]:
lat_ds = xr.Dataset(
    {
        "latitude": xr.DataArray(
            data=np.tile(elev.lat.values, (len(elev.lon.values), 1)).transpose(),
            coords={"lat": elev.lat.values, "lon": elev.lon.values},
        )
    }
)
# attach a placeholder time dimension
lat_ds = lat_ds.expand_dims(dim="time").assign_coords(
    {"time": pd.date_range("2000-01-01", "2000-01-01")}
)
assert (population["lon"].values == lat_ds["lon"].values).all()
assert (population["lat"].values == lat_ds["lat"].values).all()
assert (
    population["population"].values.shape
    == lat_ds["latitude"].isel(time=0).values.shape
)
lat_regions = spatial_aggregation(lat_ds, sparse_weights).drop("time")

Take advantage of utilities in the `metsim` package, developed largely by Andrew Bennett at the University of Arizona. The `solar_geom`, `shortwave`, and `date_range` functions are slightly different from their implementations in the `metsim` package for this use case which focuses solely on solar radiation.

For each region, use elevation and latitude information calculated above to derive radiation parameters like day length and subdaily maximum solar radiation. This calculation only needs to be done once because, while it varies in time throughout the year for every location, it will be the same for every projection.

In [None]:
params = {
    "time_step": 60,
    "method": "other",
    "utc_offset": False,
    "calendar": "gregorian",
    "SW_RAD_DT": 3600,
}

In [None]:
tiny_rad_fract_list, daylength_list = [], []
for processing_id in tqdm(elev_regions.processing_id.values):

    sg = solar_geom(
        elev_regions.sel(processing_id=processing_id)["elevation"].values[0],
        lat_regions.sel(processing_id=processing_id)["latitude"].values[0],
        -6.5,
        params,
    )
    tiny_rad_fract_list.append(
        xr.DataArray(data=sg[0], dims=("dayofyear", "tiny_timestep"))
    )
    daylength_list.append(xr.DataArray(data=sg[1], dims=("dayofyear")))
radiation_parameters = xr.Dataset(
    {
        "tiny_rad_fract": xr.concat(tiny_rad_fract_list, dim="processing_id"),
        "daylength": xr.concat(daylength_list, dim="processing_id"),
    }
)
radiation_parameters = radiation_parameters.assign_coords(
    {"processing_id": elev_regions.processing_id.values}
)

radiation_parameters = radiation_parameters.chunk(
    {"dayofyear": -1, "tiny_timestep": -1, "processing_id": 4000}
)
radiation_parameters.to_zarr(
    "s3://carbonplan-scratch/extreme-heat/wind_solrad-regions/radiation_parameters.zarr",
    mode="w",
    consolidated=True,
)
radiation_parameters = xr.open_zarr(
    "s3://carbonplan-scratch/extreme-heat/wind_solrad-regions/radiation_parameters.zarr"
)

radiation_parameters = radiation_parameters.chunk(
    {"processing_id": 25, "tiny_timestep": -1, "dayofyear": -1}
)

Set up a dataframe template to store the data and functions for calculating maximum daily solar radiation from the daily mean solar radiation.

In [None]:
df_daily_template = pd.DataFrame(index=pd.date_range("1985-01-01", "2100-12-31"))
stop = (
    df_daily_template.index[-1]
    + pd.Timedelta("1 days")
    - pd.Timedelta("{} minutes".format(params["time_step"]))
)
dates_disagg = date_range(
    df_daily_template.index[0],
    stop,
    freq="{}T".format(params["time_step"]),
    calendar=params["calendar"],
)
df_disagg_template = pd.DataFrame(index=dates_disagg)
yday = df_daily_template.index.dayofyear - 1

In [None]:
def shortwave_wrapper(rsds, daylengths, tiny_rad_fract):
    """
    Wrapper function for shortwave which supports vectorized computation
    via `xr.ufunc`
    """

    params = {
        "time_step": 60,
        "method": "other",
        "utc_offset": False,
        "calendar": "gregorian",
        "SW_RAD_DT": 3600,
    }
    dayofyear = pd.date_range("1985-01-01", "2100-12-31").dayofyear.values
    shortwave_out = shortwave(rsds, daylengths[yday], dayofyear, tiny_rad_fract, params)
    da = xr.DataArray(shortwave_out, dims=["hourlytime"])
    da = da.assign_coords(
        {
            "hourlytime": pd.date_range(
                "1985-01-01 00:00:00", "2100-12-31 23:00:00", freq="H"
            )
        }
    )
    output = da.resample({"hourlytime": "D"}).max().data
    return output

Calculate maximum solar radiation given daily mean solar radiation and radiation parameters (as calculated above). This approach accounts for the cooling effect of clouds but does not capture subdaily variations in cloud cover.

In [None]:
@delayed
def calc_shortwave(
    gcm_scenario_tuple: tuple, radiation_parameters: xr.Dataset
) -> tuple:
    gcm, scenario = gcm_scenario_tuple
    wind_solrad_ds = (
        xr.open_zarr(
            f"s3://carbonplan-scratch/extreme-heat/wind_solrad-regions/{gcm}-{scenario}-wind-solrad-regions.zarr"
        )
        .sel(time=slice("1985-01-01", "2100-12-31"))
        .persist()
    )
    wind_solrad_ds = wind_solrad_ds.chunk({"processing_id": -1, "time": -1})

    max_solrad = xr.apply_ufunc(
        shortwave_wrapper,
        wind_solrad_ds["rsds"],
        radiation_parameters.daylength,
        radiation_parameters.tiny_rad_fract,
        input_core_dims=[["time"], ["dayofyear"], ["dayofyear", "tiny_timestep"]],
        output_core_dims=[["time"]],
        vectorize=True,
        dask="parallelized",
        output_dtypes=[wind_solrad_ds["rsds"].dtype],
    )
    out_store = f"s3://carbonplan-scratch/extreme-heat/wind_solrad-regions/{gcm}-{scenario}-rsds-max-regions.zarr"
    max_solrad.to_zarr(out_store, mode="w", consolidated=True)
    return gcm_scenario_tuple

In [None]:
gcm_scenario_tuples = [
    (gcm, scenario) for gcm in gcm_list for scenario in ["ssp245", "ssp370"]
]

gcm_scenario_tuples = gcm_scenario_tuples[0:2]
delayed_results = []
for gcm_scenario in gcm_scenario_tuples:
    result = calc_shortwave(gcm_scenario, radiation_parameters)
    delayed_results.append(result)

In [None]:
results = dask.persist(delayed_results, retires=1)
progress(results)

In [None]:
cluster.shutdown()