In [1]:
from dask_jobqueue import SLURMCluster
from dask.distributed import Client, LocalCluster
import xarray as xr
import numpy as np
import pandas as pd
import intake
import dask

  from distributed.utils import tmpfile


In [2]:
'''
A selection of traditional statistical metrics for comparing against d-score components
'''

import numpy as np

def nse(obs, mod):
    """
    Calculate the Nash-Sutcliffe Efficiency (NSE)
    (https://www.sciencedirect.com/science/article/pii/0022169470902556?via%3Dihub)
    
    Args:
        obs: numpy array of observed values
        mod: numpy array of modeled values
    Returns:
        Nash-Sutcliffe Efficiency
    """
    return 1 - (mse(obs, mod) / np.var(obs))


def mse(obs, mod):
    """
    Calculate the mean squared error (MSE)
    
    Args:
        obs: numpy array of observed values
        mod: numpy array of modeled values
    Returns:
        mean squared error
    """
    return np.mean((obs - mod) ** 2)


def pbias(obs, mod):
    """
    Calculate the percent bias
    
    Args:
        obs: numpy array of observed values
        mod: numpy array of modeled values
    Returns:
        Percent bias
    """
    return 100 * ((np.sum(mod - obs)) / (np.sum(obs)))


def pbias_percentile(obs, model, percentile, fun):
    """
    Calculate the percent bias for a percentile bin
    
    Args:
        obs: numpy array of observed values
        mod: numpy array of modeled values
        percentile: float
        fun: comparison function (e.g., np.greater)
    Returns:
        Percent bias for bin
    """
    threshold = np.percentile(obs, q=percentile)
    i = fun(obs, threshold)
    
    return pbias(obs[i], model[i])
    


def pearson_r(obs, mod):
    """
    Calculate Pearson's r
    
    Args:
        obs: numpy array of observed values
        mod: numpy array of modeled values
    Returns:
        Pearson's r
    """
    #return np.cov(mod, obs) / np.sqrt( np.var(mod) * np.var(obs))
    return np.corrcoef(mod, obs)[0,1]


def spearman_r(obs, mod):
    """
    Calculate Spearman's r
    
    Args:
        obs: numpy array of observed values
        mod: numpy array of modeled values
    Returns:
        Spearman's r
    """
    return pearson_r(np.argsort(mod), np.argsort(obs))


def kge(obs, mod):
    """
    Calculate the Kling-Gupta Efficiency (KGE)
    (https://www.sciencedirect.com/science/article/pii/S0022169409004843)
    
    Args:
        obs: numpy array of observed values
        mod: numpy array of modeled values
    Returns:
        Kling-Gupta Efficiency
    """
    #d_obs = obs - np.mean(obs)
    #d_mod = mod - np.mean(mod)
    #r = np.sum(d_obs * d_mod) / np.sqrt(np.sum(d_mod ** 2) * np.sum(d_obs ** 2))
    r = pearson_r(obs, mod)
    #alpha = np.std(mod) / np.std(obs)
    alpha = sd_ratio(obs, mod)
    beta = np.sum(mod) / np.sum(obs)

    ED = np.sqrt((r - 1) ** 2 + (alpha - 1) ** 2 + (beta - 1) ** 2)
    return 1 - ED

def sd_ratio(obs, mod):
    """
    Calculate the standard deviation ratio of the model predictions and observations
    
    Args:
        obs: numpy array of observed values
        mod: numpy array of modeled values
    Returns:
        Standard deviation ratio   
    """
    return np.std(mod) / np.std(obs)


## Start cluster

In [4]:
## Denali setup
#cluster = SLURMCluster() #TOH: my config defaults to Denali, but this won't work for other users

#cluster = LocalCluster(threads_per_worker=1)
# Tallgrass setup
cluster = SLURMCluster(queue='cpu', cores=1, interface='ib0',
                       job_extra=['--nodes=1', '--ntasks-per-node=1', '--cpus-per-task=1'],
                       scheduler_options={'dashboard_address':36999},
                      #cores=1, extra=['--resources processes=1']
                       memory='6GB')

## PC setup
#import os
#n_cores = os.cpu_count() # set to match your machine

client = Client(cluster)
#client

In [5]:
cluster.adapt(maximum_jobs=100)

<distributed.deploy.adaptive.Adaptive at 0x2aab6049c640>

In [6]:
#client.close()

# Intake catalog
We use an Intake catalog to help manage the various datasets that might be used in an evaluation.

In [7]:
url = 'https://raw.githubusercontent.com/nhm-usgs/data-pipeline-helpers/main/hytest/hytest_intake_catalog.yml'
cat = intake.open_catalog(url)
print(list(cat))

['conus404-40year-onprem', 'conus404-2017-onprem', 'conus404-2017-cloud', 'nwis-streamflow-usgs-gages-onprem', 'nwis-streamflow-usgs-gages-cloud', 'nwm21-streamflow-usgs-gages-onprem', 'nwm21-streamflow-usgs-gages-cloud', 'nwm21-streamflow-cloud']


In [8]:
# read in intake
obs_ds = cat['nwis-streamflow-usgs-gages-onprem'].to_dask()
model_ds = cat['nwm21-streamflow-usgs-gages-onprem'].to_dask()


obs = obs_ds['streamflow']
mod = model_ds['streamflow'].astype('float32')

obs.name = 'observed'
mod.name = 'predicted'

In [9]:
#client.scatter(obs)
#client.scatter(mod)
#client.scatter(ds_results)

In [10]:
%%time
# selecting a single gage is fast
gage_id = 'USGS-01030350'
x = obs.sel(gage_id=gage_id).load(scheduler='threads').to_series()
#x = obs.sel(gage_id=gage_id).load(scheduler='single-threaded')

CPU times: user 23 ms, sys: 4.86 ms, total: 27.9 ms
Wall time: 25.2 ms




In [11]:
def compute_metrics(gage_id):
    # select the data for the given gage_id
    # TODO the selection may be distributed, but can we force it onto a single node? Maybe by allocating 2 cores?
    obs1 = obs.sel(gage_id=gage_id).load(scheduler='single-threaded').to_series()
    mod1 = mod.sel(gage_id=gage_id).load(scheduler='single-threaded').to_series().resample('1D', offset='5h').mean() # Resampling could be done in preanalysis
    # make sure the indices match
    obs1.index = obs1.index.to_period('D')
    mod1.index = mod1.index.to_period('D')


    # merge obs and predictions and drop nans.
    df = pd.merge(obs1, mod1, left_index=True, right_index=True).dropna(how='any')
    obs1 = df['observed']
    mod1 = df['predicted']
    
    # compute log flow for use in log NSE
    threshold = 0.01
    log_obs = np.log(obs1.where(obs1 > threshold, threshold))
    log_model = np.log(mod1.where(mod1 > threshold, threshold))
    
    scores = pd.Series(dtype='float')
    scores['nse'] = nse(obs1, mod1)
    scores['log_nse'] = nse(log_obs, log_model)
    scores['kge'] = kge(obs1, mod1)
    
    scores['pbias'] = pbias(obs1, mod1)
    scores['pearson_r'] = pearson_r(obs1, mod1)
    scores['spearman_r'] = spearman_r(obs1, mod1)
    scores['sd_ratio'] = sd_ratio(obs1, mod1)
    
    # compute high flow and low flow bias
    high_percentile = 98
    low_percentile = 30
    
    scores['pbias_q' + str(high_percentile)] = pbias_percentile(obs1, mod1, high_percentile, np.greater)
    scores['pbias_q' + str(low_percentile)] = pbias_percentile(obs1, mod1, high_percentile, np.less_equal)
    scores.name = gage_id
    
    return scores

In [12]:
%%time
# run for a single site using 1 core
gage_id = 'USGS-01030350'
compute_metrics(gage_id)

CPU times: user 75.3 ms, sys: 3.06 ms, total: 78.4 ms
Wall time: 73 ms


nse            0.610186
log_nse        0.437533
kge            0.581806
pbias        -12.679163
pearson_r      0.799410
spearman_r    -0.003219
sd_ratio       0.655655
pbias_q98    -43.865916
pbias_q30     -7.173589
Name: USGS-01030350, dtype: float64

In [13]:
gages = list(obs.gage_id.values)

In [14]:
len(gages)

7994

In [15]:
gages[0]

'USGS-01030350'

#### Wrap compute metrics in a try

In [16]:
def try_compute_metrics(gage_id):
    """Wrapper function
    """
    try:
        return compute_metrics(gage_id)
    except:
        return None

#### Try Dask Delayed, computing a list of dask delayed objects (NOT WORKING)

In [17]:
#%%time
# Not working; tried setting scheduler and without
#results = dask.compute(*[dask.delayed(try_compute_metrics)(str(gage)) for gage in gages[:1000]], retries=10, scheduler='processes');

#### Try Dask Bag

In [18]:
import dask.bag as db

b = db.from_sequence(gages[:1000], npartitions=10)
b = b.map(try_compute_metrics)

In [19]:
%%time
results = b.compute()

CPU times: user 1.61 s, sys: 457 ms, total: 2.07 s
Wall time: 23.1 s


In [20]:
results = [i for i in results if i is not None] # Drop entries where compute_metrics failed

df = pd.concat(results, axis=1)
df1 = df.T.reset_index()
ds_results = xr.Dataset.from_dataframe(df1)
ds_results