# CONFLUENCE Tutorial 1a — Point-Scale Workflow (Paradise SNOTEL)

## Introduction

This notebook demonstrates the point-scale modeling workflow in **CONFLUENCE**, a framework for reproducible and modular computational hydrology. At the point scale, we simulate vertical energy and water fluxes at a single site, independent of routing or lateral flow, to isolate and evaluate model process representations.

Here, we focus on the **Paradise SNOTEL station (ID 602)**, located at 1,630 m elevation in Washington’s Cascade Range. This site represents a transitional snow climate and provides long-term observations of snow water equivalent (SWE) and soil moisture across multiple depths. By reproducing the observed seasonal snow and soil moisture dynamics, this tutorial demonstrates how CONFLUENCE structures a controlled, transparent, and fully reproducible point-scale experiment.

Through this example, you will see how configuration-driven workflows manage experiment setup, geospatial definition, input data preprocessing, model instantiation, and performance evaluation—building a foundation for more complex distributed modeling studies later in the series.


# Step 1 — Configuration (pick or generate)

We begin by selecting (or programmatically generating) a single configuration file that fully specifies the experiment. This keeps the workflow reproducible and makes initialization a one-liner.


In [None]:
# Step 1 — Create a site-specific configuration for the Paradise SNOTEL example

from pathlib import Path
import yaml

# Path to the default template configuration
config_template = Path("../0_config_files/config_point_template.yaml")

# Load the base configuration
with open(config_template, "r") as f:
    config = yaml.safe_load(f)

# === Modify key entries for the Paradise SNOTEL point-scale case ===

# Define code directory — ensures relative paths resolve correctly
config["CONFLUENCE_CODE_DIR"] = str(Path("../").resolve())

# Define data directory — location of required input and observational data
config["CONFLUENCE_DATA_DIR"] = str(Path("/path/to/CONFLUENCE_data").resolve())

# Restrict the spatial domain to a single site using latitude/longitude bounds
# This ensures domain setup treats it as a point-scale (non-routed) experiment
config["DOMAIN_DEFINITION_METHOD"] = "point"
config["DOMAIN_DISCRETIZATION"] = "GRUs"
config["BOUNDING_BOX_COORDS"] = "46.781/-121.751/46.779/-121.749"
config["POUR_POINT_COORDS"] = "46.78/-121.75"

# Enable automatic download of SNOTEL data for this station
config["DOWNLOAD_SNOTEL"] = True

# Specify model and forcing dataset used in this example
config["HYDROLOGICAL_MODEL"] = "SUMMA"     # SUMMA is the process-based model used here
config["FORCING_DATASET"] = "ERA5"         # ERA5 reanalysis for meteorological inputs

# Define the temporal extent of the experiment
config["EXPERIMENT_TIME_START"] = "2000-01-01 01:00"
config["EXPERIMENT_TIME_END"] = "2002-12-31 23:00"
config['CALIBRATION_PERIOD'] = "2000-10-01, 2001-09-30"
config['EVALUATION_PERIOD'] = "2001-10-01, 2002-09-30"
config['SPINUP_PERIOD'] = "2000-01-01, 2000-09-30"

# Assign a descriptive domain name and experiment ID
config["DOMAIN_NAME"] = "paradise"
config["EXPERIMENT_ID"] = "run_1"

# MAF paths and settings - if you have access to MAF data stored on DRAC, Access Anvil or U calgary ARC 
config['DATATOOL_DATASET_ROOT'] = '/path/to/meteorological-data/'           # Path to datatool datasets root directory
config['GISTOOL_DATASET_ROOT'] = '/path/to/geospatial-data/'                # Path to gistool datasets root directory
config['TOOL_CACHE'] = '/path/to/cache/dir'                                 # Path to gistool cache directory
config['CLUSTER_JSON']: '/path/to/cluster.json'                             # Path to cluster json config
config['SNOW_DATA_SOURCE'] = 'SNOTEL'                                # Snow data source: SNOTEL, manual
config['SNOW_STATIONS'] = '679'                                        # Snow station IDs
config['ISMN_NETWORK'] = 'SCAN'                                        # ISMN network name
config['ISMN_STATIONS'] = '679'                                        # ISMN station IDs

# Optimisation settings
config['PARAMS_TO_CALIBRATE'] = 'tempCritRain,k_soil,vGn_n,theta_sat'

# === Save the customized configuration ===
out_config = Path("../0_config_files/config_paradise.yaml")
with open(out_config, "w") as f:
    yaml.dump(config, f, default_flow_style=False, sort_keys=False)

print(f"✅ New configuration written to: {out_config}")

## Step 1b — Initialize CONFLUENCE

With the configuration prepared, we now initialize **CONFLUENCE**.  
This step reads the configuration file, sets up the project directory, and registers all workflow managers (data, domain, model, and evaluation).  


In [None]:
# Step 1b — Initialize CONFLUENCE

from confluence import CONFLUENCE  # adjust if your import path differs

config_path = "../0_config_files/config_paradise.yaml"
confluence = CONFLUENCE(config_path)

print("✅ CONFLUENCE initialized successfully.")
print(f"Configuration loaded from: {config_path}")

## Step 1c — Project structure setup

We now create the standardized project directory and a pour-point feature for the site.  
This anchors the experiment in a clear, reproducible file layout and records the site location for downstream domain and data steps.


In [None]:
# Step 1c — Project structure setup

from pathlib import Path

# 1) Create the standardized project layout (logs, config link, data/output folders, etc.)
project_dir = confluence.managers['project'].setup_project()

# 2) Create a pour-point feature (the site reference geometry for point-scale workflows)
pour_point_path = confluence.managers['project'].create_pour_point()

print("✅ Project structure created.")
print(f"Project root: {project_dir}")
print(f"Pour point:   {pour_point_path}")

# 3) Brief top-level directory preview
print("\nTop-level structure:")
for p in sorted(Path(project_dir).iterdir()):
    if p.is_dir():
        print(f"├── {p.name}")

## Step 2 — Domain definition (point-scale GRU)

For the Paradise SNOTEL example, the domain is a **single GRU** representing the site footprint.  
This keeps the workflow strictly point-scale (no routing), aligning the geometry with the pour point created in Step 1.

### Step 2a — Geospatial attribute acquisition

We first acquire site attributes (elevation, land cover, soils, etc.).  
These are model-agnostic inputs used to parameterize vertical energy and water balance at the site.


In [None]:
# Step 2a — Acquire attributes (model-agnostic)
confluence.managers['data'].acquire_attributes()
print("✅ Attribute acquisition complete")

### Step 2b — Domain definition (point-scale)

With attributes prepared, we define a point-scale domain consistent with the pour point.  
For this example, the domain is a minimal footprint around the Paradise SNOTEL site.

In [None]:
# Step 2b — Define the point-scale domain
watershed_path = confluence.managers['domain'].define_domain()
print("✅ Domain definition complete")
print(f"Domain file: {watershed_path}")

### Step 2c — Discretization (required even for 1 GRU = 1 HRU)

Discretization writes the **catchment HRU shapefile** and related artifacts required by downstream steps.  
For the point-scale case we set `DOMAIN_DISCRETIZATION: GRUs`, which creates a **single HRU** identical to the GRU while still generating the standardized outputs.


In [None]:
# Step 2c — Discretization (GRUs → HRUs 1:1, but files are still created)
hru_path = confluence.managers['domain'].discretize_domain()
print("✅ Domain discretization complete")
print(f"HRU file: {hru_path}")

## Step 2d — Verification & inspection (Paradise SNOTEL)

We verify that discretization produced the expected shapefiles in the standardized locations, then plot a minimal GRU–HRU overlay.

**Expected files**
- `domain_dir/shapefiles/river_basins/paradise_riverBasins_point.shp` (GRU)
- `domain_dir/shapefiles/catchment/paradise_HRUs_GRUs.shp` (HRU)


In [None]:
# Step 2d — Verify domain outputs and inspect geometry

from pathlib import Path
import geopandas as gpd
import matplotlib.pyplot as plt
import yaml

# 1) Read config to get data dir and domain name
with open("../0_config_files/config_paradise.yaml") as f:
    cfg = yaml.safe_load(f)

data_dir   = Path(cfg["CONFLUENCE_DATA_DIR"])
domain_dir = data_dir / f"domain_{cfg['DOMAIN_NAME']}"
shp_dir    = domain_dir / "shapefiles"

# 2) Explicit expected shapefiles for Paradise
gru_fp = shp_dir / "river_basins" / "paradise_riverBasins_point.shp"
hru_fp = shp_dir / "catchment"     / "paradise_HRUs_GRUs.shp"

# 3) Verify presence
for label, path in [("GRU", gru_fp), ("HRU", hru_fp)]:
    if not path.exists():
        raise FileNotFoundError(f"❌ Expected {label} file not found: {path}")
    print(f"✅ {label} file found: {path}")

# 4) Minimal overlay plot
gru = gpd.read_file(gru_fp)
hru = gpd.read_file(hru_fp)
if hru.crs != gru.crs:
    hru = hru.to_crs(gru.crs)

ax = gru.plot(figsize=(6, 6))
hru.plot(ax=ax, facecolor="none")
ax.set_title("Paradise SNOTEL — GRU vs HRU")
ax.set_xlabel("")
ax.set_ylabel("")
ax.set_aspect("equal")
plt.tight_layout()
plt.show()

# Step 3 — Input preprocessing (model-agnostic)

We prepare inputs in three small moves:
1) acquire **meteorological forcings**,  
2) process **observations** (SNOTEL), and  
3) run **model-agnostic preprocessing** to standardize time steps, variables, and units for downstream use.

### Step 3a — Acquire meteorological forcings (ERA5)

Downloads/subsets the forcings for the Paradise domain.


In [None]:
# Step 3a — Forcings
confluence.managers['data'].acquire_forcings()
print("✅ Forcing data acquisition complete")


### Step 3b — Process observations (SNOTEL)

Parses site observations (e.g., SWE, soil moisture), applies basic QA/QC, and stores standardized outputs.


In [None]:
# Step 3b — Observations
confluence.managers['data'].process_observed_data()
print("✅ Observational data processing complete")

### Step 3c — Model-agnostic preprocessing

Standardizes variable names, units, and time steps (and fills required diagnostics) so multiple models can consume the same inputs consistently.

In [None]:
# Step 3c — Model-agnostic preprocessing
confluence.managers['data'].run_model_agnostic_preprocessing()
print("✅ Model-agnostic preprocessing complete")

### Step 3d — Quick verification

We confirm the expected folders exist and contain files:

- `forcing/raw_data/`
- `forcing/basin_averaged_data/`
- `observations/snow/{raw,processed}/`
- `observations/soil_moisture/{raw,processed}/`


In [None]:
from pathlib import Path
import yaml

# Derive paths from the config (no hard-coding)
with open("../0_config_files/config_paradise.yaml") as f:
    cfg = yaml.safe_load(f)

data_dir   = Path(cfg["CONFLUENCE_DATA_DIR"])
domain_dir = data_dir / f"domain_{cfg['DOMAIN_NAME']}"

targets = {
    "forcing/raw_data":                  domain_dir / "forcing" / "raw_data",
    "forcing/basin_averaged_data":       domain_dir / "forcing" / "basin_averaged_data",
    "observations/snow/raw":             domain_dir / "observations" / "snow" / "raw",
    "observations/snow/processed":       domain_dir / "observations" / "snow" / "processed",
    "observations/soil_moisture/raw":    domain_dir / "observations" / "soil_moisture" / "raw",
    "observations/soil_moisture/processed": domain_dir / "observations" / "soil_moisture" / "processed",
}

def count_files(p: Path) -> int:
    return sum(1 for x in p.iterdir() if x.is_file()) if p.exists() else 0

for label, path in targets.items():
    exists = path.exists()
    n = count_files(path)
    status = "✅" if exists and n > 0 else ("⚠️ empty" if exists else "❌ missing")
    suffix = f"({n} files)" if exists else ""
    print(f"{status} {label}  {suffix}")

# Step 4 — Model-specific preprocessing & model run (SUMMA)

We now convert the model-agnostic inputs into **SUMMA-ready inputs**, then instantiate and run the model for the Paradise point-scale case.


### Step 4a — SUMMA-specific preprocessing

Creates the SUMMA input bundle (metadata, parameter tables, forcing links) from the standardized inputs.


In [None]:
# Step 4a — SUMMA-specific preprocessing
confluence.managers['model'].preprocess_models()
print("✅ Model-specific preprocessing complete")

## Step 4b — Instantiate & run the model

Instantiates the model using the prepared inputs and executes the point-scale simulation.


In [None]:
# Step 4b — Instantiate & run SUMMA
print(f"Running {confluence.config['HYDROLOGICAL_MODEL']} for point-scale simulation…")
confluence.managers['model'].run_models()
print("✅ Point-scale model run complete")

### Step 4c - Quick verification

Print where SUMMA inputs and run outputs were written (paths are derived from the configuration).


In [None]:
from pathlib import Path
import yaml

with open("../0_config_files/config_paradise.yaml") as f:
    cfg = yaml.safe_load(f)

data_dir   = Path(cfg["CONFLUENCE_DATA_DIR"])
domain_dir = data_dir / f"domain_{cfg['DOMAIN_NAME']}"

# Common locations used by the model manager
summa_in   = domain_dir / "forcing" / "SUMMA_input"
results    = domain_dir / "simulations" / cfg['EXPERIMENT_ID'] / 'SUMMA' 

print("SUMMA input dir:", summa_in if summa_in.exists() else "(not found)")
print("Results dir:",    results if results.exists()    else "(not found)")

In [None]:
# Step 4c — Minimal evaluation: SWE & soil moisture (obs vs sim)

from pathlib import Path
import yaml, pandas as pd, numpy as np, xarray as xr
import matplotlib.pyplot as plt
import re

# --- Paths from config ---
with open("../0_config_files/config_paradise.yaml") as f:
    cfg = yaml.safe_load(f)

data_dir   = Path(cfg["CONFLUENCE_DATA_DIR"])
domain_dir = data_dir / f"domain_{cfg['DOMAIN_NAME']}"

# Find a daily SUMMA output (e.g., *_day.nc) under the domain folder
nc_files = list(domain_dir.rglob("*_day.nc"))
if not nc_files:
    raise FileNotFoundError("No daily netCDF found (pattern '*_day.nc').")
nc_path = nc_files[0]

# Observations (processed)
swe_obs_csv = domain_dir / "observations" / "snow" / "processed" / f"{cfg['DOMAIN_NAME']}_swe_processed.csv"
sm_obs_csv  = domain_dir / "observations" / "soil_moisture" / "processed" / f"{cfg['DOMAIN_NAME']}_sm_processed.csv"

# --- Helpers ---
def rmse(a, b): 
    d = (a - b).to_numpy(dtype=float)
    return np.sqrt(np.nanmean(d**2))

def bias(a, b):
    return float((a - b).mean())

def first_numeric_col(df, include_prefix=None):
    cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])]
    if include_prefix:
        cols = [c for c in cols if c.startswith(include_prefix)] or cols
    return cols[0] if cols else None

# --- Load simulation (daily) and skip first full year as spinup ---
ds = xr.open_dataset(nc_path)
start_year = int(ds["time"].dt.year.min()) + 1
ds_eval = ds.sel(time=slice(f"{start_year}-01-01", None))

# SWE (simulation) — try common SUMMA variable names
swe_var_candidates = ["scalarSWE", "scalarSnowWaterEquivalent", "SWE"]
swe_var = next((v for v in swe_var_candidates if v in ds_eval.data_vars), None)
if swe_var is None:
    raise KeyError(f"SWE variable not found. Tried: {swe_var_candidates}")
swe_sim = ds_eval[swe_var].to_series()  # index: time

# Soil moisture (simulation) — pick first layer from typical variable names
sm_var_candidates = ["mLayerVolFracLiq", "mLayerVolFracWat", "mLayerVolFrac", "mLayerSoilMoist"]
sm_var = next((v for v in sm_var_candidates if v in ds_eval.data_vars), None)
if sm_var is None:
    raise KeyError(f"Soil moisture variable not found. Tried: {sm_var_candidates}")

# Try common layer dimension names
layer_dim = next((d for d in ds_eval[sm_var].dims if re.search("layer|soil", d, re.I)), None)
sm_sim = (ds_eval[sm_var].isel({layer_dim: 0}) if layer_dim else ds_eval[sm_var]).to_series()

# --- Load observations ---
# SWE obs: parse dates; pick first numeric column (e.g., SWE_mm)
swe_obs = pd.read_csv(swe_obs_csv, parse_dates=True, index_col=0)
if not isinstance(swe_obs.index, pd.DatetimeIndex):
    # fall back to common date column names
    swe_obs = pd.read_csv(swe_obs_csv)
    date_col = next(c for c in swe_obs.columns if re.search("date|time", c, re.I))
    swe_obs[date_col] = pd.to_datetime(swe_obs[date_col])
    swe_obs = swe_obs.set_index(date_col)
swe_obs_col = first_numeric_col(swe_obs)
swe_obs_series = swe_obs[swe_obs_col].dropna()

# Soil moisture obs: columns like sm_<depth>, pick the first
sm_obs = pd.read_csv(sm_obs_csv, parse_dates=True, index_col=0)
if not isinstance(sm_obs.index, pd.DatetimeIndex):
    sm_obs = pd.read_csv(sm_obs_csv)
    date_col = next(c for c in sm_obs.columns if re.search("date|time", c, re.I))
    sm_obs[date_col] = pd.to_datetime(sm_obs[date_col])
    sm_obs = sm_obs.set_index(date_col)
sm_obs_col = first_numeric_col(sm_obs, include_prefix="sm_")
sm_obs_series = sm_obs[sm_obs_col].dropna()

# --- Align and compute metrics ---
def align(a, b):
    idx = a.index.intersection(b.index)
    return a.loc[idx].astype(float), b.loc[idx].astype(float)

swe_sim_a, swe_obs_a = align(swe_sim, swe_obs_series)
sm_sim_a,  sm_obs_a  = align(sm_sim, sm_obs_series)

def corr(a, b):
    return float(pd.Series(a).corr(pd.Series(b)))

# Metrics
swe_metrics = dict(RMSE=rmse(swe_sim_a, swe_obs_a),
                   Bias=bias(swe_sim_a, swe_obs_a),
                   r=corr(swe_sim_a, swe_obs_a))
sm_metrics  = dict(RMSE=rmse(sm_sim_a, sm_obs_a),
                   Bias=bias(sm_sim_a, sm_obs_a),
                   r=corr(sm_sim_a, sm_obs_a))

print("SWE metrics:", {k: (round(v,3) if isinstance(v,(int,float)) else v) for k,v in swe_metrics.items()})
print("Soil moisture metrics:", {k: (round(v,3) if isinstance(v,(int,float)) else v) for k,v in sm_metrics.items()})

# --- Minimal plots ---
fig, axes = plt.subplots(2, 1, figsize=(8, 6), sharex=True)

axes[0].plot(swe_obs_a.index, swe_obs_a.values, label="Obs")
axes[0].plot(swe_sim_a.index, swe_sim_a.values, label="Sim")
axes[0].set_title("SWE (obs vs sim)")
axes[0].legend()

axes[1].plot(sm_obs_a.index, sm_obs_a.values, label=f"Obs ({sm_obs_col})")
axes[1].plot(sm_sim_a.index, sm_sim_a.values, label="Sim (top layer)")
axes[1].set_title("Soil moisture (obs vs sim)")
axes[1].legend()

plt.tight_layout()
plt.show()

ds.close()

# Step 5 — Calibration (SUMMA, Differential Evolution)

We enable **iterative calibration** for SUMMA, set the **calibration/evaluation periods**, choose **parameters**, and pick a **single objective** (KGE).  
CONFLUENCE exposes a one-liner to run calibration once config is set.


## Step 5a — Minimal config (what matters)

Add/confirm these in `config_paradise.yaml`:

```yaml
# Enable iterative calibration with DE, use KGE on the calibration window
OPTIMISATION_METHODS: [iteration]
ITERATIVE_OPTIMIZATION_ALGORITHM: DE      # DE, DDS, PSO, SCE-UA, NSGA-II
OPTIMIZATION_METRIC: KGE                  # KGE, NSE, RMSE, MAE, KGEp

# Parameters to calibrate (point-scale set)
PARAMS_TO_CALIBRATE: tempCritRain,k_soil,vGn_n,theta_sat


In [None]:
# Step 5b — Run calibration (DE + KGE)

results_file = confluence.managers['optimization'].calibrate_model()  
print("Calibration results file:", results_file)