# Reproduction of CESM2-MARBL Figures

The goal of this example is to show we can use `esds-funnel` to reproduce figures looking at marine biogeochemistry (BGC) diagnostics from CESM2-MARBL

## Epoch Mean Global Map

One of the plots we are aiming to reproduce is a mean over some time period (epoch mean), plotted at the global scale (lat/lon) plot. 

An example of this is given below, where in our case, we will have a model vs. model comparison instead of observations.

![lat-lon-plot](https://github.com/marbl-ecosys/cesm2-marbl/blob/main/notebooks/figures/13-chl-surface-maps.png?raw=true)

## Epoch Mean Zonal Average

We can then create a slight modification of the preivous flow, calculating a zonal average instead of looking at both lat/lon. 

An example of this is given below, where in our case, we will have a model vs. model comparison instead of observations (GLODAP).

![zonal-avg](https://github.com/marbl-ecosys/cesm2-marbl/blob/main/notebooks/figures/02-za-obs-comparison-pCFC11.png?raw=true)

## Global Annual Average

The last plot we are recreating is a global annual average. This will include an average of fields around the globe, as well as integrals for some fields.

![timeseries](https://github.com/marbl-ecosys/cesm2-marbl/blob/main/notebooks/figures/24-global-timeseries-FG_CO2.png?raw=true)

## Imports
We start first with importing the "ingredients" of `esds-funnel`, including:
* [xpersist](https://xpersist.readthedocs.io/en/latest/)
* [xcollection](https://xcollection.readthedocs.io/en/latest/)
* [Prefect](https://www.prefect.io/)
* [intake-esm](https://intake-esm.readthedocs.io/en/stable/)



In [2]:
import os

os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "True"

import tempfile
import time

import numpy as np
import xarray as xr
import xcollection as xc
from distributed import Client
import intake
from ncar_jobqueue import NCARCluster
from prefect import Flow, Parameter, task
from xpersist import CacheStore, XpersistResult

## Spin up a Dask Cluster

In [3]:
cluster = NCARCluster()
cluster.scale(40)
client = Client(cluster)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 39435 instead


0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mgrover/proxy/39435/status,

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mgrover/proxy/39435/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.12.206.63:43476,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mgrover/proxy/39435/status,Total threads: 0
Started: Just now,Total memory: 0 B


## Configure Your Cache Store
Before we get started with the diagnostics, we need to setup a place for our cache. In this case, we use our `tempfile` directory, adding an additional directory `marbl-example-cache`

In [4]:
store = CacheStore(f'{tempfile.gettempdir()}/marbl-example-cache')
store

CacheStore(path='/glade/scratch/mgrover/marbl-example-cache', readonly=False, on_duplicate_key=<DuplicateKeyEnum.skip: 'skip'>, storage_options={})

## Epoch Mean Global Map Example
As mentioned before, in this first example, we will generate an analysis where we average over some `time` dimension, and plot a global map of some field(s)

### Setup the Operators

In [5]:
def epoch_mean(ds):
    """Take the average over some time dimension"""
    return ds.mean(dim='time')

### Setup the Tasks

In [7]:
@task
def read_catalog(path, multivar_row=False):
    """
    Reads in an intake-esm catalog from a given path

    Parameters
    ----------
    path: str, path to the associated intake-esm catalog json

    multivar_row: boolean, default=False, whether multiple variables are in each row

    Returns
    -------
    intake_esm.Catalog
    """
    if multivar_row:
        read_csv_kwargs = {"converters": {"variables": ast.literal_eval}}
    else:
        read_csv_kwargs = None

    return intake.open_esm_datastore(path, read_csv_kwargs=read_csv_kwargs)


@task
def subset_catalog(intake_esm_catalog, search_dict):
    """
    Subsets an intake-esm catalog given a search dictionary

    Parameters
    ----------
    intake_esm_catalog: intake_esm.Catalog

    search_dict: dict, dictionary to use for searching the catalog

    Returns
    -------
    catalog_subset: intake_esm.Catalog, catalog subsetted for given search
    """
    return intake_esm_catalog.search(**search_dict)


@task
def load_catalog(intake_esm_catalog, cdf_kwargs={}):
    """
    Loads the intake-esm catalog datasets into an xcollection.Collection

    Parameters
    ----------
    intake_esm_catalog: intake_esm.Catalog, catalog to read the datasets from

    cdf_kwargs: dict, dataset kwargs for intake-esm (cdf_kwargs)

    Returns
    -------
    collection: xcollection.Collection
    """
    return intake_esm_catalog.to_collection(cdf_kwargs=cdf_kwargs)


@task
def get_task_attr(obj, attr):
    """
    Retrives an attribute from a Prefect.Task

    Parameters
    ----------
    obj: Prefect.Task, the task you would like to retrieve the attributes from

    attr: str, attribute you would like to retrieve

    Returns
    -------
    object_out: Desired attribute
    """
    return list(getattr(obj, attr)())


@task(
    target="{task_name}-{key}.zarr",
    result=XpersistResult(store, serializer="xarray.zarr", serializer_dump_kwargs={"mode": "w"}),
)
def epoch_average(ds, key):
    return epoch_mean(ds)

In [9]:
with Flow('epoch_mean') as epoch_mean_flow:
    collection_path = Parameter(
        'collection_path',
        default='/glade/work/mgrover/cesm2_le_test_data/catalogs/cesm2-le-subset.json',
    )
    multivar_row = Parameter('multi_var_row', default=False)
    search_dict = Parameter('search_dict', default={})
    cdf_kwargs = Parameter('cdf_kwargs', default=None)

    # Load the intake-esm catalog into memory
    data_catalog = read_catalog(collection_path, multivar_row)

    # Subset the catalog
    data_catalog_subset = subset_catalog(data_catalog, search_dict)

    # Convert to catalog to an xcollection
    collection = load_catalog(data_catalog_subset, cdf_kwargs=cdf_kwargs)

    values = get_task_attr(collection, "values")
    keys = get_task_attr(collection, "keys")

    # Apply the epoch average operator
    epoch_average_collection = epoch_average.map(values, keys)

In [10]:
run_flow = epoch_mean_flow.run(search_dict={'stream': 'pop.h', 'variable': 'PO4'}, cdf_kwargs={})

[2022-01-05 14:50:20-0700] INFO - prefect.FlowRunner | Beginning Flow run for 'epoch_mean'
[2022-01-05 14:50:20-0700] INFO - prefect.TaskRunner | Task 'multi_var_row': Starting task run...
[2022-01-05 14:50:20-0700] INFO - prefect.TaskRunner | Task 'multi_var_row': Finished task run for task with final state: 'Success'
[2022-01-05 14:50:20-0700] INFO - prefect.TaskRunner | Task 'search_dict': Starting task run...
[2022-01-05 14:50:20-0700] INFO - prefect.TaskRunner | Task 'search_dict': Finished task run for task with final state: 'Success'
[2022-01-05 14:50:20-0700] INFO - prefect.TaskRunner | Task 'collection_path': Starting task run...
[2022-01-05 14:50:20-0700] INFO - prefect.TaskRunner | Task 'collection_path': Finished task run for task with final state: 'Success'
[2022-01-05 14:50:20-0700] INFO - prefect.TaskRunner | Task 'cdf_kwargs': Starting task run...
[2022-01-05 14:50:20-0700] INFO - prefect.TaskRunner | Task 'cdf_kwargs': Finished task run for task with final state: 'Succ

[2022-01-05 14:50:32-0700] INFO - prefect.TaskRunner | Task 'load_catalog': Finished task run for task with final state: 'Success'
[2022-01-05 14:50:32-0700] INFO - prefect.TaskRunner | Task 'get_task_attr': Starting task run...
[2022-01-05 14:50:32-0700] INFO - prefect.TaskRunner | Task 'get_task_attr': Finished task run for task with final state: 'Success'
[2022-01-05 14:50:32-0700] INFO - prefect.TaskRunner | Task 'get_task_attr': Starting task run...
[2022-01-05 14:50:32-0700] INFO - prefect.TaskRunner | Task 'get_task_attr': Finished task run for task with final state: 'Success'
[2022-01-05 14:50:32-0700] INFO - prefect.TaskRunner | Task 'epoch_average': Starting task run...
[2022-01-05 14:50:32-0700] INFO - prefect.TaskRunner | Task 'epoch_average': Finished task run for task with final state: 'Mapped'
[2022-01-05 14:50:32-0700] INFO - prefect.TaskRunner | Task 'epoch_average[0]': Starting task run...
[2022-01-05 14:50:56-0700] INFO - prefect.TaskRunner | Task 'epoch_average[0]':

In [32]:
store.keys()

['epoch_mean-epoch_average-ocn.pop.h.b.e21.BHISTsmbb.f09_g17.LE2-1301.013.zarr',
 'epoch_average-ocn.pop.h.b.e21.BHISTcmip6.f09_g17.LE2-1001.001.zarr',
 'epoch_average-ocn.pop.h.b.e21.BHISTsmbb.f09_g17.LE2-1301.013.zarr',
 'epoch_mean-epoch_average-ocn.pop.h.b.e21.BHISTcmip6.f09_g17.LE2-1001.001.zarr']