In [1]:
import datetime as dt

import mvm_python
import nivapy3 as nivapy
import numpy as np
import pandas as pd

ERROR 1: PROJ: proj_create_from_database: Open of /opt/conda/share/proj failed


In [2]:
# Connect to db
eng = nivapy.da.connect()

Username:  ········
Password:  ········


Connection successful.


In [3]:
def f(row):
    """Function to deal with flags."""
    if "<" in row["value_"]:
        val = "<"
    elif ">" in row["value_"]:
        val = ">"
    else:
        val = np.nan
    return val


def upload_mvm_data_to_resa2(swe_df, st_yr, end_yr, eng, replace=False):
    """ """
    # Check whether data already exist
    st_dt = dt.datetime.strptime(f"{st_yr}-01-01", "%Y-%m-%d")
    end_dt = dt.datetime.strptime(f"{end_yr}-12-31", "%Y-%m-%d")
    stn_ids = swe_df["station_id"].unique().astype(int).tolist()

    # Number from 0 to n_stns. Uses hack to avoid Oracle 1000 item limit in lists
    # https://stackoverflow.com/questions/17842453/is-there-a-workaround-for-ora-01795-maximum-number-of-expressions-in-a-list-is
    bind_stns = ",".join("(1, %d)" % i for i in stn_ids)
    sql = (
        "SELECT water_sample_id FROM resa2.water_samples "
        "WHERE (1, station_id) IN (%s) "
        "AND sample_date <= :end_dt "
        "AND sample_date >= :st_dt" % bind_stns
    )
    par_dict = {"end_dt": end_dt, "st_dt": st_dt}
    ws_df = pd.read_sql(sql, params=par_dict, con=eng)
    ws_ids = ws_df["water_sample_id"].unique().astype(int).tolist()

    if (len(ws_ids) > 0) and (replace is False):
        raise ValueError(
            f"{len(ws_ids)} water samples already exist for the specified sites and time period. "
            "Use 'replace=True' to delete these values and upload new data."
        )

    if (len(ws_ids) > 0) and (replace is True):
        # Delete from water_chemistry_values2
        print(
            f"{len(ws_ids)} water samples already exist for the specified sites and time period. "
            "These will be deleted and replaced with new data."
        )
        bind_samps = ",".join("(1, %d)" % i for i in ws_ids)
        sql = (
            "DELETE FROM resa2.water_chemistry_values2 "
            "WHERE (1, sample_id) IN (%s) " % bind_samps
        )
        eng.execute(sql)

        # Delete from water_samples
        sql = (
            "DELETE FROM resa2.water_samples "
            "WHERE (1, water_sample_id) IN (%s) " % bind_samps
        )
        eng.execute(sql)

    # Lookup table matching MVM pars to RESA methods
    par_map_xlsx = r"../../../Sweden_MVM_API/map_mvm_pars_to_resa2.xlsx"
    par_map_df = pd.read_excel(par_map_xlsx)

    # Path to local file containing MVM access tokens
    token_xlsx = r"../../../Sweden_MVM_API/md_mvm_tokens.xlsx"

    # Loop over stations
    for idx, row in list(swe_df.iterrows()):
        # Get IDs
        resa_id = row["station_id"]
        mvm_id = row["nfc_code"]
        name = row["station_name"]
        print("Processing: %s" % name)
        print("    Getting data from MVM...")
        mvm_df = mvm_python.query_mvm_station_data(mvm_id, st_yr, end_yr, token_xlsx)
        if len(mvm_df) > 0:
            mvm_df["unit"].fillna("none", inplace=True)
            mvm_df["mvm_par_unit"] = mvm_df["par_name"] + "_" + mvm_df["unit"]
            print("    Restructuring...")
            # Get pars of interest
            mvm_df = pd.merge(mvm_df, par_map_df, how="inner", on="mvm_par_unit")

            # Convert units
            mvm_df["value"] = mvm_df["value"] * mvm_df["factor"]

            # If multiple depths are available, get the shallowest
            mvm_df.sort_values(by=["depth1", "depth2"], inplace=True)
            mvm_df.drop_duplicates(
                subset=["mvm_id", "sample_date", "par_name"], inplace=True
            )

            # Get just cols of interest
            mvm_df = mvm_df[["sample_date", "icpw_method_id", "value"]]

            # Occasionally, there are still some duplicates (e.g. Tot-N_ps and Tot-N_TNb)
            # at the same site-date. Average these for now
            mvm_df = (
                mvm_df.groupby(["sample_date", "icpw_method_id"])
                .agg({"value": "mean"})
                .reset_index()
            )

            # Sometimes values of 0 are reported. Drop these as dubious
            mvm_df["value"].replace(0, np.nan, inplace=True)
            mvm_df.dropna(subset=["value"], inplace=True)

            # Make sure the df can be pivoted (integrity check - don't actually need
            # to pivot, so result is not saved as variable)
            mvm_df.pivot(index="sample_date", columns="icpw_method_id")

            # Build water samp df
            ws_df = mvm_df[["sample_date"]].copy()
            ws_df["station_id"] = resa_id
            ws_df["depth1"] = 0
            ws_df["depth2"] = 0
            ws_df.drop_duplicates(subset=["sample_date"], inplace=True)
            ws_df.sort_values(by="sample_date", inplace=True)
            ws_df = ws_df[["station_id", "sample_date", "depth1", "depth2"]]

            # Add water samples to db
            print("    Writing data to WATER_SAMPLES table...")

            dtypes = {
                c: types.VARCHAR(ws_df[c].str.len().max())
                for c in ws_df.columns[ws_df.dtypes == "object"].tolist()
            }

            ws_df.to_sql(
                name="water_samples",
                schema="resa2",
                con=eng,
                if_exists="append",
                index=False,
                dtype=dtypes,
            )

            # Get sample_ids back from db
            print("    Identifying sample IDs...")

            sql = (
                "SELECT water_sample_id, station_id, sample_date "
                "FROM resa2.water_samples "
                "WHERE station_id = %s" % resa_id
            )
            ws_df = pd.read_sql_query(sql, eng)

            print("    Checking data integrity...")

            # Join sample id to chemistry
            chem_df = pd.merge(mvm_df, ws_df, how="left", on="sample_date")

            # Get cols of interest
            chem_df = chem_df[["water_sample_id", "icpw_method_id", "value"]]
            chem_df.columns = ["sample_id", "method_id", "value_"]

            # Drop NaNs
            chem_df.dropna(how="any", inplace=True)

            # Deal with flags
            chem_df["value_"] = chem_df["value_"].astype(str)
            chem_df["flag1"] = chem_df.apply(f, axis=1)

            # Extract numeric chars
            chem_df["value"] = chem_df["value_"].str.extract(
                "([-+]?\d*\.\d+|\d+)", expand=True
            )
            chem_df["value"] = chem_df["value"].astype(float)
            del chem_df["value_"]

            # Reorder cols
            chem_df = chem_df[["sample_id", "method_id", "value", "flag1"]]

            # Check flags are consistent
            if not pd.isnull(chem_df["flag1"]).all():
                if not set(chem_df["flag1"].unique()).issubset(["<", ">", np.nan]):
                    print("Some flags are not valid:")
                    print(chem_df["flag1"].unique())

            # Add chem to db
            print("    Writing data to WATER_CHEMISTRY_VALUES2 table...")

            dtypes = {
                c: types.VARCHAR(chem_df[c].str.len().max())
                for c in chem_df.columns[chem_df.dtypes == "object"].tolist()
            }

            chem_df.to_sql(
                name="water_chemistry_values2",
                schema="resa2",
                con=eng,
                if_exists="append",
                index=False,
                dtype=dtypes,
            )

            print("    Done.")

# Update Swedish ICPW data

This notebook downloads data for the Swedish ICPW sites via the MVM API and adds it to RESA.

In [4]:
# Time period of interest
st_yr = 2021
end_yr = 2022

## Parameter mappings from MVM to RESA

In [5]:
# Lookup table matching MVM pars to RESA methods
par_map_xlsx = r"../../../Sweden_MVM_API/map_mvm_pars_to_resa2.xlsx"
par_map_df = pd.read_excel(par_map_xlsx)
par_map_df

Unnamed: 0,mvm_par_unit,factor,icpw_par,icpw_method_id
0,Al_µg/l,1.0,TAL,10249
1,AlI_NAJ_µg/l,1.0,LAL,10292
2,Alk/Acid_mekv/l,1.0,ALKGMILLI,10297
3,As_µg/l,1.0,As,10293
4,Ca_mekv/l,1.0,Ca mekv/l,10551
5,Ca_mg/l,1.0,Ca,10251
6,Cd_µg/l,1.0,Cd,10252
7,Cl_mekv/l,1.0,Cl mekv/l,10556
8,Cl_mg/l,1.0,Cl,10253
9,Cr_µg/l,1.0,Cr,10285


## Swedish sites

There are 92 Swedish stations in the ICPW programme.

In [6]:
# Get a list of the Swedish "core" sites
stn_xlsx = r"../../../all_icpw_sites_mar_2023.xlsx"
stn_df = pd.read_excel(stn_xlsx)
stn_df = stn_df.query("country == 'Sweden'")
print(len(stn_df))
stn_df.head()

92


Unnamed: 0,station_id,station_code,nfc_code,station_name,latitude,longitude,altitude,continent,country,region,group
358,38344,SE_101,101,Brännträsket,65.526248,21.416926,84.0,Europe,Sweden,SoNord,Trends
359,38390,SE_102,102,St. Lummersjön,58.11417,14.10621,240.0,Europe,Sweden,SoNord,Trends
360,38411,SE_105,105,Öjsjön,58.171908,16.211824,101.0,Europe,Sweden,SoNord,Trends
361,38338,SE_1070,1070,Alstern,59.715053,13.910674,162.0,Europe,Sweden,SoNord,Trends
362,38369,SE_1071,1071,Lill-Jangen,59.991355,13.347344,194.0,Europe,Sweden,SoNord,Trends


In [7]:
%%time

upload_mvm_data_to_resa2(stn_df, st_yr, end_yr, eng, replace=True)

73 water samples already exist for the specified sites and time period. These will be deleted and replaced with new data.
Processing: Brännträsket
    Getting data from MVM...
    Restructuring...
    Writing data to WATER_SAMPLES table...
    Identifying sample IDs...
    Checking data integrity...
    Writing data to WATER_CHEMISTRY_VALUES2 table...
    Done.
Processing: St. Lummersjön
    Getting data from MVM...
    Restructuring...
    Writing data to WATER_SAMPLES table...
    Identifying sample IDs...
    Checking data integrity...
    Writing data to WATER_CHEMISTRY_VALUES2 table...
    Done.
Processing: Öjsjön
    Getting data from MVM...
    Restructuring...
    Writing data to WATER_SAMPLES table...
    Identifying sample IDs...
    Checking data integrity...
    Writing data to WATER_CHEMISTRY_VALUES2 table...
    Done.
Processing: Alstern
    Getting data from MVM...
    Restructuring...
    Writing data to WATER_SAMPLES table...
    Identifying sample IDs...
    Checking 