In [26]:
import os
import numpy as np
import xarray as xr
import cftime
import pandas as pd
import glob
from datetime import date
import functools
import dask
from dask_jobqueue import PBSCluster
from dask.distributed import Client

In [27]:
# Setup PBSCluster
cluster = PBSCluster(
    cores=1,                                                   # The number of cores you want
    memory='25GB',                                             # Amount of memory
    processes=1,                                               # How many processes
    queue='casper',                                            # The type of queue to utilize
    local_directory='/glade/work/afoster',                     # Use your local directory
    resource_spec='select=1:ncpus=1:mem=25GB',                 # Specify resources
    log_directory='/glade/derecho/scratch/afoster/dask_logs',  # log directory
    account='P08010000',                                       # Input your project ID here
    walltime='02:00:00',                                       # Amount of wall time
    interface='ext')                                           # Interface to use

Perhaps you already have a cluster running?
Hosting the HTTP server on port 43017 instead


In [28]:
cluster.scale(30)
dask.config.set({
    'distributed.dashboard.link': 'https://jupyterhub.hpc.ucar.edu/stable/user/{USER}/proxy/{port}/status'
})
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/afoster/proxy/43017/status,

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/afoster/proxy/43017/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://128.117.208.91:44503,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/afoster/proxy/43017/status,Total threads: 0
Started: Just now,Total memory: 0 B


### Directories:

In [30]:
def post_process(dir, data_vars, key_df, default=False, fates=False, fates_params=False):
    
    files = sorted(glob.glob(os.path.join(dir, 'lnd', 'hist/') + "*clm2.h0*.nc"))
    flen = len(files)
    
    if flen < 16:
        return None

    ds = xr.open_mfdataset(files, combine='nested', concat_dim='time',
                       preprocess=functools.partial(preprocess,
                                                    varset=data_vars),
                       parallel=True, autoclose=True)
    ds['time'] = xr.cftime_range(str(2000), periods=len(ds.time), freq='MS')
    ds = ds.sel(time=slice("2060-01-01", "2074-12-31"))
    ds['time'] = xr.cftime_range(str(2000), periods=len(ds.time), freq='MS')

    if fates:
        ds['GPP'] = ds['FATES_GPP']*ds['FATES_FRACTION']  # kg m-2 s-1
        ds['GPP'].attrs['units'] = ds['FATES_GPP'].attrs['units']
        ds['GPP'].attrs['long_name'] = ds['FATES_GPP'].attrs['long_name']

        ds['LAI'] = ds['FATES_LAI']*ds['FATES_FRACTION']  # m m-2 
        ds['LAI'].attrs['units'] = ds['FATES_LAI'].attrs['units']
        ds['LAI'].attrs['long_name'] = ds['FATES_LAI'].attrs['long_name']

    else:
        ds['GPP'] = ds['FPSN']*1E-6*12.011/1000.0 # kg m-2 s-1
        ds['GPP'].attrs['units'] = 'kg m-2 s-1'
        ds['GPP'].attrs['long_name'] = ds['FPSN'].attrs['long_name']
    
        ds['LAI'] = ds['TLAI'] # m m-2 
        ds['LAI'].attrs['units'] = ds['TLAI'].attrs['units']
        ds['LAI'].attrs['long_name'] = ds['TLAI'].attrs['long_name']

    sh = ds.FSH
    le = ds.EFLX_LH_TOT
    energy_threshold = 20
    
    sh = sh.where((sh > 0) & (le > 0) & ((le + sh) > energy_threshold))
    le = le.where((sh > 0) & (le > 0) & ((le + sh) > energy_threshold))
    ds['EF'] = le/(le + sh)
    ds['EF'].attrs['units'] = 'unitless'
    ds['EF'].attrs['long_name'] = 'Evaporative fraction'

    ds['ASA'] = ds.FSR/ds.FSDS.where(ds.FSDS > 0)
    ds['ASA'].attrs['units'] = 'unitless'
    ds['ASA'].attrs['long_name'] = 'All sky albedo'

    ds['RLNS'] = ds.FLDS - ds.FIRE
    ds['RLNS'].attrs['units'] = 'W m-2'
    ds['RLNS'].attrs['long_name'] = 'surface net longwave radiation'

    ds['RN'] = ds.FLDS - ds.FIRE + ds.FSDS - ds.FSR
    ds['RN'].attrs['units'] = 'W m-2'
    ds['RN'].attrs['long_name'] = 'surface net radiation'

    ds['Temp'] = ds.TSA - 273.15
    ds['Temp'].attrs['units'] = 'degrees C'
    ds['Temp'].attrs['long_name'] = ds['TSA'].attrs['long_name']

    if not default:
        ensemble_key = os.path.basename(files[0]).split('_')[-1].split('.')[0]

        if fates_params:
            key_df_sub = key_df[key_df.key == int(int(ensemble_key))]
            ds['ensemble'] = ensemble_key
        else:
            ds['ensemble'] = ensemble_key.replace('CLM6SPoaat', '')
            key_df_sub = key_df[key_df.key == ensemble_key]
            
        if len(key_df_sub) == 0:
            return None
        
        ds['parameter'] = key_df_sub.param.values[0]
        ds['minmax'] = key_df_sub.minmax.values[0]
    else:
        ds['ensemble'] = '0'
        ds['parameter'] = 'default'
        ds['minmax'] = 'default'

    ds0 = xr.open_dataset(files[0])
    extras = ['grid1d_lat', 'grid1d_lon']
    for extra in extras:
        ds[extra] = ds0[extra]

    ds.attrs['Date'] = str(date.today())
    ds.attrs['Author'] = 'afoster@ucar.edu'
    ds.attrs['Original'] = files[0]
    
    return ds

In [31]:
def get_ensemble(files, whittaker_ds):

    # read in dataset and attach other info
    ds = xr.open_mfdataset(files, combine='nested',
                           concat_dim=['ens'],
                           parallel=True)
    
    ds['biome'] = whittaker_ds.biome
    ds['biome_name'] = whittaker_ds.biome_name
    
    return ds

In [32]:
def annual_mean(da, cf):

    days_per_month = da['time.daysinmonth']
    ann_mean = cf*(days_per_month*da).groupby('time.year').sum()
    ann_mean.name = da.name

    return ann_mean

In [34]:
def calculate_vars(da, biome, domain, land_area, cf_area, cf_time, units):
    
    mean_val = annual_mean(area_mean(da, biome, domain, cf_area, land_area), cf_time)

    average_vals = mean_val.mean(dim='year') 
    interannual_mean = mean_val.std(dim='year')

    data_var = da.name

    # save the reduced data
    out = xr.Dataset()
    out[f'{data_var}_mean'] = average_vals
    out[f'{data_var}_mean'].attrs= {'units': units,
                                 'long_name': da.attrs['long_name']}
    out[f'{data_var}_iav']  = interannual_mean
    out[f'{data_var}_iav'].attrs= {'units': units,
                                'long_name': da.attrs['long_name']}


    return out

In [35]:
def write_history_files(top_dir, data_vars, param_key, postp_dir, fates=False, fates_params=False):
    
    dirs = sorted(os.listdir(top_dir))
    keys_finished = []

    for dir in dirs:
        ensemble = dir.split('_')[-1]
        out_file = os.path.join(postp_dir, f"{dir}.nc")
        
        if not os.path.isfile(out_file):
            
            ds = post_process(os.path.join(top_dir, dir), data_vars, param_key, fates=fates, fates_params=fates_params)
            if ds is not None:
                ds.to_netcdf(out_file)
                keys_finished.append(ensemble)
        else:
            keys_finished.append(ensemble)
    
    all_keys = param_key.key.values
    not_finished = []
    for key in all_keys:
        if key not in keys_finished:
            not_finished.append(key)
    return not_finished

In [36]:
def combine_history_vars(postp_dir, whit, vars):
    
    files = sorted([os.path.join(postp_dir, f) for f in os.listdir(postp_dir)])
    ds = get_ensemble(files, whit)
    ds = ds.chunk({'gridcell': 20, 'ens': 20, 'time': 20})
    
    data_vars = []
    for var in vars:
        var_ds = calculate_vars(ds[var], ds.biome, 'global', land_area,
                                cfs_area[var], cfs_time[var], var_units[var])
        var_ds['parameter'] = ds.parameter
        var_ds['minmax'] = ds.minmax
        data_vars.append(var_ds)
    ensemble_ds = xr.merge(data_vars)

    return ensemble_ds

In [37]:
def get_default(archive_dir, case_dir, data_vars, param_key, whit, out_vars, fates=False):

    default_dir = os.path.join(archive_dir, case_dir)
    default = post_process(default_dir, data_vars, param_key, default=True, fates=fates)
    default['biome'] = whit.biome
    default['biome_name'] = whit.biome_name
    data_vars = []
    for var in out_vars:
        var_ds = calculate_vars(default[var], default.biome, 'global', land_area,
                                cfs_area[var], cfs_time[var], var_units[var])
        var_ds['parameter'] = default.parameter
        var_ds['minmax'] = default.minmax
        data_vars.append(var_ds)
    default_ds = xr.merge(data_vars)

    return default_ds

In [38]:
cfs_area = {'GPP': 1e-6,
          'LAI': 'intrinsic',
          'BTRANMN': 'intrinsic',
          'TV': 'intrinsic',
          'EFLX_LH_TOT': 'intrinsic',
          'FSH': 'intrinsic',
          'EF': 'intrinsic',
          'SOILWATER_10CM': 1e-9,
          'ASA': 'intrinsic',
          'FSR': 'intrinsic',
          'FSA': 'intrinsic',
          'FIRE': 'intrinsic',
          'RLNS': 'intrinsic',
          'RN': 'intrinsic'}
cfs_time = {'GPP': 24*60*60,
          'BTRANMN': 1/365,
          'TV': 1/365,
          'LAI': 1/365,
          'EFLX_LH_TOT': 1/365,
          'FSH': 1/365,
          'EF': 1/365,
          'SOILWATER_10CM': 1/365,
          'ASA': 1/365,
          'FSR': 1/365,
          'FSA': 1/365,
          'FIRE': 1/365,
          'RLNS': 1/365,
          'RN': 1/365}
var_units = {'GPP': 'PgC yr$^{-1}$',
          'BTRANMN': '',
          'TV': 'K',
          'LAI': 'm$^2$ m$^{-2}$',
          'EFLX_LH_TOT': 'W m$^{-2}$',
          'FSH': 'W m$^{-2}$',
          'EF': '',
          'SOILWATER_10CM': 'TtH$_2$O yr$^{-1}$',
          'ASA': '',
          'FSR': 'W m$^{-2}$',
          'FSA': 'W m$^{-2}$',
          'FIRE': 'W m$^{-2}$',
          'RLNS': 'W m$^{-2}$',
          'RN': 'W m$^{-2}$'}

In [39]:
clm_data_vars = ['FPSN', 'TLAI', 'QVEGE', 'QVEGT', 'EFLX_LH_TOT',
                 'FSH', 'FSR', 'FSDS', 'FSA', 'FIRE', 'FLDS', 'FCTR',
                 'FCEV', 'FGEV', 'BTRANMN', 'FGR', 'SOILWATER_10CM', 'TWS',
                 'QRUNOFF', 'SNOWDP', 'TG', 'TSA', 'TV', 'landfrac', 'area']

fates_data_vars = ['FATES_FRACTION', 'FATES_GPP', 'FATES_LAI', 'QVEGE',
              'QVEGT', 'EFLX_LH_TOT', 'FSH', 'FSR', 'FSDS', 'FSA',
              'FIRE', 'FLDS', 'FCTR', 'FCEV', 'FGEV', 'BTRANMN', 
              'FGR', 'SOILWATER_10CM', 'TWS', 'QRUNOFF', 'SNOWDP',
              'TV', 'TG', 'TSA', 'landfrac', 'area']

In [40]:
out_vars = ['GPP', 'LAI', 'EFLX_LH_TOT', 'FSH', 'EF', 'SOILWATER_10CM', 'ASA',
        'FSR', 'FSA', 'FIRE', 'RLNS', 'RN', 'BTRANMN', 'TV']

In [41]:
# whittaker biomes
whit = xr.open_dataset('/glade/work/afoster/FATES_calibration/CLM5PPE/pyth/whit/whitkey.nc')

# fetch the sparsegrid landarea - needed for unit conversion
land_area_file = '/glade/work/afoster/FATES_calibration/CLM5PPE/postp/sparsegrid_landarea.nc'
land_area = xr.open_dataset(land_area_file).landarea  #km2

In [42]:
clm_param_key_file = '/glade/work/afoster/FATES_calibration/parameter_files/clm6sp_oaat.csv' 
clm_param_key = pd.read_csv(clm_param_key_file, header=None)
clm_param_key.columns = ['key', 'param', 'minmax']

fates_param_key_file = '/glade/work/afoster/FATES_calibration/parameter_files/fates_param_oaat/fates_oaat_key.csv'
fates_param_key = pd.read_csv(fates_param_key_file, index_col=0)
fates_param_key.columns = ['key', 'minmax', 'param']

In [43]:
file_dict = {'clm_hydro': 
                {'top_dir': '/glade/derecho/scratch/afoster/FATES_calibration/ctsm_sp_oaat/archive',
                'postp_dir': '/glade/work/afoster/FATES_calibration/history_files/ctsm_sp_oaat',
                'out_file': '/glade/work/afoster/FATES_calibration/history_files/ctsm_sp_oaat.nc',
                'default_case': 'ctsm60SP_bigleaf_sparsegrid',
                'default_file': '/glade/work/afoster/FATES_calibration/history_files/ctsm_sp_default.nc',
                'fates_params': False,
                'fates': False},
            'clm_btran':
                {'top_dir': '/glade/derecho/scratch/afoster/FATES_calibration/ctsm_sp_oaat_btran/archive',
                'postp_dir': '/glade/work/afoster/FATES_calibration/history_files/ctsm_sp_oaat_btran',
                'out_file': '/glade/work/afoster/FATES_calibration/history_files/ctsm_sp_oaat_btran.nc',
                'default_case': 'ctsm60SP_bigleaf_sparsegrid_btran',
                'default_file': '/glade/work/afoster/FATES_calibration/history_files/ctsm_sp_default_btran.nc',
                'fates_params': False,
                'fates': False},
            'fates_oaat':
                {'top_dir': '/glade/derecho/scratch/afoster/FATES_calibration/fates_sp_oaat/archive',
                'postp_dir': '/glade/work/afoster/FATES_calibration/history_files/fates_sp_oaat',
                'out_file': '/glade/work/afoster/FATES_calibration/history_files/fates_sp_oaat.nc',
                'default_case': 'ctsm60SP_fates_sparsegrid',
                'default_file': '/glade/work/afoster/FATES_calibration/history_files/fates_sp_oaat_default.nc',
                'fates_params': True,
                'fates': True},
            'fates_clmpars_oaat':
                {'top_dir': '/glade/derecho/scratch/afoster/FATES_calibration/fates_clmpars_sp_oaat/archive',
                'postp_dir': '/glade/work/afoster/FATES_calibration/history_files/fates_clmpars_sp_oaat',
                'out_file': '/glade/work/afoster/FATES_calibration/history_files/fates_clmpars_sp_oaat.nc',
                'default_case': None,
                'default_file': None,
                'fates_params': False,
                'fates': True},
            }

In [49]:
simulation = 'fates_clmpars_oaat'

In [53]:
if file_dict[simulation]['fates']:
    data_vars = fates_data_vars
else:
    data_vars = clm_data_vars
if file_dict[simulation]['fates_params']:
    key = fates_param_key
else:
    key = clm_param_key

In [51]:
if file_dict[simulation]['default_case'] is not None:
    default_ds = get_default('/glade/derecho/scratch/afoster/archive',
                             file_dict[simulation]['default_case'],
                             data_vars, key, whit, out_vars,
                             fates=file_dict[simulation]['fates'])
    default_ds.to_netcdf(file_dict[simulation]['default_file'])

In [None]:
not_finished = write_history_files(file_dict[simulation]['top_dir'], data_vars,
                                   key, file_dict[simulation]['postp_dir'],
                                   fates=file_dict[simulation]['fates'],
                                   fates_params=file_dict[simulation]['fates_params'])

In [47]:
ensemble_ds = combine_history_vars(file_dict[simulation]['postp_dir'], whit, out_vars)

In [48]:
ensemble_ds.to_netcdf(file_dict[simulation]['out_file'])

This may cause some slowdown.
Consider scattering data ahead of time and using futures.


In [None]:
client.shutdown()