# RFF emulator

Method: https://www.overleaf.com/project/618bf194c5e93dd6ff7e1bf4

Original script: https://bitbucket.org/ClimateImpactLab/socioeconomics/src/master/emulator/rff_slice.py

This was adapted from the bitbucket code on 12/9/2021.

In [7]:
## UPDATE ME!!!

HEADER = """oneline: Emulation parameters for RFF sample

version: EMU-RFF.2021-12-09
dependencies: GDPPC-RFF.2021-11-05
variables:
    iso: Country ISO (from RFF file) [str]

description: "Generated by socioeconomics/emulator/rff_slice.py"
"""

In [8]:
import rhg_compute_tools.utils as rhgu
import fsspec
import os

In [10]:
import pandas as pd
import numpy as np
from scipy.sparse import coo_matrix
try:
    import gurobipy as gp
    from gurobipy import GRB
    engine = "Gurobi"
except:
    from scipy.optimize import linprog
    engine = "SciPy"
    
print("Engine: %s" % engine)

recipe = 'fivebean'

fs_kwargs = {'token': '/opt/gcsfuse_tokens/impactlab-data.json'}

fs = fsspec.filesystem('gs', **fs_kwargs)

outdir = os.path.join(
    'gs://impactlab-data/gcp/integration/integration_raw_data/rff-weights/',
    recipe,
    pd.Timestamp.now(tz='US/Pacific').strftime('%Y%m%d.%H%M'),
)

rffpath = "gs://impactlab-data/gcp/integration/integration_raw_data/rff/feather"


# os.makedirs(outdir, exist_ok=True)

assert not fs.isdir(outdir)

print(outdir)


## https://www.overleaf.com/project/618bf194c5e93dd6ff7e1bf4
## Parameters are d_it (d = |y - z|), alpha_is

# @rhgu.block_globals(whitelist=['engine','gp'])
def cook(recipe, df, wantdf):

    output = []

    assert recipe in ['delimeat', 'coleslaw', 'fivebean']

    if recipe == 'delimeat':
        header = ['iso', 'param', 'name', 'value']
        groups = pd.unique(df.iso)
    elif recipe == 'coleslaw':
        header = ['isoyear', 'param', 'name', 'value']
        groups = pd.unique(df.isoyear)
    elif recipe == 'fivebean':
        header = ['year', 'param', 'name', 'value']
        groups = pd.unique(df.year)
    
    for group in groups:

        if recipe == 'delimeat':
            iso = group
            idf = df[df.iso == iso]
            wantidf = wantdf[wantdf.iso == iso]
        elif recipe == 'coleslaw':
            iso = group[:3]
            idf = df[df.isoyear == group]
            wantidf = wantdf[wantdf.isoyear == group]
        elif recipe == 'fivebean':
            year = group
            idf = df[df.year == group]
            wantidf = wantdf[wantdf.year == group]

        if wantidf.shape[0] == 0:
            continue
            
        isoyears = pd.unique(idf.isoyear)

        ## Create parameters list

        if recipe in ['delimeat', 'coleslaw']:
            alphaparams = pd.unique(idf.isoscen)
        elif recipe == 'fivebean':
            alphaparams = pd.unique(idf.yearscen)

        # Drop the first entry for each subgroup
        alphaparams_butfirst = np.delete(alphaparams, 0)
        params = np.concatenate((isoyears, alphaparams_butfirst))

        paramindex_alpha0 = len(isoyears)

        # Construct objective function

        if 'weight' in wantdf.columns:
            weights = [wantidf.weight[(wantidf.isoyear == isoyear)].values[0] for isoyear in isoyears]
        else:
            weights = np.ones(len(isoyears))

        if recipe in ['delimeat', 'fivebean']:
            objfunc = np.concatenate((weights, np.zeros(len(alphaparams_butfirst))))
        elif recipe == 'coleslaw':
            # Prefer lower SSPs
            objfunc = np.concatenate((weights, 1e-3 * np.arange(1, len(alphaparams_butfirst) + 1)))

        # Contruct constraints, all in the form of A x < b
        AA_rows = []
        AA_cols = []
        AA_data = []
        bb = []

        def add_AA_cell(row, col, value):
            AA_rows.append(row)
            AA_cols.append(col)
            AA_data.append(value)

        ## Rows defining absolute values

        # d_it > y_it - (sum_s>1 alpha_is y_sit + (1 - sum_s>1 alpha_is) y_1it)
        # -d_it - (sum_s>1 alpha_is y_sit - (sum_s>1 alpha_is) y_1it) < -y_it + y_1it

        # d_it > -(y_it - (sum_s>1 alpha_is y_sit + (1 - sum_s>1 alpha_is) y_1it))
        # -d_it + (sum_s>1 alpha_is y_sit - (sum_s>1 alpha_is) y_1it) < y_it - y_1it

        for ii, isoyear in enumerate(isoyears):
            subdf = idf[idf.isoyear == isoyear]
            if subdf.shape[0] == 0 or not np.any(wantidf.isoyear == isoyear):
                continue

            try:
                if recipe in ['delimeat', 'coleslaw']:
                    y1it = subdf.loginc[subdf.isoscen == alphaparams[0]].values[0]
                elif recipe == 'fivebean':
                    y1it = subdf.loginc[subdf.yearscen == alphaparams[0]].values[0]
            except Exception as ex:
                # print(ii,isoyear,ex)
                # print("Exception! Keep going..") # KM added
                continue

            add_AA_cell(len(bb), ii, -1)
            add_AA_cell(len(bb) + 1, ii, -1)

            for jj, alphaparam in enumerate(alphaparams_butfirst):
                if recipe in ['delimeat', 'coleslaw']:
                    ysit = subdf.loginc[subdf.isoscen == alphaparam].values[0]
                elif recipe == 'fivebean':
                    ysit = subdf.loginc[subdf.yearscen == alphaparam].values[0]
        
                add_AA_cell(len(bb), paramindex_alpha0 + jj, -ysit + y1it)
                add_AA_cell(len(bb) + 1, paramindex_alpha0 + jj, ysit - y1it)

            bb.append(-wantidf.loginc[(wantidf.isoyear == isoyear)].values[0] + y1it)
            bb.append(wantidf.loginc[(wantidf.isoyear == isoyear)].values[0] - y1it)

        constrindex_alphag10 = len(bb)
    
        # sum alpha_is < 1
        for jj in range(paramindex_alpha0, len(params)):
            add_AA_cell(constrindex_alphag10, jj, 1)
        bb.append(1)

        constrindex_end = constrindex_alphag10 + 1

        AA = coo_matrix((AA_data, (AA_rows, AA_cols)), shape=(constrindex_end, len(params)))

        ## Also constrained so that d_it > 0 and alpha_is > 0

        if engine == 'SciPy':
            res = linprog(objfunc, A_ub=AA, b_ub=bb)
            errors = np.around(res.x[:paramindex_alpha0], 6)
            alphas = np.around(res.x[paramindex_alpha0:], 6)
        elif engine == 'Gurobi':
            mod = gp.Model("gourmet")
            xx = mod.addMVar(shape=len(objfunc), vtype=GRB.CONTINUOUS, name="xx")
            mod.setObjective(objfunc @ xx, GRB.MINIMIZE)
            bb = np.array(bb)
            mod.addConstr(AA @ xx <= bb)
            mod.optimize()
            errors = np.around(xx.X[:paramindex_alpha0], 6)
            alphas = np.around(xx.X[paramindex_alpha0:], 6)


        for ii, error in enumerate(errors):
            output.append([group, 'error', isoyears[ii], error])

        output.append([group, 'alpha', alphaparams[0], max(0, 1 - sum(alphas))])
        for ii, alpha in enumerate(alphas):
            output.append([group, 'alpha', alphaparams_butfirst[ii], alpha])

    out_df = pd.DataFrame(output, columns=header)

    return out_df


Engine: SciPy
gs://impactlab-data/gcp/integration/integration_raw_data/rff-weights/fivebean/20220225.1054


In [13]:
# @rhgu.block_globals(whitelist=['HEADER','rffpath'])
def process_rff_sample(i, df, outdir, recipe, **storage_options):
    fs = fsspec.filesystem('gs', **storage_options)
    read_fp = (
        "gs://impactlab-data/gcp/integration/integration_raw_data/rff/gdppc/gdppc-nohier-%d.csv"
        % i
    )

    with fs.open(read_fp) as f:
        wantdf = pd.read_csv(f, skiprows=11)

    wantdf['loginc'] = np.log(wantdf.value)
    wantdf['isoyear'] = wantdf.apply(lambda row: "%s:%d" % (row.iso, row.year), axis=1)

    read_feather = (
        os.path.join(rffpath, "run_%d.feather" % i)
    )
    with fs.open(read_feather) as f:
        weights = pd.read_feather(f)

    weights.rename(columns={'Year': 'year', 'Country': 'iso'}, inplace=True)
    wantdf = pd.merge(wantdf, weights, on=['year', 'iso'], how='left')
    wantdf.rename(columns={'GDP': 'weight'}, inplace=True)
    print(wantdf.iso[np.isnan(wantdf.weight)])
    wantdf.weight[np.isnan(wantdf.weight)] = np.exp(np.nanmean(np.log(wantdf.weight))) # not dominated by large values

    out_df = cook(recipe, df, wantdf)

    write_file = os.path.join(outdir, "emulate-%s-%d.csv") % (recipe, i)

    protocol = write_file.split('://')[0] if '://' in write_file else ''
    write_options = storage_options if protocol != '' else {}
    fs = fsspec.filesystem(protocol, **write_options)

    with fs.open(write_file, 'w') as outf:
        outf.write(HEADER.strip() + '\n')
        out_df.to_csv(outf, index=False)
        print("writing",write_file)

In [14]:
ssp_fp = "gs://impactlab-data/gcp/integration/integration_raw_data/gdppc-merged-nohier.csv"
with fs.open(ssp_fp) as f:
    df = pd.read_csv(f, skiprows=11)

df = df[(df.scenario != 'SSP1') & (df.scenario != 'SSP5')]
df = df[df.year >= 2010] # low only starts in 2010
df['loginc'] = np.log(df.value)
df['isoyear'] = df.apply(lambda row: "%s:%d" % (row.iso, row.year), axis=1)
df['isoscen'] = df.apply(lambda row: "%s:%s/%s" % (row.iso, row.model, row.scenario), axis=1)
df['yearscen'] = df.apply(lambda row: "%d:%s/%s" % (row.year, row.model, row.scenario), axis=1)    

### Test a couple RFF-SPs
ran rff_sp=1 and rff_sp=2


In [35]:
print(outdir)
process_rff_sample(
    1,
    df=df,
    outdir=outdir,
    recipe=recipe,
    token="/opt/gcsfuse_tokens/impactlab-data.json",
)

gs://impactlab-data/gcp/integration/integration_raw_data/rff-weights/fivebean/20220224.1242
0        ABW
1        ABW
2        ABW
60       AFG
61       AFG
        ... 
11097    ZMB
11155    ZWE
11156    ZWE
11157    ZWE
11158    ZWE
Name: iso, Length: 728, dtype: object


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  wantdf.weight[np.isnan(wantdf.weight)] = np.exp(np.nanmean(np.log(wantdf.weight))) # not dominated by large values
  res = linprog(objfunc, A_ub=AA, b_ub=bb)
  res = linprog(objfunc, A_ub=AA, b_ub=bb)
  res = linprog(objfunc, A_ub=AA, b_ub=bb)


writing gs://impactlab-data/gcp/integration/integration_raw_data/rff-weights/fivebean/20220224.1242/emulate-fivebean-1.csv


### TODO: compare output to original fivebean to make sure this produces the same outputs



### Map all RFF-SPs: Below has not yet been tested

In [None]:
import rhg_compute_tools.kubernetes as rhgk
import dask.distributed as dd

In [None]:
client, cluster = rhgk.get_micro_cluster()
cluster.scale(20)
cluster

In [None]:
futures = client.map(
    process_rff_sample,
    list(range(1, 10001)),
    df=df,
    outdir=outdir,
    recipe=recipe,
    token='/opt/gcsfuse_tokens/impactlab-data.json',
)

In [None]:
dd.progress(futures)

# The following lines clean up / shut down your cluster

In [None]:
# this will block until futures finish, so you can just run this notebook straight through.
# if you're trying to kill the job, run the next cell without running this one.
dd.wait(futures)

In [None]:
client.restart()
cluster.scale(0)
client.close()
cluster.close()
del client
del cluster