# Generate Monte Carlo outputs for each gridded site in a corefile

## Setup

In [None]:
import sys

sys.path.append("../../../")

In [3]:
import dask.distributed as dd
import numpy as np
import parameterize_jobs as pj
from cloudpathlib import GSPath
from scipy.io import loadmat
from shared import (
    DIR_SCRATCH,
    DIR_SLR_AR5_IFILES_INT,
    FS,
    LOCALIZESL_COREFILES,
    PATH_LOCALIZESL,
    _to_fuse,
    start_dask_cluster,
)

In [None]:
BATCH_SIZE = 32

# When running on larger/scalable dask cluster, may wish to specify number of workers
# Default is LocalCluster which will use the number of CPUs available on local machine
N_WORKERS = None
# N_WORKERS = 64

DIR_MFILES_SRC = PATH_LOCALIZESL / "MFILES"
TMP_MFILES_DIR = DIR_SCRATCH / "MFILES"

## Copy MFILES to location accessible by workers

In [1]:
cp_output = TMP_MFILES_DIR.upload_from(DIR_MFILES_SRC)

if isinstance(TMP_MFILES_DIR, GSPath):
    TMP_MFILES_DIR = _to_fuse(TMP_MFILES_DIR)
    TMP_MFILES_DIR.mkdir(exist_ok=True, parents=True)

NameError: name 'TMP_MFILES_DIR' is not defined

## Start cluster

In [None]:
client = start_dask_cluster(n_workers=N_WORKERS)
client

## Octave function used to run LocalizeSL

In [None]:
get_lslr_func = f"""
function this_ids = get_lslr(ix_start, ix_end, corefile_name, subcorefile_choice, dir_out)
    ifilesdir='{str(DIR_SLR_AR5_IFILES_INT)}';
    mfilesdir='{str(TMP_MFILES_DIR)}'

    addpath(ifilesdir);
    addpath(mfilesdir);

    pkg load statistics

    f = [corefile_name '_v5.mat'];

    corefilewrapper=load(fullfile(ifilesdir, f));

    mkdir(dir_out);

    ccclab = "SROCC";
    if strcmp(corefile_name, 'SLRProjections170113GRIDDEDcore')
        ccclab = "170113";
    end
    if strcmp(corefile_name, 'SLRProjections190726core_SEJ_full')
        disp("Make sure to run twice. Once for corefileL and once for corefileH");
        ccclab = "SEJ";
        if strcmp(subcorefile_choice, "H")
            corefile = corefilewrapper.corefileH;
        else
            corefile = corefilewrapper.corefileL;
        end
    else
        corefile = corefilewrapper;
    end
    disp(["Corefile: " corefile_name]);
    disp(["Corefile label: " ccclab]);
    disp(["Corefile subgroup: " subcorefile_choice]);

    rateproj_corefile = load(fullfile(ifilesdir, 'SLRProjections190726core_SEJ_full_v5.mat')).corefileL;

    % Take corefile.targregionnames
    % Get index of each name in rateproj_corefile.targregionnames
    % Take those indices from rateprojs and rateprojssd
    [_, idx_this_corefile, idx_rateproj_corefile] = intersect(corefile.targregionnames, rateproj_corefile.targregionnames);
    corefile.rateprojs(idx_this_corefile) = rateproj_corefile.rateprojs(idx_rateproj_corefile);
    corefile.rateprojssd(idx_this_corefile) = rateproj_corefile.rateprojssd(idx_rateproj_corefile);

    rand("seed", 0);
    corefile.seeds = [];
    for rrr=1:size(corefile.samps, 2)
        seeds=linspace(0,1,size(corefile.samps, 1) + 2);
        seeds=seeds(2:end-1);
        seeds=norminv(seeds(randperm(length(seeds))));

        corefile.seeds = [corefile.seeds; seeds];
    end

    siteids = int64(corefile.targregions);

    % Subset `siteids` to the really high-numbered ones (the gridded ones, rather than those indexed by PSMSL stations)
    gridids = siteids(siteids > 100000000);

    disp(["Number of sites: " mat2str(length(gridids))]);

    if (ix_start == 0 && ix_end == 0)
        this_ids = [0];
    else
        this_ids = gridids(ix_start:ix_end);
    end

    n_ids = size(this_ids)(1);
    for i=1:n_ids
        [sampslocrise,sampsloccomponents,siteids,sitenames,targyears,scens,cols] = LocalizeStoredProjections(this_ids(i),corefile);
        if this_ids(i) == 0
            sl_str = "GSL";
        else
            sl_str = "LSL";
            WriteTableMC(sampsloccomponents,24,siteids,sitenames,targyears,scens,[dir_out sl_str 'proj_MC_' ccclab '_baseline_']);
        end
        WriteTableMC(sampsloccomponents,[],siteids,sitenames,targyears,scens,[dir_out sl_str 'proj_MC_' ccclab '_']);
    end
"""

## Python wrappers needed for running Octave function

In [None]:
def run_batch(start, end, corefile, sub_corefile, dir_out):
    from oct2py import octave

    octave.eval(get_lslr_func)
    return octave.get_lslr(start, end, corefile, sub_corefile, str(dir_out) + "/")


def get_num_sites(corefile, sub_corefile=None):
    path_corefile = DIR_SLR_AR5_IFILES_INT / (corefile + "_v5.mat")

    cf = loadmat(path_corefile, squeeze_me=True)

    if sub_corefile is None:
        targ_regions = cf["targregions"]
    else:
        targ_regions = cf["corefile" + sub_corefile]["targregions"].item()

    return (targ_regions > 100000000).sum()

## Run jobs on workers

In [None]:
futures = dict()

n_corefiles = len(sum(LOCALIZESL_COREFILES.values(), []))
for corefile, sub_corefiles in LOCALIZESL_COREFILES.items():
    futures[corefile] = dict()
    for sub_corefile in sub_corefiles:
        num_sites = get_num_sites(corefile, sub_corefile)

        # get beginning and ending index for each batch
        starts = np.arange(1, num_sites, BATCH_SIZE)
        ends = np.arange(BATCH_SIZE, num_sites + BATCH_SIZE, BATCH_SIZE)
        ends[-1] = num_sites

        # add gsl
        starts = np.hstack(([0], starts))
        ends = np.hstack(([0], ends))

        # get out dir
        dir_out = TMP_MFILES_DIR.parent / "mcs" / corefile / "mc_tsv"
        dir_out.mkdir(parents=True, exist_ok=True)

        # get jobs
        jobs = pj.Constant(
            corefile=corefile, sub_corefile=sub_corefile, dir_out=dir_out
        ) * pj.ParallelComponentSet(start=starts, end=ends)

        # map jobs
        futures[corefile][sub_corefile] = client.map(pj.expand_kwargs(run_batch), jobs)

## Clean up temporary MFILES and close cluster

In [None]:
dd.wait(futures)
client.cluster.close(), client.close()

In [None]:
FS.rm(str(TMP_MFILES_DIR.relative_to("/gcs")), recursive=True)