# Visualization of Streamflow Conditions at Streamgages

This notebook provides a demonstration of the use of [hyswap](https://doi-usgs.github.io/hyswap/) python package for calculating streamflow percentiles and then visualizing streamflow conditions at multiple streamflow gages. 

This example notebook relies on use of the [dataretrieval](https://github.com/DOI-USGS/dataRetrieval) package for downloading streamflow information from USGS Water Data for the Nation as well as the [geopandas](https://geopandas.org/) package for mapping functionality. All of the other packages used in this notebook are either part of the standard python library or are dependencies of hyswap.


In [None]:
# Run commented lines below to install geopandas and mapping dependencies from within the notebook
#import sys
#!{sys.executable} -m pip install geopandas folium mapclassify

In [None]:
from dataretrieval import waterdata
from hyswap import utils, percentiles
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo

from tqdm import tqdm # used for progress bar indicators
import warnings
warnings.filterwarnings('ignore') # ignore warnings from dataretrieval

**NOTE:** The `tqdm` package is used in for-loops in this notebook to show a data download progress bar, which may be informative to the user. The specification below (`disable_tdqm`) determines whether this progress bar is displayed when the notebook renders. It is set to `True` when rendering the notebook in the `hyswap` GitHub documentation site. To see the progress bars in this notebook, set `disable_tqdm=False`.

In [None]:
disable_tqdm=True

## Define Helper Functions
The `hyswap` package provides functionality for calculating non-interpretive streamflow statistics but does not provide functionality for correcting invalid data or geospatial capabilities for mapping. Here we setup some simple helper functions we can re-use throughout the notebook to QAQC data and create maps. We will be using data from `dataretrieval`, which returns geodataframes if `geopandas` is installed in your environment. If your dataset does not include geometry information, you will need to manually specify it using `geopandas.GeoDataFrame()`.

In [None]:
# Data QAQC function for provisional USGS data
def qaqc_usgs_data(df, data_column_name):
    #replace invalid -999999 values with NA
    df[data_column_name] = df[data_column_name].replace(-999999, np.nan)
    # add any additional QAQC steps needed
    return df

In [None]:
def create_gage_condition_map(gage_df, flow_data_type, flow_data_col, map_schema, streamflow_data_type):
        # Format date and set to str type for use in map tooltips
        if flow_data_type == 'instantaneous':
                gage_df['Date'] = gage_df['time'].dt.strftime('%Y-%m-%d %H:%M')
        elif flow_data_type == 'daily':
                gage_df['Date'] = gage_df['time'].dt.strftime('%Y-%m-%d')
        gage_df = gage_df.drop('time', axis=1)
        # create colormap for map from hyswap schema
        schema = utils.retrieve_schema(map_schema)
        flow_cond_cmap = schema['colors']
        if 'low_color' in schema:
                flow_cond_cmap = [schema['low_color']] + flow_cond_cmap
        if 'high_color' in schema:
                flow_cond_cmap = flow_cond_cmap + [schema['high_color']]
        # if creating a drought map, set handling of non-drought flows
        if map_schema in ['WaterWatch_Drought', 'NIDIS_Drought']:
                gage_df['flow_cat'] = gage_df['flow_cat'].cat.add_categories('Other')
                gage_df.loc[gage_df['flow_cat'].isnull(), 'flow_cat'] = 'Other'
                flow_cond_cmap = flow_cond_cmap + ['#e3e0ca'] # light taupe
        # set NA values to "Not Ranked" category
        gage_df['flow_cat'] = gage_df['flow_cat'].cat.add_categories('Not Ranked')
        gage_df.loc[gage_df['est_pct'].isna(), 'flow_cat'] = 'Not Ranked'
        flow_cond_cmap = flow_cond_cmap + ['#d3d3d3'] # light grey
        # renaming columns with user friendly names for map
        gage_df = gage_df.rename(columns={flow_data_col:'Discharge (cfs)',
                                                'est_pct':'Estimated Percentile',
                                                'monitoring_location_id':'USGS Gage ID',
                                                'monitoring_location_name':'Streamgage Name',
                                                'flow_cat':'Streamflow Category'})
        # Create map
        m = gage_df.set_crs(crs="EPSG:4326").to_crs("EPSG:5070").explore(column="Streamflow Category",
                                cmap=flow_cond_cmap,
                                tooltip=["USGS Gage ID", "Streamgage Name", "Streamflow Category", "Discharge (cfs)", "Estimated Percentile", "Date"],
                                tiles="CartoDB Positron",
                                marker_kwds=dict(radius=5),
                                legend_kwds=dict(caption=streamflow_data_type + '<br> Streamflow  Category'))
        return m #returns a folium map object

## Data Downloading and Processing
Utilize an example state to select streamgages for generating various flow condition maps. Certain past days selected in the notebook are relevant to using the state of Vermont as an example, but the notebook can be run for any state. Next, find all stream sites active in the last year within the state.

In [None]:
#| tbl-cap: List of streamgage sites active within the last week
state = 'Vermont'
# Query Water Data APIs for what monitoring locations were active within the last week
active_time_series,_ = waterdata.get_time_series_metadata(
    state_name=state,
    parameter_code='00060',
    end_utc='P1W',
    skip_geometry=True
    )
# Figure out which of these monitoring locations are streams
# and grab their latitude/longitude information
active_stream_gages,_ = waterdata.get_monitoring_locations(
    monitoring_location_id=active_time_series['monitoring_location_id'].unique().tolist(),
    site_type_code='ST',
    properties=['monitoring_location_id', 'agency_code', 'monitoring_location_name', 'county_name', 
                'drainage_area', 'site_type', 'hydrologic_unit_code', 'altitude', 'vertical_datum', 'original_horizontal_datum']
)

display(active_stream_gages.head())

### Retrieve Streamflow Data from the Water Data APIs
For the monitoring locations identified above, download all historical daily streamflow data, from the beginning of each gage's flow record up until present. 

In [None]:
# create a python dictionary of dataframes by site id number
flow_data = {}

for StaID in tqdm(active_stream_gages['monitoring_location_id'].tolist(), disable=disable_tqdm, desc="Downloading USGS Flow Data for Sites"):
    flow_data[StaID],_ = waterdata.get_daily(
        monitoring_location_id=StaID,
        parameter_code='00060',
        statistic_id='00003', # mean daily discharge
        skip_geometry=True
        )

### Calculate Variable Streamflow Percentile Thresholds
For the sites identified above, calculate streamflow percentile thresholds at 0, 1, 5, 10, ... , 90, 95, 99, 100 percentiles.

Note that when using the default settings of [calculate_fixed_percentile_threshold()](https://doi-usgs.github.io/hyswap/reference/index.html#hyswap.percentiles.calculate_variable_percentile_thresholds_by_day) it is common for NA values to be returned for the highest/lowest percentile thresholds such as 1 and 99. This is because a very long streamflow record (100+ years) is required to have sufficient observations to calculate the 99th or 1st percentile of streamflow for a given day when using the default settings of `method=weibull` with `mask_out_of_range=True`. 

In [None]:
# Define what percentile levels (thresholds) that we want to calculate.
# Intervals of 5 or less are recommended to have sufficient levels to interpolate between in later calculations. 
# Note that 0 and 100 percentile levels are ignored, refer to min and max values returned instead.
percentile_levels = np.concatenate((np.array([1]), np.arange(5,96,5), np.array([99])), axis=0)
print(percentile_levels)

In [None]:
percentile_values = {}
for StaID in tqdm(active_stream_gages['monitoring_location_id'], disable=disable_tqdm, desc="Processing Sites"):
    if flow_data[StaID].shape[0] > 0:
        # Filter data as only approved data in the Water Data APIs should be used to calculate statistics
        df = utils.filter_approved_data(flow_data[StaID], 'approval_status')
        percentile_values[StaID] = percentiles.calculate_variable_percentile_thresholds_by_day(
            df, 
            data_column_name='value',
            date_column_name='time',
            percentiles=percentile_levels
            )
    else:
        print('No standard discharge data found for site ' + StaID + ', skipping')

In [None]:
#| tbl-cap: Sample of calculated variable streamflow percentile thresholds for first site in list
display(percentile_values[list(percentile_values.keys())[0]].head())

## Create a Current Flow Conditions Map for Daily Mean Streamflow

### Retrieve most recent (yesterday) daily mean streamflow
Data from yesterday will still be provisional, so they will not have been used to generate historical percentiles. We will pull these values from the `flow_data` dictionary and make a dataframe for calculating percentiles and mapping. 

In [None]:
yesterday = datetime.strftime(datetime.now(tz=ZoneInfo("US/Eastern")) - timedelta(1), '%Y-%m-%d')

recent_dvs = pd.DataFrame()

for StaID in active_stream_gages['monitoring_location_id'].tolist():
    df = flow_data[StaID]
    yesterday_row = df[df['time'] == yesterday]
    recent_dvs = pd.concat([recent_dvs, yesterday_row])

recent_dvs = qaqc_usgs_data(recent_dvs, 'value').drop(columns='geometry')

### Categorize streamflow based on calculated percentile values
Calculate estimated streamflow percentile for the new data by interpolating against the previously calculated percentile threshold levels.

In [None]:
# estimate percentiles
df = pd.DataFrame()
for StaID, site_df in recent_dvs.groupby(by="monitoring_location_id", group_keys=False):
    if StaID in list(percentile_values.keys()):
        if not percentile_values[StaID].isnull().all().all():
            percentile_df = percentiles.calculate_multiple_variable_percentiles_from_values(
            site_df,
            data_column_name = 'value',
            percentile_df = percentile_values[StaID],
            date_column_name = 'time')
            df = pd.concat([df, percentile_df])
# categorize streamflow by the estimated streamflow percentiles
df = utils.categorize_flows(df, 'est_pct', schema_name="NWD")
df = df.reset_index(level='time')
# Prep Data for mapping by joining site information and flow data  
gage_df = pd.merge(active_stream_gages, df, how="right", on="monitoring_location_id")

### Create Map of Streamflow Conditions

In [None]:
#| fig-cap: Map showing most recent daily mean streamflow and corresponding flow conditions
map = create_gage_condition_map(
    gage_df=gage_df,
    flow_data_type='daily',
    flow_data_col='value',
    map_schema='NWD',
    streamflow_data_type='Current Daily Mean'
    )
display(map)

### Create Map of Streamflow Conditions using Alternative Categorization Schema

In [None]:
#| fig-cap: Map showing most recent daily mean streamflow and corresponding flow conditions using a brown-blue schema

# Prep Data for mapping by joining site information and flow data 
map = create_gage_condition_map(
    gage_df=gage_df,
    flow_data_type='daily',
    flow_data_col='value',
    map_schema='WaterWatch_BrownBlue',
    streamflow_data_type='Current Daily Mean'
    )
display(map)

## Create a "Real-Time" Flow Conditions Map for Instantaneous Streamflow

### Retrieve most recent instantaneous streamflow records
Download data from the Water Data APIs and calculate corresponding streamflow percentile for the most recent instantaneous discharge measurement

In [None]:
recent_ivs,_ = waterdata.get_latest_continuous(
    monitoring_location_id=active_stream_gages['monitoring_location_id'].tolist(),
    parameter_code='00060',
    skip_geometry=True
    )
recent_ivs = qaqc_usgs_data(recent_ivs, 'value')

### Categorize streamflow based on calculated percentile values
Calculate estimated streamflow percentile for the new instantaneous data by interpolating against the previously calculated percentile threshold levels from daily streamflow records.

In [None]:
# estimate percentiles
df = pd.DataFrame()
for StaID, site_df in recent_ivs.groupby(by="monitoring_location_id", group_keys=False):
    if StaID in list(percentile_values.keys()):
        if not percentile_values[StaID].isnull().all().all():
            percentile_df = percentiles.calculate_multiple_variable_percentiles_from_values(
            site_df,
            data_column_name = 'value',
            percentile_df = percentile_values[StaID],
            date_column_name = 'time')
            df = pd.concat([df, percentile_df])
# categorize streamflow by the estimated streamflow percentiles
df = utils.categorize_flows(df, 'est_pct', schema_name="NWD")
df = df.tz_convert(tz='US/Eastern')
df = df.reset_index(level='time')
# Prep Data for mapping by joining site information and flow data  
gage_df = pd.merge(active_stream_gages, df, how="right", on="monitoring_location_id")

### Create Map of Real-Time Streamflow Conditions

In [None]:
#| fig-cap: Map showing most real-time streamflow conditions
map = create_gage_condition_map(
    gage_df=gage_df,
    flow_data_type='instantaneous',
    flow_data_col='value',
    map_schema='NWD',
    streamflow_data_type='Real-Time Instantaneous'
    )
display(map)

## Create a Current Flow Conditions Map for n-Day Daily Streamflow

### Retrieve daily streamflow records for past n-days
Let's download data from the Water Data APIs for the past 7 days and calculate a 7-day average. We will define the "n"-day in the beginning of this section. If instead we want to calculate a different n-day average, like a 14-day or 28-day, we would simply change the value of `n_days`. Also recall that we defined "yesterday" above, and will use it to download our data up to that date.

In [None]:
# Define your n-day period here.
# Choose from 1, 7, 14, or 28 days
n_days = 7

seven_days_back = datetime.strftime(datetime.now(tz=ZoneInfo("US/Eastern")) - timedelta(n_days), '%Y-%m-%d')

past_dvs,_ = waterdata.get_daily(
    monitoring_location_id=active_stream_gages['monitoring_location_id'].tolist(), 
    parameter_code='00060',
    statistic_id='00003',
    time=f"{seven_days_back}/{yesterday}",
    skip_geometry=True
)
past_dvs = qaqc_usgs_data(past_dvs, 'value').set_index('time')
past_dvs_n_d = pd.DataFrame()
for StaID, new_df in past_dvs.groupby('monitoring_location_id'):
    df = utils.rolling_average(new_df, 'value', f'{n_days}D').round(2)
    past_dvs_n_d=pd.concat([past_dvs_n_d, df])
past_dvs_n_d = past_dvs_n_d.dropna(subset=['value'])

### Calculate 7-day average streamflow and corresponding variable percentile thresholds

In [None]:
flow_data_n_d = {}
for StaID in tqdm(active_stream_gages['monitoring_location_id'].tolist(), disable=disable_tqdm):
    if flow_data[StaID].shape[0] > 0:
        flow_data_n_d[StaID] = utils.rolling_average(flow_data[StaID].set_index('time'), 'value', f'{n_days}D').round(2)
    else:
        print('No standard discharge data column found for site ' + StaID + ', skipping')

In [None]:
percentile_values_n_d = {}
for StaID in tqdm(active_stream_gages['monitoring_location_id'], disable=disable_tqdm, desc="Processing"):
    if flow_data[StaID].shape[0] > 0:
        # Filter data down to only approved records to calculate statistics
        df = utils.filter_approved_data(flow_data_n_d[StaID], 'approval_status')
        # We are not defining the date column here, since it was switched to the index
        # when calculating the rolling n-day average.
        percentile_values_n_d[StaID] = percentiles.calculate_variable_percentile_thresholds_by_day(
            df, 
            data_column_name='value',
            percentiles=percentile_levels
            )
    else:
        print('No standard discharge data column found for site ' + StaID + ', skipping')

### Categorize streamflow based on calculated percentile values
Calculate estimated streamflow percentile for the new data by interpolating against the previously calculated percentile threshold levels.

In [None]:
# estimate percentiles
df = pd.DataFrame()
for StaID, site_df in past_dvs_n_d.groupby("monitoring_location_id"):
    if StaID in list(percentile_values_n_d.keys()):
        if not percentile_values[StaID].isnull().all().all():
            month_day = site_df.index.strftime('%m-%d')[0]
            site_df['est_pct'] = percentiles.calculate_variable_percentile_from_value(
            site_df['value'][0], percentile_values[StaID], month_day)
            df = pd.concat([df, site_df])
# categorize streamflow by the estimated streamflow percentiles
df = utils.categorize_flows(df, 'est_pct', schema_name="NWD")
# keep only most recent 7-day average flow for plotting
df = df[df.index.get_level_values('time') == yesterday]
df = df.reset_index(level='time')
# Prep Data for mapping by joining site information and flow data  
gage_df = pd.merge(active_stream_gages, df, how="right", on="monitoring_location_id")

### Create Map of 7-Day Average Streamflow Conditions

In [None]:
#| fig-cap: Map showing most recent 7-day average streamflow and corresponding flow conditions

map = create_gage_condition_map(
    gage_df=gage_df,
    flow_data_type='daily',
    flow_data_col='value',
    map_schema='NWD',
    streamflow_data_type=f'Current {n_days}-Day Average'
    )
display(map)

## Create a Drought Conditions Map for a Previous Day's Streamflow

### Retrieve daily streamflow records from a past day
Download data from the Water Data APIs and calculate corresponding streamflow percentiles for the given day's streamflow. Remember that we are comparing past days to the historical percentles calculated from the full streamflow record up to present. 

In [None]:
past_day = "2023-05-30"

past_dvs,_ = waterdata.get_daily(
    monitoring_location_id=active_stream_gages['monitoring_location_id'].tolist(),
    parameter_code='00060',
    statistic_id='00003',
    time=past_day,
    skip_geometry=True
    )
past_dvs = qaqc_usgs_data(past_dvs, 'value')

### Categorize streamflow based on calculated percentile values

In [None]:
# Calculate estimated streamflow percentile for the new data by interpolating against
# the previously calculated percentile threshold levels
df = pd.DataFrame()
for StaID, site_df in past_dvs.groupby(by="monitoring_location_id", group_keys=False):
    if StaID in list(percentile_values.keys()):
        if not percentile_values[StaID].isnull().all().all():
            percentile_df = percentiles.calculate_multiple_variable_percentiles_from_values(
            site_df,
            data_column_name = 'value',
            percentile_df = percentile_values[StaID],
            date_column_name = 'time')
            df = pd.concat([df, percentile_df])
# categorize streamflow by the estimated streamflow percentiles
df = utils.categorize_flows(df, 'est_pct', schema_name="WaterWatch_Drought")
df = df.reset_index(level='time')
# Prep Data for mapping by joining site information and flow data  
gage_df = pd.merge(active_stream_gages, df, how="right", on="monitoring_location_id")

### Create Map of Streamflow Drought Conditions

In [None]:
#| fig-cap: Map showing historical daily mean streamflow and corresponding flow conditions using a drought categorization schema
map = create_gage_condition_map(
    gage_df=gage_df,
    flow_data_type='daily',
    flow_data_col='value',
    map_schema='WaterWatch_Drought',
    streamflow_data_type='Daily Mean'
    )
display(map)

## Create a Flood Conditions Map for a past Day's Streamflow
This example uses fixed percentiles that are not calculated by  day of year, but instead across all days of the year together. Flow categories are therefore relative to absolute streamflow levels rather than what is normal for that day of the year.

### Retrieve daily streamflow records from a past day
Download data from the Water Data APIs and calculate corresponding fixed streamflow percentiles for the given day's streamflow

In [None]:
past_day = "2023-07-10"

past_dvs,_ = waterdata.get_daily(
    monitoring_location_id=active_stream_gages['monitoring_location_id'].tolist(),
    parameter_code='00060',
    statistic_id='00003',
    time=past_day,
    skip_geometry=True
    )
past_dvs = qaqc_usgs_data(past_dvs, 'value')

In [None]:
fixed_percentile_values = {}

for StaID in tqdm(active_stream_gages['monitoring_location_id'], disable=disable_tqdm):
    if flow_data[StaID].shape[0] > 0:
        # Filter data down to only approved records to calculate statistics
        df = utils.filter_approved_data(flow_data[StaID], 'approval_status')
        if not df.empty:
            fixed_percentile_values[StaID] = percentiles.calculate_fixed_percentile_thresholds(
                df,
                data_column_name='value',
                date_column_name='time',
                percentiles=percentile_levels
                )
        else:
            print(StaID + ' has no approved data, skipping')
    else:
        print(StaID + ' does not have standard discharge data column, skipping')

### Categorize streamflow based on calculated percentile values

In [None]:
# estimate percentiles
for StaID in past_dvs['monitoring_location_id'].unique().tolist():
    if StaID in list(fixed_percentile_values.keys()):
        past_dvs.loc[(past_dvs['monitoring_location_id'] == StaID) & (past_dvs['time'] == past_day), 'est_pct'] = percentiles.calculate_fixed_percentile_from_value(
            past_dvs.loc[(past_dvs['monitoring_location_id'] == StaID) & (past_dvs['time'] == past_day), 'value'],
            fixed_percentile_values[StaID]
        )
# categorize streamflow by the estimated streamflow percentiles
df = utils.categorize_flows(past_dvs, 'est_pct', schema_name="WaterWatch_Flood")
# Prep Data for mapping by joining site information and flow data  
gage_df = pd.merge(active_stream_gages, df, how="right", on="monitoring_location_id")

### Create Map of Streamflow High-Flow Conditions

In [None]:
#| fig-cap: Map showing historical daily mean streamflow and corresponding flow conditions using a high-flow categorization schema
map = create_gage_condition_map(
    gage_df=gage_df,
    flow_data_type='daily',
    flow_data_col='value',
    map_schema='WaterWatch_Flood',
    streamflow_data_type='Daily Mean'
    )
display(map)