In [None]:
import time
import pandas as pd
import matplotlib.pyplot as plt
from dask_gateway import Gateway
from dask.distributed import Client
from IPython.display import IFrame
from rich import print

plt.rcParams['figure.figsize'] = (15,10)
%matplotlib inline

In [None]:
def summarize_inputs(recipe, ninputs=8901):
    """A helper function to use below
    """
    for time_index, url in recipe.file_pattern.items():
        if time_index[0] < 3 or time_index[0] > (ninputs-4):
            print(time_index, url)
        elif time_index[0] == (ninputs-4):
            print("...")
        else:
            pass

In [None]:
# I'll start and connect to this cluster ahead of time, but time it, so you'll know how long it took!
start = time.time()

gateway = Gateway()
cluster = gateway.new_cluster()
cluster.adapt(minimum=1, maximum=20)
client = Client(cluster)
print(f"Connected to Dask client in {round(time.time()-start, 2)} seconds")
client

# Pangeo Forge: ETL for analysis-ready, cloud-optimized (ARCO) data stores

**Charles Stern** ([@cisaacstern](http://github.com/cisaacstern)), Data Infrastructure Engineer, Lamont-Doherty Earth Observatory (LDEO)

Presentation Repo: https://github.com/cisaacstern/zarr-vs-download

## CMEMS sea surface altimetry data

For this example we will use [gridded sea-surface altimetry data](http://marine.copernicus.eu/services-portfolio/access-to-products/?option=com_csw&view=details&product_id=SEALEVEL_GLO_PHY_L4_REP_OBSERVATIONS_008_047) from The Copernicus Marine Environment, a widely used dataset in physical oceanography and climate.

## CMEMS `ftp` index

```
ftp://my.cmems-du.eu:...
 ├──/1993
 │   ├──/01
 │   │   ├── dt_global...19930101.nc  (7789577 bytes)
 │   │  ...
 │   │   └── dt_global...19930131.nc  (7853172 bytes)
 │  ...
 │   └──/12
...
 └──/2020
```

In [None]:
# Our target range includes 8901 files:

dates = pd.date_range(start="1993-01-01", end="2017-05-15")
avg_bytes = 7853172
print(f"{len(dates)} files")

### The old way: start downloading

This will take awhile! 

And the end result is likely not well-situated for parallel computation.

# A better way: Pangeo Forge

`pangeo-forge-recipes` provides logic for transforming all of these source files into a single consolidated zarr store.

## What's a `recipe`?

A `recipe` is a Python file which can "see" all of the source files, and also knows how to logically arrange them into a cohesive dataset.

In [None]:
from cmems_recipe import recipe

summarize_inputs(recipe)

# Zarr build: steps

1. Cache files to cloud
2. Write to Zarr store according to alignment **and chunking** logic

> Chunking usually **not** 1:1 with alignment. For parallel computation with Dask, ~50-100 MB chunks tend to work well.

In [None]:
from intake import open_catalog
cat = open_catalog("catalog.yaml")

for source in ["full_altimetry", "anomalies_only"]:
    ds = cat[source].to_dask()
    print(f"'{source}' is {round(ds.nbytes/1e9, 2)} GBs and contains {ds.data_vars} \n")

In [None]:
ds

## Example calculation: timeseries of Global Mean Sea Level

Here we make a simple yet fundamental calculation: the rate of increase of global mean sea level over the observational period.

In [None]:
# the number of GB involved in the reduction
ds.sla.nbytes/1e9

In [None]:
IFrame(client.dashboard_link, width=900, height=550)

In [None]:
# the computationally intensive step
sla_timeseries = ds.sla.mean(dim=('latitude', 'longitude')).load()

In [None]:
sla_timeseries.plot(label='full data')
sla_timeseries.rolling(time=365, center=True).mean().plot(label='rolling annual mean')
plt.ylabel('Sea Level Anomaly [m]')
plt.title('Global Mean Sea Level')
plt.legend()
plt.grid()

## Today
- Start working on a recipe: https://pangeo-forge.readthedocs.io/en/latest/
- Ask me for help with it: https://github.com/pangeo-forge/staged-recipes/issues
- Add it to the queue for automated builds: https://github.com/pangeo-forge/staged-recipes/pulls
- Build it using a notebook


## Soon
- Build your recipe in an automated "Bakery"
- Browse (and contribute to) a STAC catalog of available Zarr datasets
