# check output files with dask

In [1]:
import numpy as np
import numpy.matlib
import numpy.ma as ma

import xarray as xr
time_coder = xr.coders.CFDatetimeCoder(use_cftime=True) #create time coder with cftime

import time
import cftime
import netCDF4 as nc
from datetime import timedelta

import pandas as pd

import glob



In [2]:
# load custom functions for analyzing flat10

from loading_function_flat10 import load_flat10, load_grid

In [None]:
import dask
from dask_jobqueue import PBSCluster
from dask.distributed import Client

################
##### Dask #####
################

def get_ClusterClient(
        ncores=1,
        nmem='200GB',
        walltime='01:00:00',
        account='UWAS0155'):
    """
    Code from Daniel Kennedy
    More info about Dask on HPC - https://ncar.github.io/dask-tutorial/notebooks/05-dask-hpc.html
    """
    cluster = PBSCluster(
        cores=ncores,              # The number of cores you want
        memory=nmem,               # Amount of memory
        processes=ncores,          # How many processes
        queue='casper',            # Queue name
        resource_spec='select=1:ncpus=' +\
        str(ncores)+':mem='+nmem,  # Specify resources
        account=account,           # Input your project ID here
        walltime=walltime,         # Amount of wall time
        interface='ext',           # Interface to use
    )

    client = Client(cluster)
    return cluster, client

In [3]:
data_dict={}

# Zonal correction for NorESM

In [4]:
outputdir= '/glade/campaign/cgd/tss/people/aswann/flat10/'

# modellist_orig= ['ACCESS-ESM1-5',  
#             'CESM2',    
#             'GFDL-ESM4',  
#             'GISS_E2.1',  
#             'NorESM2-LM',
#             'MPI-ESM1-2-LR',
#             'CNRM-ESM2-1',
#             'HadCM3LC-Bris']
modellist=['NorESM2-LM']

runlist = ['flat10','flat10_zec','flat10_cdr']
#runlist = ['flat10-cdr']
# use a wildcard to capture different ways the folders and runs are named across models
runlist_wc = ['*lat10','*zec','*cdr']
#runlist_wc = ['*cdr']

varlist_load=['cVeg','cSoil','cLitter','nbp','gpp','rh'] #, 'gpp','fgco2', 'ra', 'rh']#, 'npp'] # not working beyond nbp for norESM
varlist_analyze=['cVeg','cSoil','cTot','cLitter','nbp','gpp','rh']
varlist=varlist_load
#varlist=['rh']

In [None]:
## Create Dask cluster and client, scale up to 20 workers
cluster, client = get_ClusterClient(walltime='02:00:00')
cluster.scale(20)
client.wait_for_workers(20)

## Lists active workers and their status
cluster.workers

In [5]:
# load grid
data_dict = load_grid(data_dict,modellist)

NorESM2-LM getting grid info


In [6]:
# load all data
##data_dict=load_flat10(data_dict, modellist, runlist, runlist_wc, varlist)

In [None]:
import numpy as np
import numpy.matlib
import numpy.ma as ma

import xarray as xr
#xr.set_options(enable_cftimeindex=True)
#from xarray.coding.times import CFTimedeltaCoder
time_coder = xr.coders.CFDatetimeCoder(use_cftime=True) #create time coder with cftime

import time
import cftime
import netCDF4 as nc
from datetime import timedelta

import pandas as pd

import glob


## notes on packages to add to this kernel
import nc_time_axis





# data location
outputdir= '/glade/campaign/cgd/tss/people/aswann/flat10/'

#----loop over models----#
for m in range(len(modellist)):
#for m in range(len(['GFDL-ESM4',  'GISS_E2.1',  'NorESM2-LM','MPI-ESM1-2-LR'])):
    model=modellist[m]
    print('loading model: ' +model)
    #----loop over experiments----# 
    for r in range(len(runlist)):
        run = runlist_wc[r]
        print('loading run: ' +run)
        #----loop over variables----#
        for v in range(len(varlist)):
            var=varlist[v]
            print('loading variable: ' +var)
            
            searchpath= outputdir +model +'/' +run +'/*' +var +'_*.nc'
            
            filenamelist= np.sort(glob.glob(searchpath)) # sort in time order, xarray was having trouble arranging some of them in time dim

            #----loop over filenames----#
            # some variables are stored in multiple files
            # this should be possible with xr.open_mfdataset but it isn't loading all of time points
            for f in range(len(filenamelist)):
                file = filenamelist[f]
                if f==0:
                    dsmerge_f = xr.open_dataset(file,decode_times=time_coder)
                else:
                    ds = xr.open_dataset(file,decode_times=time_coder)
                    dsmerge_f=xr.concat([dsmerge_f,ds],dim='time')

            
            if model == 'NorESM2-LM':
                if 'PRECC' in dsmerge_f: #NorESM
                    dsmerge_f['pr']=dsmerge_f['PRECC']
                    if dsmerge_f['pr'].units == 'm/s':
                        dsmerge_f['pr']=dsmerge_f['pr']*(1e3)
                        dsmerge_f['pr'].attrs['units'] = 'kg m-2 s-1' #equivalent is mm/s
                



            
            #----check units and convert if necessary----#
            if var in dsmerge_f: 
                if model =='CESM2':
                    if dsmerge_f[var].units == 'gC/m^2/s':
                        dsmerge_f[var]=dsmerge_f[var]*(1/1000) # convert from gC to kgC
                        dsmerge_f[var].attrs['units'] = 'kg m-2 s-1'
                    # stock variables

                
            else: #var does not exist
                ds=dsmerge_f
                # add a blank variable so that loops work
                if 'time' in ds:
                    nan_dataarray = xr.DataArray(np.full((len(ds['time']),len(ds['lat']), len(ds['lon'])), np.nan), 
                                                 coords={'lon': ds['lon'], 'lat': ds['lat'],'time': ds['time']}, dims=['time','lat', 'lon'])

                # Assign the new variable to the dataset
                dsmerge_f[var] = nan_dataarray
            
            #----merge all variables into one dataset----#
            # if it's the first variable, then start a new datset, otherwise merge with existing
            if v ==0:
                dsmerge_v = dsmerge_f.copy()
            else:
                dsmerge_v=xr.merge([dsmerge_v, dsmerge_f],compat='override')

            # add a new variable that is the sum of all carbon pools
            if all(var_name in dsmerge_v for var_name in ['cVeg', 'cSoil', 'cLitter']):
                if (dsmerge_v['cLitter'].notnull().all()): #litter is sometimes missing. Would be good to make this more general but dealing with this problem for now.
                    dsmerge_v['cTot'] = dsmerge_v['cVeg']+dsmerge_v['cSoil']+dsmerge_v['cLitter'] 
                else: 
                    dsmerge_v['cTot'] = dsmerge_v['cVeg']+dsmerge_v['cSoil'] 
        
        #----save output to a dictionary----#
        print('adding ' +model +' ' +runlist[r] +' to dict')
        data_dict[model +'_' +runlist[r]] = dsmerge_v





loading model: NorESM2-LM
loading run: *lat10
loading variable: cVeg
loading variable: cSoil
loading variable: cLitter
loading variable: nbp
loading variable: gpp
loading variable: rh
adding NorESM2-LM flat10 to dict
loading run: *zec
loading variable: cVeg
loading variable: cSoil


In [None]:
## Once done, shut down the Dask cluster
client.shutdown()

In [None]:
!rm ./dask-worker.e*
!rm ./dask-worker.o*