In [76]:
import os
import numpy as np
import xarray as xr
import cftime
import pandas as pd
import glob
from datetime import date
import functools
import dask
from dask_jobqueue import PBSCluster
from dask.distributed import Client

In [77]:
# Setup PBSCluster
cluster = PBSCluster(
    cores=1,                                      # The number of cores you want
    memory='10GB',                                # Amount of memory
    processes=1,                                  # How many processes
    queue='casper',                               # The type of queue to utilize (/glade/u/apps/dav/opt/usr/bin/execcasper)
    local_directory='/glade/work/afoster',        # Use your local directory
    resource_spec='select=1:ncpus=1:mem=10GB',    # Specify resources
    project='P93300041',                          # Input your project ID here
    walltime='02:00:00',                          # Amount of wall time
    interface='ext',                              # Interface to use
)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 42087 instead


In [78]:
cluster.scale(2)

In [79]:
client = Client(cluster)

In [80]:
def preprocess(ds, varset):
    return ds[data_vars]

In [81]:
def postprocess(top_dir, histdir, data_vars, postp_dir):
    
    files = sorted(glob.glob(os.path.join(top_dir, histdir, 'lnd', 'hist/') + "*clm2.h0*.nc"))
    
    ds = xr.open_mfdataset(files, combine='nested', concat_dim='time', preprocess=functools.partial(preprocess, varset=data_vars),
                            parallel=True, autoclose=True)
    
    ds['time']   = xr.cftime_range(str(2005), periods=12*60, freq='MS') #fix time bug
    ds = ds.sel(time=slice("2055-01-01", "2064-12-31"))
    ds['time']   = xr.cftime_range(str(2005), periods=12*10, freq='MS')
    
    ## calculate some variables
    ds['ASA'] = ds.FSR/ds.FSDS.where(ds.FSDS>0)
    ds['ASA'].attrs['units'] = 'unitless'
    ds['ASA'].attrs['long_name'] = 'All sky albedo'
    
    ds['GPP'] = ds['FATES_GPP']*ds['FATES_FRACTION'] # kg m-2 s-1
    ds['GPP'].attrs['units'] = ds['FATES_GPP'].attrs['units']
    ds['GPP'].attrs['long_name'] = ds['FATES_GPP'].attrs['long_name']
    
    ds['Temp'] = ds.TSA-273.15
    ds['Temp'].attrs['units'] = 'degrees C'
    ds['Temp'].attrs['long_name'] = ds['TSA'].attrs['long_name']
    
    ds0 = xr.open_dataset(files[0])
    extras = ['grid1d_lat','grid1d_lon']
    for extra in extras:
        ds[extra]=ds0[extra]
    
    key = os.path.basename(files[0]).split('_')[-1].split('.')[0]
    this_member = param_key[param_key.ensemble_member == key]

    ds['param'] = this_member.param.values
    ds['minmax'] = this_member.minmax.values
        
    ds.attrs['Date'] = str(date.today())
    ds.attrs['Author'] = 'afoster@ucar.edu'
    ds.attrs['Original'] = files[0]
    
    out_file = os.path.join(postp_dir, files[0].split('/')[-1].split('.')[0]+'.nc')
    ds.to_netcdf(out_file)
    

In [82]:
param_file = '/glade/work/afoster/FATES_calibration/FATES_SP_OAAT/FATES_SP_OAAT_param_key.csv' 
param_key = pd.read_csv(param_file)
param_key['ensemble_member'] = param_key.key.str.split('_', expand=True)[2]

In [83]:
data_vars = ['FATES_GPP', 'EFLX_LH_TOT', 'FSR', 'FSDS', 'QRUNOFF', 'FATES_FRACTION', 'SNOWDP', 
             'SOILWATER_10CM', 'TV', 'FATES_LAI', 'TWS', 'FSH', 'QVEGE', 'TG', 'TSA', 'RAIN', 'SNOW', 
            'TBOT']

In [84]:
top_dir = '/glade/derecho/scratch/afoster/FATES_SP_OAAT/archive'
postp_dir = '/glade/work/afoster/FATES_calibration/FATES_SP_OAAT/hist/'
dirs = sorted(os.listdir(top_dir))

In [87]:
for histdir in dirs:
    postprocess(top_dir, histdir, data_vars, postp_dir)