# Generate annual/yearly zarr stores from hourly ERA5 NetCDF files on NCAR's Geoscience Data Exchange


## Overview and Warning: Please Read
- ERA5 data on NCAR's Geoscience Data Exchange (GDEX) is stored in hourly NetCDF files. Therefore, it is necessary to create intermediate ARCO datasets for fast processing.
- In this notebook, we read hourly data from NCAR's publicly accessible ERA5 collection using an intake catalog, compute the annual means and store the result using zarr stores.
- If you don't have write permision to save to NCAR's GDEX, please save the result to your local folder.
- If you need annual means for the following variables, please don't run the commented cells in this notebook. The data has already been calculated and can be accessed via https from https://data.gdex.ucar.edu/special_projects/pythia_2024/annual_means/

  1) Air temperature at 2 m/ [VAR_2T](https://data.gdex.ucar.edu/special_projects/pythia_2024/annual_means/temp_2m_annual_1940_2023.zarr)
     
- Otherwise, please run this script once to generate the annual means.

## Prerequisites

| Concepts | Importance | Notes |
| --- | --- | --- |
| [Intro to Xarray](https://foundations.projectpythia.org/core/xarray) | Necessary | |
| [Intro to Intake](https://projectpythia.org/intake-cookbook/notebooks/intake_introduction) | Necessary | |
| [Understanding of Zarr](https://zarr.readthedocs.io/en/stable/) | Helpful | |

- **Time to learn**: 30 minutes

## Imports

In [1]:
import glob
import re
import matplotlib as plt
import numpy as np
import scipy as sp
import xarray as xr
import intake
import intake_esm
import pandas as pd

In [2]:
import dask
from dask.distributed import Client, performance_report,LocalCluster
from dask_jobqueue import PBSCluster

In [3]:
######## File paths ################
gdex_scratch   = "/lustre/desc1/scratch/harshah"
era5_catalog = 'https://data.gdex.ucar.edu/special_projects/pythia_2024/pythia_intake_catalogs/era5_catalog.json'
annual_means      = '/gdex/data/special_projects/pythia_2024/annual_means/' #Posix path, only used for writing data 
annual_means_https= 'https://data.gdex.ucar.edu/special_projects/pythia_2024/annual_means/'

print(era5_catalog)

https://data.gdex.ucar.edu/special_projects/pythia_2024/pythia_intake_catalogs/era5_catalog.json


## Create a Dask cluster

#### Dask Introduction

[Dask](https://www.dask.org/) is a solution that enables the scaling of Python libraries. It mimics popular scientific libraries such as numpy, pandas, and xarray that enables an easier path to parallel processing without having to refactor code. 

There are 3 components to parallel processing with Dask: the client, the scheduler, and the workers. 

The Client is best envisioned as the application that sends information to the Dask cluster. In Python applications this is handled when the client is defined with `client = Client(CLUSTER_TYPE)`. A Dask cluster comprises of a single scheduler that manages the execution of tasks on workers. The `CLUSTER_TYPE` can be defined in a number of different ways.

- There is LocalCluster, a cluster running on the same hardware as the application and sharing the available resources, directly in Python with `dask.distributed`. 

- In certain JupyterHubs Dask Gateway may be available and a dedicated dask cluster with its own resources can be created dynamically with `dask.gateway`. 

- On HPC systems `dask_jobqueue` is used to connect to the HPC Slurm and PBS job schedulers to provision resources.

The `dask.distributed` client python module can also be used to connect to existing clusters. A Dask Scheduler and Workers can be deployed in containers, or on Kubernetes, without using a Python function to create a dask cluster. The `dask.distributed` Client is configured to connect to the scheduler either by container name, or by the Kubernetes service name.   

#### Select the Dask cluster type

The default will be `LocalCluster` as that can run on any system.

If running on a HPC computer with a PBS Scheduler, set to True. Otherwise, set to False.

In [4]:
USE_PBS_SCHEDULER = False

If running on Jupyter server with Dask Gateway configured, set to True. Otherwise, set to False.

In [5]:
USE_DASK_GATEWAY = False

**Python function for a PBS cluster**

In [6]:
# Create a PBS cluster object
def get_pbs_cluster():
    """ Create cluster through dask_jobqueue.   
    """
    from dask_jobqueue import PBSCluster
    cluster = PBSCluster(
        job_name = 'dask-pythia-24',
        cores = 1,
        memory = '4GiB',
        processes = 1,
        local_directory = gdex_scratch + '/dask/spill',
        resource_spec = 'select=1:ncpus=1:mem=8GB',
        queue = 'casper',
        walltime = '1:00:00',
        #interface = 'ib0'
        interface = 'ext'
    )
    return cluster

**Python function for a Gateway Cluster**

In [7]:
def get_gateway_cluster():
    """ Create cluster through dask_gateway
    """
    from dask_gateway import Gateway

    gateway = Gateway()
    cluster = gateway.new_cluster()
    cluster.adapt(minimum=2, maximum=4)
    return cluster

**Python function for a Local Cluster**

In [8]:
def get_local_cluster():
    """ Create cluster using the Jupyter server's resources
    """
    from distributed import LocalCluster, performance_report
    cluster = LocalCluster()    
    cluster.scale(6)
    return cluster

**Python logic to select the Dask Cluster type**

This uses True/False boolean logic based on the variables set in the previous cells

In [9]:
# Obtain dask cluster in one of three ways

if USE_PBS_SCHEDULER:
    cluster = get_pbs_cluster()
elif USE_DASK_GATEWAY:
    cluster = get_gateway_cluster()
else:
    cluster = get_local_cluster()

# Connect to cluster
from distributed import Client
client = Client(cluster)

# Display cluster dashboard URL
cluster

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 6
Total threads: 18,Total memory: 48.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:63981,Workers: 0
Dashboard: http://127.0.0.1:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:63995,Total threads: 3
Dashboard: http://127.0.0.1:63999/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:63984,
Local directory: /var/folders/4b/mqx319l57glgzq_jyh58q9c80000gp/T/dask-scratch-space/worker-nv71i7m1,Local directory: /var/folders/4b/mqx319l57glgzq_jyh58q9c80000gp/T/dask-scratch-space/worker-nv71i7m1

0,1
Comm: tcp://127.0.0.1:63993,Total threads: 3
Dashboard: http://127.0.0.1:64000/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:63986,
Local directory: /var/folders/4b/mqx319l57glgzq_jyh58q9c80000gp/T/dask-scratch-space/worker-5x6cs6vp,Local directory: /var/folders/4b/mqx319l57glgzq_jyh58q9c80000gp/T/dask-scratch-space/worker-5x6cs6vp

0,1
Comm: tcp://127.0.0.1:63994,Total threads: 3
Dashboard: http://127.0.0.1:63998/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:63988,
Local directory: /var/folders/4b/mqx319l57glgzq_jyh58q9c80000gp/T/dask-scratch-space/worker-klj1fek1,Local directory: /var/folders/4b/mqx319l57glgzq_jyh58q9c80000gp/T/dask-scratch-space/worker-klj1fek1

0,1
Comm: tcp://127.0.0.1:63996,Total threads: 3
Dashboard: http://127.0.0.1:63997/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:63990,
Local directory: /var/folders/4b/mqx319l57glgzq_jyh58q9c80000gp/T/dask-scratch-space/worker-ykie8s7z,Local directory: /var/folders/4b/mqx319l57glgzq_jyh58q9c80000gp/T/dask-scratch-space/worker-ykie8s7z

0,1
Comm: tcp://127.0.0.1:64013,Total threads: 3
Dashboard: http://127.0.0.1:64014/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:64007,
Local directory: /var/folders/4b/mqx319l57glgzq_jyh58q9c80000gp/T/dask-scratch-space/worker-s3aseglu,Local directory: /var/folders/4b/mqx319l57glgzq_jyh58q9c80000gp/T/dask-scratch-space/worker-s3aseglu

0,1
Comm: tcp://127.0.0.1:64012,Total threads: 3
Dashboard: http://127.0.0.1:64015/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:64009,
Local directory: /var/folders/4b/mqx319l57glgzq_jyh58q9c80000gp/T/dask-scratch-space/worker-jjdohiq7,Local directory: /var/folders/4b/mqx319l57glgzq_jyh58q9c80000gp/T/dask-scratch-space/worker-jjdohiq7


## Find data using intake catalog 

In [None]:
era5_cat = intake.open_esm_datastore(era5_catalog)
era5_cat

In [None]:
era5_cat.df[['long_name','variable']].drop_duplicates().head()

### Search for variable of interest

:::{hint}
- Examples of searching for other variables 
  1. olr_cat = era5_cat.search(variable ='MTNLWRF'), where MTNLWRF = Outgoing Long Wave Radiation (upto a sign),        Mean Top Net Long Wave Radiative Flux
  2. rh_cat = era5_cat.search(variable= 'R') # R =  Relative Humidity
:::

In [None]:
############ Access temperature data ###########
temp_cat = era5_cat.search(variable='VAR_2T',frequency = 'hourly')
temp_cat

In [None]:
# Define the xarray_open_kwargs with a compatible engine, for example, 'scipy'
xarray_open_kwargs = {
    'engine': 'h5netcdf',
    #'engine': 'netcdf4',
    'chunks': {},  # Specify any chunking if needed
    'backend_kwargs': {}  # Any additional backend arguments if required
}

In [None]:
%%time
dset_temp = temp_cat.to_dataset_dict(xarray_open_kwargs=xarray_open_kwargs)

In [None]:
dset_temp.keys()

In [None]:
temp_2m = dset_temp['an.sfc'].VAR_2T
temp_2m

### Resample hourly data to annual data and save 

:::{warning}
Uncomment and run the cells in this sub-section only if you want to recalculate the annual means! This is a computationally expensive step.
:::

In [None]:
# temp_2m_annual = temp_2m.resample(time='YE').mean()
# temp_2m_annual

In [None]:
# temp_2m_annual = temp_2m_annual.chunk({'latitude':721,'longitude':1440})
# temp_2m_annual = temp_2m_annual.drop_isel({'time':-1}) # Drop 2024 data
# temp_2m_annual

- Save annual mean to annual_means folder within gdex_data
- You may not have permissions to write to this folder. 
- If so, please replace the folder path in the cell below with your own path where you have write permissions


In [None]:
# %%time
# temp_2m_annual.to_dataset().to_zarr(annual_means + 'temp_2m_annual_1940_2023.zarr',mode='w')

### Open the saved file and make a test plot

In [None]:
temp_2m_annual = xr.open_zarr(annual_means_https + 'temp_2m_annual_1940_2023.zarr').VAR_2T
temp_2m_annual 

In [None]:
%%time
temp_2m_annual.isel(time=0).plot()

### Close up the cluster

In [None]:
cluster.close()