# Calculate vertical ensemble mean on regular pressure levels 
CESM2-LE ONLY

* **Description**: Reads in and creates seasonal and ensemble means and vertically interpolates
* **Input data**: CESM2-LE output in timeseries format from intake-esm
* **Output data**: Netcdf file with output
* **Creator**: Alice DuVivier
* **Date**: March 2022

In [1]:
import xarray as xr
import numpy as np
from datetime import timedelta
import glob

import pop_tools

import matplotlib.pyplot as plt
import matplotlib.path as mpath
from matplotlib.gridspec import GridSpec

import geocat.datafiles as gdf
import geocat.viz.util as gvutil
from geocat.viz import cmaps as gvcmaps
import geocat.comp as gcomp

import cartopy.crs as ccrs
import cartopy.feature as cfeature
from scipy.stats import linregress,pearsonr, t

import dask
import intake
from distributed import Client
from ncar_jobqueue import NCARCluster

  from distributed.utils import tmpfile


In [2]:
# spin up dask cluster

import dask

# Use dask jobqueue
from dask_jobqueue import PBSCluster

# Import a client
from dask.distributed import Client

# Setup your PBSCluster
cluster = PBSCluster(
    cores=36, # The number of cores you want
    memory='300 GB', # Amount of memory
    processes=9, # How many processes
    queue='casper', # The type of queue to utilize (/glade/u/apps/dav/opt/usr/bin/execcasper)
    local_directory='$TMPDIR', # Use your local directory
    resource_spec='select=1:ncpus=36:mem=300GB', # Specify resources
    project='P93300665', # Input your project ID here
    walltime='06:00:00', # Amount of wall time
    interface='ib0', # Interface to use
)
# Scale up
cluster.scale(jobs=8)

# Change your url to the dask dashboard so you can see it
dask.config.set({'distributed.dashboard.link':'https://jupyterhub.hpc.ucar.edu/stable/user/{USER}/proxy/{port}/status'})

# Setup your client
client = Client(cluster)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 44867 instead
  f"Port {expected} is already in use.\n"


In [3]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/duvivier/proxy/44867/status,

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/duvivier/proxy/44867/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.12.206.58:40326,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/duvivier/proxy/44867/status,Total threads: 0
Started: Just now,Total memory: 0 B


## Manually set variables

In [4]:
# list the variables to load
var_in_1 = 'Z3'

## Load the CESM-LE data 

We will use [`intake-esm`](https://intake-esm.readthedocs.io/en/latest/), which is a data catalog tool.
It enables querying a database for the files we want, then loading those directly as an `xarray.Dataset`.

First step is to set the "collection" for the CESM-LE, which depends on a json file conforming to the [ESM Catalog Specification](https://github.com/NCAR/esm-collection-spec).

In [5]:
catalog_file = '/glade/collections/cmip/catalog/intake-esm-datastore/catalogs/glade-cesm2-le.json'

cat = intake.open_esm_datastore(catalog_file)

  self._df, self.catalog_file = _fetch_catalog(self.esmcol_data, esmcol_obj, csv_kwargs)


In [6]:
forcing = 'cmip6'  # do not want smbb data
expt = 'ssp370'
comp = 'atm'
freq = 'month_1'

subset_1 = cat.search(variable=var_in_1, forcing_variant=forcing, experiment=expt, component=comp, frequency=freq )

In [7]:
subset_1.df.head()

Unnamed: 0,component,stream,case,member_id,variable,start_time,end_time,time_range,long_name,units,vertical_levels,frequency,path,experiment,forcing_variant,cesm_member_id,control_branch_year,cmip_experiment_id
0,atm,cam.h0,b.e21.BSSP370cmip6.f09_g17.LE2-1001.001,r1i1001p1f1,Z3,2015-01,2024-12,201501-202412,Geopotential Height (above sea level),m,1.0,month_1,/glade/campaign/cgd/cesm/CESM2-LE/timeseries/a...,ssp370,cmip6,1001.001,1001,CESM2_ssp370_r1i1001p1f1
1,atm,cam.h0,b.e21.BSSP370cmip6.f09_g17.LE2-1001.001,r1i1001p1f1,Z3,2025-01,2034-12,202501-203412,Geopotential Height (above sea level),m,1.0,month_1,/glade/campaign/cgd/cesm/CESM2-LE/timeseries/a...,ssp370,cmip6,1001.001,1001,CESM2_ssp370_r1i1001p1f1
2,atm,cam.h0,b.e21.BSSP370cmip6.f09_g17.LE2-1001.001,r1i1001p1f1,Z3,2035-01,2044-12,203501-204412,Geopotential Height (above sea level),m,1.0,month_1,/glade/campaign/cgd/cesm/CESM2-LE/timeseries/a...,ssp370,cmip6,1001.001,1001,CESM2_ssp370_r1i1001p1f1
3,atm,cam.h0,b.e21.BSSP370cmip6.f09_g17.LE2-1001.001,r1i1001p1f1,Z3,2045-01,2054-12,204501-205412,Geopotential Height (above sea level),m,1.0,month_1,/glade/campaign/cgd/cesm/CESM2-LE/timeseries/a...,ssp370,cmip6,1001.001,1001,CESM2_ssp370_r1i1001p1f1
4,atm,cam.h0,b.e21.BSSP370cmip6.f09_g17.LE2-1001.001,r1i1001p1f1,Z3,2055-01,2064-12,205501-206412,Geopotential Height (above sea level),m,1.0,month_1,/glade/campaign/cgd/cesm/CESM2-LE/timeseries/a...,ssp370,cmip6,1001.001,1001,CESM2_ssp370_r1i1001p1f1


In [8]:
# check that we only have cmip6, not smbb, data
member_id = list(subset_1.df.experiment.unique())
print(member_id)

['ssp370']


In [None]:
%%time
with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    dsets_1 = subset_1.to_dataset_dict(cdf_kwargs={'chunks': {'time':50}, 'decode_times': True})
#    dsets_1 = subset_1.to_dataset_dict(cdf_kwargs={'chunks': {'time':240}, 'decode_times': True})


--> The keys in the returned dictionary of datasets are constructed as follows:
	'component.experiment.stream.forcing_variant.variable'


In [None]:
# load in the future datasets
futures_1 = []
for key in sorted(dsets_1.keys()):
    futures_1.append(dsets_1[key])
    print(key)

In [None]:
future_ds_1 = xr.concat(futures_1, dim='member_id')

In [None]:
future_ds_1.time

In [None]:
# Shift months by one to be center of time period.
# Take average of the time bounds to get middle of month
# will lose some attributes with time, so may need to put this back in later...
future_ds_1['time'] = future_ds_1.time_bnds.load().mean(dim='nbnd').sel(member_id='r1i1281p1f1')

In [None]:
# get just NH slice
future_ds_1_masked = future_ds_1.isel(lat=slice(164,192))

In [None]:
# grab variables of interest
Z_le = future_ds_1_masked[var_in_1]

In [None]:
Z_le.persist()

## Calculate geopotential height on regular pressure levels

### Get surface pressure

In [None]:
# We will need surface pressure and reference pressure for interpolation

# set a surface reference pressure [Pa]
p0 = 100000 

# set variable name
var_in_3 = 'PS'

In [None]:
# use same input parameters as above
#forcing = 'cmip6'  # do not want smbb data
#expt = 'ssp370'
#comp = 'atm'
#freq = 'month_1'

subset_3 = cat.search(variable=var_in_3, forcing_variant=forcing, experiment=expt, component=comp, frequency=freq )

In [None]:
%%time
with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    dsets_3 = subset_3.to_dataset_dict(cdf_kwargs={'chunks': {'time':240}, 'decode_times': True})

In [None]:
# load in the future datasets
futures_3 = []
for key in sorted(dsets_3.keys()):
    futures_3.append(dsets_3[key])
    print(key)

In [None]:
future_ds_3 = xr.concat(futures_3, dim='member_id')

In [None]:
# Shift months by one to be center of time period.
# Take average of the time bounds to get middle of month
# will lose some attributes with time, so may need to put this back in later...
future_ds_3['time'] = future_ds_3.time_bnds.load().mean(dim='nbnd').sel(member_id='r1i1281p1f1')

In [None]:
# get just NH slice
future_ds_3_masked = future_ds_3.isel(lat=slice(164,192))

In [None]:
# grab variables of interest
PS_le = future_ds_3_masked[var_in_3]

### Load the vertical parameters

In [None]:
# grab parameters for interpolation
hyam_le = future_ds_1_masked["hyam"]
hybm_le = future_ds_1_masked["hybm"]

### Interpolate to new pressure levels from hybrid levels

In [None]:
# Specify output pressure levels
new_levels = np.array([925, 850, 700, 500])  # in millibars
new_levels = new_levels * 100  # convert to Pascals

In [None]:
%%time
Z_le_interp = gcomp.interp_hybrid_to_pressure(Z_le,
                                               PS_le,
                                               hyam_le, hybm_le, p0 = p0,
                                               new_levels=new_levels,
                                               method='linear')

In [None]:
Z_le_interp.persist()

## Calculate Seasonal Means

In [None]:
season_names = ['OND','JFM', 'AMJ', 'JAS']

In [None]:
# find total years
xarr_le = Z_le_interp.coords['time.year'][(Z_le_interp.coords['time.month']==1)]

In [None]:
# Loop through seasons - le

# make numpy array to fill and specify dimensions we want
seas_array_le_1 = np.zeros([len(season_names),len(xarr_le),len(Z_le_interp.member_id),len(Z_le_interp.plev),len(Z_le_interp.lat),len(Z_le_interp.lon)])

for s_count, ss in enumerate(season_names):
#for s_count, ss in enumerate(season_names[0:1]):
    print(ss)
    ### Z PLEV
    # get temporary array of just these month by season
    if ss == 'JFM':
        temp1 = Z_le_interp.isel(time=Z_le_interp.time.dt.month.isin([1,2,3]))
    if ss == 'AMJ':
        temp1 = Z_le_interp.isel(time=Z_le_interp.time.dt.month.isin([4,5,6]))
    if ss == 'JAS':
        temp1 = Z_le_interp.isel(time=Z_le_interp.time.dt.month.isin([7,8,9]))
    if ss == 'OND':
        temp1 = Z_le_interp.isel(time=Z_le_interp.time.dt.month.isin([10,11,12]))
    # now loop through years to get the seasonal average by year for each ensemble member
    for y_count, yy in enumerate(xarr_le):
    #for y_count, yy in enumerate(xarr_le[0:1]):
        # select only the indexes for this year
        temp1a = temp1.isel(time=temp1.time.dt.year.isin([yy])).mean(dim='time')
        seas_array_le_1[s_count,y_count,:,:,:,:] = temp1a   


In [None]:
print(seas_array_le_1.shape)

In [None]:
# convert the numpy array to a xarray for easier plotting
Z_seas_le = xr.DataArray(seas_array_le_1,dims=('season','time','member_id','plev','lat','lon'))

In [None]:
# set coordinate arrays
Z_seas_le['season'] = season_names
Z_seas_le['time'] = xarr_le
Z_seas_le['member_id'] = Z_le_interp['member_id']
Z_seas_le['plev'] = Z_le_interp['plev'].values
Z_seas_le['lat'] = Z_le_interp['lat'].values
Z_seas_le['lon'] = Z_le_interp['lon'].values


## Write out files

In [None]:
# quick and dirty way to save a file!

# save rufmod expt, rename the variable so it makes sense
#fout = 'rufmod_vertical_seas_ens_mean_WS'
#
#WS_seas_ens_mean_rufmod.to_dataset(name='vert_ws').to_netcdf(fout+'.nc')

### Z @ PLEVS

In [None]:
#set info to write out
out_tag = 'Z'
units = 'm'
longname = 'geopotential height at given plevels'

fout = 'CESM2-LE_seas_'+out_tag

In [None]:
ds_to_save = Z_seas_le

In [None]:
# check how big this will be to write out in GB
ds_to_save.nbytes/(1024**3)

In [None]:
# assign some attributes
refdata = {'Author': 'Alice DuVivier', 'units':units, 'longname':longname}

ds_to_save.attrs = refdata

In [None]:
# check data
ds_to_save


In [None]:
ds_to_save.to_netcdf(fout+'.nc')  # how to save file