# Load Post-Event Evaluation Data
This notebook walks through the steps to load NWM operational forecasts and verifying observations into a cache of parquet files to enable interactive, visual evaluation of the forecasts. The steps of the loading process include the following:

1. Start a cluster
2. Read the root path and geometry data
3. Define the event (dates and region)
4. Select datasets to load
5. Load the data
6. Monitor progress on the cluster dashboard  
(Repeat steps 4-6 for each forecast configuration)

### First load necessary packages 

In [18]:
# import packages
import dashboard_utils as du
import teehr.loading.nwm_point_data as tlp
import teehr.loading.usgs as tlu
import teehr.loading.nwm_grid_data as tlg
from pathlib import Path
import pandas as pd
import geopandas as gpd
import panel as pn
import geoviews as gv
import holoviews as hv
import cartopy.crs as ccrs
import datetime as dt
import time
import json
pn.extension()
hv.extension('bokeh', logo=False)
gv.extension('bokeh', logo=False)

import importlib

## 1. Start a cluster
Before loading data, start a cluster of nodes for distributed computing to make the loading faster. The method (GatewayCluster or LocalCluster) depends on whether you are running this notebook locally or in the TEEHR Hub, which is detected automatically below (based on the JupyterHub global username 'jovyan'). The cluster will remain active until you shut it down, so you only need to run this step once. (i.e., no need to rerun to load data for a new event, region or forecast configuration). Note that the cluster does not shut down automatically after a period of inactivity, so it is important to manually shut down the cluster when you are finished loading data (last cell in this notebook). 

**To monitor data loading progress**:
- Click on the Dashboard URL that appears after running the cell below
- Got to the dashboard after launching the data loading step further below

**If running in TEEHR Hub**, select the number of workers in the GatewayCluster panel:
- Select "Manual Scaling"
- Enter the # of desired workers (initial testing indicates 16 is roughly optimal)
- Click "Scale" and wait for the # of workers in the left side of the GatewayCluster panel to update

*On TEEHR Hub it may take a several minutes for the workers to spin up if the server is inactive.

In [2]:
if 'client' not in locals():
    if 'jovyan' in list(Path().absolute().parts):
        run_location = 'teehrhub'       
        from dask_gateway import Gateway
        gateway = Gateway("https://teehr-hub.rtiamanzi.org/services/dask-gateway", auth="jupyterhub")
        cluster = gateway.new_cluster()
        client = cluster.get_client()
    else:
        from dask.distributed import Client
        client = Client()
        run_location = 'local'

display(client)
if run_location == 'teehrhub':
    display(cluster)

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 12,Total memory: 15.64 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:65320,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 12
Started: Just now,Total memory: 15.64 GiB

0,1
Comm: tcp://127.0.0.1:65352,Total threads: 3
Dashboard: http://127.0.0.1:65353/status,Memory: 3.91 GiB
Nanny: tcp://127.0.0.1:65323,
Local directory: C:\Users\KVANWE~1\AppData\Local\Temp\dask-scratch-space\worker-rilvqmu5,Local directory: C:\Users\KVANWE~1\AppData\Local\Temp\dask-scratch-space\worker-rilvqmu5

0,1
Comm: tcp://127.0.0.1:65345,Total threads: 3
Dashboard: http://127.0.0.1:65348/status,Memory: 3.91 GiB
Nanny: tcp://127.0.0.1:65324,
Local directory: C:\Users\KVANWE~1\AppData\Local\Temp\dask-scratch-space\worker-ai3vn56v,Local directory: C:\Users\KVANWE~1\AppData\Local\Temp\dask-scratch-space\worker-ai3vn56v

0,1
Comm: tcp://127.0.0.1:65335,Total threads: 3
Dashboard: http://127.0.0.1:65336/status,Memory: 3.91 GiB
Nanny: tcp://127.0.0.1:65325,
Local directory: C:\Users\KVANWE~1\AppData\Local\Temp\dask-scratch-space\worker-_o30rrdy,Local directory: C:\Users\KVANWE~1\AppData\Local\Temp\dask-scratch-space\worker-_o30rrdy

0,1
Comm: tcp://127.0.0.1:65344,Total threads: 3
Dashboard: http://127.0.0.1:65346/status,Memory: 3.91 GiB
Nanny: tcp://127.0.0.1:65326,
Local directory: C:\Users\KVANWE~1\AppData\Local\Temp\dask-scratch-space\worker-yjczdhfx,Local directory: C:\Users\KVANWE~1\AppData\Local\Temp\dask-scratch-space\worker-yjczdhfx


## 2. Read the root path and geometry data
For running on the TEEHR Hub, the config file ```teehrhub_config.json``` and is included in the repo. For running locally, only a sample config file (```local_sample.json```) is included in the repo for security purposes (so local paths are not exposed). As described in the README, to work with the notebooks locally, copy ```local_sample.json``` to ```local_config.json``` and edit the path. ```local_config.json``` is included in the .gitignore file to prevent it from being pushed to the repo.

Geometry-related data read below include:
- ID crosswalks
- points/polygons geometry needed to align NWM forecasts with observations
- polygons to display in maps to facilitate region selection
- grid weights for HUC10 mean areal calculations.

Note that HUC10s are currently the default area unit for precipitation processing in this notebook, however grid weights can be externally calculated in TEEHR for any polygon layer.

In [3]:
if 'jovyan' in list(Path().absolute().parts):
    config_file = 'teehrhub_config.json'
else:
    config_file = 'local_config.json'

# Read the root path in the config file, set geometry subdir name under the root dir
root_dir = du.read_root_dir(Path(Path().absolute().parents[0], 'config', config_file))
geo_dir = Path(root_dir, 'geo')
json_dir = Path(root_dir, 'zarr')
event_defs_path = Path(root_dir, 'events', 'event_definitions.json')

# read crosswalks
usgs_nwm_crosswalk=pd.read_parquet(Path(geo_dir, 'usgs_nwm22_crosswalk.conus.parquet'))
usgs_huc12_crosswalk=pd.read_parquet(Path(geo_dir, 'usgs_huc12_crosswalk.conus.parquet'))
nwm_huc12_crosswalk=pd.read_parquet(Path(geo_dir, 'nwm22_huc12_crosswalk.conus.parquet'))

# read geometry
huc2_gdf = gpd.read_parquet(Path(geo_dir, 'huc2_geometry.conus.parquet'))
states_gdf = gpd.read_parquet(Path(geo_dir, 'states_geometry.conus.parquet')) 
usgs_gdf = gpd.read_parquet(Path(geo_dir, 'usgs_point_geometry.conus.parquet'))
huc10_gdf = gpd.read_parquet(Path(geo_dir, 'huc10_geometry.conus.parquet'))
nwm_gdf = gpd.read_parquet(Path(geo_dir, 'nwm22_centroid_geometry.conus.parquet'))

# read huc10 grid weights
huc10_grid_weights = pd.read_parquet(Path(geo_dir, 'huc10_grid_weights.conus.parquet'))

## 3. Define the event

To load and organize the data, the event name, dates, and regional extents are needed.  The event name is used to organize parquet files by event (for faster querying) and for reference in subsequent notebooks. The dates selected are the start and end dates **of the event** (use the same date for a single-day event). The forecast reference times will be identified that include any timesteps overlapping with the event dates.  Those forecasts, and observations corresponding to the value times in those forecasts, will be loaded.  Selecting a subregion (for conus) will also save some time and disk space rather than loading the entire domain. This is particularly true if you are loading forcing data. For simplicity (for now), the widgets below allow you to subset by one or more HUC2 subregions and further subset by lat/lon bounds if desired.

Event name and specifications are stored in ```ROOT_DIR/post-event/events/event_definitions.json```

### First select whether this is a new or previously defined event, then show/define the event characteristics
If you would like to download data again for a previously defined event, select that event name. Otherwise leave the selection as the default "define new event" to launch the date/region selection widgets. To change the dates or region of a previously defined event, treat it as a new event and use the same name (it will overwrite the values in event_definitions.json)

(TO DO - add other domains)

In [4]:
existing_events = du.read_event_definitions(event_defs_path)
select_event_name = pn.widgets.Select(name='Select new or previously defined event:', options=['define new event'] + list(existing_events.keys()))
pn.Row(select_event_name, pn.Column(pn.Spacer(height=30)))

In [5]:
define_event_panel, widgets, selection = du.select_event_widgets(huc2_gdf, states_gdf, existing_events, select_event_name)
define_event_panel

### Confirm selected features and polygons
This step extracts the NWM features within the selected region and forecast reference times overlapping the event date range. A map will appear below with a red boundary around the selected area - confirm that the region and printed forecast reference times match the intended selection. 

In [8]:
print(f"\033[1mSubregion:\033[0m")
huc2_list = du.get_selected_huc2_list(huc2_gdf, selection.index)
if huc2_list == [] and select_event_name != 'define new event':
    huc2_list = du.get_existing_event(existing_events, select_event_name)['huc2_list']
event_specs = dict(
    name = widgets['event_name_input'].value,
    start_date = widgets['start_picker'].value,
    end_date = widgets['end_picker'].value,
    huc2_list = huc2_list,
    lat_limits = widgets['lat_slider'].value,
    lon_limits = widgets['lon_slider'].value,
)
huc2_selected = event_specs['huc2_list']
latlon_box = du.get_lat_lon_box_from_limits(event_specs['lat_limits'], event_specs['lon_limits'])

# get the features and hucs within the selected region
usgs_ids = du.get_usgs_id_list_as_str(huc2_selected, latlon_box, usgs_gdf, usgs_huc12_crosswalk)
huc10_selected_gdf = du.get_hucs_selected(huc2_selected, latlon_box, huc10_gdf, huc_level = 10)
nwm_selected_gdf = du.get_nwm_subset_by_huc10s(huc10_selected_gdf['id'].to_list(), nwm_gdf, nwm_huc12_crosswalk, usgs_ids, usgs_nwm_crosswalk, 'all reaches')
print(f"{len(usgs_ids)} USGS gages selected\nLoading map...")
outer_bound = du.get_outer_bound(points = nwm_selected_gdf, polys = huc10_selected_gdf)
huc2s = gv.Polygons(huc2_gdf, vdims=['huc2'],crs=ccrs.GOOGLE_MERCATOR)
states = gv.Polygons(states_gdf, vdims=['STUSPS'], crs=ccrs.GOOGLE_MERCATOR)   

button = pn.widgets.Button(name='Update/Store selected event', button_type='primary')
def update_event_defs_file(event):
    updated_event_defs = du.update_event_definitions(existing_events, event_specs.copy())
    du.write_event_definitions(event_defs_path, updated_event_defs)
button.on_click(update_event_defs_file)

pn.Column(
    pn.Row(
        pn.Column(pn.Spacer(height=25), button),
        states.opts(color_index=None, fill_color='lightgray', nonselection_alpha=1, line_color='white', tools=[''], 
                    title=f'Selecting reaches within the red boundary') \
        * huc2s.opts(color_index=None, line_color='darkgray', fill_color='none', width=700, height=450) \
        * gv.Polygons(outer_bound, crs=ccrs.GOOGLE_MERCATOR).opts(color_index=None, fill_color='none', line_color = 'red', line_width=2),
    ),
)

[1mSubregion:[0m
1897 HUC10 polygons selected
274960 features selected
612 USGS gages selected
Loading map...


### Define datasets to load
Next define the datasets you want to load for the above defined event and the set of reaches (gaged only or all NWM features).  Note that loading functions search for and load all data available on a given date (if the hour is specified, it is ignored).

In [21]:
define_data_panel, data_widgets = du.select_data_widgets()
date_strings = du.list_nwm_dates_for_event_dates(widgets['start_picker'].value, widgets['end_picker'].value)
for key in date_strings.keys():
    print(date_strings[key])
pn.Column(pn.Spacer(height=10), define_data_panel, pn.Spacer(height=10))

Short range forecasts references times that overlap with event dates (through current): 2023-08-18 00:00:00 through 2023-08-20 09:00:00
Short range forecasts valid times that overlap with event dates (through current): 2023-08-18 00:00:00 through 2023-08-20 11:00:00
Medium range forecasts that overlap with event dates (through current): 2023-08-09 00:00:00 through 2023-08-20 00:00:00
Medium range forecasts valid times that overlap with event dates (through current): 2023-08-09 00:00:00 through 2023-08-20 11:00:00


## Verify dates

In [111]:
reach_set = data_widgets['select_reach_set'].value
if reach_set == 'gaged reaches':
    nwm_selected_gdf = du.get_nwm_subset_by_huc10s(huc10_selected_gdf['id'].to_list(), nwm_gdf, nwm_huc12_crosswalk, usgs_ids, usgs_nwm_crosswalk, reach_set)
nwm_ids = du.get_nwm_id_list_as_int(nwm_selected_gdf, nwm_huc12_crosswalk)

forecast_configuration = data_widgets['select_forecast_config'].value[0]
ref_start = data_widgets['select_ref_start'].value
ref_end = data_widgets['select_ref_end'].value
adj_ref_end = du.validate_dates(ref_start, ref_end, forecast_configuration, "reference")
ref_n_days = (adj_ref_end - ref_start).days + 1
val_start = data_widgets['select_value_start'].value
val_end = data_widgets['select_value_end'].value
adj_val_end = du.validate_dates(val_start, val_end, "usgs", "valid")

612 features selected
reference start date: 2023-08-18, end date: 2023-08-20
valid start date: 2023-08-18, end date: 2023-08-19


# Load streamflow data
Load the streamflow data for the forecast configuration, data sources (forecast and/or observed) and reach set defined above. If selecting 'all reaches', USGS data will be loaded as observations for gaged reaches and NWM extended analysis/assim data will be loaded for ungaged reaches to serve as a proxy for 'observed'. 

In [None]:
variable_name = 'streamflow'
output_type = 'channel_rt'
if forecast_configuration == 'medium_range_mem1':
    output_type = 'channel_rt_1'

ts_dir = Path(root_dir, 'events', event_specs['name'], 'timeseries')

if forecast_configuration != 'none':
    t_start = time.time()
    ts_dir_config = Path(ts_dir, forecast_configuration)
    json_dir_config = Path(json_dir, forecast_configuration)
    print(f"Loading {forecast_configuration} {variable_name} from {ref_start} to {ref_end}")
    tlp.nwm_to_parquet(
        forecast_configuration,
        output_type,
        variable_name,
        ref_start,
        ref_n_days,
        nwm_ids,
        json_dir_config,
        ts_dir_config
    )
    print(f"...{forecast_configuration} {variable_name} loading complete in {(time.time() - t_start)/60} minutes\n")       
    

In [92]:
# get corresponding observed data if requested
if 'USGS*' in data_widgets['select_observed_source'].value:
    t_start = time.time()
    ts_dir_config = Path(ts_dir, 'usgs')
    print(f"Loading USGS {variable_name} from {val_start} to {val_end}") 
    tlu.usgs_to_parquet(
        usgs_ids,
        dt.datetime.combine(val_start, dt.time(hour=0)),
        dt.datetime.combine(val_end, dt.time(hour=23)),
        ts_dir_config,
        chunk_by = 'day'
    )
    print(f"...USGS loading complete in {(time.time() - t_start)/60} minutes\n") 

Loading USGS streamflow from 2023-08-18 to 2023-08-20
...USGS loading complete in 0.17939512729644774 minutes



In [110]:
ana_list = ['analysis_assim_extend', 'analysis_assim', 'analysis_assim_extend_no_da', 'analysis_assim_no_da']
for ana in ana_list:
    adj_val_end = du.adj_valtime_end(val_end, ana)
    val_n_days = (adj_val_end - val_start).days + 1
    if ana in data_widgets['select_observed_source'].value:
        t_start = time.time()
        ts_dir_config = Path(ts_dir, ana)
        json_dir_config = Path(json_dir, ana)
        print(f"Loading {ana} {variable_name} from {val_start} to {adj_val_end}") 
        if 'extend' in ana:
            tm_range = range(0,28)
        else:
            tm_range = range(0,2)
        tlp.nwm_to_parquet(
            ana,
            output_type,
            variable_name,
            val_start,
            val_n_days,        
            nwm_ids,
            json_dir_config,
            ts_dir_config,
            tm_range
        )
        print(f"...{ana} {variable_name} loading complete in {(time.time() - t_start)/60} minutes\n")

Loading analysis_assim streamflow from 2023-08-18 to 2023-08-19
...analysis_assim streamflow loading complete in 0.6941436688105266 minutes



# Load mean areal precipitation data

Now (if desired) calculate and load mean areal precipitation data for the forecast configuration and data sources (forecast and/or observed). NWM extended analysis/assim forcing data (StageIV) will be loaded as the best proxy for 'observed'.  Note that HUC10s are currently the default area unit for precipitation processing in this notebook, however grid weights can be externally calculated in TEEHR for any polygon layer.me period

In [None]:
forcing_forecast_configuration = 'forcing_' + forecast_configuration
if forcing_forecast_configuration == 'forcing_medium_range_mem1':
    forcing_forecast_configuration = 'forcing_medium_range'
variable_name = 'RAINRATE'
output_type = 'forcing'
json_dir_config = Path(json_dir, forcing_forecast_configuration)
ts_dir_config = Path(ts_dir, forcing_forecast_configuration)

# write subset of weights to temporary file (necessary to avoid memory issues when passing in memory for distributed computing)
grid_weights_subset = huc10_grid_weights[huc10_grid_weights['zone'].isin(huc10_selected_gdf['id'])]
grid_weights_subset.to_parquet(Path(geo_dir, 'temp_grid_weights_subset.parquet'))

if forecast_configuration != 'none':
    t_start = time.time()
    json_dir_config = Path(json_dir, forcing_forecast_configuration)
    ts_dir_config = Path(ts_dir, forcing_forecast_configuration)
    print(f"Loading {forcing_forecast_configuration} {variable_name} from {ref_start} to {ref_end}")
    tlg.nwm_grids_to_parquet(
        forcing_forecast_configuration,
        output_type,
        variable_name,
        ref_start,
        ref_n_days,
        Path(geo_dir, 'temp_grid_weights_subset.parquet'),
        json_dir_config,
        ts_dir_config
    )
    print(f"...{forcing_forecast_configuration} {variable_name} loading complete in {(time.time() - t_start)/60} minutes\n")

ana_list = ['analysis_assim_extend', 'analysis_assim']
for ana in list(set(ana_list) & set(data_widgets['select_observed_source'].value)):
    adj_val_end = du.adj_valtime_end(val_end, ana)
    val_n_days = (adj_val_end - val_start).days + 1
    if ana in data_widgets['select_observed_source'].value:
        t_start = time.time()
        forcing_ana_config = 'forcing_' + ana
        ts_dir_config = Path(ts_dir, forcing_ana_config)
        json_dir_config = Path(json_dir, forcing_ana_config)
        print(f"Loading {forcing_ana_config} {variable_name} from {val_start} to {val_end}") 
        if 'extend' in forcing_ana_config:
            tm_range = range(0,28)
        else:
            tm_range = range(0,2)          
        tlg.nwm_grids_to_parquet(
            forcing_ana_config,
            output_type,
            variable_name,
            val_start,
            val_n_days, 
            Path(geo_dir, 'temp_grid_weights_subset.parquet'),
            json_dir_config,
            ts_dir_config,
            tm_range
        )
        print(f"...{forcing_ana_config} {variable_name} loading complete in {(time.time() - t_start)/60} minutes\n")

Loading forcing_short_range RAINRATE from 2023-08-18 to 2023-08-20
...forcing_short_range RAINRATE loading complete in 0.0 minutes

Loading forcing_analysis_assim RAINRATE from 2023-08-18 to 2023-08-19


In [None]:
gateway.stop_cluster(cluster_name=cluster.name)

In [None]:
gateway.list_clusters()

In [None]:
gateway.stop_cluster(cluster_name='teehr-hub.d1b3846d52244b599e0498129d86c245')