# Parallelizing Reference Generation with Coiled
This notebook is used to generate the reference files for the NASA-NEX-GDDP-CMIP6 dataset. This example uses `Coiled` to spin up a large # of small workers and reduces the computation time from 8+ hours on a 32 core 256Gb RAM AWS instance to 30 minutes with 500 `t4g.small` workers. Processing this 36 TB dataset with coiled cost approximately $5 of AWS Compute.

In [None]:
import coiled
import dask
import fsspec
import pandas as pd
from fsspec.implementations.reference import LazyReferenceMapper
from kerchunk.combine import MultiZarrToZarr
from kerchunk.hdf import SingleHdf5ToZarr

In the coiled cluster setup, we are trying to reduce cloud costs by: 
1. Choosing small workers `t4g.small`
1. Choosing to use spot instances
1. Specifying worker with ARM processing architecture, which is cheaper on AWS.

In [None]:
cluster = coiled.Cluster(
    n_workers=500,
    worker_vm_types=["t4g.small"],
    spot_policy="spot_with_fallback",
    arm=True,
)
client = cluster.get_client()

In [None]:
def _nasa_nex_df() -> pd.DataFrame:
    return pd.read_csv(
        "s3://carbonplan-share/nasa-nex-reference/nasa_nex_formatted.csv"
    )


def _GCM_scenarios(df: pd.DataFrame) -> pd.DataFrame:
    df = pd.read_csv("s3://carbonplan-share/nasa-nex-reference/nasa_nex_formatted.csv")
    colapsed_df = (
        df.groupby(["GCM", "scenario", "ensemble_member"])["variable"]
        .apply(list)
        .reset_index()
    )
    colapsed_df["variable"] = colapsed_df["variable"].apply(lambda x: list(set(x)))

    return colapsed_df


def read_catalog_file(catalog_url):
    return pd.read_csv(catalog_url)

In [None]:
catalog_file_url = "s3://carbonplan-share/nasa-nex-reference/reference_catalog_prod.csv"
nasa_nex_df = _nasa_nex_df()
nasa_nex_catalog = _GCM_scenarios(nasa_nex_df)
nasa_nex_catalog["ID"] = nasa_nex_catalog["GCM"] + "_" + nasa_nex_catalog["scenario"]

# If we're going to write, we can use this to check what refs exist in our catalog
# kerchunk_ref_catalog = read_catalog_file(catalog_file_url)
# missing_refs_df = nasa_nex_catalog.merge(kerchunk_ref_catalog, how='outer', on='ID',indicator=True).query('_merge != "both"')[['GCM', 'scenario', 'ID']]

In [None]:
# If we're not writing, we can use this for testing
missing_refs_df = nasa_nex_catalog

In [None]:
missing_refs_df

In [None]:
fs_read = fsspec.filesystem("s3", anon=True, skip_instance_cache=True)
so = dict(mode="rb", anon=True, default_fill_cache=False, default_cache_type="first")


def build_reference_catalog(catalog_file_url: str):
    ref_list = [
        "s3://" + ref
        for ref in fs_read.ls(
            "s3://carbonplan-share/nasa-nex-reference/references_prod/"
        )
    ]
    ref_list.remove("s3://carbonplan-share/nasa-nex-reference/references_prod/")
    ref_df = pd.DataFrame({"ID": None, "url": ref_list})
    ref_df["ID"] = ref_df["url"].str.split("/", expand=True)[5]
    ref_df.to_csv(catalog_file_url, index=False)

In [None]:
@dask.delayed
def combine_refs(refs, outpath):
    fs = fsspec.filesystem("s3")

    if fs.exists(outpath):
        fs.rm(outpath, recursive=True)
    fs.makedir(outpath)
    out = LazyReferenceMapper.create(10000, outpath, fs)
    mzz = MultiZarrToZarr(
        refs,
        remote_protocol="s3",
        concat_dims=["time"],
        identical_dims=["lat", "lon"],
        out=out,
    ).translate()
    out.flush()

    return mzz


@dask.delayed
def generate_json_reference(fil):
    with fs_read.open(fil, **so) as infile:
        h5chunks = SingleHdf5ToZarr(infile, fil, inline_threshold=300)
        return h5chunks.translate()


def gen_all_refs(row):
    GCM = row["GCM"]
    scenario = row["scenario"]
    target_root = "s3://carbonplan-share/nasa-nex-reference/references_prod/"
    store_name = f"{GCM}_{scenario}"
    output_file_name = "reference.parquet"
    outpath = target_root + store_name + "/" + output_file_name

    file_pattern = nasa_nex_df.query(f"GCM == '{GCM}'  & scenario == '{scenario}'")
    refs = [generate_json_reference(fil) for fil in file_pattern["url"].to_list()]

    mzz = combine_refs(refs=refs, outpath=outpath)
    return mzz

In [None]:
row_list = [row for _, row in missing_refs_df.iterrows()]
tasks_alt = [(gen_all_refs)(row) for row in row_list]

In [None]:
dask.compute(tasks_alt)

In [None]:
client.shutdown()

## Update Catalog
This optional section will call build_reference_catalog, which is a function to scan the carbonplan s3 storage and update the catalog.

In [None]:
# build_reference_catalog(catalog_file_url)