In [None]:
%matplotlib inline
import calendar
import configparser
import glob
import os

import matplotlib.pyplot as plt
import nivapy3 as nivapy
import numpy as np
import pandas as pd
import seaborn as sn

plt.style.use("ggplot")

# Get API key for HydAPI
config = configparser.RawConfigParser()
config.read(".nve-hydapi-key")
api_key = config.get("Auth", "key")

In [None]:
# Connect to db
eng = nivapy.da.connect()

# Update RID flow datasets

Each year, updated flow datasets (both modelled and observed) are obtained from NVE and added to RESA2.

In [None]:
# Year of interest
year = 2020

## 1. Observed discharge

Observed time series are used **only** for the 11 main rivers - all other calculations are based on modelled flows (from HBV). This notebook uses NVE's HydAPI to download data for the relevant stations where possible. Other datasets must be obtained directly from NVE (e-mail Trine Fjeldstad). Note that more than 11 discharge stations are involved, because at some chemistry sampling locations the flow is the sum of several NVE discharge series. Note also the following:

 * Chemistry station 29613 should ideally use the sum of NVE series 16.133 and 16.153, but the latter is no longer available. We simply assume the input from 16.153 is constant at 10 $m^3/s$ (which is roughly equal to the long-term average) <br><br>
 
 * The discharge for chemistry station 29614 is **either** NVE station 21.71 **or** 21.11. 21.11 is usually available first, but can check 21.71 too <br><br> 
 
 * Discharge data for chemistry stations 29617 (NVE ID 2.605) and 36225 (NVE ID 6.78) are often delayed. Need to contact Trine at NVE early to avoid problems later.

### 1.1. Discharge stations

The discharge stations associated with the 11 main water chemistry sampling locations are shown in the dataframe below.

In [None]:
xl_path = r"../data/rid_resa_nve_discharge_stations.xlsx"
resa_nve_df = pd.read_excel(xl_path, sheet_name="observed_stns")

with pd.option_context("display.max_colwidth", None):
    display(resa_nve_df)

### 1.2. Data from HydAPI

In [None]:
# Get stations from HydAPI
nve_stn_df = nivapy.da.get_nve_hydapi_stations(api_key=api_key)
nve_stn_ids = resa_nve_df["nve_id"].values
nve_stn_df = nve_stn_df.query("station_id in @nve_stn_ids")
print(f"{len(nve_stn_df)} out of {len(resa_nve_df)} stations found in HydAPI:")
nve_stn_df

In [None]:
# Get discharge
par_ids = [1001]
st_dt = f"{year}-01-01"
end_dt = f"{year + 1}-01-01"
q_df = nivapy.da.query_nve_hydapi(
    nve_stn_ids, par_ids, st_dt, end_dt, resolution=1440, api_key=api_key
)
q_df.head()

In [None]:
# Check number of records as expected
days = 366 if calendar.isleap(year) else 365
assert len(q_df) == len(nve_stn_df) * days, "Number of records is not as expected."

# Check quality control level
print(
    "The following series have not completed quality control and will be dropped (i.e. 'quality' < 3;"
)
print("see https://hydapi.nve.no/UserDocumentation/ for details):\n")
print(q_df.query("quality != 3")[["station_id", "station_name"]].drop_duplicates())

# Drop quality < 3
q_df = q_df.query("quality == 3")

# Check for NaN
if pd.isna(q_df["value"]).sum() > 0:
    print("\n\nThe following records contain NaN values:\n")
    print(
        q_df[pd.isna(q_df["value"])][["station_id", "station_name"]].drop_duplicates()
    )

# Check for negative
if (q_df["value"] < 0).sum() > 0:
    print("\n\nThe following records contain values <0:\n")
    print(q_df[q_df["value"] < 0][["station_id", "station_name"]].drop_duplicates())

Based on the output above we can **make a data request to NVE**. This typically involves requesting the records for stations **21.11.0** and **12.285.0** (which are not available via HydAPI), **plus** any sites that haven't yet finished quality control (listed above).

The cell below uploads the valid HydAPI data to RESA2.

In [None]:
# List to store output
df_list = []

# Loop over Hydra II data
for idx, row in resa_nve_df.iterrows():
    nve_id = row["nve_id"]
    dis_stn_id = row["dis_station_id"]

    # Get flow for station
    q_stn_df = q_df.query("station_id == @nve_id").copy()

    if len(q_stn_df) == 0:
        print(f"No data for NVE ID {nve_id}.")

    else:
        assert len(q_stn_df) == days

        # Remove HH:MM:SS part from dates
        q_stn_df.set_index("datetime", inplace=True)
        q_stn_df = q_stn_df.resample("D").mean()
        q_stn_df.reset_index(inplace=True)
        q_stn_df["datetime"] = q_stn_df["datetime"].dt.date

        # Linear interpolation and back-filling of NaN
        q_stn_df["value"].interpolate(method="linear", inplace=True)
        q_stn_df["value"].fillna(method="backfill", inplace=True)

        # Add 10 m3/s to 16.133 (RESA2 discharge station ID 59)
        if dis_stn_id == 59:
            q_stn_df["value"] = q_stn_df["value"] + 10

        # Add other required cols and tidy
        q_stn_df["dis_station_id"] = dis_stn_id
        q_stn_df["xcomment"] = np.nan
        q_stn_df["xvalue"] = q_stn_df["value"]
        q_stn_df["xdate"] = q_stn_df["datetime"]

        # Reorder cols
        q_stn_df = q_stn_df[["dis_station_id", "xdate", "xvalue", "xcomment"]]

        # Append to output
        df_list.append(q_stn_df)

        # Check whether data already exist for this year
        sql = (
            "SELECT count(*) FROM resa2.discharge_values "
            "WHERE dis_station_id = %s "
            "AND EXTRACT(YEAR FROM xdate) = %s " % (dis_stn_id, year)
        )
        cnt_df = pd.read_sql(sql, eng)
        cnt = cnt_df.iloc[0, 0]
        if cnt > 0:
            print(
                "%s data already exist for NVE "
                "station %s (RESA2 ID %s)." % (cnt, nve_id, dis_stn_id)
            )

# Stack data
hydapi_q_df = pd.concat(df_list, axis=0)
hydapi_q_df.head()

In [None]:
# # Add new rows to database
# hydapi_q_df.to_sql(
#     "discharge_values", con=eng, schema="resa2", if_exists="append", index=False
# )

### 1.3. Data from Trine

Once all the missing datasets identified above have been obtained from NVE, the following code can be used to add them to the database.

In [None]:
# Folder containing data from Trine
tri_fold = f"../../../Data/nve_observed/{year}-{year - 1999}/from_trine"

days = 366 if calendar.isleap(year) else 365

# List to store output
df_list = []

# Get list of files from Trine to process
search_path = os.path.join(tri_fold, "*.csv")
file_list = glob.glob(search_path)

# Loop over files from Trine
for file_path in file_list:
    # Get RESA station ID
    nve_id = os.path.split(file_path)[1].split("_")[0] + ".0"
    dis_stn_id = resa_nve_df.query("nve_id == @nve_id")["dis_station_id"].iloc[0]

    # Parse file
    q_stn_df = pd.read_csv(
        file_path,
        skiprows=1,
        index_col=0,
        parse_dates=True,
        header=None,
        sep=";",
        names=["xdate", "xvalue"],
        na_values="-9999",
        encoding="cp1252",
    )

    # Get just records for year of interest
    q_stn_df = q_stn_df.truncate(
        before="%s-01-01" % year, after="%s-01-01" % (year + 1)
    )

    # Remove HH:MM:SS part from dates
    q_stn_df = q_stn_df.resample("D").mean()
    q_stn_df.reset_index(inplace=True)
    q_stn_df["xdate"] = q_stn_df["xdate"].dt.date

    # Linear interpolation and back-filling of NaN
    q_stn_df["xvalue"].interpolate(method="linear", inplace=True)
    q_stn_df["xvalue"].fillna(method="backfill", inplace=True)

    # Add 10 m3/s to 16.133 (RESA2 ID 59)
    if dis_stn_id == 59:
        q_stn_df["xvalue"] = q_stn_df["xvalue"] + 10.0

    # Add dis_id and tidy
    q_stn_df["dis_station_id"] = dis_stn_id
    q_stn_df["xcomment"] = np.nan

    # Reorder cols
    q_stn_df = q_stn_df[["dis_station_id", "xdate", "xvalue", "xcomment"]]

    # Append to output
    df_list.append(q_stn_df)

# Stack data
tri_q_df = pd.concat(df_list, axis=0)

assert (
    len(tri_q_df) == len(file_list) * days
), "Datasets has an unexpected number of records."
assert tri_q_df["xvalue"].dtypes == np.float64, 'Check for text in "xvalue" column.'
assert pd.isna(tri_q_df["xvalue"]).sum() == 0, 'Check for NaN in "xvalue" column.'

tri_q_df.head()

In [None]:
# # Add new rows to database
# tri_q_df.to_sql(
#     "discharge_values", con=eng, schema="resa2", if_exists="append", index=False
# )

## 2. Modelled discharge

Each year, Stein Beldring supplies modelled data from HBV for the period from 1990 to the year of interest. These datasets are stored locally here:

    ...Elveovervakingsprogrammet\Data\hbv_modelled

and on the network here:

K:\Avdeling\Vass\316_Miljøinformatikk\Prosjekter\RID\Vannføring\Modellert

The flow files are named e.g. `hbv_00000001.var`, where the number corresponds to the NVE "vassdragsområde". These are listed in *vassomr.pdf* in the above folder, and they're also included in RESA2's `DISCHARGE_STATIONS` table. The vassdragsområde numbers are stored in the `NVE_SERINUMMER` field.

Tore has an Access database in e.g.

K:\Avdeling\Vass\316_Miljøinformatikk\Prosjekter\RID\Vannføring\Modellert\NVE_MODELLERT_2016\vannføring

that first deletes the modelled NVE values for each station from 1990 onwards and then adds the new data, which includes everything from 1990 plus the additional year of data. The code below does the same, and performs some basic checking of the data at the same time.

In [None]:
# Folder containing modelled data
data_fold = f"../../../Data/hbv_modelled/RID_{year}"

# Get a list of files to process (only interested in flow here)
search_path = os.path.join(data_fold, "hbv_*.var")
file_list = glob.glob(search_path)

# Get number of days between 1990 and year of interest
days_new = len(pd.date_range(start="1990-01-01", end="%s-12-31" % year, freq="D"))

# Get number of days between 1990 and year before
days_old = len(pd.date_range(start="1990-01-01", end="%s-12-31" % (year - 1), freq="D"))

# Loop over files
for file_path in file_list:
    # Get name and reg. nr.
    name = os.path.split(file_path)[1]
    reg_nr = int(name.split("_")[1][:-4])
    print(f"Processing {name}.")

    # Get RESA2 station ID
    sql = (
        "SELECT dis_station_id FROM resa2.discharge_stations "
        "WHERE nve_serienummer = '%s'" % reg_nr
    )
    dis_id = pd.read_sql_query(sql, eng).iloc[0, 0]

    # Check number of post-1990 records already in db
    # (should equal days_old)
    sql = (
        "SELECT COUNT(*) FROM resa2.discharge_values "
        "WHERE dis_station_id = %s "
        "AND xdate >= DATE '1990-01-01'" % dis_id
    )
    cnt_old = pd.read_sql_query(sql, eng).iloc[0, 0]
    assert cnt_old == days_old, "Unexpected number of records already in database."

    # Read new data
    df = pd.read_csv(
        file_path, delim_whitespace=True, header=None, names=["XDATE", "XVALUE"]
    )

    # Convert dates
    df["XDATE"] = pd.to_datetime(df["XDATE"], format="%Y%m%d/1200")

    # Check st, end and length
    assert df["XDATE"].iloc[0] == pd.Timestamp(
        "1990-01-01"
    ), "New series does not start on 01/01/1990."
    assert df["XDATE"].iloc[-1] == pd.Timestamp("%s-12-31" % year), (
        "New series does not end on 31/12/%s." % year
    )
    assert len(df) == days_new, "Unexpected length for new series."

    # Add station ID to df
    df["DIS_STATION_ID"] = dis_id

#     # Drop existing rows post-1990 for this site
#     sql = (
#         "DELETE FROM resa2.discharge_values "
#         "WHERE dis_station_id = %s "
#         "AND xdate >= DATE '1990-01-01'" % dis_id
#     )
#     res = eng.execute(sql)

#     # Add new rows
#     df.to_sql(
#         "discharge_values", con=eng, schema="resa2", if_exists="append", index=False
#     )