# Server-side Compute with Globus Compute
<img src="images/globus-logo.png" width=250 alt="Globus logo" style="display:inline-block">
<img src="images/esgf.png" width=250 alt="ESGF logo" style="float:left"></img>

## The Use Case: Custom Computations
What if we have a computation other than the typical averaging/subsetting/regridding workflows?

An example: The El Niño Southern Oscillation (ENSO) Index:
![ENSO Index](https://www.ncdc.noaa.gov/monitoring-content/teleconnections/nino-regions.gif)

## The Solution: Globus Compute

Thankfully, there is an existing solution to packaging custom computations, through a common API, allowing pre-defined functions to run in proximity to the datasets. From their documentation (https://www.globus.org/compute), their capabilities match our requirements:

✅ …figuring out credentials and different authentication mechanisms

✅ …configuring and managing batch jobs and schedulers

✅ …interacting with resource managers, waiting in queues and scaling nodes

✅ …configuring the execution environment for different compute systems

✅ **…retrieving and sharing computation results**

## So I have a function I would like to share - where do I start?

### Step 1. Write, register, and test your function
As someone with access to the ESGF data holdings in a data center, you would:
1. Write a function that locally accesses the data using `intake-esgf`
2. Register the function with `globus-compute`
3. Test the function on your local machine, using the unique ID of the function you registered to test.

### Step 2. Share your function with the community.
Now that you have a registered function, you can share that with a user group by:
1. Creating a shared user group
2. Adding that group as collaborators on your function using the web interface at globus.org

## An Example of Calculating ENSO with Globus Compute

### Imports and Pre-Requirements
These imports and associated code would be run **within the data center, which has access to petabytes of earth system model output**.

In [1]:
import hvplot.xarray
import holoviews as hv
import numpy as np
import hvplot.xarray
import matplotlib.pyplot as plt
import cartopy.crs as ccrs
import intake_esgf
from intake_esgf import ESGFCatalog
import xarray as xr
import cf_xarray
import warnings
import os
import time
from globus_compute_sdk import Executor, Client
warnings.filterwarnings("ignore")

hv.extension("bokeh")

### Writing, Registering, and Testing our Function
As mentioned in the introduction, we are utilizing functions from the previous ENSO notebooks. In order to run these with Globus Compute, we need to comply with the following requirements
- All libraries/packages used in the function need to be installed on the globus compute endpoint
- All functions/libraries/packages need to be imported and defined within the function to execute
- The output from the function needs to serializable (ex. xarray.Dataset, numpy.array)

Using these constraints, we setup the following function, with the key parameter being which modeling center (model) to compare. Two examples here include Community Earth System Model v2 (CESM2) and the Model for Interdisciplinary Research on Climate v6 (MIROC6) .

In [2]:
def compute_enso(source_id, return_path=False):
    import numpy as np
    import matplotlib.pyplot as plt
    import intake_esgf
    from intake_esgf import ESGFCatalog
    import xarray as xr
    import cf_xarray
    import warnings
    warnings.filterwarnings("ignore")

    def search_esgf(source_id):

        # Configure intake-esgf to only look at data from Argonne, which will be local
        # for the Globus endpoint to which we are submitting this work.
        intake_esgf.conf.set(
            indices={
                "anl-dev": True,
                "ornl-dev": False,
            }
        )

        # Search and load the ocean surface temperature (tos)
        cat = ESGFCatalog().search(
            activity_id="CMIP",
            experiment_id="historical",
            variable_id="tos",
            source_id=source_id,
            table_id="Omon",
            grid_label='gn'
        )

        # There will be many ensemble members, but we just want the 'main' one. You can
        # filter out all others by using the intake-esgf catalog.
        cat.remove_ensembles()

        try:
            tos_ds = cat.to_dataset_dict()["tos"]
        except Exception as exc:
            print(f"There was an issue with the '{source_id}' dataset")
            print(exc)
            return xr.Dataset()

        # Store the session log in the attributes so we can see what happened
        tos_ds.attrs["intake-esgf log"] = cat.session_log()

        return tos_ds

    def calculate_enso(ds):

        # Subset the El Nino 3.4 index region
        dso = ds.where(
        (ds.cf["latitude"] < 5) & (ds.cf["latitude"] > -5) & (ds.cf["longitude"] > 190) & (ds.cf["longitude"] < 240), drop=True
        )

        # Calculate the monthly means
        gb = dso.tos.groupby('time.month')

        # Subtract the monthly averages, returning the anomalies
        tos_nino34_anom = gb - gb.mean(dim='time')

        # Determine the non-time dimensions and average using these
        non_time_dims = set(tos_nino34_anom.dims)
        non_time_dims.remove(ds.tos.cf["T"].name)
        weighted_average = tos_nino34_anom.weighted(ds["areacello"]).mean(dim=list(non_time_dims))

        # Calculate the rolling average
        rolling_average = weighted_average.rolling(time=5, center=True).mean()
        std_dev = weighted_average.std()
        return rolling_average / std_dev

    def add_enso_thresholds(da, threshold=0.4):

        # Conver the xr.DataArray into an xr.Dataset
        ds = da.to_dataset()

        # Cleanup the time and use the thresholds
        try:
            ds["time"]= ds.indexes["time"].to_datetimeindex()
        except:
            pass
        ds["tos_gt_04"] = ("time", ds.tos.where(ds.tos >= threshold, threshold).data)
        ds["tos_lt_04"] = ("time", ds.tos.where(ds.tos <= -threshold, -threshold).data)

        # Add fields for the thresholds
        ds["el_nino_threshold"] = ("time", np.zeros_like(ds.tos) + threshold)
        ds["la_nina_threshold"] = ("time", np.zeros_like(ds.tos) - threshold)

        return ds
    
    ds = search_esgf(source_id)
    enso_index = add_enso_thresholds(calculate_enso(ds).compute())
    enso_index.attrs = ds.attrs
    enso_index.attrs["source_id"] = source_id

    return enso_index

In [3]:
compute_enso("CESM2")

   Searching indices:   0%|          |0/1 [       ?index/s]

Get file information:   0%|          |0/1 [       ?index/s]

Adding cell measures:   0%|          |0/1 [     ?dataset/s]

#### Setup your Compute Endpoint - this will be on an HPC system or your local machine.

Once you have tested your function locally, it is time to setup your compute endpoint! To start a GC endpoint at your system you need to login, configure a [conda environment](https://foundations.projectpythia.org/foundations/how-to-run-python.html#installing-and-managing-python-with-conda), and install `globus-compute-endpoint`.


You will then run:

```bash
globus-compute-endpoint configure sc2024-esgf

globus-compute-endpoint start sc2024-esgf
```

Which will return a unique ID! **Make a note of that ID - we will use that next!**

In [4]:
compute_endpoint_id = "YOUR_COMPUTE_ID_HERE"

#### Setup an Executor to Run the Function(s)
Once we have our compute endpoint ID, we need to pass this to our executor, which will be used to pass our functions from our local machine to the machine we would like to compute on.

In [5]:
gce = Executor(endpoint_id=compute_endpoint_id)
gce.amqp_port = 443
gce

Executor<ep_id:b58a22ed-3fcf-4cc0-85e1-e56b306160db; tg_id:None; bs:128>

#### Test the Function through Globus Compute

Now that we have our functions prepared, and an executor to run on, we can test them out using our endpoint!

We pass in our function name, and the additional arguments for our functions. For example, let’s look at comparing at the CESM2 and MIROC6 simulations.

In [6]:
ncar_task = gce.submit(compute_enso, source_id='CESM2')
miroc_task = gce.submit(compute_enso, source_id='MIROC6')

In [7]:
# The results are started as python objects, with the resultant datasets available using `.result()`
ncar_ds = ncar_task.result()
miroc_ds = miroc_task.result()

### Visualize our Data Locally
Now that we have pre-computed datasets, the last step is to visualize the output. In the other example, we stepped through how to utilize the .hvplot tool to create interactive displays of ENSO values. We will utilize that functionality here, wrapping into a function.

In [8]:
def plot_enso(ds):
    el_nino = ds.hvplot.area(x="time", y2='tos_gt_04', y='el_nino_threshold', color='red', hover=False)
    el_nino_label = hv.Text(ds.isel(time=40).time.values, 2, 'El Niño').opts(text_color='red',)

    # Create the La Niña area graphs
    la_nina = ds.hvplot.area(x="time", y2='tos_lt_04', y='la_nina_threshold', color='blue', hover=False)
    la_nina_label = hv.Text(ds.isel(time=-40).time.values, -2, 'La Niña').opts(text_color='blue')

    # Plot a timeseries of the ENSO 3.4 index
    enso = ds.tos.hvplot(x='time', line_width=0.5, color='k', xlabel='Year', ylabel='ENSO 3.4 Index')

    # Combine all the plots into a single plot
    return (el_nino_label * la_nina_label * el_nino * la_nina * enso).opts(title=f'{ds.attrs["source_id"]} \n Ensemble Member: {ds.attrs["variant_label"]}')

# Once we have the function, we apply to our two datasets and combine into a single column.
plots = (plot_enso(ncar_ds) + plot_enso(miroc_ds)).cols(1)

In [9]:
plots

### Share your Function with Others!
Now that we have fully tested and deployed our function, let's share it with others! We can do this by registering the function.

For example, we creating one for our ESGF project:

![ESGF group globus](images/globus-groups.png)

**Take note of the ID listed there - we need to use that to register our function and share!**

In [12]:
print("Instantiating a Globus Compute client ...")
gcc = Client()

# Register the function within the Globus ecosystem
print("Registering the Globus Compute function ...")
compute_function_id = gcc.register_function(compute_enso,
                                            description="ESGF Demo Video",
                                            group="d19fa870-f242-11ee-9d91-c94e369cbd56")

print(compute_function_id)

Instantiating a Globus Compute client ...
Registering the Globus Compute function ...
5dd105a0-3b57-4e14-9c5b-41252134dad5


#### Now that we registered the function, we can run it!

We authenticated prior to this. All authentication is handled through globus!

In [13]:
task_id = gcc.run(endpoint_id=compute_endpoint_id,
                  function_id=compute_function_id,
                  source_id='CESM2'
                  )

# Wait 10 seconds for the computation
time.sleep(10)
cesm2_ds = gcc.get_result(task_id)
cesm2_ds

In [14]:
# and plot it!
plot_enso(cesm2_ds)

# Key Points

- Great solution if a user needs custom computation next to the data
- Minimizes data transfer by operating on the data where it is stored
- `intake-esgf` is used to detect the file system, and access the data locally