# Run pyCIAM on 3 different adaptation scenarios

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from time import sleep

import dask.config
import numpy as np
import pandas as pd
import xarray as xr
from distributed import as_completed
from pyCIAM.constants import CASE_DICT
from pyCIAM.io import check_finished_zarr_workflow
from pyCIAM.run import (
    _aggregate_results,
    _check_completed_fut_batch,
    _get_selectors,
    execute_pyciam,
)
from shared import (
    AUTHOR,
    BIN_MAP_PATH,
    CONTACT,
    DIR_RAW,
    DIR_SCRATCH,
    HISTORY,
    PATH_OUTPUT,
    PATH_OUTPUT_FIXEDADAPT,
    PATH_OUTPUT_GLOBALADAPT,
    PATH_PARAMS,
    PATH_REFA,
    PATH_REFA_GLOBALADAPT,
    PATH_SLIIDERS,
    PATH_SLIIDERS_SEG,
    PATH_SLIIDERS_SEG_TMP,
    PATH_SLR_INT,
    PATHS_SURGE_LOOKUP,
    SEG_CHUNKSIZE,
    TMPPATH,
    TMPPATH_FIXEDADAPT,
    TMPPATH_GLOBALADAPT,
    start_dask_cluster,
)

In [3]:
DESCRIPTION = "pyCIAM outputs using impact-region SLIIDERS database from DSCIM-EPA, modified slightly to update VSL numbers and remove pre-matched SLR site id, which was matched to LocalizeSL SLR data that is no longer used. Outputs are run against a selection of FACTS simulated SLR scenarios associated with AR6. Specific workflows, SSP scenarios, and monte carlo samples are chosen to give a wide range of samples for global SLR in each year up to 2100, covering as much as possible the support of the FaIR+SESL GMSL projections used to generate a coastal SCC as part of DSCIM."

PATH_SLIIDERS_TMP = DIR_SCRATCH / "sliiders-tmp.zarr"

PARAMS = pd.read_json(PATH_PARAMS)["values"]

In [4]:
from dask_gateway import Gateway

gateway = Gateway()
gateway.list_clusters()

[]

In [5]:
client, cluster = start_dask_cluster(
    env_items={
        "DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "10",
    }
)
cluster.scale(250)

# cluster = gateway.connect(gateway.list_clusters()[0].name)
# client = cluster.get_client()

# from distributed import Client

# client = Client(n_workers=7)
# cluster = client.cluster

cluster

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    â€¦

## Get filter of scenarios to run (comment out if we are not running full set of scenarios)

In [8]:
scen_mc_filter = (
    xr.open_zarr(str(BIN_MAP_PATH), chunks=None)[["scenario", "sample"]]
    .to_dataframe()
    .set_index(["scenario", "sample"])
    .index.unique()
)
scen_mc_filter = scen_mc_filter.union(
    pd.MultiIndex.from_product(
        (["ncc_ar6"], scen_mc_filter.get_level_values("sample").unique()),
        names=["scenario", "sample"],
    )
)

## Run pyCIAM

In [9]:
def postprocess_func(ds):
    return (
        ds.drop_vars("npv").sel(case=["noAdaptation", "optimalfixed"]).sum("costtype")
    )


common_kwargs = dict(
    pyciam_seg_chunksize=SEG_CHUNKSIZE,
    surge_input_paths=PATHS_SURGE_LOOKUP,
    dask_client_func=lambda: client,
    remove_tmpfile=False,
    seg_var="seg_ir",
    adm_var="impact_region",
    mc_dim="sample",
    quantiles=None,
    no_surge_check=True,
    other_chunksizes={"ssp": 1, "iam": 1, "scen_mc": 1000},
    extra_attrs={
        "author": AUTHOR,
        "contact": CONTACT,
        "description": DESCRIPTION,
        "history": HISTORY,
    },
    scen_mc_filter=scen_mc_filter,
    postprocess_func=postprocess_func,
)

### Run normal pyCIAM (full-adapt)

In [10]:
execute_pyciam(
    PATH_PARAMS,
    PATH_SLIIDERS,
    PATH_SLR_INT,
    "ar6",
    PATH_REFA,
    econ_input_path_seg=PATH_SLIIDERS_SEG,
    output_path=PATH_OUTPUT,
    tmp_output_path=TMPPATH,
    **common_kwargs,
)

Creating ESL impacts lookup table if it does not exist...
Baseline adaptation RPs already calculated...
Queueing jobs to calculate costs for all adaptation scenarios...


  _, chunked_data = chunkmanager.unify_chunks(*unify_chunks_args)


### Run fixed adaptation

In this run, we hold rho (resilience to storm impacts) fixed at 2000-2014 mean levels. We do **NOT** regenerate "refA", which defines present-day adaptation levels by running pyCIAM under a no-climate-change scenario and picking the optimal adaptation height. In other words, we are using the time-varying rho to define refA but then fixing rho at 2000 levels for the actual model run.

In [8]:
dr = PARAMS.loc["dr"]
npv_start = PARAMS.loc["npv_start"]

Create template zarr for new output

In [9]:
# template = (
#     xr.open_zarr(str(TMPPATH))
#     .sel(case="noAdaptation", costtype="stormPopulation", drop=True)
#     .drop_encoding()
# )
# template[["costs", "npv"]] = template[["costs", "npv"]].expand_dims(
#     case=["noAdaptation", "optimalfixed"]
# )

# for dv in template.data_vars:
#     template[dv] = create_template_dataarray(
#         template[dv].dims,
#         dict(template[dv].coords),
#         {k: v[0] for k, v in template[dv].chunksizes.items()},
#         dtype="float32" if dv != "optimal_case" else "uint8",
#         name=dv,
#     )

# template.to_zarr(str(TMPPATH_FIXEDADAPT), mode="w", compute=False)

Run batched jobs to take old output, adjust storm damages with new rho, recalculate NPV, recalculate optimal case, and save to new store.

In [10]:
def optimize_case_new_rho(selectors, eps=1):
    # use last fpath to check if this task has already been run
    if check_finished_zarr_workflow(
        finalstore=TMPPATH_FIXEDADAPT,
        varname="costs",
        final_selector=selectors,
    ):
        return None

    with xr.open_zarr(str(PATH_SLIIDERS), chunks=None) as ds:
        all_segs = ds.seg.load()
        all_seg_vars = ds["seg_ir"].load()

        segadm_seg_map = all_segs.sel(seg_ir=selectors["seg_ir"])
        all_seg_adms = all_seg_vars.isel(
            seg_ir=all_segs.isin(np.unique(segadm_seg_map))
        )

        rho = (
            ds.rho.sel(
                country=ds.seg_country,
                **{
                    k: v for k, v in selectors.items() if k not in ["seg_ir", "scen_mc"]
                },
            )
            .sel(seg_ir=all_seg_adms.values)
            .load()
            .drop_vars("country")
        )

    rho_ratio = (1 - rho.sel(year=slice(2000, 2014)).mean("year")) / (1 - rho)
    dfact = (1 / (1 + dr)) ** (xr.open_zarr(str(TMPPATH), chunks=None).year - npv_start)

    # use single threaded dask over numpy b/c it keeps memory footprint down for irs
    # that require loading a lot of seg-irs
    npv_delta = (
        (
            (
                xr.open_zarr(str(TMPPATH))
                .costs.sel(
                    seg_ir=all_seg_adms.values,
                    costtype=["stormPopulation", "stormCapital"],
                    **{k: v for k, v in selectors.items() if k != "seg_ir"},
                )
                .drop_sel(case="optimalfixed")
                .sum("costtype")
            )
            * (rho_ratio - 1)
        )
        * dfact
    ).sum("year")
    with dask.config.set(scheduler="single-threaded"):
        npv_delta = npv_delta.load()

    opt_case = (
        (
            (
                xr.open_zarr(str(TMPPATH), chunks=None)
                .npv.sel(
                    seg_ir=all_seg_adms.values,
                    **{k: v for k, v in selectors.items() if k != "seg_ir"},
                )
                .drop_sel(case="optimalfixed")
            )
            + npv_delta
        )
        .groupby(all_seg_adms.seg)
        .sum()
    )

    # in case of a tie, we don't want floating point precision noise to determine the
    # choice so we artificially shave 1 dollar off of the noAdaptation npv
    opt_case = opt_case.where(opt_case.case != "noAdaptation", opt_case - eps).idxmin(
        "case"
    )

    opt_case_ser = opt_case.to_series()
    opt_val = (
        pd.Series(
            pd.Series(CASE_DICT).loc[opt_case_ser].values,
            index=opt_case_ser.index,
        )
        .to_xarray()
        .astype("uint8")
        .sel(seg=segadm_seg_map)
    )

    # get opt_case back into seg-region dimension
    opt_case = opt_case.sel(seg=segadm_seg_map)

    out = (
        xr.open_zarr(str(TMPPATH), chunks=None)[["costs", "npv"]]
        .sel(selectors)
        .drop_sel(case="optimalfixed")
    )
    out["costs"] = out.costs.where(
        ~out.costtype.isin(["stormPopulation", "stormCapital"]),
        out.costs * rho_ratio.sel(seg_ir=selectors["seg_ir"]),
    ).sum("costtype")
    out["npv"] += npv_delta.sel(seg_ir=selectors["seg_ir"])

    out = out.drop_vars("costtype")

    noadapt = out.sel(case=["noAdaptation"])
    out = out.sel(case=opt_case).drop_vars("case").expand_dims(case=["optimalfixed"])
    out = xr.concat((noadapt, out), dim="case")
    out["optimal_case"] = opt_val
    out.reset_coords(drop=True).to_zarr(str(TMPPATH_FIXEDADAPT), region="auto")

In [11]:
selector_groups, ix_groups = _get_selectors(
    xr.open_zarr(str(PATH_SLIIDERS), chunks=None),
    "seg_ir",
    SEG_CHUNKSIZE,
    common_kwargs["other_chunksizes"],
    xr.open_zarr(str(TMPPATH_FIXEDADAPT), chunks=None),
    10000,
)

In [12]:
ciam_futs = None
n_selector_groups = len(selector_groups)
for sx, s in enumerate(selector_groups):
    final_batch = (sx + 1) == n_selector_groups
    print(f"computing group {sx + 1} / {n_selector_groups}")

    this_futs = pd.Series(
        s,
        index=client.map(
            optimize_case_new_rho,
            s,
        ),
    )
    if ciam_futs is None:
        ciam_futs = this_futs
    else:
        ciam_futs = pd.concat((ciam_futs, this_futs))
    del this_futs

    for batch in as_completed(ciam_futs.index.tolist()).batches():
        ciam_futs = _check_completed_fut_batch(
            batch, ciam_futs, client, optimize_case_new_rho
        )
        del batch
        if ((not final_batch) and len(ciam_futs) < (1000)) or (
            final_batch and (not len(ciam_futs))
        ):
            break
        sleep(10)

computing group 1 / 178
computing group 2 / 178
computing group 3 / 178
computing group 4 / 178
computing group 5 / 178
computing group 6 / 178
computing group 7 / 178
computing group 8 / 178
computing group 9 / 178
computing group 10 / 178
computing group 11 / 178
computing group 12 / 178
computing group 13 / 178
computing group 14 / 178
computing group 15 / 178
computing group 16 / 178
computing group 17 / 178
computing group 18 / 178
computing group 19 / 178
computing group 20 / 178
computing group 21 / 178
computing group 22 / 178
computing group 23 / 178
computing group 24 / 178
computing group 25 / 178
computing group 26 / 178
computing group 27 / 178
computing group 28 / 178
computing group 29 / 178
computing group 30 / 178
computing group 31 / 178
computing group 32 / 178
computing group 33 / 178
computing group 34 / 178
computing group 35 / 178
computing group 36 / 178
computing group 37 / 178
computing group 38 / 178
computing group 39 / 178
computing group 40 / 178
computing

In [27]:
def _drop_npv(ds):
    return ds.drop_vars("npv")


_aggregate_results(
    TMPPATH_FIXEDADAPT,
    PATH_OUTPUT_FIXEDADAPT,
    PATH_SLIIDERS,
    "seg_ir",
    "impact_region",
    "sample",
    client,
    scen_mc_filter=scen_mc_filter,
    postprocess_func=_drop_npv,
    overwrite=False,
)

In [28]:
assert xr.open_zarr(str(PATH_OUTPUT_FIXEDADAPT)).costs.notnull().all()

### Run with global average rho

First we create a new version of SLIIDERS that contains the population-weighted global average resilience factor ($\rho$), by country. For the population weighting, we use the 2010 population estimates from the SSPs, previously downscaled to impact regions and now aggregated up to countries.

In [78]:
pop = (
    xr.open_zarr(str(DIR_RAW / "integration-econ-bc39.zarr"), chunks=None)
    .sel(ssp="SSP2", model="IIASA GDP", year=2010, drop=True)["pop"]
    .load()
)
country_pops = (
    pop.groupby(pop.region.astype(str).str.split("tmp", sep=".").isel(tmp=0))
    .sum()
    .rename(region="country")
)

# fix typo in some versions of GADM
country_pops["country"] = country_pops.country.str.replace("SMX", "SXM")

sliiders = xr.open_zarr(str(PATH_SLIIDERS))
sliiders["rho"] = (
    sliiders.rho.sel(year=slice(2000, 2014))
    .mean("year")
    .weighted(country_pops.sel(country=sliiders.country.values))
    .mean("country")
    .expand_dims(country=sliiders.country, year=sliiders.year)
)

sliiders.to_zarr(str(PATH_SLIIDERS_TMP), mode="w")

Now, we rerun pyCIAM

In [None]:
execute_pyciam(
    PATH_PARAMS,
    PATH_SLIIDERS_TMP,
    PATH_SLR_INT,
    "ar6",
    PATH_REFA_GLOBALADAPT,
    econ_input_path_seg=PATH_SLIIDERS_SEG_TMP,
    output_path=PATH_OUTPUT_GLOBALADAPT,
    tmp_output_path=TMPPATH_GLOBALADAPT,
    **common_kwargs,
)

Creating ESL impacts lookup table if it does not exist...
Baseline adaptation RPs already calculated...




In [None]:
cluster.close(), client.close()