# Create intake catalog for ERA5 disk access

- This notebook uses an intake catalog that was created using ecgtools
- We will read in data for temperature delete the rda_url and prepend rda_data to file paths and save it as a new catalog
- Note that irrespective of whether the file paths in the catalog are urls or posix paths, the catalog itself has to be read using posix for now !!!

In [1]:
# Display output of plots directly in Notebook
%matplotlib inline
import warnings
warnings.filterwarnings("ignore")
import intake
import numpy as np
import pandas as pd
import xarray as xr
import intake_esm
import glob
from ecgtools import Builder
from ecgtools.builder import INVALID_ASSET, TRACEBACK
import re
import pandas as pd
from pathlib import Path
import aiohttp

In [2]:
import dask 
from dask_jobqueue import PBSCluster
from dask.distributed import Client
from dask.distributed import performance_report

In [13]:
rda_scratch = '/gpfs/csfs1/collections/rda/scratch/harshah'
rda_data    = '/gpfs/csfs1/collections/rda/data/'
#era5_path   = rda_data + 'ds633.0/e5.oper.an.sfc/'
era5_path   = rda_data + 'ds633.0/'
zarr_path   = rda_scratch + '/tas_zarr/'
#
rda_url     =  'https://data.rda.ucar.edu/'
#This maps to /glade/campaign/collections/rda/transfer/
# rda_zarr    = rda_url + 'harshah/pelican_test/tas_zarr/'
cat_url = '/glade/campaign/collections/rda/data/d850001/catalogs/posix/era5/era5_catalog_posix.json'

In [4]:
# Create a PBS cluster object
cluster = PBSCluster(
    job_name = 'dask-wk24-hpc',
    cores = 1,
    memory = '4GiB',
    processes = 1,
    local_directory = rda_scratch+'/dask/spill',
    log_directory = rda_scratch + '/dask/logs/',
    resource_spec = 'select=1:ncpus=1:mem=4GB',
    queue = 'casper',
    walltime = '5:00:00',
    #interface = 'ib0'
    interface = 'ext'
)

In [5]:
cluster.scale(5)

In [6]:
cluster

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/harshah/proxy/43833/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://128.117.208.98:34247,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/harshah/proxy/43833/status,Total threads: 0
Started: Just now,Total memory: 0 B


## Build a custom parser 

## Open the saved catalog and test its properties

In [41]:
catalog = intake.open_esm_datastore(cat_url)
catalog

Unnamed: 0,unique
Unnamed: 0,785068
era_id,1
datatype,2
level_type,1
step_type,7
table_code,4
param_code,164
variable,212
long_name,212
units,33


In [38]:
catalog.df['path'].head().values

array(['/glade/campaign/collections/rda/data/d633000/e5.oper.an.pl/194001/e5.oper.an.pl.128_060_pv.ll025sc.1940010100_1940010123.nc',
       '/glade/campaign/collections/rda/data/d633000/e5.oper.an.pl/194001/e5.oper.an.pl.128_060_pv.ll025sc.1940010200_1940010223.nc',
       '/glade/campaign/collections/rda/data/d633000/e5.oper.an.pl/194001/e5.oper.an.pl.128_060_pv.ll025sc.1940010300_1940010323.nc',
       '/glade/campaign/collections/rda/data/d633000/e5.oper.an.pl/194001/e5.oper.an.pl.128_060_pv.ll025sc.1940010400_1940010423.nc',
       '/glade/campaign/collections/rda/data/d633000/e5.oper.an.pl/194001/e5.oper.an.pl.128_060_pv.ll025sc.1940010500_1940010523.nc'],
      dtype=object)

In [44]:
# catalog.df

In [45]:
cat_surface = catalog.search(level_type ='sfc')
cat_surface.df.head()

Unnamed: 0.1,Unnamed: 0,era_id,datatype,level_type,step_type,table_code,param_code,variable,long_name,units,year,month,format,frequency,path
0,591340,e5,fc,sfc,accumu,128,8,SRO,Surface runoff,m,1940,1,nc,hourly,/glade/campaign/collections/rda/data/d633000/e...
1,591341,e5,fc,sfc,accumu,128,8,SRO,Surface runoff,m,1940,1,nc,hourly,/glade/campaign/collections/rda/data/d633000/e...
2,591342,e5,fc,sfc,accumu,128,9,SSRO,Sub-surface runoff,m,1940,1,nc,hourly,/glade/campaign/collections/rda/data/d633000/e...
3,591343,e5,fc,sfc,accumu,128,9,SSRO,Sub-surface runoff,m,1940,1,nc,hourly,/glade/campaign/collections/rda/data/d633000/e...
4,591344,e5,fc,sfc,accumu,128,44,ES,Snow evaporation,m of water equivalent,1940,1,nc,hourly,/glade/campaign/collections/rda/data/d633000/e...


In [46]:
cat_surface.df['path'].head().values

array(['/glade/campaign/collections/rda/data/d633000/e5.oper.fc.sfc.accumu/194001/e5.oper.fc.sfc.accumu.128_008_sro.ll025sc.1940010106_1940011606.nc',
       '/glade/campaign/collections/rda/data/d633000/e5.oper.fc.sfc.accumu/194001/e5.oper.fc.sfc.accumu.128_008_sro.ll025sc.1940011606_1940020106.nc',
       '/glade/campaign/collections/rda/data/d633000/e5.oper.fc.sfc.accumu/194001/e5.oper.fc.sfc.accumu.128_009_ssro.ll025sc.1940010106_1940011606.nc',
       '/glade/campaign/collections/rda/data/d633000/e5.oper.fc.sfc.accumu/194001/e5.oper.fc.sfc.accumu.128_009_ssro.ll025sc.1940011606_1940020106.nc',
       '/glade/campaign/collections/rda/data/d633000/e5.oper.fc.sfc.accumu/194001/e5.oper.fc.sfc.accumu.128_044_es.ll025sc.1940010106_1940011606.nc'],
      dtype=object)

In [53]:
cat_vint = catalog.search(step_type ='vinteg')
cat_vint.df.head()

Unnamed: 0.1,Unnamed: 0,era_id,datatype,level_type,step_type,table_code,param_code,variable,long_name,units,year,month,format,frequency,path
0,554980,e5,an,,vinteg,162,53,VIMA,Vertical integral of mass of atmosphere,kg m**-2,1940,1,nc,hourly,/glade/campaign/collections/rda/data/d633000/e...
1,554981,e5,an,,vinteg,162,54,VIT,Vertical integral of temperature,K kg m**-2,1940,1,nc,hourly,/glade/campaign/collections/rda/data/d633000/e...
2,554982,e5,an,,vinteg,162,59,VIKE,Vertical integral of kinetic energy,J m**-2,1940,1,nc,hourly,/glade/campaign/collections/rda/data/d633000/e...
3,554983,e5,an,,vinteg,162,60,VITHE,Vertical integral of thermal energy,J m**-2,1940,1,nc,hourly,/glade/campaign/collections/rda/data/d633000/e...
4,554984,e5,an,,vinteg,162,61,VIPIE,Vertical integral of potential+internal energy,J m**-2,1940,1,nc,hourly,/glade/campaign/collections/rda/data/d633000/e...


### Inspect the keys

In [None]:
test_ds.keys()

In [None]:
test_ds['an.vinteg']

## Select variable and plot
- I am selecting Vertical Integral of temperature

In [None]:
test_ds['an.vinteg'].VIT

## Spin up cluster

In [None]:
client = Client(cluster)
client

In [None]:
cluster.scale(15)
cluster

### Builder object for all files

In [None]:
#b_an = Builder(paths=[era5_path+'e5.oper.an.*/'],depth=1,exclude_patterns=['*.grb'])
# b_era = Builder(paths=[era5_path],depth=2,exclude_patterns=['*.grb','.html'],joblib_parallel_kwargs = {
#          'n_jobs': 15,  # Utilize all 10 cores
#          'backend': 'loky',  # 'loky' is good for managing processes, especially if you're not using Dask integration
#         })
# b_era

In [None]:
# %%time
# b_era.build(parsing_func= parse_era5)

In [None]:
b_era_df = b_era.df
b_era_df

# Inspect the catalog
- We observe that there are several rows which are NaN, we should drop them before saving our catalog
- We also see that not all files have a `level_type'. So, we should probably not use this column as a groupby attribute

In [None]:
# Check for NaN values in the 'datatype' column
print(b_era_df['datatype'].isnull().value_counts())

In [None]:
# Replace NaN values in the 'datatype' column with the string 'NA'
b_era_df = b_era_df.dropna()
b_era_df

In [None]:
b_era_df['level_type'] = b_era_df['level_type'].replace('NaN', 'NA')
b_era.df = b_era_df  # Update the builder's DataFrame with the modified one
# Check for NaN values in the 'level_type' column
print(b_era_df['level_type'].isnull().value_counts())

In [None]:
b_era_df['level_type']

- Check to see which files were not parsed by calling .invalid_assets

In [None]:
b_era.invalid_assets

In [None]:
# %%time
# b_era.save(
#     name='era5_catalog',
#     path_column_name='path',
#     variable_column_name='variable',
#     data_format='netcdf',
#     groupby_attrs=[
#         'datatype',
#         #'level_type',
#         'step_type'
#     ],
#     aggregations=[
#         {'type': 'union', 'attribute_name': 'variable'},
#         {
#             'type': 'join_existing',
#             'attribute_name': 'time_range',
#             'options': {'dim': 'time', 'coords': 'minimal', 'compat': 'override'},
#         },
#     ],
#     description = 'This is the NetCDF collection of the publicly accessible ERA5 dataset, which is a part of the NCAR glade collection. ',
#     directory = '/gpfs/csfs1/collections/rda/scratch/harshah/intake_catalogs'
# )

## Test the catalog, load some data and plot

In [None]:
# test = pd.read_json('/gpfs/csfs1/collections/rda/scratch/harshah/intake_catalogs/era5_catalog.json')
# test

In [None]:
col.df

In [None]:
col = intake.open_esm_datastore('/gpfs/csfs1/collections/rda/scratch/harshah/intake_catalogs/era5_catalog_test.json')
col

In [None]:
col.df

In [None]:
df1 = col.df

In [None]:
df1['path']= df1['path'].str.replace(rda_data, '')
df1['path'] = rda_url + df1['path'] + '#mode=bytes'
df1

In [None]:
# # Drop the column named 'Unnamed: 0' if it exists
# df1 = df1.loc[:, ~df1.columns.str.contains('^Unnamed')]
# df1

In [None]:
# df1.to_csv('/gpfs/csfs1/collections/rda/scratch/harshah/intake_catalogs/era5_catalog.csv',index=False)

In [None]:
# test_path = df1['path'][0] + '#mode=bytes'

In [None]:
df1.to_csv("/gpfs/csfs1/collections/rda/scratch/harshah/intake_catalogs/era5_catalog_test.csv")

In [None]:
col = intake.open_esm_datastore('/gpfs/csfs1/collections/rda/scratch/harshah/intake_catalogs/era5_catalog_test.json')
col.df

In [None]:
%%time
test = xr.open_dataset(col.df['path'][0],engine='netcdf4')
test = test.PV.isel(latitude=0,longitude =1,level=0)

In [None]:
xr.open_mfdataset(col.df.path.values[0:2], engine='netcdf4')

In [None]:
dsets = col.to_dataset_dict()

In [None]:
ds = xr.open_mfdataset(col['an.vinteg'].df.path.values)

In [None]:
cat = col.search(variable='MN2T',frequency='hourly', year=1940)
cat

In [None]:
dsets = cat.to_dataset_dict(aggregate=False)

- Inspect keys

In [None]:
cat.df

In [None]:
%%time
test2 = xr.open_dataset(cat.df['path'][0],engine='netcdf4')
test2

In [None]:
print(cat.df['path'][0])

In [None]:
dsets.keys()

In [None]:
ds = dsets['fc.minmax']
ds

In [None]:
%%time
ds.MN2T.isel(forecast_initial_time=0,forecast_hour=0).plot()