## Mortality: Compute discounted time series and SCC with climate-only and full uncertainty

This notebook is meant to be the most up-to-date and operational workflow for running mortality partial-SCC. It was created for `v2.5` but should be backwards compatible for v2.4+

This notebook is developed from `mortality_ensemble_climate_full_montecarlo_modularize_SimonDianaTesting.ipynb` which was a combination of `mortality_ensemble_climate_only_modularize.ipynb` and `mortality_ensemble_montecarlo.ipynb`.

In [1]:
HISTORY = """
version 2.5.1: Same as v2.5 but without adaptation costs. Mortality resubmission Nov 2021.
version 2.5: new damage function specification. Now running for 19 quantiles. GMST anomaly update: 
             now using centered 20-year moving average of GMST anomalies for each model, and also
             now extrapolating over the period 2092-2100 for those GCMs that don't have data 
             beyond 2100 (using quadratic extrapolation). GMST smoothing computed 
             with this notebook: https://gitlab.com/ClimateImpactLab/Climate/climate_data_notebooks/-/blob/master/smooth_GMST_anom_damagefunc.ipynb. 
version 2.4: same as v2.3 but with "Algeria Bug Fix". Produced in: 
             mortality_climate_full_uncertainty_objectoriented_computed_v2p4_here.ipynb
version 2.3: new damage function specification. No lower bound on 
             adaptation. Same time as changing pulse year to 2020, and 2019USD.
version 2.2: new damage function specificiation. Back to quadratic 
             and a time interaction for post-2100 extrapolation.
version 2.1: (v1.6-2.0 skipped) new temperature simulations and filters,
             including wider ECS, diagnosed emissions for each param set
             in IPT and TCRE simulations. Updated damage function (cubic).
             Removed adjustment to preindustrial temperatures. Everything
             rebased to 2001-2010.
version 1.5: temperature anomalies updated to reflect regression
             temperatures (subtract 1986-2005 average, + 0.6)
version 1.4: temperature anomalies updated to reflect regression
             temperatures (subtract 1986-2006 average, + 0.8)
version 1.3: quadratic damage functions replaced with linear
             functions when not significant
version 1.2: quantile regression used in damage functions
version 1.1: truncated noramal distribution used in tau3
version 1.0: IPT filter switched to first occurrence of dT<0
"""

# Set up workspace

In [2]:
pip install fair==1.3.2

Note: you may need to restart the kernel to use updated packages.


#### Make sure we are using FaIR v1.3.2 on workers and in notebook

In [3]:
import fair

assert fair.__version__ == "1.3.2"

In [None]:
import dask
import dask.array as dda
import dask.dataframe as ddf
import dask.distributed as dd
#import dask_kubernetes as dk
import rhg_compute_tools.utils as rhgu
import rhg_compute_tools.kubernetes as rhgk

import os

### set up the cluster spec with the correct docker image and packages
For running the "big set of jobs" over all climate simulations (100,000), ~100 standard workers is pretty good.

For post-processing and diagnostics, 10-30 workers is enough.


In [None]:
client, cluster = rhgk.get_big_cluster(extra_pip_packages="fair==1.3.2")
cluster.scale(30) #(100) # use 100 for computing and saving all damages. 30 for post-processing
cluster

### Set up the local workspace

import other data analysis packages

In [None]:
import xarray as xr
import numpy as np
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
from pandas import IndexSlice as idx

import scipy as sp
import scipy.stats
from scipy.stats import lognorm, norm
import seaborn as sns

import tqdm

from datetime import datetime
import sys
sys.path.append('..')

import load_climate_parameters as lcp

%matplotlib inline

In [None]:
import datashader
import datashader.transfer_functions as dstf


# Global settings
Settings that define central or quantile-reg damage function, which version of damage func and climate will be run, and other global metadata

This is ugly but important for the rest of the notebook

TO-DO switch from a bunch of boolean flags (`MORTALITY*`) to one setting for climate and one for damages (e.g. `MORTALITY_DAMAGE_VERS = 'v2p5'` or something)

### CLIMATE-ONLY OR FULL UNCERTAINTY FLAG 
- Running climate-only uncertainty uses only the central damage function. (`DO_QUANTILE_REG = False`)
- Running full uncertainty (or, climate+statistical uncertainty) uses quantile regression damage functions. (`DO_QUANTILE_REG = True`)

In [None]:
DO_QUANTILE_REG = True  # Run the code for central damage func or quantile reg damage funcs

In [None]:
SECTOR = "mortality"
DAMAGE_VARNAME = "mortrate" # variable name in the outputs


# these are the index columns in the damage function specification csv's
CENTRAL_INDEX_COLS = [0, 1, 2, 3]
QR_INDEX_COLS = [0, 1, 2, 3, 4]

MORTALITY_DAMAGE_V1 = False
MORTALITY_DAMAGE_V2p1 = False
MORTALITY_DAMAGE_V2p2 = False
MORTALITY_DAMAGE_V2p3 = False
MORTALITY_DAMAGE_V2p4 = False
MORTALITY_DAMAGE_V2p5 = False
MORTALITY_DAMAGE_V2p5p1 = True 

MORTALITY_CLIMATE_V1 = (
    False  # refers to the parameter set used. This is summer 2018 paper version
)
MORTALITY_CLIMATE_V2 = False  # This is 2019 paper version w/ pulse year = 2015
MORTALITY_CLIMATE_V2p1 = True  # this is 2019-2020 paper version w/ pulse year = 2020

# only one Damage version should be set to True
assert (
    np.sum(
        [
            MORTALITY_DAMAGE_V2p1,
            MORTALITY_DAMAGE_V2p2,
            MORTALITY_DAMAGE_V2p3,
            MORTALITY_DAMAGE_V2p4,
            MORTALITY_DAMAGE_V2p5,
            MORTALITY_DAMAGE_V2p5p1,
        ]
    )
    == 1
)

# only one Climate version should be set to True
assert np.sum([MORTALITY_CLIMATE_V1, MORTALITY_CLIMATE_V2, MORTALITY_CLIMATE_V2p1]) == 1

CLIM_VERS = "2.1"
DAMAGE_VERS = "2.5.1"
# TODO: switch to using these version strings instead of all the boolean flags

# for saving outputs: (User should modify to desired save directory)
BASEDIR_FP = "/gcs/impactlab-data/gcp/outputs/{sector}/fair_global_scc_montecarlo/"


## Set these globals to specify metadata for file output
Others generating new data should modify these!

In [None]:
AUTHOR_NAME = "Stefan Klos" #"Kelly McCusker" #"Diana Gergel"
AUTHOR_EMAIL = "stefan_klos@ucsb.edu" #"kmccusker@rhg.com" #"dgergel@rhg.com"
DATE_NOW = "{:%m-%d-%Y %H:%M:%S}".format(datetime.now())

#### Set up valuation metadata (turn mortality response into $)

In [None]:
REFERENCE_YEAR = 1765

# year in which damages start being calculated (determined by first year in damages coefs file)
START_YEAR = 2015

# year in which pulse will be emitted. Update the PULSE_DEFLATOR accordingly
if MORTALITY_CLIMATE_V1 or MORTALITY_CLIMATE_V2:
    PULSE_YEAR = 2015
elif MORTALITY_CLIMATE_V2p1:
    PULSE_YEAR = 2020
else:
    raise NotImplementedError

# in Gt C = 1e9 ton C
PULSE_AMT = 1.0

# CONVERSION is in units of
# [pulse/tCO2] = [1 pulse/PULSE_AMT GtC * 1 GtC/1e9 tC * 12tC/44tCO2]
# This is used to convert costs ($Bn_2005 / pulse) to SCC ($_2015/ton
# CO2), though the $Bn_2005 to $_2015 is handled later.
# Therefore, it should be the inverse of any changes to PULSE_AMT
PULSE_CONVERSION = 1.0 / PULSE_AMT / 1e9 * 12.011 / 44.0098

MAGNITUDE_OF_DAMAGES = 1e9  # magnitude of damage function values

if MORTALITY_DAMAGE_V2p1 or MORTALITY_DAMAGE_V2p2:
    # if MORTALITY_DAMAGE_V2:
    # Damages based year is 2005, with this World Bank US GDP deflator
    DAMAGES_DEFLATOR = 90.8776
    # else:
    #     DAMAGES_DEFLATOR = 74.415 # the value from 1995 (https://data.worldbank.org/indicator/NY.GDP.DEFL.ZS?locations=US)

    # Update for the pulse base year (currently 2015) -- use the World
    # Bank US GDP deflator for that year.
    PULSE_DEFLATOR = 108.6850

    # Converts from Damages_year $ to Pulse_year $
    BASE_YEAR_CONVERSION = PULSE_DEFLATOR / DAMAGES_DEFLATOR

elif (
    MORTALITY_DAMAGE_V2p3
    or MORTALITY_DAMAGE_V2p4
    or MORTALITY_DAMAGE_V2p5
    or MORTALITY_DAMAGE_V2p5p1
):
    # Converts from Damages_year $ to Pulse_year $: here we are doing 2019 dollars so no conversion
    BASE_YEAR_CONVERSION = 1
    DAMAGES_DEFLATOR = 1
    PULSE_DEFLATOR = 1

    # PULSE_DEFLATOR = 'No deflator was used. Data reported in units of the damage function (2019 international dollars (PPP))'
    # DAMAGES_DEFLATOR = 'No deflator was used. Data reported in units of the damage function (2019 international dollars (PPP))'

else:
    raise NotImplementedError

# Read in damage function parameters
These next couple functions set up strings used to create/set up filepaths for different versions

In [None]:
def get_damages_string_version():

    if MORTALITY_DAMAGE_V1:
        version = "v1"
    elif MORTALITY_DAMAGE_V2p1:
        version = "v2.1"
    elif MORTALITY_DAMAGE_V2p2:
        version = "v2.2"
    elif MORTALITY_DAMAGE_V2p3:
        version = "v2.3"
    elif MORTALITY_DAMAGE_V2p4:
        version = "v2.4"
    elif MORTALITY_DAMAGE_V2p5:
        version = "v2.5"
    elif MORTALITY_DAMAGE_V2p5p1:
        version = "v2.5.1"
    else:
        raise NotImplementedError

    return version


def get_climate_string_version():

    if MORTALITY_CLIMATE_V1:
        raise NotImplementedError  # shouldn't need to re-save or re-read these ever
    elif MORTALITY_CLIMATE_V2:
        version = "v2"
    elif MORTALITY_CLIMATE_V2p1:
        version = "v2.1"
    else:
        raise NotImplementedError

    return version


def get_damage_coefs_date():

    if MORTALITY_DAMAGE_V1:
        damage_coefs_date = "2018-06-28"
    elif MORTALITY_DAMAGE_V2p1:
        damage_coefs_date = "2019-02-28"
    elif MORTALITY_DAMAGE_V2p2:
        damage_coefs_date = "2019-04-01"
    elif MORTALITY_DAMAGE_V2p3:
        damage_coefs_date = "2019-06-21"
    elif MORTALITY_DAMAGE_V2p4:
        damage_coefs_date = "2019-10-16"
    elif MORTALITY_DAMAGE_V2p5:
        damage_coefs_date = "2020-05-11"
    elif MORTALITY_DAMAGE_V2p5p1:
        damage_coefs_date = "2020-05-11"
    else:
        raise NotImplementedError

    return damage_coefs_date


def get_damage_name_slug():

    if MORTALITY_DAMAGE_V2p1:
        damage_slug = "cubic_nonpar_MC_global_poly4_uclip_sharecombo"
    elif MORTALITY_DAMAGE_V2p2:
        damage_slug = "quadratic_MC_global_poly4_uclip_sharecombo"
    elif MORTALITY_DAMAGE_V2p3 or MORTALITY_DAMAGE_V2p4 or MORTALITY_DAMAGE_V2p5:
        damage_slug = "quadratic_IGIA_MC_global_poly4_uclip_sharecombo"
    elif MORTALITY_DAMAGE_V2p5p1:
        damage_slug = "quadratic_IGIA_MC_global_poly4_uclip_sharecombo_wo_costs"
    elif MORTALITY_DAMAGE_V1:
        damage_slug = "global_poly4"
    else:
        raise NotImplementedError

    return damage_slug


def get_climate_output_date():

    if MORTALITY_CLIMATE_V1:
        raise NotImplementedError  # shouldn't need to re-save or re-read these ever
    elif MORTALITY_CLIMATE_V2:
        climate_date = "2019-02-28"  # pulse year 2015
    elif MORTALITY_CLIMATE_V2p1:
        climate_date = "2019-06-21"  # pulse year 2020
    else:
        raise NotImplementedError

    return climate_date


def get_central_damage_parameters():

    # set the path for damage function coefs

    damage_path = (
        "../damage_function_specifications/"
        "mortality_damage_coefficients_global_poly4_{date}/"
        "mortality_damage_coefficients_{slug}_{ssp}.csv"
    ).format(date=get_damage_coefs_date(), slug=get_damage_name_slug(), ssp="{ssp}")

    # I think the only difference b/w these version and v1 below is the number of SSPs?
    if (
        MORTALITY_DAMAGE_V2p1
        or MORTALITY_DAMAGE_V2p2
        or MORTALITY_DAMAGE_V2p3
        or MORTALITY_DAMAGE_V2p4
        or MORTALITY_DAMAGE_V2p5
        or MORTALITY_DAMAGE_V2p5p1
    ):

        SSPs = ["SSP3"]

        # load up damage function coefs. Similar to what Dylan just sent
        #  For each simulation, you need one constant and all of the betas for each year
        #  This has multiple valuation scenarios (SSP, vsl, agegroup or something...)
        #  For next round, probably vsl_value column won't be there (not necessary)

        damage_params = pd.concat(
            {
                ssp: pd.read_csv(
                    damage_path.format(ssp=ssp), index_col=CENTRAL_INDEX_COLS
                )
                for ssp in SSPs
            },
            axis=0,
            names=["SSP"],
        )

        damage_params.columns.names = ["coefficient"]

        by_scenario = damage_params.unstack("year")
        by_scenario.head()  # only one scenario this time around
        return by_scenario
    elif MORTALITY_DAMAGE_V1:  # mortality damage v1

        SSPs = ["SSP2", "SSP3", "SSP4"]

        # load up damage function coefs. Similar to what Dylan just sent
        #  For each simulation, you need one constant and all of the betas for each year
        #  This has multiple valuation scenarios (SSP, vsl, agegroup or something...)
        #  For next round, probably vsl_value column won't be there (not necessary)

        damage_params = pd.concat(
            {
                ssp: pd.read_csv(
                    damage_path.format(ssp=ssp), index_col=CENTRAL_INDEX_COLS
                )
                for ssp in SSPs
            },
            axis=0,
            names=["SSP"],
        )

        damage_params.columns.names = ["coefficient"]

        # the final format
        # Each row is for a given damage function specification: set of constant, betas for each year in one row
        #   For each climate parameter set, the SCC is computed once for each row
        by_scenario = damage_params.unstack("year")
        by_scenario.head()
        return by_scenario
    else:
        raise NotImplementedError


def get_quantilereg_damage_parameters():

    damage_path = (
        "../damage_function_specifications/"
        "mortality_damage_coefficients_global_poly4_{date}/"
        "mortality_damage_coefficients_{slug}_"
        "quantilereg_{ssp}.csv"
    ).format(date=get_damage_coefs_date(), slug=get_damage_name_slug(), ssp="{ssp}")

    if MORTALITY_DAMAGE_V1:

        damage_params = pd.read_csv(
            "../damage_function_specifications/"
            "mortality_damage_coefficients_global_poly4_2018-06-28/"
            "mortality_damage_coefficients_poly4_SSP3_montecarlo_quantileregression.csv",
            index_col=QR_INDEX_COLS,
        )

    elif (
        MORTALITY_DAMAGE_V2p2
        or MORTALITY_DAMAGE_V2p3
        or MORTALITY_DAMAGE_V2p4
        or MORTALITY_DAMAGE_V2p5
        or MORTALITY_DAMAGE_V2p5p1
    ):

        damage_params = pd.read_csv(
            damage_path.format(ssp="SSP3"), index_col=QR_INDEX_COLS
        )
    else:
        raise NotImplementedError

    damage_params.columns.names = ["coefficient"]

    by_scenario = damage_params.unstack("year")
    return by_scenario

In [None]:
# plotting utility function
def plot_array_columns_as_lines(array_2d):
    new_array = np.hstack([array_2d, np.empty_like(array_2d[:, [0]]) * np.nan])

    index = np.ones_like(new_array, dtype=int) * np.hstack(
        [np.arange(array_2d.shape[1]), np.array([np.nan])]
    )

    new_len = array_2d.shape[0] * (array_2d.shape[1] + 1)

    df = pd.DataFrame(
        np.hstack([index.reshape((new_len, 1)), new_array.reshape((new_len, 1))]),
        columns=["x", "y"],
    )

    return df

# read in climate parameters & filters

In [None]:
original_climate_params = lcp.get_parameters(version=get_climate_string_version(),filtered=False)
original_climate_params.shape

In [None]:
climate_mask = lcp.get_filter_mask(version=get_climate_string_version())

### Read in damage func params (central = for climate-only uncertainty; quantilereg = for full uncertainty)

In [None]:
central_damage_params_by_scenario = get_central_damage_parameters()
central_damage_params_by_scenario.head()  # 6 rows x 1430 columns

In [None]:
quantilereg_damage_parameters_by_scenario = get_quantilereg_damage_parameters()
quantilereg_damage_parameters_by_scenario.head()  # 30 rows x 1430 columns

# Calculate damages from climate parameter samples


### Functions to convert parameters to results (ie. compute damages)

In [None]:
def compute_damages(temperatures_by_scenario, damage_spec):
    """
    Estimate time series mortality damages
    
    Accepts temperature changes from FAIR for pulse run and control run,
    and a damage specification for mortality
    
    Parameters
    ----------
    temperatures_by_scenario: array
        (scenario x pulse x year) array of temperature change from preindustrial by year
    Returns
    -------
    damages : array
        (scenario x rcp x year) array of marginal damages due to pulse by year
    
    """

    damageyears = np.arange(START_YEAR, 2301)

    if MORTALITY_DAMAGE_V2p1:
        # Very important that this base period matches what was used in damage function specification
        BASEPERIOD = [2001, 2010] 

        base_period_temp = (
            temperatures_by_scenario[
                :,
                :,
                BASEPERIOD[0] - REFERENCE_YEAR : BASEPERIOD[1] + 1 - REFERENCE_YEAR,
            ]
            .mean(axis=2)
            .reshape(2, 2, 1)
        )

        temperature_anomalies = (temperatures_by_scenario - base_period_temp)[
            :, :, START_YEAR - REFERENCE_YEAR : 2301 - REFERENCE_YEAR
        ]

        cons = damage_spec.xs("cons", axis=1, level="coefficient").values.reshape(
            (len(damage_spec), 1, 1, len(damageyears))
        )
        beta1 = damage_spec.xs("beta1", axis=1, level="coefficient").values.reshape(
            (len(damage_spec), 1, 1, len(damageyears))
        )
        beta2 = damage_spec.xs("beta2", axis=1, level="coefficient").values.reshape(
            (len(damage_spec), 1, 1, len(damageyears))
        )
        # New for v2.1
        beta3 = damage_spec.xs("beta3", axis=1, level="coefficient").values.reshape(
            (len(damage_spec), 1, 1, len(damageyears))
        )

        damages = (
            PULSE_CONVERSION
            * MAGNITUDE_OF_DAMAGES
            * BASE_YEAR_CONVERSION
            * (
                cons
                + temperature_anomalies * beta1
                + temperature_anomalies ** 2 * beta2
                + temperature_anomalies ** 3 * beta3
            )
        )
    elif (
        MORTALITY_DAMAGE_V2p2
        or MORTALITY_DAMAGE_V2p3
        or MORTALITY_DAMAGE_V2p4
        or MORTALITY_DAMAGE_V2p5
        or MORTALITY_DAMAGE_V2p5p1
    ):

        BASEPERIOD = [2001, 2010]

        base_period_temp = (
            temperatures_by_scenario[
                :,
                :,
                BASEPERIOD[0] - REFERENCE_YEAR : BASEPERIOD[1] + 1 - REFERENCE_YEAR,
            ]
            .mean(axis=2)
            .reshape(2, 2, 1)
        )

        temperature_anomalies = (temperatures_by_scenario - base_period_temp)[
            :, :, START_YEAR - REFERENCE_YEAR : 2301 - REFERENCE_YEAR
        ]

        cons = damage_spec.xs("cons", axis=1, level="coefficient").values.reshape(
            (len(damage_spec), 1, 1, len(damageyears))
        )
        beta1 = damage_spec.xs("beta1", axis=1, level="coefficient").values.reshape(
            (len(damage_spec), 1, 1, len(damageyears))
        )
        beta2 = damage_spec.xs("beta2", axis=1, level="coefficient").values.reshape(
            (len(damage_spec), 1, 1, len(damageyears))
        )

        damages = (
            PULSE_CONVERSION
            * MAGNITUDE_OF_DAMAGES
            * BASE_YEAR_CONVERSION
            * (
                cons
                + temperature_anomalies * beta1
                + temperature_anomalies ** 2 * beta2
            )
        )

    elif MORTALITY_DAMAGE_V1:

        BASEPERIOD = [1986, 2005]

        base_period_temp = (
            temperatures_by_scenario[
                :,
                :,
                BASEPERIOD[0] - REFERENCE_YEAR : BASEPERIOD[1] + 1 - REFERENCE_YEAR,
            ]
            .mean(axis=2)
            .reshape(2, 2, 1)
        )

        temperature_anomalies = (temperatures_by_scenario - base_period_temp)[
            :, :, START_YEAR - REFERENCE_YEAR : 2301 - REFERENCE_YEAR
        ]
        # + 0.6) #<-- Leaving this out b/c apparently the damage func did NOT have the 0.6C added, even tho it should have.

        cons = damage_spec.xs("cons", axis=1, level="coefficient").values.reshape(
            (len(damage_spec), 1, 1, len(damageyears))
        )
        beta1 = damage_spec.xs("beta1", axis=1, level="coefficient").values.reshape(
            (len(damage_spec), 1, 1, len(damageyears))
        )
        beta2 = damage_spec.xs("beta2", axis=1, level="coefficient").values.reshape(
            (len(damage_spec), 1, 1, len(damageyears))
        )

        damages = (
            PULSE_CONVERSION
            * MAGNITUDE_OF_DAMAGES
            * BASE_YEAR_CONVERSION
            * (
                cons
                + temperature_anomalies * beta1
                + temperature_anomalies ** 2 * beta2
            )
        )

    else:
        raise NotImplementedError

    return damages


def produce_mortality_estimate(parameters, damage_spec, return_model_vars=False):
    """
    Estimate time series mortality damages, or return climate response (conc,forcing,temp)
    
    Accepts a row of a numpy array, which fully
    parameterizes a damage simulation in FAIR
    
    Parameters
    ----------
    parameters: list-like
        0: tcr, the Transient Climate Respons
        1: ecs, the Equilibrium Climate Sensitivity
        2: d2
        3: tau4
        4: aeroscale, the aerosol forcing scaling parameter, optional (default 1)

    damage_spec: pandas.core.frame.DataFrame
        specifies the coefficients and components of damage function
        
    return_model_vars: bool
        True: returns climate model variables 
        False: returns damages
    
    Returns
    -------
    if return_model_vars is False
    damages : array
        (scenario x rcp x year) array of marginal damages due to pulse by year
    
    if return_model_vars is True
    climate_variables_by_scenario : array
        (variable x rcp x year) array of climate vars due to pulse by year (pulse-control)
        variable[0] = CO2 con change
        variable[1] = CO2 forcing change
        variable[2] = temperature change
    
    """

    import fair
    import fair.RCPs.rcp45, fair.RCPs.rcp85
    import warnings

    paramlist = list(parameters)

    # extract pamaters 0-4 for climate uncertainty
    tcr, ecs, d2, tau4 = paramlist[:4]
    if len(paramlist) > 4:
        aeroscale = paramlist[4]
    else:
        aeroscale = 1.0

    # build array of damage years
    damageyears = np.arange(START_YEAR, 2301)
    ndy = len(damageyears)

    temperatures_by_rcp = []
    by_rcp = []

    # for each RCP, run FAIR once and then run again with a pulse
    for scen in [fair.RCPs.rcp45, fair.RCPs.rcp85]:
        Cc, Fc, Tc = fair.forward.fair_scm(
            emissions=scen.Emissions.emissions,
            tcrecs=np.array([tcr, ecs]),
            tau=np.array([1000000, 394.4, 36.54, tau4]),
            d=np.array([239.0, d2]),
            scale=np.array([1.0] * 8 + [aeroscale] + [1.0] * 4),
        )

        pulse = scen.Emissions.emissions.copy()
        pulse[PULSE_YEAR - REFERENCE_YEAR, 1] += PULSE_AMT

        Cp, Fp, Tp = fair.forward.fair_scm(
            emissions=pulse,
            tcrecs=np.array([tcr, ecs]),
            tau=np.array([1000000, 394.4, 36.54, tau4]),
            d=np.array([239.0, d2]),
            scale=np.array([1.0] * 8 + [aeroscale] + [1.0] * 4),
        )

        temperatures_by_rcp.append(np.vstack([Tc, Tp]))
        # print(np.vstack([Tc,Tp]).shape)

        if return_model_vars:
            # send back Tp-Tc, Cp-Cc, Fp-Fc too
            # note, don't need to rebase b/c sending back differences, which will be 0 before PULSE_YEAR
            by_rcp.append(
                np.vstack(
                    [
                        Cp[2000 - REFERENCE_YEAR :, 0] - Cc[2000 - REFERENCE_YEAR :, 0],
                        (Fp - Fc).sum(axis=1)[2000 - REFERENCE_YEAR :],
                        Tp[2000 - REFERENCE_YEAR :] - Tc[2000 - REFERENCE_YEAR :],
                    ]
                )
            )

    if return_model_vars:  # return concentration, forcing, temperature response
        vars_by_scenario = np.stack(by_rcp, axis=0)
        vars_by_scenario = vars_by_scenario.transpose(1, 0, 2)
        return vars_by_scenario
    else:  # return time series of damages
        # form (scenario, pulse, time) array of temperatures
        temperatures_by_scenario = np.stack(temperatures_by_rcp, axis=0)

        # move to func so can have different specs
        damages = compute_damages(temperatures_by_scenario, damage_spec)

        # return scenario * time array of current-year marginal damages
        time_series_of_marginal_damages = damages[..., 1, :] - damages[..., 0, :]

        return time_series_of_marginal_damages


def produce_discounted_mortality_estimate(parameters, damage_spec, varname="mortrate"):
    """ Computes time series of mortality damages for the specified climate parameters,
        and returns the discounted time series.
    
        returns time series of discounted mortality damages (present value)
        by scenario and for a set of discount rates 
    """

    time_series_of_marginal_damages = produce_mortality_estimate(
        parameters, damage_spec, return_model_vars=False
    )

    time_series_da = xr.DataArray(
        time_series_of_marginal_damages,
        name=varname,
        dims=["scenario", "rcp", "year"],
        coords=[damage_spec.index, ["rcp45", "rcp85"], np.arange(START_YEAR, 2301)],
    )

    disc_rates = [0.01, 0.025, 0.03, 0.05]

    discounted_time_series = xr.concat(
        [
            (time_series_da / (1 + r) ** (time_series_da.year - PULSE_YEAR))
            for r in disc_rates
        ],
        dim=pd.Index(disc_rates, name="discrate"),
    )

    return discounted_time_series.to_dataset(name="discounted_mortrate").unstack(
        "scenario"
    )


def produce_scc_pulse_climate_response(parameters):
    """ 
        For given set of (climate) parameters, run FAIR and return 
        the climate response to a pulse.
        
        returns concentration, total_forcing, temperature """

    import fair
    import fair.RCPs.rcp45, fair.RCPs.rcp85
    import warnings

    paramlist = list(parameters)

    # extract pamaters 0-4 for climate uncertainty
    tcr, ecs, d2, tau4 = paramlist[:4]
    if len(paramlist) > 4:
        aeroscale = paramlist[4]
    else:
        aeroscale = 1.0

    temperatures_by_rcp = []
    by_rcp = []

    # for each RCP, run FAIR once and then run again with a pulse
    for scen in [fair.RCPs.rcp45, fair.RCPs.rcp85]:

        Cc, Fc, Tc = fair.forward.fair_scm(
            emissions=scen.Emissions.emissions,
            tcrecs=np.array([tcr, ecs]),
            tau=np.array([1000000, 394.4, 36.54, tau4]),
            d=np.array([239.0, d2]),
            scale=np.array([1.0] * 8 + [aeroscale] + [1.0] * 4),
        )

        pulse = scen.Emissions.emissions.copy()
        pulse[PULSE_YEAR - REFERENCE_YEAR, 1] += PULSE_AMT

        Cp, Fp, Tp = fair.forward.fair_scm(
            emissions=pulse,
            tcrecs=np.array([tcr, ecs]),
            tau=np.array([1000000, 394.4, 36.54, tau4]),
            d=np.array([239.0, d2]),
            scale=np.array([1.0] * 8 + [aeroscale] + [1.0] * 4),
        )

        temperatures_by_rcp.append(np.vstack([Tc, Tp]))
        # print(np.vstack([Tc,Tp]).shape)

        # send back Tp-Tc, Cp-Cc, Fp-Fc for years 2000 to end
        by_rcp.append(
            np.vstack(
                [
                    Cp[2000 - REFERENCE_YEAR :, 0] - Cc[2000 - REFERENCE_YEAR :, 0],
                    (Fp - Fc).sum(axis=1)[2000 - REFERENCE_YEAR :],
                    Tp[2000 - REFERENCE_YEAR :] - Tc[2000 - REFERENCE_YEAR :],
                ]
            )
        )

    vars_by_scenario = np.stack(by_rcp, axis=0)
    vars_by_scenario = vars_by_scenario.transpose(1, 0, 2)

    model_response_da = xr.DataArray(
        vars_by_scenario,
        name="output",
        dims=["variable", "rcp", "year"],
        coords=[
            ["concentration", "total_forcing", "temperature"],
            ["rcp45", "rcp85"],
            np.arange(2000, 2501),
        ],
    )

    return model_response_da.to_dataset(name="output")


def produce_mortality_estimate_from_parameter_block(
    parameters, damage_spec, return_model_vars=False
):
    """
    Estimate damages for each of an arbitrary number of parameters
    """

    return np.apply_along_axis(
        produce_mortality_estimate,
        1,
        parameters,
        damage_spec=damage_spec,
        return_model_vars=return_model_vars,
    )

# Sample climate and outcome uncertainty

### Test `produce_mortality_estimate` function on one row of climate params
#### climate model outputs

dims = `[outputs, rcp, projectionyears]` where `outputs` are `dConcentration, dTotalRadiativeForcing, dTemperature`

In [None]:
# these are for saving intermediate results (damages for all 100,000 samples)
if DO_QUANTILE_REG:
    damage_specification = quantilereg_damage_parameters_by_scenario
    FP_PREFIX = "quantilereg"
    FP_UNCERTAINTY = "fulluncertainty"
    FP_SUBDIR = "intermediate_{}_results".format(FP_UNCERTAINTY)
else:
    damage_specification = central_damage_params_by_scenario
    FP_PREFIX = "climateonly"
    FP_UNCERTAINTY = "climateuncertainty"
    FP_SUBDIR = "intermediate_{}_results".format(FP_UNCERTAINTY)

In [None]:
# # test return FAIR model outputs
TESTPARAMS = 2000  # choose a random parameter set

# sample_modelout = produce_mortality_estimate(
#     original_climate_params[TESTPARAMS, :],
#     damage_spec=damage_specification,
#     return_model_vars=True,
# )

# sample_modelout.shape  # (oututs(dC,dtotalF,dT) x rcp x projyears)

#### damages (mortality response) outputs

dims = `[pricing scenario, rcp, damageyears]` where `pricing scenario` is specified by "age_adjustment", "heterogeneity", "vsl_value". 

There are 6 pricing scenarios. If we are running quantile regression damage functions, then there are 6 pricing scenarios for each quantile regression.

In [None]:
sample_estimate = produce_mortality_estimate(
    original_climate_params[TESTPARAMS, :],
    damage_spec=damage_specification,
    return_model_vars=False,
)
sample_estimate.shape

#### If quantilereg: Plot pricing scenarios for each quantile regression for each RCP and year
#### If central (climateonly): Plot 6 pricing scenarios for the central damage function for each RCP and year

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(14, 8))

for i in range(sample_estimate.shape[0]):
    ax.plot(
        np.arange(START_YEAR, 2301),
        sample_estimate[i, 0, :],
        color=matplotlib.cm.Blues(float(i) / sample_estimate.shape[0]),
        label="rcp45 {}".format(" ".join(str(damage_specification.index.values[i]))),
    )

for i in range(sample_estimate.shape[0]):
    ax.plot(
        np.arange(START_YEAR, 2301),
        sample_estimate[i, 1, :],
        color=matplotlib.cm.Reds(float(i) / sample_estimate.shape[0]),
        label="rcp85 {}".format(" ".join(str(damage_specification.index.values[i]))),
    )

# Shrink current axis by 20%
box = ax.get_position()
ax.set_position([box.x0, box.y0, box.width * 0.8, box.height])

# Put a legend to the right of the current axis
ax.legend(loc="center left", bbox_to_anchor=(1, 0.5))

#### Plot the climate simulation's output

In [None]:
# fig, axs = plt.subplots(1, 3, figsize=(15, 3))
# # concentration
# ax = axs[0]
# _ = ax.plot(sample_modelout[0, 1, :].T)  # rcp85
# _ = ax.plot(sample_modelout[0, 0, :].T)  # rcp45
# ax.set_title("delta co2 concentration")
# # total RF
# ax = axs[1]
# _ = ax.plot(sample_modelout[1, 1, :].T)  # rcp85
# _ = ax.plot(sample_modelout[1, 0, :].T)  # rcp45

# ax.set_title("delta total RF")
# ax = axs[2]
# _ = ax.plot(sample_modelout[2, 1, :].T)  # rcp85
# _ = ax.plot(sample_modelout[2, 0, :].T)  # rcp45
# ax.set_title("delta T")

### Test functions on a handful of rows
Run a set of climate params at once

Returns a block of damages with dims = `[climate param set, pricing scenario, rcp, year]`

In [None]:
TESTRANGE = np.arange(100, 110)  # (10,20)

# test block of damages: on a block of climate params
sample_block = produce_mortality_estimate_from_parameter_block(
    original_climate_params[TESTRANGE, :], damage_spec=damage_specification
)

sample_block.shape  # (parameter sets x mortality specs x rcps x time)

For plotting:

In [None]:
# stack the various mortality cost types with climate monte carlo params
rcp45_to_plot = plot_array_columns_as_lines(
    sample_block[:, :, 0, :].reshape(
        (sample_block.shape[0] * sample_block.shape[1], sample_block.shape[3])
    )
)  # the sample_block is reshaped to (60, 286), for one rcp scenario

rcp85_to_plot = plot_array_columns_as_lines(
    sample_block[:, :, 1, :].reshape(
        (sample_block.shape[0] * sample_block.shape[1], sample_block.shape[3])
    )
)

In [None]:
fig, axs = plt.subplots(1, 2, figsize=(16, 5), dpi=300)

cvs = datashader.Canvas(plot_height=400, plot_width=1000)
agg45 = cvs.line(rcp45_to_plot, "x", "y", datashader.count())
img45 = dstf.shade(agg45, how="eq_hist")  # , cmap=matplotlib.cm.Blues, name='rcp45')
img45 = xr.DataArray(img45.where(img45 > 0))
img45["x"] = img45.x.values + PULSE_YEAR
img45 = img45.rename({"x": "year", "y": "marginal damages"})
(
    img45.plot(
        cmap=matplotlib.cm.Blues_r,
        norm=matplotlib.colors.Normalize(),
        add_colorbar=False,
        ax=axs[0],
    )
)
axs[0].set_title("{}: RCP4.5".format(get_damages_string_version()))

agg85 = cvs.line(rcp85_to_plot, "x", "y", datashader.count())
img85 = dstf.shade(agg85, how="eq_hist")  # , cmap=matplotlib.cm.Reds, name='rcp85')
img85 = xr.DataArray(img85.where(img85 > 0))
img85["x"] = img85.x.values + PULSE_YEAR
img85 = img85.rename({"x": "year", "y": "marginal damages"})
(
    img85.plot(
        cmap=matplotlib.cm.Reds_r,
        norm=matplotlib.colors.Normalize(),
        add_colorbar=False,
        ax=axs[1],
    )
)
axs[1].set_title("{}: RCP8.5".format(get_damages_string_version()))

# dstf.Images(img45, img85)

### Map simulation across all sample parameters
Make sure write directories exist:

In [None]:
get_damage_coefs_date(), get_damage_name_slug(), get_climate_output_date(), FP_PREFIX, FP_SUBDIR

### Prep filenames for saving intermediate files

In [None]:
BASEDIR = BASEDIR_FP.format(sector=SECTOR)

In [None]:
INTERMEDIATE_FP = (
    BASEDIR
    + "{sector}_damage_coefficients_global_poly4_{date}/{subdir}/"
    + FP_PREFIX
    + "_{sector}_damage_coefficients_{slug}_{date}_"
    + "{varname}_raw_{range1}-{range2}.nc"
).format(
    sector=SECTOR,
    varname=DAMAGE_VARNAME,
    date=get_damage_coefs_date(),
    subdir="{}",
    slug=get_damage_name_slug(),
    range1="{}",
    range2="{}",
)
# The climate output lives in mortality subfolder. TODO: move this data into neutral location. @@
INTERMEDIATE_CLIM_FP = (
    BASEDIR
    + "mortality_damage_coefficients_global_poly4_{climdate}/"
    + "intermediate_climate_results/"
    + "climateonly_rcp_sccpulse_simulations_{climdate}_"
    + "dconc_dtotalf_dT_{range1}-{range2}.nc"
).format(climdate=get_climate_output_date(), range1="{}", range2="{}")

print(INTERMEDIATE_FP, "\n")
print(INTERMEDIATE_CLIM_FP)

### This function is what is mapped to workers
Computes damages and saves intermediate files to disk for later post-processing

In [None]:
def compute_and_save_damages_block(n, step, stop, damage_spec, return_model_vars=False):

    """
    Computes damages for a block of climate parameters and saves output to disk.
    
        `damage_spec`       : central or quantile-regression damage function coefficients
        `return_model_vars` : If True, save climate model (FaIR) output to disk
                              If False, save damages (mortality rate) to disk
    """
    CHUNK_SIZE = 50

    stop = min(stop, n + step)
    chunk = min(CHUNK_SIZE, stop - n)

    climate_dask = dda.from_array(
        original_climate_params[n:stop],
        chunks=(chunk, original_climate_params.shape[1]),
    )

    climate_dask = climate_dask.persist()

    if return_model_vars:  # want back C,totalF, T (pulse - control)

        print(INTERMEDIATE_CLIM_FP.format(n, stop))

        if os.path.isfile(INTERMEDIATE_CLIM_FP.format(n, stop)):
            return

        climate_outputs = climate_dask.map_blocks(
            produce_mortality_estimate_from_parameter_block,
            dtype=np.float64,
            chunks=(chunk, 3, 2, 501),  # sims x modelouts x rcp x year
            drop_axis=1,
            new_axis=[1, 2, 3],
            damage_spec=damage_spec,
            return_model_vars=True,
        )
        model_da = xr.DataArray(
            climate_outputs,
            name="output",
            dims=["simulation", "variable", "rcp", "year"],
            coords=[
                np.arange(n, n + step),
                ["concentration", "total_forcing", "temperature"],
                ["rcp45", "rcp85"],
                np.arange(2000, 2501),
            ],
        )

        model_da = model_da.compute()

        print(model_da)
        print("writing to {}".format(INTERMEDIATE_CLIM_FP.format(n, stop)))
        model_da.to_dataset(name="output").to_netcdf(
            INTERMEDIATE_CLIM_FP.format(n, stop)
        )

    else:  # just damages

        if os.path.isfile(INTERMEDIATE_FP.format(FP_SUBDIR, n, stop)):
            return

        sample_mortality_estimate = climate_dask.map_blocks(
            produce_mortality_estimate_from_parameter_block,
            dtype=np.float64,
            chunks=(chunk, len(damage_spec.index), 2, 286),
            drop_axis=1,
            new_axis=[1, 2, 3],
            damage_spec=damage_spec,
        )

        da = xr.DataArray(
            sample_mortality_estimate,
            dims=["simulation", "scenario", "rcp", "year"],
            coords=[
                np.arange(n, n + step),
                damage_spec.index,
                ["rcp45", "rcp85"],
                np.arange(START_YEAR, 2301),
            ],
        )

        da = da.compute()

        print("writing to {}".format(INTERMEDIATE_FP.format(FP_SUBDIR, n, stop)))

        (
            da.to_dataset(name="mortrate")
            .unstack("scenario")
            .to_netcdf(INTERMEDIATE_FP.format(FP_SUBDIR, n, stop))
        )

In [None]:
# SCC functions
def compute_scc_discounted_timeseries(ds):

    rates = [
        0.01,
        0.015,
        0.02,
        0.025,
        0.03,
        0.05,
    ]  # NOTE: as of 7/13/2020, adding in 1.5% and 2% discount rates

    discounted_timeseries = xr.concat(
        [(ds / (1 + r) ** (ds.year - PULSE_YEAR)) for r in rates],
        dim=pd.Index(rates, name="discrate"),
    )

    discounted_timeseries = discounted_timeseries.rename(
        {"mortrate": "discounted_mortrate"}
    )
    return discounted_timeseries


def compute_scc_from_discounted_ts(ds, var_to_rename="discounted_mortrate"):

    return ds.sum(dim="year").rename({var_to_rename: "scc"})


def compute_scc(ds, var_to_rename="discounted_mortrate"):

    return compute_scc_from_discounted(
        compute_scc_discounted_timeseries(ds, var_to_rename=var_to_rename)
    )



### Check versions and filepath for saving the simulations 

In [None]:
"damages", get_damages_string_version(), "climate", get_climate_string_version()

In [None]:
INTERMEDIATE_FP

## Here we can map all calculations to workers, be it central damage function, or quantile regression. Same calls
## run all 100,000 sims
Do this in 5 blocks of jobs. These jobs each save an intermediate output file

In [None]:
step = 250  # this specifies how many are in the "block" of climate params
return_model_vars = False  # True # CHANGE THIS VAR NAME TO INDICATE SAVE DAMAGES VS CLIMATE VARS. False gives damages. True gives climate vars.

In [None]:
# map blocks of simulations up to the first 20000 params, each map block is 250 params
# This takes a little < 7 mins for central damage function, and separately for quantilereg.

futures1 = client.map(
    compute_and_save_damages_block,
    np.arange(0, 20000, step),
    step=step,
    stop=20000,
    damage_spec=damage_specification,
    return_model_vars=return_model_vars,
)

In [None]:
dd.progress(futures1)

In [None]:
futures2 = client.map(
    compute_and_save_damages_block,
    np.arange(20000, 40000, step),
    step=step,
    stop=40000,
    damage_spec=damage_specification,
    return_model_vars=return_model_vars,
)

In [None]:
dd.progress(futures2)

In [None]:
futures3 = client.map(
    compute_and_save_damages_block,
    np.arange(40000, 60000, step),
    step=step,
    stop=60000,
    damage_spec=damage_specification,
    return_model_vars=return_model_vars,
)

In [None]:
dd.progress(futures3)

In [None]:
futures4 = client.map(
    compute_and_save_damages_block,
    np.arange(60000, 80000, step),
    step=step,
    stop=80000,
    damage_spec=damage_specification,
    return_model_vars=return_model_vars,
)

In [None]:
dd.progress(futures4)

In [None]:
futures5 = client.map(
    compute_and_save_damages_block,
    np.arange(80000, 100000, step),
    step=step,
    stop=100000,
    damage_spec=damage_specification,
    return_model_vars=return_model_vars,
)

In [None]:
dd.progress(futures5)

In [None]:
cluster.scale(0)
client.restart()

# ==================================== end running all 100,000 simulations

# Postprocessing MonteCarlos

This section reads in all intermediate files saved to disk (in chunks by simulation) and computes the discounted time series and SCC for all simulations. Make sure the above sections that map jobs to workers have been run and all files have been saved before moving to this section.

It is a good idea to restart the client (`client.restart()`) and delete all the `futures` from above, before moving on. Or, restart the notebook kernel, run the setup and function definition cells at the top, commission a cluster, and skip to this step for post-processing.

For each uncertainty type, the post-processing steps are
1. read in all the time series of marginal damages by valuation type (ultimately there will be 100,000 time series for each valuation)
2. discount all the time series. Because there are multiple valuation types and discount rates, this has a large memory footprint
3. filter out simulations that don't meet criteria (previously determined) 
4. compute quantiles over discounted time series for the primary valuation type as an intermediate output
5. sum all simulations and to SCC
6. filter out sccs for simulations that don't meet criteria
7. quantile SCCs
8. save output files

### 1. Open all intermediate files: central damage func 
Only run this if you've computed the central damages and saved intermediate outputs above.


In [None]:
BASEDIR = BASEDIR_FP.format(sector=SECTOR)

In [None]:
CENTRAL_FP = (
    BASEDIR
    + "mortality_damage_coefficients_global_poly4_{date}/intermediate_climateuncertainty_results/"
    + "climateonly_mortality_damage_coefficients_{slug}_{date}_"
    + "mortrate_raw_*.nc"
).format(date=get_damage_coefs_date(), slug=get_damage_name_slug())
print(CENTRAL_FP)

In [None]:

cds = xr.open_mfdataset(CENTRAL_FP, concat_dim="simulation", combine="nested")

if ( # repeat booleans here -- should it be V2p2, V2p3, V2p4, V2p5? TO-DO
    MORTALITY_DAMAGE_V2p3
    or MORTALITY_DAMAGE_V2p3
    or MORTALITY_DAMAGE_V2p3
    or MORTALITY_DAMAGE_V2p5
):
    cds = cds.isel(
        SSP=0
    )  # select the singleton SSP to maintain consistency with quantile-reg data format
print(cds)

In [None]:
# rechunk for memory purposes
cds = cds.chunk(
    {
        "simulation": 100000,
        "rcp": 1,
        "vsl_value": 1,
        "heterogeneity": 1,
        "age_adjustment": 1,
        "year": 286,
    }
)

In [None]:
cds = cds.persist()

In [None]:
dd.progress(cds)

### Open all intermediate files: climate response
This is unnecessary for SCC calculation

In [None]:
# CLIM_FP = (MORTALITY_BASEDIR
#                         + 'mortality_damage_coefficients_global_poly4_{climdate}/'
#                         + 'intermediate_climate_results/'
#                         + 'climateonly_rcp_sccpulse_simulations_{climdate}_'
#                         + 'dconc_dtotalf_dT_*.nc').format(climdate=get_climate_output_date())

# climds = xr.open_mfdataset(CLIM_FP, concat_dim='simulation')
# print(climds)

## Compute the SCC from mortality damages
Break the SCC calc up into two steps so can save the time series of present values

For all simulations, pricing scenarios, rcps, compute discounted time series (or time series of present values of mortality response), and from that, the SCC

### central damage function SCC (or skip if doing full uncertainty)
### 2. Discount & 5. Compute SCC

In [None]:
c_discounted_timeseries = compute_scc_discounted_timeseries(cds)
c_scc = compute_scc_from_discounted_ts(c_discounted_timeseries)

### 3. filter out bad runs before quantiling

In [None]:
c_discounted_ts_selected_params = c_discounted_timeseries.where(
    get_filter_mask(), drop=True
)

In [None]:
c_discounted_ts_selected_params = c_discounted_ts_selected_params.persist()
dd.progress(c_discounted_ts_selected_params)

### 4. quantile discounted time series for main valuation scenario

In [None]:

# select out a valuation scenario. 'c' is for the central df
vsl_value_c = "epa"

################### update for each #########################
heterogeneity_c = "scaled"
age_adjustment_c = "vly"
#############################################################

slicers_c = dict(
    age_adjustment=age_adjustment_c, heterogeneity=heterogeneity_c, vsl_value=vsl_value_c
)

c_discounted_ts_mainresults_quantiles = (
    c_discounted_ts_selected_params.sel(**slicers_c)
    .compute()["discounted_{}".format("mortrate")]
    .quantile([0.05, 0.17, 0.25, 0.5, 0.75, 0.83, 0.95], dim="simulation")
)

In [None]:
c_scc = c_scc.compute()

### 6. filter SCC 

In [None]:
c_scc_selected_params = c_scc.where(get_filter_mask(), drop=True)

### 7. quantile SCC

In [None]:
c_scc_selected_params_quantiles = c_scc_selected_params.quantile(
    [0.01, 0.05, 0.17, 0.25, 0.5, 0.75, 0.83, 0.95, 0.99], dim="simulation"
)

In [None]:
c_scc_selected_params_quantiles

## end of central damage function part of workflow. Start of quantile regression workflow. 

This is memory intensive. This was run one by one for each of the below valuation scenarios. 

Six unique valuation scenarios 

- vly-epa-scaled (main results 7/24/2020)
- vly-epa-popavg
- mt-epa-scaled
- mt-epa-popavg 
- vsl-epa-scaled 
- vsl-epa-popavg 

### 1. Open all intermediate files: quantile regression damage funcs
Only run this if you've computed the quantile regression damages and saved all the intermediate outputs above.


In [None]:
"damages", get_damage_coefs_date(), get_damage_name_slug()

In [None]:
QR_FP = (
    BASEDIR
    + "mortality_damage_coefficients_global_poly4_{date}/intermediate_fulluncertainty_results/"
    + "quantilereg_mortality_damage_coefficients_{slug}_{date}_"
    + "mortrate_raw_*TEST.nc"
).format(date=get_damage_coefs_date(), slug=get_damage_name_slug())
print(QR_FP)

In [None]:
qrds = xr.open_mfdataset(
    QR_FP, concat_dim="simulation", parallel=True, combine="nested"
)
print(qrds)

In [None]:
qrds.mortrate.data.nbytes / (
    1024 ** 3
)  # this is small if testing. About 48Gb for V2p5 (19 quantiles)

In [None]:
qrds = qrds.persist()

In [None]:
dd.progress(qrds)

Note that `qrds` has an extra dimension compared to `cds`, which is `pctile` for the quantile regression quantile

#### Note also that due to memory issues: we are saving one valuation scenario at a time for quantile regression damage func damages

In [None]:
# select out a valuation scenario - note slicers is defined for each of the dfs (central and quantile reg). Central df above
vsl_value = "epa"

################### update for each #########################
heterogeneity = "scaled"
age_adjustment = "vly"
#############################################################

selected_scenario = f"{age_adjustment}-{vsl_value}-{heterogeneity}"#SSP3-vsl-epa-scaled_quantiles

slicers = dict(
    age_adjustment=age_adjustment, heterogeneity=heterogeneity, vsl_value=vsl_value
)
qrds_valscen = qrds.sel(**slicers)

In [None]:
qrds_valscen

In [None]:
qrds_valscen.nbytes/(1024**3)

In [None]:
assert qrds_valscen.age_adjustment == age_adjustment
assert qrds_valscen.heterogeneity == heterogeneity

### 2. discount & 5. compute SCC

In [None]:
qr_discounted_timeseries_valscen = compute_scc_discounted_timeseries(qrds_valscen)
qr_scc_valscen = compute_scc_from_discounted_ts(qr_discounted_timeseries_valscen)

qr_discounted_timeseries_valscen.nbytes / (
    1024 ** 3
)  

### 3. Filter out the unreasonable climate simulations before quantiling

In [None]:
qr_discounted_ts_selected_params_valscen = qr_discounted_timeseries_valscen.where(
    lcp.get_filter_mask(version=CLIM_VERS), drop=True
)

In [None]:
qr_discounted_ts_selected_params_valscen = (
    qr_discounted_ts_selected_params_valscen.persist()
)
dd.progress(qr_discounted_ts_selected_params_valscen)

In [None]:
qr_discounted_ts_selected_params_valscen.nbytes / (1024 ** 3)

### Note how big this is! Do not attempt to bring it into memory or you will regret it :) 

Persisting this object means it'll be computed and kept on the workers for use further down (but not brought into notebook memory, and it doesn't hold up execution of other cells). 

### set up functions for weighted quantiling for quantile regression df damages
quantiles need to be weighted by the mass of the distribution that is represented by the quantile regression

In [None]:
def get_weights(quantiles):
    """
        Compute the weights of each quantile regression.
        `quantiles` must be between 0-1! 
    """
    quantiles = np.array(quantiles)
    # find midpoints between quantiles
    bounds = np.array([0] + ((quantiles[:-1] + quantiles[1:]) / 2).tolist() + [1])
    if (bounds > 1).any():
        raise RuntimeError("quantiles must be between 0-1")
    weights = np.diff(bounds)
    return weights

def quantile_weight_quantilereg(
    ds,
    arrayname="discounted_mortrate",
    quantiles=[0.01, 0.05, 0.167, 0.25, 0.5, 0.75, 0.833, 0.95, 0.99],
):
    """ Produce quantile weights of the quantile regression damages.
        qr_quantiles: the quantile regression quantiles for damages. (Must be 0-1!)
    """
    import impactlab_tools.utils.weighting

    qr_quantiles = ds.pctile.values
    ds_stacked = ds[arrayname].stack(obs=("simulation", "pctile"))
    weights = xr.DataArray(
        get_weights(qr_quantiles), dims=["pctile"], coords=[qr_quantiles]
    )
    weights_by_obs = weights.sel(pctile=ds_stacked.obs.pctile)
    # these are quantiles of the full uncertainty, weighted by the quantile regression quantiles
    ds_quantiles = impactlab_tools.utils.weighting.weighted_quantile_xr(
        ds_stacked, quantiles, sample_weight=weights_by_obs.values, dim="obs"
    )
    return ds_quantiles

 
# these functions can accept raw damage time series or already discounted time series

def compute_discounted_timeseries(ds, disc_type='constant', **kwargs):
    ''' Compute discounted time series based on `disc_type`.
    
        `disctype` == "constant": Compute discounted time series over dim `year` for all `rates`
        `disctype` == "ramsey": Compute discounted time series over dim `year` for all `rho` and `eta` combinations.
                                Note current implementation reads pre-computed rho-eta combination discount factors in from disk.
    '''
    if 'discrate' in ds.dims:
        # already discounted
        return ds
    else:
        if disc_type == 'constant':
            discounted_timeseries = compute_constant_discounted_timeseries(ds, **kwargs)
        elif disc_type == 'ramsey':
            discounted_timeseries = compute_ramsey_discounted_timeseries(ds, **kwargs)
            
        
        return discounted_timeseries    

def compute_discounted_timeseries_main(ds, disc_type='constant', filtered = False, **kwargs):
    ''' Compute discounted time series over dim `year` for all `rates`
        and return main pricing scenario (`var_type`). Result is PERSISTED.
        
        If `filtered` = True, filter on dim `simulation` and return only filtered MCs
    '''
    import load_climate_parameters as lcp

    
    ds = compute_discounted_timeseries(ds, disc_type=disc_type, **kwargs)# will not re-compute if discrate is a dim
    
    # extract the central pricing scenario
    ds_disc_main = ds.sel(var_type=MAIN_PRICING_SCEN)
    
    if filtered:

        if len(ds.simulation)>1:
            ds_disc_main = (
                ds_disc_main.where(lcp.get_filter_mask(version=CLIM_VERS), 
                                   drop=True).persist())#.compute())
        else:
            # statistical uncertainty ds has singleton simulation dim. Do not filter
            ds_disc_main = ds_disc_main.persist()
    else:
        ds_disc_main = ds_disc_main.persist()#.compute()
        
    return ds_disc_main


def compute_discounted_timeseries_main_quantiles(ds, disc_type='constant',
                                                 quantiles=[0.05, 0.1, 0.17, 0.25, 0.5, 0.75, 0.83, 0.95], **kwargs):
    
    if 'var_type' in ds.dims:
        # this means still need to select main scenario
        ds = compute_discounted_timeseries_main(ds, disc_type=disc_type, filtered = True, **kwargs)
        
    if 'pctile' in ds.dims:
        # quantile regression data --> use special funcs
        ds = quantile_weight_quantilereg(ds, quantiles = quantiles)
    else:
        ds = ds.quantile(quantiles, dim='simulation')

    return ds


### 4. Compute quantiles over full uncertainty for discounted time series for "main results", which is the main "pricing scenario" shown in the paper
Map this over discount rate, otherwise too much



In [None]:
qr_discounted_ts_selected_params_valscen.sel(discrate=0.02).nbytes/(1024**3)

####  brute force way to compute discounted timeseries quantiles:
took this code from the Energy operational ensemble notebook. Compute each discount rate separately

In [None]:
# Brute force
qr_disc_main_quantdt = {}

for dr in qr_discounted_ts_selected_params_valscen.discrate.values:
    qr_disc_main_quantdt[dr] = compute_discounted_timeseries_main_quantiles(qr_discounted_ts_selected_params_valscen.sel(discrate=dr))
    print(dr)
print(qr_disc_main_quantdt[0.02].nbytes / (1024 ** 3))



In [None]:
# concat the discrates in to one object
qr_disc_main_quantiles = xr.concat(
    qr_disc_main_quantdt.values(),
    dim=pd.Index(
        qr_discounted_ts_selected_params_valscen.discrate.values, name="discrate"
    ),
)

In [None]:
# save the discounted time series for the selected valuation scenario
SAVEDIR = '{basedir}{sector}_damage_coefficients_global_poly4_{date}'.format(basedir=BASEDIR,
                                                                sector="mortality",
                                                                date=get_damage_coefs_date())

discounted_fp = (SAVEDIR
                + '/{sector}_{uncertainty}_discounted_SSP3-{main}'
                 + '_quantiles_timeseries_{version}.csv').format(sector="mortality",
                                                                 date=get_damage_coefs_date(),
                                                                 uncertainty=FP_UNCERTAINTY,
                                                                 main=selected_scenario,
                                                                 version=get_damages_string_version())


qr_disc_main_quantiles.to_dataframe(name='present_value').to_csv(discounted_fp)


print("saved discounted damages to %s" %discounted_fp)



In [None]:
qr_disc_main_quantiles.plot.line(x="year", col="discrate",row="rcp",)

### 6. filter SCC & 7. compute SCC quantiles 

In [None]:
qr_scc_valscen_filtered = qr_scc_valscen.where(
    lcp.get_filter_mask(version=CLIM_VERS), drop=True
)

In [None]:
qr_scc_valscen_filtered = qr_scc_valscen_filtered.persist()
dd.progress(qr_scc_valscen_filtered)

In [None]:
# quantile the SCCs after filtering
qr_scc_selected_params_quantiles_valscen = quantile_weight_quantilereg(
    qr_scc_valscen_filtered,
    arrayname="scc",
    quantiles=[0.01, 0.05, 0.17, 0.25, 0.5, 0.75, 0.83, 0.95, 0.99],
)

In [None]:
# this is small, bring it into memory to save
qr_scc_valscen_filtered = qr_scc_valscen_filtered.compute()

In [None]:
SCC_FP = (
    BASEDIR
    + "mortality_damage_coefficients_global_poly4_{date}/"
    + "{sector}_scc_{uncertainty}_{age}-{vsl}-{het}_{date}_{version}.nc"
).format(
    date=get_damage_coefs_date(),
    sector=SECTOR,
    uncertainty=FP_UNCERTAINTY,
    version=get_damages_string_version(),
    vsl=vsl_value,
    het=heterogeneity,
    age=age_adjustment,
)
print(SCC_FP)
qr_scc_valscen_filtered.to_netcdf(SCC_FP)

In [None]:
# save for one valuation scenario

SCC_QUANT_FP = (
    BASEDIR
    + "mortality_damage_coefficients_global_poly4_{date}/"
    + "mortality_scc_{uncertainty}_{age}-{vsl}-{het}_summary_quantiles_{prefix}_{date}_{version}.csv"
).format(
    date=get_damage_coefs_date(),
    uncertainty=FP_UNCERTAINTY,
    prefix=FP_PREFIX,
    version=get_damages_string_version(),
    vsl=vsl_value,
    het=heterogeneity,
    age=age_adjustment,
)


qr_scc_selected_params_quantiles_valscen.to_series().unstack("quantile").to_csv(
    SCC_QUANT_FP
)
print(SCC_QUANT_FP)

In [None]:
cluster.scale(0)
client.restart()


In [None]:
# If doing v2.5.1 stop here, do not do anymore for the "without costs" version of results. Only need main valuation scenario.
# If doing v2.5 continue

### for `v2.5` reload filtered, quantiled SCCs as DataFrames for each valuation scenario and concatenate 

In [None]:
dfs = []
for age_adjustment in qrds.age_adjustment.values:
    for heterogeneity in qrds.heterogeneity.values:
        print("%s %s %s" % (age_adjustment, "epa", heterogeneity))

        df_dir = (
            "/gcs/impactlab-data/gcp/outputs/mortality/fair_global_scc_montecarlo/"
            "mortality_damage_coefficients_global_poly4_2020-05-11"
        )
        scc_file = (
            "mortality_scc_fulluncertainty_{age_adjustment}-epa-{heterogeneity}_summary_quantiles_"
            "quantilereg_2020-05-11_v2.5.csv"
        )

        df_valscen = pd.read_csv(
            os.path.join(
                df_dir,
                scc_file.format(
                    heterogeneity=heterogeneity, age_adjustment=age_adjustment
                ),
            )
        )

        # add DataFrame columns for valuation scenario specs so we can concat
        df_valscen.insert(
            2, "age_adjustment", [age_adjustment] * df_valscen.shape[0], True
        )
        df_valscen.insert(3, "vsl_value", ["epa"] * df_valscen.shape[0], True)
        df_valscen.insert(
            4, "heterogeneity", [heterogeneity] * df_valscen.shape[0], True
        )

        dfs.append(df_valscen)

In [None]:
# concatenate dataframes
# sccs_all_scenarios = pd.concat(dfs)
from functools import reduce

sccs_all_scenarios = reduce(lambda df1, df2: df1.merge(df2, "outer"), dfs)

In [None]:
sccs_all_scenarios.head()

### save SCCs for all valuation scenarios in one csv 

In [None]:
SCC_QUANT_FP = (
    BASEDIR
    + "mortality_damage_coefficients_global_poly4_{date}/"
    + "mortality_scc_{uncertainty}_summary_quantiles_{prefix}_{date}_{version}.csv"
).format(
    date=get_damage_coefs_date(),
    uncertainty=FP_UNCERTAINTY,
    prefix=FP_PREFIX,
    version=get_damages_string_version(),
)


sccs_all_scenarios.to_csv(SCC_QUANT_FP)

# 8. Now start saving output files, some for posterity, some to use for figures, some for passing to econ team

## The following cells save
1. csv of discounted time series full uncertainty quantiles (after filtering) for the "main results" (vly, epa, scaled) --> used in paper figure
2. netcdf of all SCC values for all 5 percentiles of quantile reg for 100000 climate simulations and all scenarios
3. csv of SCC full uncertainty quantiles (after filtering) for all scenarios --> goes to econ team

### Save time series of present value for the "main results" for full uncertainty -- to be used in paper Fig. 12


In [None]:
# these are filtered

DISCOUNTED_FP = (
    BASEDIR
    + "{sector}_damage_coefficients_global_poly4_{date}/"
    + "{sector}_{uncertainty}_discounted_SSP3-{age_adjustment}-{vsl_value}-{heterogeneity}_quantiles_timeseries_{version}.csv"
)

if DO_QUANTILE_REG:
    fp = DISCOUNTED_FP.format(
        sector=SECTOR,
        date=get_damage_coefs_date(),
        uncertainty=FP_UNCERTAINTY,
        age_adjustment=age_adjustment,
        vsl_value=vsl_value,
        heterogeneity=heterogeneity,
        version=get_damages_string_version()
    )
    qr_discounted_ts_valscen_quantiles.to_dataframe(name="present_value").to_csv(fp)
else:
    fp = DISCOUNTED_FP.format(
        sector=SECTOR,
        date=get_damage_coefs_date(),
        uncertainty=FP_UNCERTAINTY,
        age_adjustment=age_adjustment_c,
        vsl_value=vsl_value_c,
        heterogeneity=heterogeneity_c,
        version=get_damages_string_version()
    )
    c_discounted_ts_mainresults_quantiles.to_dataframe(name="present_value").to_csv(fp)

print('saved',fp)
# NOTE: These files were previously saved locally

In [None]:
qr_discounted_ts_valscen_quantiles.nbytes / (1024 ** 3)

### Save all SCC values to netcdf -- This section looks like it's not updated 
This section is saving `qr_scc` which is no longer an object in the notebook. But it can be created by concatenating valuation-specific SCC outputs.


In [None]:
get_climate_output_date(), get_climate_string_version(), FP_UNCERTAINTY

In [None]:
# This filename has been updated from earlier versions for the demo/test notebook and going forward.

print(
    "saving {} SCC ({})".format(get_damage_coefs_date(), get_damages_string_version())
)

SCC_FP = (
    BASEDIR
    + "mortality_damage_coefficients_global_poly4_{date}/"
    + "{sector}_scc_{uncertainty}_{date}_{version}.nc"
).format(
    date=get_damage_coefs_date(),
    sector=SECTOR,
    uncertainty=FP_UNCERTAINTY,
    version=get_damages_string_version(),
)

SCC_ATTRS = {
    "short_name": "scc",
    "long_name": (
        "{sector} partial social cost of carbon estimates by simulation, valuation, "
        "climate, and economic scenario, and by discount rate"
    ).format(sector=SECTOR),
    "units": "{} International Dollars (PPP)".format(PULSE_YEAR),
    "pulse_year": str(PULSE_YEAR),
    "pulse_year_description": (
        "year in which pulse will be emitted. "
        "Pulse deflator has been updated accordingly"
    ),
    "pulse_amount": "{} Gt C ({} Mt CO2)".format(
        PULSE_AMT, PULSE_AMT * 12.011 / 44.0098
    ),
    "damage_deflator": "{}".format(DAMAGES_DEFLATOR),
    "damage_estimate_year": "2005",
    "damage_deflator_desription": "dollar year deflator used for damage year estimate",
    "pulse_deflator": "{}".format(PULSE_DEFLATOR),
    "pulse_deflator_description": "dollar year deflator used for pulse year",
    "deflator_source": "World Bank US GDP Deflator",
}
SCC_FILE_ATTRS = {
    "damage_function": "mortality_damage_coefficients_{slug}_{prefix}, monte carlo quantiles".format(
        slug=get_damage_name_slug(), prefix=FP_PREFIX
    ),
    "damage_function_version": "{version}, {date}".format(
        version=get_damages_string_version(), date=get_damage_coefs_date()
    ),
    "damage_function_author": "Tamma Carleton",
    "climate_parameters": "istrictrwf_iptdiagnoseemissions_filter_no_histreg",
    "climate_parameter_version": "2019-02-20",
    "climate_parameter_author": "Kelly McCusker",
    "climate_output_version": "{}, {}".format(
        get_climate_string_version(), get_climate_output_date()
    ),
    "repository": "https://gitlab.com/ClimateImpactLab/Climate/pFAIR",
    "tag": "mortality-submission-{}".format(get_damage_coefs_date()),
    "history": HISTORY,
    "author": AUTHOR_NAME,
    "contact": AUTHOR_EMAIL,
}

if DO_QUANTILE_REG:
    qr_scc.scc.attrs.update(SCC_ATTRS)
    qr_scc.attrs.update(SCC_FILE_ATTRS)
    qr_scc.to_netcdf(SCC_FP)
else:
    c_scc.scc.attrs.update(SCC_ATTRS)
    c_scc.attrs.update(SCC_FILE_ATTRS)
    c_scc.to_netcdf(SCC_FP)
print("saved {}".format(SCC_FP))

# start here if just want to compute SCC quantiles from existing SCCs

In [None]:
# this just checks to see if you've just computed the scc and it's in notebook already, or if we need to open the dataset.

try:
    qr_mortality_scc = qr_scc.compute()
except NameError:
    qr_mortality_scc = xr.open_dataset(SCC_FP)

qr_mortality_scc

These SCCs are all 100,000 and so are unfiltered. Filter out unreasonable climate sims before quantiling

In [None]:
qr_mortality_scc_selected_params = qr_scc.where(get_filter_mask(), drop=True)

This section has been assuming we have the full uncertainty of SCCs (100,000 for each quantile regression) and therefore we have to use a special quantile function that takes into account which quantile the 100,000 are associated with. Use `quantile_weight_quantilereg()` (TO-DO This func needs updating for the additional quantiles!). 

If we were working with the "climate only uncertainty" central damage function set of SCCs, we could just call a straight `quantile` on the DataArray using xarray or numpy.

In [None]:
qr_mortality_scc_selected_params_quantiles = quantile_weight_quantilereg(
    qr_mortality_scc_selected_params,
    arrayname="scc",
    quantiles=[0.01, 0.05, 0.17, 0.25, 0.5, 0.75, 0.83, 0.95, 0.99],
)
qr_mortality_scc_selected_params_quantiles

## Save SCC quantiles (after filtering) to csv : This is the primary result file we share with econ team

In [None]:
SCC_QUANT_FP = (
    BASEDIR
    + "mortality_damage_coefficients_global_poly4_{date}/"
    + "mortality_scc_{uncertainty}_summary_quantiles_{prefix}_{date}_{version}.csv"
).format(
    date=get_damage_coefs_date(),
    uncertainty=FP_UNCERTAINTY,
    prefix=FP_PREFIX,
    version=get_damages_string_version(),
)

if DO_QUANTILE_REG:
    qr_mortality_scc_selected_params_quantiles.to_series().unstack("quantile").to_csv(
        SCC_QUANT_FP
    )
else:
    c_scc_selected_params_quantiles.scc.to_series().unstack("quantile").to_csv(
        SCC_QUANT_FP
    )

    print("saved", SCC_QUANT_FP)

In [None]:
# save mainresults
SCC_QUANT_FP = (
    BASEDIR
    + "mortality_damage_coefficients_global_poly4_{date}/"
    + "mortality_scc_{uncertainty}_mainresults_summary_quantiles_{prefix}_{date}_{version}.csv"
).format(
    date=get_damage_coefs_date(),
    uncertainty=FP_UNCERTAINTY,
    prefix=FP_PREFIX,
    version=get_damages_string_version(),
)


qr_scc_selected_params_quantiles_mainresults.to_series().unstack("quantile").to_csv(
    SCC_QUANT_FP
)