## CONUS Water Budget Components from WRF-Hydro

#### Notes on CONUS Water Budget Processing

Aubrey has already processed the data from 3-hourly to monthly summaries on the native WRF-Hydro/NWM grids. We will use this temporally-aggregated output to begin the spatial aggregation for the sake of reduced processing time.

| Output File Type      | Description | Path on Glade |
| ----------- | ----------- | ----------- |
| **LDASOUT** | WRF-Hydro 1km output files | `/glade/campaign/ncar/USGS_Water/adugger/IWAA/retro_42yr/monthly_files/water_*.nc` |
| **CHRTOUT** | WRF-Hydro/NWM Network files | `/glade/campaign/ncar/USGS_Water/adugger/IWAA/retro_42yr/monthly_files/chrt_*.nc` |
| **LDASIN** | Noah-MP LSM Input files | `/glade/campaign/ncar/USGS_Water/adugger/IWAA/retro_42yr/monthly_files/clim_*.nc` |
| **GWOUT** | WRF-Hydro/NWM Groundwater basins | `/glade/campaign/ncar/USGS_Water/adugger/IWAA/retro_42yr/monthly_files/gw_*.nc` |

For water budget components and how they map to the above files, refer to the spreadsheet here: https://docs.google.com/spreadsheets/d/1m486t3jXBcrvST3AtlVktyeCnjJqZgn1Gd4X70Bso10/edit#gid=0

## Background

This notebook is intended to process the zonal (spatial) statistics between NWM Retrospective outputs and a set of gridded 'zones', which can be any spatial unit such as counties, states, HUCs, etc. Those inputs must already be resolved on the intended NWM grid (LSM - 1km, or routing - 250m) and optionally subset to any spatial subset of the NWM retrospective data (i.e. the grids must match exactly). This script assumes all 'zone' datasets are written in typical GIS fashion from north to south. If an LSM grid is requested, the zone dataset will be flipped south-to-north in this script. 

A 2D groupby operation is performed using the `flox` python package, though this functionality has been incorporated into certain branches of the `xarray` library and may become available soon in the main branch.

https://github.com/dask/dask/issues/5085#issuecomment-513043034

## Processing Environment

The Python environment used is a conda environment `analysis`, here:

* /glade/work/ksampson/conda-envs/analysis

### Imports

In [1]:
# --- Import Modules --- #

# Import Python Core Modules
import sys
import os
import time
import math
import tracemalloc
import datetime
from pathlib import Path

# Some environment variables important to dask
os.environ["MALLOC_TRIM_THRESHOLD_"] = "0"
os.environ["DASK_DISTRIBUTED__SCHEDULER__ACTIVE_MEMORY_MANAGER__START"] = "True"
os.environ["DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION"] = "1.2"
if 'DASK_ROOT_CONFIG' in os.environ:
    del os.environ['DASK_ROOT_CONFIG']    # This seems pretty important
import dask
from dask.distributed import Client, progress, LocalCluster, performance_report
from dask_jobqueue import PBSCluster
from dask.diagnostics import ProgressBar
import dask.array as da

# Import Additional Modules
import numpy as np
import xarray as xr
import pandas as pd
import zarr
import flox.xarray

# Import functions from local repository
sys.path.append(r'/glade/scratch/ksampson/Water_Budget/usgs_water/daily_retro')
from usgs_common import *

tic = time.time()
print('Process initiated at {0}'.format(time.ctime()))
# --- End Import Modules --- #

Process initiated at Mon Oct 30 10:56:02 2023


## Define the input files and other relevant local variables

In [2]:
# NWM output type ['LDASOUT', 'RTOUT', 'Precip']
NWM_type = 'LDASOUT'

# Variable to process - list form, from LDASOUT, and LDASIN
variables = ['deltaACCET',
             'deltaACSNOW',
             'deltaSNEQV',
             'deltaSOILM',
             'deltaUGDRNOFF',
             'deltaSOILM_depthmean',
             'avgSNEQV',
             'avgSOILM',
             'avgSOILM_depthmean',
             'avgSOILM_wltadj_depthmean',
             'avgSOILSAT',
             'avgSOILSAT_wltadj_top1',
             'totPRECIP',
             'avgT2D']

# Give a name to the zone dataset, which will be the name of the zone variable
zone_name = 'WBDHU12'

# Perform temporal subset on inputs?
temporal_subset = False

# Choose the temporal range, if temporal_subset is true
#time_subset_bounds = slice('2009-10-01', '2019-09-30')    # First IWAAS batch
time_subset_bounds = slice('2019-10-01', '2021-09-30')     # Second IWAAS batch
#time_subset_bounds = slice('2009-10-01', '2021-09-30')     # Second IWAAS batch

## Define the output files and other relevant variables to outputs

In [3]:
# Output directory
outDir = r'/glade/scratch/ksampson/USGS/CONUS_Water_Budget/Water_Budget'

# Basename for output files - extension will be applied later
#output_pattern = 'CONUS_HUC12_WB_2D_19791001_20001231_notFinal'
output_pattern = 'CONUS_HUC12_WB_2D_19791001_20220930'

# Other variables to help with the file output naming convention
write_CSV = True
write_NC = True

# Apply a landmask to the weight grid so that water cells are not considered in the spatial statistics? 
# Only applies to LSM grid variables
landmask_results = True

# Variables that will be normalized to the full land area (not landmasked land area)
non_landmask_vars = ['Precip', 'landmask']

# Add variables that we want to process spatial stats for
addVars = ['total_gridded_area'] + non_landmask_vars    # For all other processing
#addVars = ['total_gridded_area']                        # For the soil moisture top layer variables

# Calculate percent soil saturation as a derived output variable
pct_sat = False

### Handle the processing of input variables if the source is raw NWM

Use the NWM_type to define the input Zarr store, and any other processing requirements (unit conversion, time resampling, etc.)

In [4]:
# We will construct a list of files. They must all contain the same time and other dimensions in order to be concatenated using open_mfdataset
convert_to_mm = False

# Specify the directory where the precip (LDASIN, clim_*.nc) files are stored. This can be different in some cases than the othe files
#clim_dir = r'/glade/p/ral/hap/adugger/projects/USGS_HyTEST/WaterBudget/datasets/IWAA_WH/monthly_files/wrfhydro_10yr'
#clim_dir = r'/glade/scratch/adugger/USGS_HyTEST/IWAA/monthly_files_42yr/final_outputs'
clim_dir = r'/glade/campaign/ncar/USGS_Water/adugger/IWAA/retro_42yr/monthly_files'

# Specify the directory where the LDASOUT (water_*.nc) files are stored.
#land_dir = r'/glade/p/ral/hap/adugger/projects/USGS_HyTEST/WaterBudget/datasets/IWAA_WH/monthly_files/wrfhydro_10yr'
#land_dir = r'/glade/scratch/adugger/USGS_HyTEST/IWAA/monthly_files_42yr/final_outputs'
land_dir = r'/glade/campaign/ncar/USGS_Water/adugger/IWAA/retro_42yr/monthly_files'

# Add a second set of variables from a different set of files
file_in = get_files_wildcard(land_dir, 
                             file_pattern='water_*.nc', 
                             recursive=False)

# Obtain list of files from wildcard
file_in2 = get_files_wildcard(clim_dir, 
                             file_pattern='clim_*.nc', 
                             recursive=False)

# If no additional datasets are needed
#file_in2 = []

Found 516 files using wildcard "water_*.nc" (recursive=False) in the input directory:
	 /glade/campaign/ncar/USGS_Water/adugger/IWAA/retro_42yr/monthly_files
Found 516 files using wildcard "clim_*.nc" (recursive=False) in the input directory:
	 /glade/campaign/ncar/USGS_Water/adugger/IWAA/retro_42yr/monthly_files


### Spin up a Dask Cluster

https://ncar.github.io/esds/posts/2021/casper_pbs_dask/index.html

Currently, the best way to execute this notebook is on a Casper PBS login node, though it may also be run on a Casper batch node with sufficient memory such that the individual workers do not incur >80% memory utilization. The current upper limit on the batch nodes is 36 cores per node and 1494Gb per node. Thus, for some datasets, such as routing grid resolution and high temporal frequncies, the script may overload the workers and stop responding. 

Suggested parameters for a Casper batch node are:

* Casper share nodes
* 500-1494Gb per node (109 is Cheyenne batch node limit)
* 2-36 cores
* 1-24 hour wallclock time

In [5]:
%%time

# Change your url to the dask dashboard so you can see it
dask.config.config.get('distributed').get('dashboard').update({'link':'{JUPYTERHUB_SERVICE_PREFIX}/proxy/{port}/status'})

# # If you are already on a cluster, use the local cluster, aliased with Client
# client = Client()
# client

#project_key = 'NHAP0007'    # IWAA, Cheyenne only
project_key = 'P48500028'

# To spin up a PBS cluster (Cheyenne, Casper, etc.), use the syntax below
cluster = PBSCluster(
    cores=1, 
    queue='casper', 
    project=project_key, 
    memory='10GiB', 
    walltime='02:00:00',  
    death_timeout=75,
    resource_spec = 'select=1:ncpus=1:mem=10GB',
    local_directory=os.path.join(outDir, "dask"),
    job_script_prologue=["export DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=1.2"],)
    # interface='ib0',)
    
print(cluster.job_script())

# For a set number of cores
# n_jobs = 10
# n_jobs_start = n_jobs
# cluster.scale(jobs=n_jobs)
# client = Client(cluster)
# client.wait_for_workers(n_jobs_start)

# For an adaptable number of cores
cluster.adapt(minimum=10, maximum=100)

# Start the client
client = Client(cluster)

# Display the client
client



#!/usr/bin/env bash

#PBS -N dask-worker
#PBS -q casper
#PBS -A P48500028
#PBS -l select=1:ncpus=1:mem=10GB
#PBS -l walltime=02:00:00
export DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=1.2
/glade/work/ksampson/conda-envs/analysis/bin/python -m distributed.cli.dask_worker tcp://128.117.208.96:44245 --nthreads 1 --memory-limit 10.00GiB --name dummy-name --nanny --death-timeout 75 --local-directory /glade/scratch/ksampson/USGS/CONUS_Water_Budget/Water_Budget/dask

CPU times: user 233 ms, sys: 44.4 ms, total: 277 ms
Wall time: 390 ms


Perhaps you already have a cluster running?
Hosting the HTTP server on port 45953 instead


0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: /stable/user/ksampson//proxy/45953/status,

0,1
Dashboard: /stable/user/ksampson//proxy/45953/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://128.117.208.96:44245,Workers: 0
Dashboard: /stable/user/ksampson//proxy/45953/status,Total threads: 0
Started: Just now,Total memory: 0 B


#### Open the input file and read some useful information

In [6]:
%%time

def extract_dates(in_paths=[], format_str='%Y%m'):
    '''
    This function will take an input path and extract a date object from the filename. 
    Assumes that the filename ends with "_{datestring}.nc" (default = YYYYMM)
    '''
    dt_strings = [os.path.basename(in_path).split('.nc')[0].split('_')[1] for in_path in in_paths] 
    dt_obj = pd.to_datetime(dt_strings, format=format_str)
    return dt_obj

# Open the selected dataset(s), dropping variables as necessary
drop_vars = [var_in for var_in in xr.open_dataset(file_in[0]).variables if var_in not in variables+[time_coord]]
if len(file_in2)>1:
    drop_vars += [var_in for var_in in xr.open_dataset(file_in2[0]).variables if var_in not in variables+[time_coord]]
drop_vars = list(set(drop_vars)) # Eliminate redundancy
print('Dropping {0} from input file.'.format(drop_vars))

# Only use this method if datasets are coming from multiple directories or file types
with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    # This is a little complicated because we will be building multiple datasets
    ds_list = [xr.open_mfdataset(in_list, 
                           combine='nested',
                           decode_cf=False, 
                           concat_dim='time',
                           chunks='auto', 
                           drop_variables=drop_vars) for in_list in [file_in, file_in2] if len(in_list)>0]
    datetimes = [extract_dates(in_list) for in_list in [file_in, file_in2] if len(in_list)>0]
    ds_list = [ds.assign_coords(time=datetimes_in) for ds, datetimes_in in zip(ds_list, datetimes)]
    ds = xr.merge(ds_list)
    del ds_list, datetimes
    
# Perform temporal subset, or not
if temporal_subset:
    ds = ds.loc[{time_coord:time_subset_bounds}]
    
# Obtain and print information about the input file
ds, timesteps, x_chunk_sizes, y_chunk_sizes, time_chunk_sizes = report_structure(ds, variable=list(ds.data_vars.keys())[0])
ds

Dropping ['crs', 'y', 'x'] from input file.




Found 516 timestep(s) in input file
Size of input dataset:  714.29 Gb
Size of 1 timesteps in dataset:  0.066 Gb
Size of 1 time chunk (1 timesteps) for full extent of dataset:  0.066 Gb
Time chunk sizes [first, last]: 1, 1
Size of 1 chunk, (2766,2304) cells, of the dataset:  0.024 Gb
X chunk sizes [first, last]: 2766, 920
Y chunk sizes [first, last]: 2304, 768
CPU times: user 21.6 s, sys: 4.55 s, total: 26.1 s
Wall time: 3min 5s


Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1034 graph layers,4644 chunks in 1034 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 34.01 GiB 24.31 MiB Shape (516, 3840, 4608) (1, 2304, 2766) Dask graph 4644 chunks in 1034 graph layers Data type float32 numpy.ndarray",4608  3840  516,

Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1034 graph layers,4644 chunks in 1034 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1034 graph layers,4644 chunks in 1034 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 34.01 GiB 24.31 MiB Shape (516, 3840, 4608) (1, 2304, 2766) Dask graph 4644 chunks in 1034 graph layers Data type float32 numpy.ndarray",4608  3840  516,

Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1034 graph layers,4644 chunks in 1034 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1034 graph layers,4644 chunks in 1034 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 34.01 GiB 24.31 MiB Shape (516, 3840, 4608) (1, 2304, 2766) Dask graph 4644 chunks in 1034 graph layers Data type float32 numpy.ndarray",4608  3840  516,

Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1034 graph layers,4644 chunks in 1034 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,136.05 GiB,72.93 MiB
Shape,"(516, 3840, 4, 4608)","(1, 2304, 3, 2766)"
Dask graph,9288 chunks in 1034 graph layers,9288 chunks in 1034 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 136.05 GiB 72.93 MiB Shape (516, 3840, 4, 4608) (1, 2304, 3, 2766) Dask graph 9288 chunks in 1034 graph layers Data type float32 numpy.ndarray",516  1  4608  4  3840,

Unnamed: 0,Array,Chunk
Bytes,136.05 GiB,72.93 MiB
Shape,"(516, 3840, 4, 4608)","(1, 2304, 3, 2766)"
Dask graph,9288 chunks in 1034 graph layers,9288 chunks in 1034 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1034 graph layers,4644 chunks in 1034 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 34.01 GiB 24.31 MiB Shape (516, 3840, 4608) (1, 2304, 2766) Dask graph 4644 chunks in 1034 graph layers Data type float32 numpy.ndarray",4608  3840  516,

Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1034 graph layers,4644 chunks in 1034 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1550 graph layers,4644 chunks in 1550 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 34.01 GiB 24.31 MiB Shape (516, 3840, 4608) (1, 2304, 2766) Dask graph 4644 chunks in 1550 graph layers Data type float32 numpy.ndarray",4608  3840  516,

Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1550 graph layers,4644 chunks in 1550 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1034 graph layers,4644 chunks in 1034 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 34.01 GiB 24.31 MiB Shape (516, 3840, 4608) (1, 2304, 2766) Dask graph 4644 chunks in 1034 graph layers Data type float32 numpy.ndarray",4608  3840  516,

Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1034 graph layers,4644 chunks in 1034 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,136.05 GiB,72.93 MiB
Shape,"(516, 3840, 4, 4608)","(1, 2304, 3, 2766)"
Dask graph,9288 chunks in 1034 graph layers,9288 chunks in 1034 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 136.05 GiB 72.93 MiB Shape (516, 3840, 4, 4608) (1, 2304, 3, 2766) Dask graph 9288 chunks in 1034 graph layers Data type float32 numpy.ndarray",516  1  4608  4  3840,

Unnamed: 0,Array,Chunk
Bytes,136.05 GiB,72.93 MiB
Shape,"(516, 3840, 4, 4608)","(1, 2304, 3, 2766)"
Dask graph,9288 chunks in 1034 graph layers,9288 chunks in 1034 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1550 graph layers,4644 chunks in 1550 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 34.01 GiB 24.31 MiB Shape (516, 3840, 4608) (1, 2304, 2766) Dask graph 4644 chunks in 1550 graph layers Data type float32 numpy.ndarray",4608  3840  516,

Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1550 graph layers,4644 chunks in 1550 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1550 graph layers,4644 chunks in 1550 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 34.01 GiB 24.31 MiB Shape (516, 3840, 4608) (1, 2304, 2766) Dask graph 4644 chunks in 1550 graph layers Data type float32 numpy.ndarray",4608  3840  516,

Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1550 graph layers,4644 chunks in 1550 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1550 graph layers,4644 chunks in 1550 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 34.01 GiB 24.31 MiB Shape (516, 3840, 4608) (1, 2304, 2766) Dask graph 4644 chunks in 1550 graph layers Data type float32 numpy.ndarray",4608  3840  516,

Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1550 graph layers,4644 chunks in 1550 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1550 graph layers,4644 chunks in 1550 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 34.01 GiB 24.31 MiB Shape (516, 3840, 4608) (1, 2304, 2766) Dask graph 4644 chunks in 1550 graph layers Data type float32 numpy.ndarray",4608  3840  516,

Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1550 graph layers,4644 chunks in 1550 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1034 graph layers,4644 chunks in 1034 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 34.01 GiB 24.31 MiB Shape (516, 3840, 4608) (1, 2304, 2766) Dask graph 4644 chunks in 1034 graph layers Data type float32 numpy.ndarray",4608  3840  516,

Unnamed: 0,Array,Chunk
Bytes,34.01 GiB,24.31 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1034 graph layers,4644 chunks in 1034 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,68.03 GiB,48.62 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1034 graph layers,4644 chunks in 1034 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 68.03 GiB 48.62 MiB Shape (516, 3840, 4608) (1, 2304, 2766) Dask graph 4644 chunks in 1034 graph layers Data type float64 numpy.ndarray",4608  3840  516,

Unnamed: 0,Array,Chunk
Bytes,68.03 GiB,48.62 MiB
Shape,"(516, 3840, 4608)","(1, 2304, 2766)"
Dask graph,4644 chunks in 1034 graph layers,4644 chunks in 1034 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


#### Obtain the spatial aggregation array

In [7]:
%%time

# Choose a method for spatial aggregation
raster_zones = True
spatial_weights = False

# Use a 2D grid of zone IDs to perform spatial aggregation.
# This is a representation of the zones on the same grid as the analysis data.
if raster_zones:
    
    # Sort out resolution and input files
    if NWM_type == 'RTOUT':
        zone_raster = r'/glade/scratch/ksampson/USGS/CONUS_Water_Budget/HUCs/HUC12s_on_250m_grid.tif'
        LSM_grid = False
    elif NWM_type == 'LDASOUT':
        zone_raster = r'/glade/scratch/ksampson/USGS/CONUS_Water_Budget/HUCs/HUC12s_on_1000m_grid.tif'
        LSM_grid = True
    print('Using raster grid of zones for spatial aggregation: {0}'.format(zone_raster))
    
    # Data value to define nodata in the zone rater (anywhere that a zone does not exist).
    zone_nodata = 0

    # Read in the raster that defines the zones
    zone_arr, zone_ndv = return_raster_array(zone_raster)

    # Flip the raster if necessary - easier than flipping each input array from the model data
    if LSM_grid:
        zone_arr = zone_arr[flip_dim(['y', 'x'], DimToFlip='y')]

    # Replace nodata values with np.nan, which requires converting to floating point.    
    zone_arr = zone_arr.astype('float')    
    zone_arr[zone_arr==zone_nodata] = np.nan

    # Obtain unique values
    zone_uniques = np.unique(zone_arr)
    zones_unique = zone_uniques[zone_uniques!=np.nan]
    print('{0} zones found in the input dataset'.format(zones_unique.shape[0]-1))
    del zone_uniques, zones_unique
    
    # Add zones to the Xarray DataSet object
    zones = xr.DataArray(zone_arr, dims=("y", "x"), name=zone_name)
    #ds[zone_name] = zones.fillna(-1).astype(int)   # workaround flox bug
    ds[zone_name] = zones.fillna(-1).astype(np.int64)   # workaround flox bug
    del zones
    
    # Obtain landmask grid
    if landmask_results and NWM_type == 'LDASOUT':
        print('  Masking zone grid to LSM LANDMASK variable')
        landmask = xr.open_dataset(geogrid)['LANDMASK'].squeeze()
        zone_masked = zone_arr.copy()
        zone_masked[landmask==0] = np.nan
        masked_zone_name = '{0}_masked'.format(zone_name)
        zones_ma = xr.DataArray(zone_masked, dims=("y", "x"), name=masked_zone_name)
        
        # Filling NaN areas (water or ocean) with -1 removes it from that HUC.
        #ds[masked_zone_name] = zones_ma.fillna(-1).astype(int)   # workaround flox bug
        ds[masked_zone_name] = zones_ma.fillna(-1).astype(np.int64)   # workaround flox bug
        
        # Save the landmask (1s and 0s)
        landmask_da = xr.DataArray(landmask, dims=("y", "x"), name='landmask')
        ds['landmask'] = landmask_da.fillna(0).astype(int)   # workaround flox bug
        del landmask, zones_ma
    
        # Obtain unique values
        zone_uniques = np.unique(zone_masked)
        zones_unique = zone_uniques[zone_uniques!=np.nan]
        print('{0} zones found in the input dataset after land-masking'.format(zones_unique.shape[0]-1))
        del zone_uniques, zones_unique, zone_masked
        
    del zone_arr
    
# Use a 1D array of pixel weights to perform spatial aggregation
### NOT YET WORKING!
elif spatial_weights:
    sw_file = r'/glade/scratch/ksampson/USGS/CONUS_Water_Budget/Spatial_Weights/CONUS_HUC12_NWM1km_spatialweights.nc'
    print('Using pre-computed NWM-style spatial weight file for spatial aggregation: {0}'.format(sw_file))
    
    # If the raster used to create spatial weights was created in GIS, then it will start with 0,0 in UL corner. 
    # To flip to south_north, select flip_raster==True
    flip_raster = True
    
    # Open the spatial weight file
    sw_ds = xr.open_dataset(sw_file)

    # Subset the spatial weight file to just one zone
    sw_ds = sw_ds.drop(['overlaps', 'polyid', 'regridweight'])
    sw_ds.load()
    
    display(sw_ds)

    # For now, flox need an integer for the zone IDs
    sw_ds['IDmask'] = sw_ds['IDmask'].astype(np.int64)
    sw_ds = sw_ds.rename({'IDmask':zone_name})

    # Obtain indexer arrays and alter the indices to 'flip' the y dimension if requested.
    indexer_i = sw_ds['i_index'].astype(int).data
    if flip_raster:
        indexer_j = LSM_grid_size_y - sw_ds['j_index'].astype(int).data
    else:
        indexer_j = sw_ds['j_index'].astype(int).data
        
    # Add the spatial weight variables to the dataset
    ds = xr.merge([ds, sw_ds])

Using raster grid of zones for spatial aggregation: /glade/scratch/ksampson/USGS/CONUS_Water_Budget/HUCs/HUC12s_on_1000m_grid.tif
    Reversing order of dimension 'y'
86723 zones found in the input dataset
  Masking zone grid to LSM LANDMASK variable
86604 zones found in the input dataset after land-masking
CPU times: user 1.31 s, sys: 423 ms, total: 1.73 s
Wall time: 2.81 s


## Iterate over time, processing the zonal statistics

### Perform 2D Groupby operation

This codeblock will execute the 2D groupby (zonal statistic) operation using the `flox` method `xarray_reduce` or `groupby_reduce`.

#### Method of operation

For some datasets there may be a memory limitation that will cause individual workers to pause once they reach 80% memory utilization. Thus, we have to carefully select the size of chunks to process. Currently, we use the existing chunk size in the input Zarr store, establishing our iteration strategy on how many time-chunks from the input we can process at once. Keep in mind that the full 2D dataset will be used at each timestep, so only the time chunk will be considered. The `time_chunk_factor` is used to multiply the time-chunk to determine the number of timesteps processed at each iteration. Keep in mind that processing times appear to scale linearly, so this may not be an important factor.

Currently, for certain variables, we calculate the sum over a third dimension, such as soil_layers_stag for the `SOIL_M` variable. 

Currently, the statistical operations provided in the `numpy_groupies` python library are supported:
* `sum`, `nansum`
* `prod`, `nanprod`
* `mean`, `nanmean`
* `var`, `nanvar`
* `std`, `nanstd`
* `min`, `nanmin`
* `max`, `nanmax`
* `first`, `nanfirst`
* `last`, `nanlast`
* `argmax`, `nanargmax`
* `argmin`, `nanargmin`

An output CSV is issued for each iteration and each statistic requested.

Other configurations are set to assist in the chunking of the data. A variable `time_chunk_factor` is used to calculate how many timestep chunks to use for each iteration. One CSV file is written out per iteration, per statistic calculated (currently `mean` and `max` are supported).

In [8]:
%%time
print('Process initiated at {0}'.format(time.ctime()))
        
# Output to file
with performance_report(filename=os.path.join(outDir, "dask-report_2D_2.html")):  
    
    # Determine how many time chunks we can process at once
    time_chunks = [timesteps]    # To process all times at once, provide nested list containing all timesteps
    print('There will be {0} iterations over time.'.format(len(time_chunks)))

    # Iterate over variables
    datetime_strings = []
    print('There will be up to {0} variables processed.'.format(len(variables)))
    for varnum, variable in enumerate(addVars+variables):
        tic1 = time.time()
        #if variable not in ds:
        #    print('Skipping variable {0}'.format(variable))
        #    continue
        print('Processing variable {0}'.format(variable))

        # Set the appropriate zone mask
        if variable in non_landmask_vars+['Precip']:
            # Use full basin zone array for spatial aggregation. No land-masking
            print('  Using full basin mask for variable {0}'.format(variable))
            zone_da = ds[zone_name]

            # Special case where we re-use a variable to produce a secondary result
            if variable == 'Precip':
                da = ds['totPRECIP']
                da.name = variable
        else:
            print('  Using land/water mask to remove water cells from analysis')
            # Use land-masked zone array for spatial aggregation
            zone_da = ds[masked_zone_name]

        # Subset the variable to a DataArray
        if variable in ds:
            da = ds[variable]

        # Special case to gather gridded area considered for each basin
        elif variable == 'total_gridded_area':
            # Make an array of ones to collect the total gridded area for each basin
            da = xr.ones_like(ds['landmask'])
            da.name = variable

        # Initialize list to store temporary partial DataArrays
        outputs = []

        # Iterate over time-chunks and process zonal statistics
        for n,time_chunk in enumerate(time_chunks):

            # Interpret times as strings - for later input to CSV files as a time index
            datetime_strings += [pd.to_datetime(time_chunk).strftime('%Y%m%d%H')]

            # Subset in time if necessary
            if 'time' in da.dims:
                data = da.loc[dict(time=slice(time_chunk[0], time_chunk[-1]))]
            else:
                data = da

            # Handle total soil moisture depth
            if NWM_type == 'LDASOUT' and variable in ['SOIL_M','deltaSOILM','avgSOILM']:
                print('\tConverting soil mositure value to total water depth (mm) in soil column.')

                # For Soil Moisture, apply weights to soil depths to get total volume (in mm) in soil column.
                soil_dict = dict(soil_weights=("soil_layers_stag", [0,1,2,3]))
                weights = xr.DataArray(soil_depths_mm, dims=("soil_layers_stag",), coords=soil_dict)

                # Multiply by depth and sum the values over depth dimension
                data = (data * weights).sum(dim='soil_layers_stag')
                data.name = variable  # reset the dataarray name

            # Apply groupby operation
            if raster_zones:
                if variable == 'total_gridded_area':
                    flox_function = 'sum'
                else:
                    flox_function = 'mean'
                print('\t[{0}]    Calculating zonal {1}.'.format(varnum, flox_function))
                output = run_flox(data, zone_da, flox_function=flox_function, n=n)
            elif spatial_weights:
                # Convert from 2D to 1D array using indexer_j and indexer_i
                flox_function = 'sum'
                print('\t[{0}]    Calculating spatially weighted value.'.format(varnum, flox_function))
                output = run_flox(data.data[indexer_j, indexer_i] * ds['weight'], 
                                  zone_da, 
                                  flox_function=flox_function, 
                                  n=n)
            if variable not in non_landmask_vars+['Precip']:
                output = output.rename({masked_zone_name:zone_name})
            outputs.append(output)
            del data
        print('\t[{0}] Spatial aggregation step completed in {0:3.2f} seconds.'.format(varnum, time.time()-tic1))

        # Merge all outputs together
        output = xr.merge(outputs)

        # Re-arrange dimensions so that time is the fastest varying dimension
        if 'time' in output.dims:
            output = output[[zone_name, time_coord, variable]]

        #if varnum == 0:
        if not 'out_ds' in locals():
            out_ds = output
        else:
            out_ds[variable] = output[variable]
        print('\t[{0}] Iteration completed in {1:3.2f} seconds.'.format(varnum, time.time()-tic1))
    out_ds

Process initiated at Mon Oct 30 10:59:11 2023
There will be 1 iterations over time.
There will be up to 14 variables processed.
Processing variable total_gridded_area
  Using land/water mask to remove water cells from analysis
	[0]    Calculating zonal sum.
	[0]    Calculated zonal sum in 1.83 seconds.
	[0] Spatial aggregation step completed in 0.00 seconds.
	[0] Iteration completed in 1.87 seconds.
Processing variable Precip
  Using full basin mask for variable Precip
	[1]    Calculating zonal mean.


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


	[0]    Calculated zonal mean in 120.97 seconds.
	[1] Spatial aggregation step completed in 1.00 seconds.
	[1] Iteration completed in 121.24 seconds.
Processing variable landmask
  Using full basin mask for variable landmask
	[2]    Calculating zonal mean.
	[0]    Calculated zonal mean in 1.94 seconds.
	[2] Spatial aggregation step completed in 2.00 seconds.
	[2] Iteration completed in 1.95 seconds.
Processing variable deltaACCET
  Using land/water mask to remove water cells from analysis
	[3]    Calculating zonal mean.


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


	[0]    Calculated zonal mean in 97.82 seconds.
	[3] Spatial aggregation step completed in 3.00 seconds.
	[3] Iteration completed in 97.84 seconds.
Processing variable deltaACSNOW
  Using land/water mask to remove water cells from analysis
	[4]    Calculating zonal mean.


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


	[0]    Calculated zonal mean in 88.46 seconds.
	[4] Spatial aggregation step completed in 4.00 seconds.
	[4] Iteration completed in 88.50 seconds.
Processing variable deltaSNEQV
  Using land/water mask to remove water cells from analysis
	[5]    Calculating zonal mean.


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


	[0]    Calculated zonal mean in 85.76 seconds.
	[5] Spatial aggregation step completed in 5.00 seconds.
	[5] Iteration completed in 85.77 seconds.
Processing variable deltaSOILM
  Using land/water mask to remove water cells from analysis
	Converting soil mositure value to total water depth (mm) in soil column.
	[6]    Calculating zonal mean.


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


	[0]    Calculated zonal mean in 302.51 seconds.
	[6] Spatial aggregation step completed in 6.00 seconds.
	[6] Iteration completed in 302.59 seconds.
Processing variable deltaUGDRNOFF
  Using land/water mask to remove water cells from analysis
	[7]    Calculating zonal mean.


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


	[0]    Calculated zonal mean in 93.35 seconds.
	[7] Spatial aggregation step completed in 7.00 seconds.
	[7] Iteration completed in 93.36 seconds.
Processing variable deltaSOILM_depthmean
  Using land/water mask to remove water cells from analysis
	[8]    Calculating zonal mean.


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


	[0]    Calculated zonal mean in 81.28 seconds.
	[8] Spatial aggregation step completed in 8.00 seconds.
	[8] Iteration completed in 81.30 seconds.
Processing variable avgSNEQV
  Using land/water mask to remove water cells from analysis
	[9]    Calculating zonal mean.


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


	[0]    Calculated zonal mean in 80.72 seconds.
	[9] Spatial aggregation step completed in 9.00 seconds.
	[9] Iteration completed in 80.74 seconds.
Processing variable avgSOILM
  Using land/water mask to remove water cells from analysis
	Converting soil mositure value to total water depth (mm) in soil column.
	[10]    Calculating zonal mean.


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


	[0]    Calculated zonal mean in 283.20 seconds.
	[10] Spatial aggregation step completed in 10.00 seconds.
	[10] Iteration completed in 283.29 seconds.
Processing variable avgSOILM_depthmean
  Using land/water mask to remove water cells from analysis
	[11]    Calculating zonal mean.


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


	[0]    Calculated zonal mean in 80.87 seconds.
	[11] Spatial aggregation step completed in 11.00 seconds.
	[11] Iteration completed in 80.91 seconds.
Processing variable avgSOILM_wltadj_depthmean
  Using land/water mask to remove water cells from analysis
	[12]    Calculating zonal mean.


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


	[0]    Calculated zonal mean in 90.49 seconds.
	[12] Spatial aggregation step completed in 12.00 seconds.
	[12] Iteration completed in 90.51 seconds.
Processing variable avgSOILSAT
  Using land/water mask to remove water cells from analysis
	[13]    Calculating zonal mean.


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


	[0]    Calculated zonal mean in 92.01 seconds.
	[13] Spatial aggregation step completed in 13.00 seconds.
	[13] Iteration completed in 92.03 seconds.
Processing variable avgSOILSAT_wltadj_top1
  Using land/water mask to remove water cells from analysis
	[14]    Calculating zonal mean.


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


	[0]    Calculated zonal mean in 84.43 seconds.
	[14] Spatial aggregation step completed in 14.00 seconds.
	[14] Iteration completed in 84.45 seconds.
Processing variable totPRECIP
  Using land/water mask to remove water cells from analysis
	[15]    Calculating zonal mean.


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


	[0]    Calculated zonal mean in 88.05 seconds.
	[15] Spatial aggregation step completed in 15.00 seconds.
	[15] Iteration completed in 88.07 seconds.
Processing variable avgT2D
  Using land/water mask to remove water cells from analysis
	[16]    Calculating zonal mean.


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


	[0]    Calculated zonal mean in 116.16 seconds.
	[16] Spatial aggregation step completed in 16.00 seconds.
	[16] Iteration completed in 116.18 seconds.
CPU times: user 8min 47s, sys: 42.9 s, total: 9min 30s
Wall time: 29min 55s


### Remove unecessary attributes

In [9]:
# Eliminate any unecessary variable attributes (such as spatial metadata)
for variable in out_ds.data_vars:
    if 'grid_mapping' in out_ds[variable].attrs:
        del out_ds[variable].attrs['grid_mapping']
    if 'esri_pe_string' in out_ds[variable].attrs:
        del out_ds[variable].attrs['esri_pe_string']
    if 'proj4' in out_ds[variable].attrs:
        del out_ds[variable].attrs['proj4']
    if variable == 'landmask':
        out_ds[variable].attrs = {'description':'Fraction of gridded land area in each HUC12'}
    if variable == 'total_gridded_area':
        out_ds[variable].attrs = {'description':'Number of 1km grid cells for HUC12. Equivalend to square kilometers. Based on grid association of each HUC12'}
        
# Now eliminate unnecessary global attributes
if 'grid_mapping' in out_ds.attrs:
    del out_ds.attrs['grid_mapping']
if 'units' in out_ds.attrs:
    del out_ds.attrs['units']  
if 'esri_pe_string' in out_ds.attrs:
    del out_ds.attrs['esri_pe_string'] 
if 'long_name' in out_ds.attrs:
    del out_ds.attrs['long_name'] 
if '_FillValue' in out_ds.attrs:
    del out_ds.attrs['_FillValue'] 
if 'missing_value' in out_ds.attrs:
    del out_ds.attrs['missing_value'] 
out_ds

In [10]:
out_ds = out_ds.where(out_ds[zone_name]!=-1, drop=True)
out_ds

### Output to disk

In [11]:
%%time

# Read into memory before writing to disk?
out_ds.compute()

# Write output file (CSV)
if write_CSV:
    tic1 = time.time()
    out_file = os.path.join(outDir, output_pattern+'_2.csv')
    print('  Writing output to {0}'.format(out_file))
    if os.path.exists(out_file):
        tic1 = time.time()
        df_in = pd.read_csv(out_file)
        df_out = pd.concat([df_in, out_ds.to_dataframe()])
        df_out.to_csv(out_file)
        print('\t      Output file written in {0:3.2f} seconds.'.format(time.time()-tic1))
    else:
        write_csv(out_ds, out_file, columns=output[zone_name], index=[datetime_strings])
    print('\tExport to CSV completed in {0:3.2f} seconds.'.format(time.time()-tic1))
    
# Write output file (netCDF)
if write_NC:
    tic1 = time.time()
    out_file = os.path.join(outDir, output_pattern+'_2.nc')
    if os.path.exists(out_file):
        in_ds = xr.open_dataset(out_file).load()
        out_ds2 = xr.merge([in_ds, out_ds.transpose()])
        in_ds.close()
        del in_ds
        print('  Writing output to {0}'.format(out_file))
        out_ds2.to_netcdf(out_file, mode='w', format="NETCDF4", compute=True)
        del out_ds2
    else:
        print('  Writing output to {0}'.format(out_file))
        out_ds.transpose().to_netcdf(out_file, mode='w', format="NETCDF4", compute=True)
    print('\tExport to netCDF completed in {0:3.2f} seconds.'.format(time.time()-tic1))

  Writing output to /glade/scratch/ksampson/USGS/CONUS_Water_Budget/Water_Budget/CONUS_HUC12_WB_2D_19791001_20220930_2.csv
	      Output file written in 1817.52 seconds.
	Export to CSV completed in 1817.52 seconds.
  Writing output to /glade/scratch/ksampson/USGS/CONUS_Water_Budget/Water_Budget/CONUS_HUC12_WB_2D_19791001_20220930_2.nc
	Export to netCDF completed in 13.53 seconds.
CPU times: user 28min 38s, sys: 32.7 s, total: 29min 11s
Wall time: 30min 31s


## Spin Down the Cluster and Close datasets
##### After we are done, we can spin down our cluster

In [12]:
# Close the Dask cluster
client.close()
cluster.close()

2023-10-30 12:00:13,358 - distributed.deploy.adaptive_core - INFO - Adaptive stop


In [13]:
# Close dataset
ds.close()
print('Process completed in {0: 3.2f} seconds.'.format(time.time()-tic))

Process completed in  3852.22 seconds.


### The following is an attempt to compute a spatially weighted value

In [None]:
# %%time

# import dask.array as da

# varnum = 0
# variable = 'deltaACCET'
# unit = 'mm'

# print('Process initiated at {0}'.format(time.ctime()))

# print('Processing variable {0}'.format(variable))
# da_in = ds[variable]

# # Setup vindex on dask array to re-shape the data
# #if spatial_weights:
# da_out = da_in.isel({'time': [0,1]})
# da_out = da_in.data.vindex[:, indexer_j, indexer_i].compute()
# da_out

#### Pre-processing step, if the input is raw NWM and has not already been pre-processed (subset, de-accumulate, time aggregation, etc.)

In [None]:
# %%time
# print('Process initiated at {0}'.format(time.ctime()))
      

# Resample the time variable to daily if t_resample is True - only relevant if t_resample == True
# Also used in the output variable name, so choose something that indicates the temporal resolution
# #resample_time_period = "1D"
# resample_time_period = '1M'

# # Remove accumulated signal from accumulated variables?
# de_accumulate = False

# # List the accumulated variables in this dataset. Only necessary when de_accumulate==True
# accum_vars = ['ACCET', 'UGDRNOFF']  
# accum_var_renamed = {'ACCET':'ET'}

# # Determine how many time chunks we can process at once
# time_chunk_factor = 1
# time_chunks = split_given_size(timesteps, int(time_chunk_sizes[0]*time_chunk_factor))
# time_chunks = [time_chunks[0]]

# # To process all times at once
# #time_chunks = [timesteps]    

# # Iterate over time-chunks and process zonal statistics
# print('There will be {0} iterations.'.format(len(time_chunks)))
# for n,time_chunk in enumerate(time_chunks):
#     tic1 = time.time()
    
#     # Interpret times as strings
#     datetime_strings = pd.to_datetime(time_chunk).strftime('%Y%m%d%H')

#     # Subset in time if necessary
#     #data = ds[variable].loc[dict(time=slice(time_chunk[0], time_chunk[-1]))]
#     data = ds.loc[dict(time=slice(time_chunk[0], time_chunk[-1]))]

#     # For accumulated variables, we will attempt to difference over time then aggregagte
#     if variable in accum_vars:

#         if de_accumulate:
#             stat = 'sum'
#             print('De-accumulating the {0} variable.'.format(variable))
#             rename_var = accum_var_renamed.get(variable, variable)
#             if rename_var != variable:
#                 print('Renaming variable from {0} to {1}.'.format(variable, rename_var))
#                 data = data.rename({variable:rename_var})
#                 variable = rename_var
#             max_factor = 1.2            
#             de_accumulated = data[variable].diff(dim='time').rename(rename_var)

#             # Build a mask, wherever the difference is greater than the max difference for each cell time-series
#             print('Calculating max difference for each cell.')
#             mask_val = -de_accumulated.max(dim='time') * max_factor
#             mask_val.load()
#             restart_mask = de_accumulated >= mask_val

#             # The block below helps diagnose if the criteria is sufficient to determine the model restart times
#             #restart_locations = de_accumulated.where(de_accumulated < mask_val, drop=True)
#             #print('Found {0} timesteps where a restart may have occured based on the criteria'.format(restart_locations.shape[0]))

#             # Alter values after each restart location to the non differenced values from the original accumulated array
#             if t_resample:
#                 da = (de_accumulated.where(
#                     de_accumulated >= mask_val, 
#                     other = data[variable].isel(time=slice(1,None,None)),
#                     drop = False)
#                     .resample(time=resample_time_period)
#                     .sum(dim='time'))  
#             else:
#                 da = (de_accumulated.where(
#                     de_accumulated >= mask_val, 
#                     other = data[variable].isel(time=slice(1,None,None)),
#                     drop = False)
#         else:
#             # For accumulated variables, we want the LAST value of each time aggregation
#             stat = 'last'
#             print('Writing LAST value of each day to output.')
#             if t_resample:
#                 da = (
#                     data[variable]
#                     .resample(time=resample_time_period, closed='right')
#                     .last(skipna=False))  # significant user decision
#             else:
#                 da = data[variable]
                
#     # Handle percent saturation if requested
#     elif pct_sat and NWM_type == 'LDASOUT' and variable in ['SOIL_M',]:
#         print('Converting soil mositure value to soil saturation percent.')
#         data = soil_water_pct_sat(data)
#         variable = 'soil_water_pct_sat'

#         # For Soil Moisture, apply weights to soil depths to get total volume (in mm) in soil column.
#         stat = 'mean'
#         print('Converting to mm and summing over soil_layers_stag')
#         soil_dict = dict(soil_weights=("soil_layers_stag", [0,1,2,3]))
#         weights = xr.DataArray(soil_depths_mm, dims=("soil_layers_stag",), coords=soil_dict)
#         data[variable] = data[variable] * weights

#         # Sum the variables over depth dimension
#         if t_resample:
#             da = data[variable].sum(dim='soil_layers_stag').resample(time=resample_time_period).mean(dim='time') 
#         else:
#             da = data[variable].sum(dim='soil_layers_stag')

#     # For all other variables, calculate the mean of the variable over the aggregated timestep
#     else:
#         if t_resample:
#             stat = 'mean'
#             print('Writing MEAN across each {0} to output'.format(resample_time_period))
#             da = (
#                 data[variable]
#                 .resample(time=resample_time_period)
#                 .mean(dim='time'))
#         else:
#             da = data[variable]
            
#     # Apply groupby operation  
#     flox_function = 'mean'
#     print('\t[{0}]    Calculating zonal {1}.'.format(n, flox_function))
#     output = run_flox(data, data[zone_name], flox_function=flox_function, n=n)

#     # convert from rate (m^3/s) to depth (m) over a day (86400s) or hour (3600s)
#     if convert_to_mm and variable in ['RAINRATE'] and flox_function=='mean':
#         print('Converting from a rate (mm/s) to sum across each {0} to output'.format(resample_time_period))
#         if t_resample:
#             time_duration = pd.Timedelta(resample_time_period)
#             seconds = time_duration.total_seconds()
#         else:
#             seconds = input_timestep_seconds
#         print('Multiplying rate from mm/s to mm over timestep ({0} seconds.)'.format(seconds))
#         output = (output*seconds)
    
#     # Write output file (CSV)
#     if write_CSV:
#         out_file = os.path.join(outDir, '{0}_{1}_{2}_{3}.csv'.format(region, variable, flox_function, n))
#         write_csv(output.data, out_file, columns=output[zone_name], index=[datetime_strings], drops=[-1])
#         del output, out_file, flox_function            

#     # Write output file (netCDF)
#     if write_NC:
#         # Add geospatial data to output from the GEOGRID LDASOUT Spatial Metadata file
#         # output = output.to_dataset(name=variable)
#         if add_SM:
#             output = add_SM_to_ds(output, variable, ds_geo=geo_input, grid_type=NWM_type)
        
#         out_file = os.path.join(outDir, nc_output_pattern.format(filetype=NWM_type, variable=variable, region=region, unit=units, time=resample_time_period, stat=flox_function))
#         print('  Writing output to {0}'.format(out_file))
#         output.to_netcdf(out_file, mode='w', format="NETCDF4", compute=True)
        
#     del data, datetime_strings
#     print('\t[{0}] Iteration completed in {1:3.2f} seconds.'.format(n, time.time()-tic1))