In [1]:
from pathlib import Path
import json
from netCDF4 import Dataset,chartostring
import numpy as np
import os

In [2]:
#establish mnt and important data locations in reference to mnt
mnt=Path('/fs/ess/PAS1926/Fix_SWOT_SHCQ/confluence_latest/latest_mnt')
confluence_input=mnt.joinpath('input')
flpedir=mnt.joinpath('flpe')
output=flpedir.joinpath('consensus')

In [3]:
def algo_metadata():
        algos={
            'momma': {
                'qvar':'Q'
            },
            'hivdi': {
                'qvar':'Q'
            },
            'neobam':{
                'qvar':'q/q'
            },
            'metroman':{
                'qvar':'allq'
            },
            'sic4dvar':{
                'qvar':'Q_da'
            },
            'sad':{
                'qvar':'Qa'
            }
        }

        return algos

In [4]:
def collect_data(reach_id,flpedir,MD):
    FLPEs_to_check=os.listdir(flpedir)
    q=[]
    flpe=[]
    for ALGO in FLPEs_to_check:
        if ALGO !='consensus':
            if ALGO !='lakeflow': #avoide possibly recursivly generating consensus, or ever including lakeflow
                NOWdir=flpedir.joinpath(ALGO)
                FN=str(reach_id)+'_'+ALGO+'.nc'
                try:
                    D=Dataset(NOWdir.joinpath(FN))
                    algoQ=D[MD[ALGO]['qvar']][:].filled(np.nan)
                    q.append(algoQ)
                    flpe.append(ALGO)
                    
                        
                except:
                    optional_print=False
                    if optional_print:
                        print('no '+ALGO+' discharge file at '+str(reach_id))
        flpedict={
        'rid':reach_id,
        'q':q,
        'FLPEs':flpe
    }
        
    return flpedict

In [None]:
def remove_low_cv_and_calc_consensus(arrs, CV_thresh):
    """
    For a list of discharge arrays:
    - Removes arrays with CV < threshold
    - Recalculates consensus using the remaining arrays
    Parameters
    ----------
    arrs : list of np.ndarray
        Discharge arrays from each algorithm.
    CV_thresh : float
        Coefficient of variation threshold below which arrays are excluded.

    Returns
    -------
    np.ndarray
        Cleaned and recalculated consensus array.
    """
    cleaned_arrs = []
    for arr in arrs:
        mean = np.nanmean(arr)
        std = np.nanstd(arr)
        cv = std / mean if mean != 0 else np.nan
        if not np.isnan(cv) and cv > CV_thresh:
            cleaned_arrs.append(arr)

    if not len(cleaned_arrs):
        print("All algorithms removed due to low CV; returning NaN array.")
        return np.full_like(arrs[0], np.nan)

    return np.nanmedian(np.stack(cleaned_arrs, axis=0), axis=0)


In [5]:
def calculate_consensus(indict):
    
    conlen=len(indict['q'][0])
    #Calculate median based consensus
    ALLQ=np.full((len(indict['q']),  conlen), np.nan)
    
    contributed=[]
    for row in range(len(indict['q'])):
        ALGv=indict['q'][row]
        if np.size(ALGv)==conlen:
            ALLQ[row,:]=ALGv
            if np.any(ALGv>0):
                contributed.append(indict['FLPEs'][row])
        
    
    consensus=np.nanmedian(ALLQ,axis=0)
    consensus_cv=remove_low_cv_and_calc_consensus([ALLQ[i, :] for i in range(ALLQ.shape[0])], CV_thresh=0.5)
        
        
    consensus_dictionary={
            'rid':indict['rid'],
            'q_all':consensus,
            'q':consensus_cv,
            'FLPEs':contributed,
            'n_contributers':len(contributed)
        }
    return consensus_dictionary

In [6]:
def write_consensus(indict,output_dir):
    reach_id=indict['rid']
    nt=len(indict['q'])
    # output     
    outfile= os.path.join(output_dir, f'{reach_id}_consensus.nc')
    dsout = Dataset(outfile, 'w', format="NETCDF4") #output dataset
    dsout.n_algos=str(indict['n_contributers'])
    dsout.contributing_algos=indict['FLPEs']
    dsout.createDimension("nt",nt)
    fillvalue = np.nan
    consensus_q= dsout.createVariable("consensus_q","f8",("nt"),fill_value=fillvalue)
    consensus_q.long_name= 'consensus discharge'
    consensus_q[:]=indict['q']
    dsout.close
    
        


In [9]:
#main

#read the input reaches.json file that is driving the entire run
reach_list=confluence_input.joinpath('reaches.json')
try:
    with open(reach_list, 'r') as file:
        RL = json.load(file)        
except FileNotFoundError:
    print("Error: 'reaches.json' not found. Please ensure the file exists.")
except json.JSONDecodeError:
    print("Error: Could not decode JSON from 'reaches.json'. Check file format.")
#loop through reaches in the list and calculate consensus for each
MetaData=algo_metadata()#this function generated a dictionary of q variables from the various FLPEs
for reach in RL:   
    rid=reach['reach_id']
    FLPEdict=collect_data(rid,flpedir,MetaData)#generates a dictionary of all FLPEs at that reach
    CONdict=calculate_consensus(FLPEdict)#generates a dictionary of consensus and metadata
    write_consensus(CONdict,output)#writes file with consensus to flpe directory

  consensus=np.nanmedian(ALLQ,axis=0)
