# Access ERA5 data from glade and plot temp anomaly for Colorado

In [1]:
import matplotlib.pyplot as plt
import numpy as np
import xarray as xr
import intake_esm
import intake
import pandas as pd
import cartopy.crs as ccrs  # Correct import for coordinate reference systems
import cartopy.feature as cfeature
from holoviews import opts
import geoviews as gv
import holoviews as hv
import aiohttp

### Specify global variables

In [2]:
lat = 40.014
lon =  -105.270 + 360
# Bounding box for Colorado
colorado_right  =  (-109.060253 + 360)%360
colorado_left   = (-102.041524 + 360)%360 
colorado_bottom = 37
colorado_top    = 41
###
baseline_year = 1940
current_year  = 2023
day           = 11
month         = 6

In [13]:
rda_data          = '/gpfs/csfs1/collections/rda/data/'
rda_url           = 'https://data.rda.ucar.edu/'
era5_catalog      = rda_data + 'pythia_intake_catalogs/'
complete_url      = rda_url + 'pythia_intake_catalogs/'  + 'era5_catalog.json'
alternate_catalog = rda_data + 'pythia_intake_catalogs/era5_catalog.json'
print(complete_url)

https://data.rda.ucar.edu/pythia_intake_catalogs/era5_catalog.json


### Create a Dask cluster

#### Dask Introduction

[Dask](https://www.dask.org/) is a solution that enables the scaling of Python libraries. It mimics popular scientific libraries such as numpy, pandas, and xarray that enables an easier path to parallel processing without having to refactor code. 

There are 3 components to parallel processing with Dask: the client, the scheduler, and the workers. 

The Client is best envisioned as the application that sends information to the Dask cluster. In Python applications this is handled when the client is defined with `client = Client(CLUSTER_TYPE)`. A Dask cluster comprises of a single scheduler that manages the execution of tasks on workers. The `CLUSTER_TYPE` can be defined in a number of different ways.

- There is LocalCluster, a cluster running on the same hardware as the application and sharing the available resources, directly in Python with `dask.distributed`. 

- In certain JupyterHubs Dask Gateway may be available and a dedicated dask cluster with its own resources can be created dynamically with `dask.gateway`. 

- On HPC systems `dask_jobqueue` is used to connect to the HPC Slurm and PBS job schedulers to provision resources.

The `dask.distributed` client python module can also be used to connect to existing clusters. A Dask Scheduler and Workers can be deployed in containers, or on Kubernetes, without using a Python function to create a dask cluster. The `dask.distributed` Client is configured to connect to the scheduler either by container name, or by the Kubernetes service name.     

#### Select the Dask cluster type

The default will be LocalCluster as that can run on any system.

If running on a HPC computer with a PBS Scheduler, set to True. Otherwise, set to False.

In [4]:
USE_PBS_SCHEDULER = False

If running on Jupyter server with Dask Gateway configured, set to True. Otherwise, set to False.

In [5]:
USE_DASK_GATEWAY = False

**Python function for a PBS Cluster**

In [6]:
# Create a PBS cluster object
def get_pbs_cluster():
    """ Create cluster through dask_jobqueue.   
    """
    from dask_jobqueue import PBSCluster
    cluster = PBSCluster(
        job_name = 'dask-pythia-24',
        cores = 1,
        memory = '4GiB',
        processes = 1,
        local_directory = rda_scratch + '/dask/spill',
        resource_spec = 'select=1:ncpus=1:mem=4GB',
        queue = 'casper',
        walltime = '1:00:00',
        #interface = 'ib0'
        interface = 'ext'
    )
    return cluster

**Python function for a Gateway Cluster**

In [7]:
def get_gateway_cluster():
    """ Create cluster through dask_gateway
    """
    from dask_gateway import Gateway

    gateway = Gateway()
    cluster = gateway.new_cluster()
    cluster.adapt(minimum=2, maximum=4)
    return cluster

**Python function for a Local Cluster**

In [8]:
def get_local_cluster():
    """ Create cluster using the Jupyter server's resources
    """
    from distributed import LocalCluster, performance_report
    cluster = LocalCluster()    

    cluster.scale(4)
    return cluster

**Python logic to select the Dask Cluster type**

This uses True/False boolean logic based on the variables set in the previous cells

In [9]:
# Obtain dask cluster in one of three ways

if USE_PBS_SCHEDULER:
    cluster = get_pbs_cluster()
elif USE_DASK_GATEWAY:
    cluster = get_gateway_cluster()
else:
    cluster = get_local_cluster()

# Connect to cluster
from distributed import Client
client = Client(cluster)

# Display cluster dashboard URL
cluster

Perhaps you already have a cluster running?
Hosting the HTTP server on port 45051 instead


0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/harshah/proxy/45051/status,Workers: 4
Total threads: 4,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:46249,Workers: 4
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/harshah/proxy/45051/status,Total threads: 4
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:40531,Total threads: 1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/harshah/proxy/41053/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:38123,
Local directory: /glade/derecho/scratch/harshah/tmp/dask-scratch-space/worker-k_h0j6pd,Local directory: /glade/derecho/scratch/harshah/tmp/dask-scratch-space/worker-k_h0j6pd

0,1
Comm: tcp://127.0.0.1:37095,Total threads: 1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/harshah/proxy/41263/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:41833,
Local directory: /glade/derecho/scratch/harshah/tmp/dask-scratch-space/worker-61nlhfti,Local directory: /glade/derecho/scratch/harshah/tmp/dask-scratch-space/worker-61nlhfti

0,1
Comm: tcp://127.0.0.1:35097,Total threads: 1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/harshah/proxy/46771/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:40093,
Local directory: /glade/derecho/scratch/harshah/tmp/dask-scratch-space/worker-l3itqghb,Local directory: /glade/derecho/scratch/harshah/tmp/dask-scratch-space/worker-l3itqghb

0,1
Comm: tcp://127.0.0.1:46151,Total threads: 1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/harshah/proxy/44049/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:45057,
Local directory: /glade/derecho/scratch/harshah/tmp/dask-scratch-space/worker-nea9q2f6,Local directory: /glade/derecho/scratch/harshah/tmp/dask-scratch-space/worker-nea9q2f6


### Open ERA5 catalog 

In [14]:
era5_cat = intake.open_esm_datastore(complete_url)
#era5_cat = intake.open_esm_datastore(alternate_catalog)
era5_cat

  df = pd.read_csv(


Unnamed: 0,unique
Unnamed: 0,785068
era_id,1
datatype,2
level_type,1
step_type,7
table_code,4
param_code,164
variable,212
long_name,212
units,33


In [16]:
era5_df = era5_cat.df
era5_df.head()

Unnamed: 0.1,Unnamed: 0,era_id,datatype,level_type,step_type,table_code,param_code,variable,long_name,units,year,month,format,frequency,path
0,0,e5,an,,pl,128,60,PV,Potential vorticity,K m**2 kg**-1 s**-1,1940,1,nc,hourly,https://data.rda.ucar.edu/ds633.0/e5.oper.an.p...
1,1,e5,an,,pl,128,60,PV,Potential vorticity,K m**2 kg**-1 s**-1,1940,1,nc,hourly,https://data.rda.ucar.edu/ds633.0/e5.oper.an.p...
2,2,e5,an,,pl,128,60,PV,Potential vorticity,K m**2 kg**-1 s**-1,1940,1,nc,hourly,https://data.rda.ucar.edu/ds633.0/e5.oper.an.p...
3,3,e5,an,,pl,128,60,PV,Potential vorticity,K m**2 kg**-1 s**-1,1940,1,nc,hourly,https://data.rda.ucar.edu/ds633.0/e5.oper.an.p...
4,4,e5,an,,pl,128,60,PV,Potential vorticity,K m**2 kg**-1 s**-1,1940,1,nc,hourly,https://data.rda.ucar.edu/ds633.0/e5.oper.an.p...


- Explore the list of variables to find the name of the 2m air temperature

In [17]:
era5_df['variable'].unique()

array(['PV', 'CRWC', 'CSWC', 'Z', 'T', 'U', 'V', 'Q', 'W', 'VO', 'D', 'R',
       'O3', 'CLWC', 'CIWC', 'CC', 'ALUVP', 'ALUVD', 'ALNIP', 'ALNID',
       'CI', 'ASN', 'RSN', 'SSTK', 'ISTL1', 'ISTL2', 'ISTL3', 'ISTL4',
       'SWVL1', 'SWVL2', 'SWVL3', 'SWVL4', 'CAPE', 'LAILV', 'LAIHV',
       'TCLW', 'TCIW', 'SP', 'TCW', 'TCWV', 'STL1', 'SD', 'CHNK', 'MSL',
       'BLH', 'TCC', 'VAR_10U', 'VAR_10V', 'VAR_2T', 'VAR_2D', 'STL2',
       'STL3', 'LCC', 'MCC', 'HCC', 'SRC', 'TCO3', 'IEWS', 'INSS', 'ISHF',
       'IE', 'SKT', 'STL4', 'TSN', 'FAL', 'FSR', 'FLSR', 'LBLT', 'LTLT',
       'LSHF', 'LICT', 'LICD', 'TCRW', 'TCSW', 'U10N', 'V10N', 'VAR_100U',
       'VAR_100V', 'LMLT', 'LMLD', 'VIMA', 'VIT', 'VIKE', 'VITHE',
       'VIPIE', 'VIPILE', 'VITOE', 'VIEC', 'VIMAE', 'VIMAN', 'VIKEE',
       'VIKEN', 'VITHEE', 'VITHEN', 'VIWVE', 'VIWVN', 'VIGE', 'VIGN',
       'VITOEE', 'VITOEN', 'VIOZE', 'VIOZN', 'VILWD', 'VIIWD', 'VIMAD',
       'VIKED', 'VITHED', 'VIWVD', 'VIGD', 'VITOED', 'VIOZD', 'VILWE

- We find that the variable of interest is called 'VAR_2T'

In [18]:
temp_cat = era5_cat.search(variable='VAR_2T',frequency = 'hourly')
temp_cat

Unnamed: 0,unique
Unnamed: 0,1010
era_id,1
datatype,1
level_type,0
step_type,1
table_code,1
param_code,1
variable,1
long_name,1
units,1


- Convert catalog to dictonary
- Inspect keys

In [19]:
# Define the xarray_open_kwargs with a compatible engine, for example, 'scipy'
xarray_open_kwargs = {
    'engine': 'netcdf4',
    'chunks': {},  # Specify any chunking if needed
    'backend_kwargs': {}  # Any additional backend arguments if required
}

In [20]:
xr.open_dataset ('https://data.rda.ucar.edu/ds633.0/e5.oper.an.sfc/195206/e5.oper.an.sfc.128_167_2t.ll025sc.1952060100_1952063023.nc#mode=bytes',engine='netcdf4')

In [None]:
# # Ensure that the catalog entries are correctly handled
# for key in temp_cat.df['path']:
#     print(f"Processing {key}")

In [21]:
dsets = temp_cat.to_dataset_dict(xarray_open_kwargs=xarray_open_kwargs)


--> The keys in the returned dictionary of datasets are constructed as follows:
	'datatype.step_type'


Key:       _open_dataset-ff49e362-798e-423b-9490-b50afa9c107e
Function:  execute_task
args:      ((<function apply at 0x14ed279299e0>, <function _open_dataset at 0x14ed101a8e00>, ['https://data.rda.ucar.edu/ds633.0/e5.oper.an.sfc/199712/e5.oper.an.sfc.128_167_2t.ll025sc.1997120100_1997123123.nc#mode=bytes', 'VAR_2T'], (<class 'dict'>, [['xarray_open_kwargs', (<class 'dict'>, [['engine', 'netcdf4'], ['chunks', (<class 'dict'>, [])], ['backend_kwargs', (<class 'dict'>, [])]])], ['preprocess', None], ['expand_dims', (<class 'dict'>, [])], ['requested_variables', []], ['data_format', 'netcdf'], ['additional_attrs', (<class 'dict'>, [['Unnamed: 0', 535432], ['era_id', 'e5'], ['datatype', 'an'], ['step_type', 'sfc'], ['table_code', 128], ['param_code', 167], ['variable', 'VAR_2T'], ['long_name', '2 metre temperature'], ['units', 'K'], ['year', 1997], ['month', 12], ['format', 'nc'], ['frequency', 'hourly'], ['path', 'https://data.rda.ucar.edu/ds633.0/e5.oper.an.sfc/199712/e5.oper.an.sfc.128_

ESMDataSourceError: Failed to load dataset with key='an.sfc'
                 You can use `cat['an.sfc'].df` to inspect the assets/files for this key.
                 

In [None]:
temp_cat['an.sfc'].df

In [None]:
dsets.keys()

- Extract dataset using dictionary

In [None]:
temp_2m = dsets['an.sfc'].VAR_2T
temp_2m

- Select data corresponding to the dayofyear and colorado for the years baseline_year and current_year

In [None]:
# Create a pandas Timestamp
current_date  = pd.Timestamp(year=current_year, month=month, day=day)
baseline_date =  pd.Timestamp(year=baseline_year, month=month, day=day)
current_date

In [None]:
t2m_colorado_current = temp_2m.sel(latitude=slice(colorado_top,colorado_bottom),\
                                         longitude=slice(colorado_right,colorado_left)).sel(time=slice(current_date,current_date+  pd.Timedelta(days=1)))
t2m_colorado_current

In [None]:
t2m_colorado_baseline = temp_2m.sel(latitude=slice(colorado_top,colorado_bottom),\
                                         longitude=slice(colorado_right,colorado_left)).sel(time=slice(current_date,current_date+  pd.Timedelta(days=1)))
t2m_colorado_baseline

In [None]:
max_t2m_colorado_anomaly = t2m_colorado_current.max('time') - t2m_colorado_baseline.max('time')
max_t2m_colorado_anomaly

## Let us now plot this data with a map of Colorado in the background

In [None]:
# Extract the latitude, longitude, and data
latitude = max_t2m_colorado_anomaly['latitude'].values
longitude = max_t2m_colorado_anomaly['longitude'].values
max_t2m_colorado_anomaly = max_t2m_colorado_anomaly.data

In [None]:
# Create the plot
proj = ccrs.PlateCarree(central_longitude=180)
tiles = gv.tile_sources.ESRI()

# Define the state borders feature
states = cfeature.NaturalEarthFeature(
    category='cultural',
    name='admin_1_states_provinces_lines',
    scale='110m',
    facecolor='none'
)

# Create a GeoViews plot
plot = gv.Image((longitude, latitude, max_t2m_colorado_anomaly), crs=proj, vdims='value').opts(
    cmap='viridis',
    colorbar=True,
    width=800,
    height=600
)

# Add state borders and other features
state_borders = gv.feature.Feature(states, group='State Borders').opts(line_color='black')
# borders = gv.feature.borders.opts(line_dash='dotted', zorder=2)
# state_lines = gv.feature.states.opts(line_color='black', zorder=3)
# borders = gv.feature.borders.opts(line_dash='dotted')
# state_lines = gv.feature.states.opts(line_color='black')

# Combine everything into a single plot
final_plot = tiles * plot * state_borders * borders * state_lines

# Display the plot
hv.extension('bokeh')
final_plot.opts(
    opts.Overlay(projection=ccrs.PlateCarree(central_longitude=180), global_extent=False),
    opts.Image(tools=['hover']),
    opts.Feature(line_width=1.5)
).opts(
    opts.GridSpace(show_title=True, shared_xaxis=True, shared_yaxis=True, height=400, width=600)
)

# To add grid lines with labels, use the options on the plot directly
final_plot.opts(
    opts.Image(xaxis='bottom', yaxis='left', show_grid=True)
)

## Close the Dask Cluster

It's best practice to close the Dask cluster when it's no longer needed to free up the compute resources used.

In [None]:
cluster.close()