# Generate annual/yearly zarr stores from hourly ERA5 data files

In [1]:
import glob
import re
import matplotlib as plt
import numpy as np
import scipy as sp
import xarray as xr
import intake
import intake_esm

In [2]:
import dask
from dask.distributed import Client, performance_report
from dask_jobqueue import PBSCluster

In [3]:
# File paths
rda_scratch = "/gpfs/csfs1/collections/rda/scratch/harshah"
rda_data    = "/gpfs/csfs1/collections/rda/data/"
#########
rda_url           = 'https://data.rda.ucar.edu/'
era5_catalog      = rda_url + 'pythia_era5_24/pythia_intake_catalogs/era5_catalog.json'
#alternate_catalog = rda_data + 'pythia_era5_24/pythia_intake_catalogs/era5_catalog_opendap.json'
annual_means      =  rda_data + 'pythia_era5_24/annual_means/'
print(era5_catalog)

https://data.rda.ucar.edu/pythia_era5_24/pythia_intake_catalogs/era5_catalog.json


## Spin up a PBS cluster

In [4]:
# Create a PBS cluster object
cluster = PBSCluster(
    job_name = 'dask-wk24-hpc',
    cores = 1,
    memory = '8GiB',
    processes = 1,
    local_directory = rda_scratch+'/dask/spill',
    log_directory = rda_scratch +'/dask/',
    resource_spec = 'select=1:ncpus=1:mem=8GB',
    queue = 'casper',
    walltime = '3:30:00',
    #interface = 'ib0'
    interface = 'ext'
)

In [5]:
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/harshah/proxy/8787/status,

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/harshah/proxy/8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://128.117.208.94:40231,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/harshah/proxy/8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [6]:
cluster.scale(25)

## Find data using intake catalog 

In [7]:
era5_cat = intake.open_esm_datastore(era5_catalog)
era5_cat

  df = pd.read_csv(


Unnamed: 0,unique
era_id,1
datatype,2
level_type,1
step_type,7
table_code,4
param_code,164
variable,212
long_name,212
units,33
year,85


In [8]:
## Select 

In [9]:
intake.__version__

'0.7.0'

In [10]:
temp_cat = era5_cat.search(variable='VAR_2T',frequency = 'hourly')
temp_cat

Unnamed: 0,unique
era_id,1
datatype,1
level_type,0
step_type,1
table_code,1
param_code,1
variable,1
long_name,1
units,1
year,85


In [11]:
# Define the xarray_open_kwargs with a compatible engine, for example, 'scipy'
xarray_open_kwargs = {
    'engine': 'h5netcdf',
    'chunks': {},  # Specify any chunking if needed
    'backend_kwargs': {}  # Any additional backend arguments if required
}

In [12]:
%%time
dsets = temp_cat.to_dataset_dict(xarray_open_kwargs=xarray_open_kwargs)


--> The keys in the returned dictionary of datasets are constructed as follows:
	'datatype.step_type'


CPU times: user 1min 33s, sys: 3.67 s, total: 1min 37s
Wall time: 4min 7s


In [13]:
dsets.keys()

dict_keys(['an.sfc'])

In [14]:
temp_2m = dsets['an.sfc'].VAR_2T
temp_2m

Unnamed: 0,Array,Chunk
Bytes,2.79 TiB,3.97 MiB
Shape,"(737784, 721, 1440)","(27, 139, 277)"
Dask graph,9339456 chunks in 3031 graph layers,9339456 chunks in 3031 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 2.79 TiB 3.97 MiB Shape (737784, 721, 1440) (27, 139, 277) Dask graph 9339456 chunks in 3031 graph layers Data type float32 numpy.ndarray",1440  721  737784,

Unnamed: 0,Array,Chunk
Bytes,2.79 TiB,3.97 MiB
Shape,"(737784, 721, 1440)","(27, 139, 277)"
Dask graph,9339456 chunks in 3031 graph layers,9339456 chunks in 3031 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,2.81 MiB,2.91 kiB
Shape,"(737784,)","(744,)"
Dask graph,1010 chunks in 2021 graph layers,1010 chunks in 2021 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray
"Array Chunk Bytes 2.81 MiB 2.91 kiB Shape (737784,) (744,) Dask graph 1010 chunks in 2021 graph layers Data type int32 numpy.ndarray",737784  1,

Unnamed: 0,Array,Chunk
Bytes,2.81 MiB,2.91 kiB
Shape,"(737784,)","(744,)"
Dask graph,1010 chunks in 2021 graph layers,1010 chunks in 2021 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray


In [15]:
temp_2m_annual = temp_2m.resample(time='1Y').mean()
temp_2m_annual

Unnamed: 0,Array,Chunk
Bytes,336.65 MiB,150.40 kiB
Shape,"(85, 721, 1440)","(1, 139, 277)"
Dask graph,28560 chunks in 3630 graph layers,28560 chunks in 3630 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 336.65 MiB 150.40 kiB Shape (85, 721, 1440) (1, 139, 277) Dask graph 28560 chunks in 3630 graph layers Data type float32 numpy.ndarray",1440  721  85,

Unnamed: 0,Array,Chunk
Bytes,336.65 MiB,150.40 kiB
Shape,"(85, 721, 1440)","(1, 139, 277)"
Dask graph,28560 chunks in 3630 graph layers,28560 chunks in 3630 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [16]:
temp_2m_annual = temp_2m_annual.chunk({'latitude':721,'longitude':1440})
temp_2m_annual

Unnamed: 0,Array,Chunk
Bytes,336.65 MiB,3.96 MiB
Shape,"(85, 721, 1440)","(1, 721, 1440)"
Dask graph,85 chunks in 3631 graph layers,85 chunks in 3631 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 336.65 MiB 3.96 MiB Shape (85, 721, 1440) (1, 721, 1440) Dask graph 85 chunks in 3631 graph layers Data type float32 numpy.ndarray",1440  721  85,

Unnamed: 0,Array,Chunk
Bytes,336.65 MiB,3.96 MiB
Shape,"(85, 721, 1440)","(1, 721, 1440)"
Dask graph,85 chunks in 3631 graph layers,85 chunks in 3631 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


#### Save annual mean to pythia_era5_24/annual_means folder within rda_data

In [None]:
%%time
temp_2m_annual.to_dataset().to_zarr(annual_means + 'temp_2m_annual.zarr',mode='w')

In [None]:
temp_2m_annual = xr.open_zarr(annual_means + 'temp_2m_annual.zarr').VAR_2T
temp_2m_annual

### Close up the cluster

In [None]:
cluster.close()