In [None]:
import glob
import os

import geopandas as gpd
import nivapy3 as nivapy
import pandas as pd
from sqlalchemy import text
from tqdm.notebook import tqdm

# Create PostGIS database for TEOTIL3

To streamline the workflow for TEOTIL3, I propose migrating all key datasets from Oracle to the Hub's PostGIS database. The code in this notebook creates a new schema named `teotil3` within the `general` database and loads relevant datasets to PostGIS.

In [None]:
# Connect to PostGIS
eng = nivapy.da.connect_postgis(admin=True)

## 1. Create schema and set permissions

In [None]:
# Drop schema if it already exists - USE WITH CAUTION!
sql = "DROP SCHEMA teotil3 CASCADE"
eng.execute(sql)

In [None]:
# Create schema
sql = "CREATE SCHEMA IF NOT EXISTS teotil3"
eng.execute(sql)

In [None]:
# Grant "ReadOnly" privileges to default Jovyan user
sql_list = [
    "GRANT USAGE ON SCHEMA teotil3 TO jovyan",
    "GRANT SELECT ON ALL TABLES IN SCHEMA teotil3 TO jovyan",
    "ALTER DEFAULT PRIVILEGES IN SCHEMA teotil3 GRANT SELECT ON TABLES TO jovyan",
]
for sql in sql_list:
    eng.execute(sql)

## 2. Basic non-spatial tables

### 2.1. Tables from Oracle

In [None]:
# point_source_locations
sql = text("DROP TABLE IF EXISTS teotil3.point_source_locations CASCADE")
eng.execute(sql)

sql = text(
    "CREATE TABLE teotil3.point_source_locations "
    "( "
    "  site_id text NOT NULL, "
    "  name text NOT NULL, "
    "  sector text NOT NULL, "
    "  type text NOT NULL, "
    "  year integer NOT NULL, "
    "  geom geometry(Point, 25833) NOT NULL, "
    "  PRIMARY KEY (site_id, year), "
    "  CONSTRAINT site_unique UNIQUE (site_id, name, sector, type, year, geom) "
    ")"
)
eng.execute(sql)

# input_param_definitions
sql = text("DROP TABLE IF EXISTS teotil3.input_param_definitions CASCADE")
eng.execute(sql)

sql = text(
    "CREATE TABLE teotil3.input_param_definitions "
    "( "
    "  in_par_id integer NOT NULL, "
    "  oracle_par_id integer, "
    "  name text NOT NULL, "
    "  unit text NOT NULL, "
    "  comment text, "
    "  PRIMARY KEY (in_par_id), "
    "  CONSTRAINT in_par_unique UNIQUE (in_par_id, name, unit) "
    ")"
)
eng.execute(sql)

# output_param_definitions
sql = text("DROP TABLE IF EXISTS teotil3.output_param_definitions CASCADE")
eng.execute(sql)

sql = text(
    "CREATE TABLE teotil3.output_param_definitions "
    "( "
    "  out_par_id integer NOT NULL, "
    "  name text NOT NULL, "
    "  unit text NOT NULL, "
    "  PRIMARY KEY (out_par_id), "
    "  CONSTRAINT out_par_unique UNIQUE (out_par_id, name, unit) "
    ")"
)
eng.execute(sql)

# input_output_param_conversion
sql = text("DROP TABLE IF EXISTS teotil3.input_output_param_conversion CASCADE")
eng.execute(sql)

sql = text(
    "CREATE TABLE teotil3.input_output_param_conversion "
    "( "
    "  in_par_id integer NOT NULL, "
    "  out_par_id integer NOT NULL, "
    "  factor numeric NOT NULL, "
    "  PRIMARY KEY (in_par_id, out_par_id), "
    "  CONSTRAINT in_par_id_fkey FOREIGN KEY (in_par_id) "
    "      REFERENCES teotil3.input_param_definitions (in_par_id), "
    "  CONSTRAINT out_par_id_fkey FOREIGN KEY (out_par_id) "
    "      REFERENCES teotil3.output_param_definitions (out_par_id) "
    ")"
)
eng.execute(sql)

# point_source_values
sql = text("DROP TABLE IF EXISTS teotil3.point_source_values CASCADE")
eng.execute(sql)

sql = text(
    "CREATE TABLE teotil3.point_source_values "
    "( "
    "  site_id text NOT NULL, "
    "  in_par_id integer NOT NULL, "
    "  year integer NOT NULL, "
    "  value numeric NOT NULL, "
    "  PRIMARY KEY (site_id, in_par_id, year), "
    "  CONSTRAINT site_id_fkey FOREIGN KEY (site_id, year) "
    "      REFERENCES teotil3.point_source_locations (site_id, year), "
    "  CONSTRAINT in_par_id_fkey FOREIGN KEY (in_par_id) "
    "      REFERENCES teotil3.input_param_definitions (in_par_id) "
    ")"
)
eng.execute(sql)

# spredt_inputs
sql = text("DROP TABLE IF EXISTS teotil3.spredt_inputs CASCADE")
eng.execute(sql)

sql = text(
    "CREATE TABLE teotil3.spredt_inputs "
    "( "
    "  komnr text NOT NULL, "
    "  in_par_id integer NOT NULL, "
    "  year integer NOT NULL, "
    "  value numeric NOT NULL, "
    "  PRIMARY KEY (komnr, in_par_id, year), "
    "  CONSTRAINT in_par_id_fkey FOREIGN KEY (in_par_id) "
    "      REFERENCES teotil3.input_param_definitions (in_par_id) "
    ")"
)
eng.execute(sql)

# # agri_inputs_teotil2
# sql = text("DROP TABLE IF EXISTS teotil3.agri_inputs_teotil2 CASCADE")
# eng.execute(sql)

# sql = text(
#     "CREATE TABLE teotil3.agri_inputs_teotil2 "
#     "( "
#     "  region text NOT NULL, "
#     "  year integer NOT NULL, "
#     "  diff_totn_kg numeric NOT NULL, "
#     "  point_totn_kg numeric NOT NULL, "
#     "  back_totn_kg numeric NOT NULL, "
#     "  diff_totp_kg numeric NOT NULL, "
#     "  point_totp_kg numeric NOT NULL, "
#     "  back_totp_kg numeric NOT NULL, "
#     "  PRIMARY KEY (region, year) "
#     ")"
# )
# eng.execute(sql)

In [None]:
# input_param_definitions
csv_path = r"../../data/postgis_input_parameter_definitions.csv"
df = pd.read_csv(csv_path)
df.to_sql(
    "input_param_definitions", eng, schema="teotil3", index=False, if_exists="append"
)

# output_param_definitions
csv_path = r"../../data/postgis_output_parameter_definitions.csv"
df = pd.read_csv(csv_path)
df.to_sql(
    "output_param_definitions",
    eng,
    schema="teotil3",
    index=False,
    if_exists="append",
)

# input_output_param_conversion
csv_path = r"../../data/postgis_input_output_param_conversion.csv"
df = pd.read_csv(csv_path)
df.dropna(how='any', inplace=True)
df.to_sql(
    "input_output_param_conversion",
    eng,
    schema="teotil3",
    index=False,
    if_exists="append",
)

### 2.2. New tables

In [None]:
# data_fold = r"../../data"
# data_year = 2022

# # key: value = table_name: primary key
# csv_dict = {
#     "ar50_artype_classes.csv": "artype",
#     "lake_residence_times_10m_dem.csv": "vatnLnr",
#     "offshore_hierarchy.csv": "regine",
#     "regine_retention_transmission_10m_dem.csv": "regine",
#     "spatially_static_background_coefficients.csv": "variable",
#     "spatially_variable_background_coefficients.csv": "regine",
#     "spatiotemporally_variable_background_coefficients.csv": "regine",
#     "vassdragsomrader_ospar_regions.csv": "vassom",
# }

# for fname, pk_col in csv_dict.items():
#     table_name = fname[:-4] + f"_{data_year}"
#     fpath = os.path.join(data_fold, fname)
#     df = pd.read_csv(fpath)
#     df.to_sql(table_name, eng, schema="teotil3", index=False, if_exists="replace")

#     sql = (
#         f"ALTER TABLE teotil3.{table_name} "
#         f"ADD CONSTRAINT {table_name}_pk "
#         f'PRIMARY KEY ("{pk_col}")'
#     )
#     eng.execute(sql)

## 3. Spatial tables

In [None]:
spatial_data_fold = f"/home/jovyan/shared/common/teotil3/core_data"
teo_gpkg = os.path.join(spatial_data_fold, "tidied", f"teotil3_data.gpkg")
reg_gdf = gpd.read_file(teo_gpkg, layer=f"regines", driver="GPKG")
reg_gdf.head()

In [None]:
table_name = f"regines"

nivapy.da.gdf_to_postgis(
    reg_gdf,
    table_name,
    "teotil3",
    eng,
    f"{table_name}_spidx",
    create_pk=False,
    index=False,
    if_exists="replace",
)

sql = (
    f"ALTER TABLE teotil3.{table_name} "
    f"ADD CONSTRAINT {table_name}_pk "
    f'PRIMARY KEY ("regine")'
)
eng.execute(sql)

## 4. HBV modelled discharge from NVE

In [None]:
# Period of data to upload (i.e. data delivery years; for each of these,
# a whole time series from 1990 to the delivery year will be added)
st_yr, end_yr = 2016, 2022

# Whether to replace the entire table or append to it
if_exists = "replace"

In [None]:
# Folder containing modelled data
data_fold = r"/home/jovyan/shared/common/teotil3/nve_hbv_data"

years = range(st_yr, end_yr + 1)
df_list = []
for year in years:
    search_path = os.path.join(data_fold, f"RID_{year}", "hbv_*.var")
    flist = glob.glob(search_path)

    # Get number of days between 1990 and year of interest
    days = len(pd.date_range(start="1990-01-01", end="%s-12-31" % year, freq="D"))

    for fpath in flist:
        name = os.path.split(fpath)[1]
        vassom = name.split("_")[1][-7:-4]

        df = pd.read_csv(
            fpath, delim_whitespace=True, header=None, names=["date", "flow_m3/s"]
        )
        df["date"] = pd.to_datetime(df["date"], format="%Y%m%d/1200")
        df["vassom"] = vassom
        df["data_supply_year"] = year + 1
        df = df[["data_supply_year", "vassom", "date", "flow_m3/s"]]

        # Check st, end and length
        assert df["date"].iloc[0] == pd.Timestamp(
            "1990-01-01"
        ), "Series does not start on 01/01/1990."
        assert df["date"].iloc[-1] == pd.Timestamp("%s-12-31" % year), (
            "Series does not end on 31/12/%s." % year
        )
        assert len(df) == days, "Unexpected length for new series."

        df_list.append(df)

df = pd.concat(df_list, axis="rows")
assert df.duplicated(["data_supply_year", "vassom", "date"], keep=False).sum() == 0

print(f"{len(df)/1e6:.1f} million rows to insert.")

In [None]:
%%time

# The databasse can't cope with writing 20 M rows directly from pandas
# Instead, manually split the dataframe into chunks and write one
# at a time
chunk_size = 100000

table_name = "nve_hbv_discharge"

if if_exists == "replace":
    # Replace with empty table
    df.iloc[:0].to_sql(
        table_name,
        eng,
        schema="teotil3",
        index=False,
        if_exists=if_exists,
    )

# Write chunks in append mode
chunks = [df[i : i + chunk_size] for i in range(0, df.shape[0], chunk_size)]
for chunk in tqdm(chunks):
    chunk.to_sql(
        table_name,
        eng,
        schema="teotil3",
        index=False,
        if_exists="append",
        method="multi",
    )

sql = (
    f"ALTER TABLE teotil3.{table_name} "
    f"ADD CONSTRAINT {table_name}_pk "
    f"PRIMARY KEY (data_supply_year, vassom, date)"
)
eng.execute(sql)