In [1]:
from pathlib import Path
import sys

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.cluster.hierarchy import fcluster, linkage
from scipy.ndimage import median_filter
from scipy.signal import savgol_filter
import xarray as xr

sys.path.append("../")
from src.analytics.cluster_hex_analysis import (
    aggregate_clusters_to_hex,
    assign_gauges_to_hybrid_classes,
    consolidate_hybrid_combinations,
    map_secondary_cluster_to_hex,
)
from src.plots.hex_utils import build_hex_grid, to_equal_area
from src.readers.geom_reader import load_geodata
from src.utils.logger import setup_logger

plt.rcParams["font.family"] = "DeJavu Serif"
plt.rcParams["font.serif"] = ["Times New Roman"]

log = setup_logger("hybrid_analysis", log_file="../logs/hybrid_analysis.log")

# Load geometry
ws, gauges = load_geodata(folder_depth="../")
gauges.index = gauges.index.map(str)
ws.index = ws.index.map(str)

print(f"Loaded {len(gauges)} gauges and {len(ws)} watersheds")

Loaded 996 gauges and 996 watersheds


## 1. Load Geographical Clusters (9 clusters)

In [2]:
# Load HydroATLAS data
geo_data = pd.read_csv(
    "../data/attributes/hydro_atlas_cis_camels.csv",
    dtype={"gauge_id": str},
)

if "gauge_id" in geo_data.columns:
    geo_data = geo_data.set_index("gauge_id")

geo_data.index = geo_data.index.map(str)

# Static parameters for clustering
static_parameters = [
    "for_pc_use",
    "crp_pc_use",
    "inu_pc_ult",
    "ire_pc_use",
    "lka_pc_use",
    "prm_pc_use",
    "pst_pc_use",
    "cly_pc_uav",
    "slt_pc_uav",
    "snd_pc_uav",
    "kar_pc_use",
    "urb_pc_use",
    "gwt_cm_sav",
    "lkv_mc_usu",
    "rev_mc_usu",
    "slp_dg_uav",
    "sgr_dk_sav",
    "ws_area",
    "ele_mt_uav",
]

# Align indices
available = pd.Index(gauges.index).intersection(geo_data.index)
gauges = gauges.loc[available].copy()
ws = ws.loc[available].copy()
common_index = gauges.index.to_list()

geo_subset = geo_data.loc[common_index, static_parameters]
geo_scaled = (geo_subset - geo_subset.min()) / (geo_subset.max() - geo_subset.min())

# Hierarchical clustering
n_geo_clusters = 9
Z_geo = linkage(geo_scaled.values, method="ward", metric="euclidean")
geo_labels = fcluster(Z_geo, t=n_geo_clusters, criterion="maxclust")

print(f"Geographic clustering: {n_geo_clusters} clusters for {len(common_index)} gauges")

Geographic clustering: 9 clusters for 996 gauges


## 2. Load Hydrological Clusters (10 clusters)

In [3]:
# Load discharge time series
discharge_data = {}
dis_column = "q_mm_day"

for gauge_id in ws.index:
    try:
        with xr.open_dataset(f"../data/nc_all_q/{gauge_id}.nc") as ds:
            df = ds.to_dataframe()
        discharge_data[gauge_id] = df[[dis_column]].squeeze()
    except Exception as e:
        log.error(f"Error loading discharge for {gauge_id}: {e}")
        continue

# Calculate median seasonal cycle
q_df = {}
for gauge_id in discharge_data.keys():
    try:
        ts = discharge_data[gauge_id]
        seasonal_cycle = ts.groupby([ts.index.month, ts.index.day]).median().values
        q_df[gauge_id] = seasonal_cycle
    except Exception as e:
        log.error(f"Error calculating seasonal cycle for {gauge_id}: {e}")
        continue

q_df = pd.DataFrame.from_dict(q_df, orient="columns")
q_df = q_df.loc[:, q_df.max() < 50]

# Spike removal and smoothing
q_df_smoothed = q_df.copy()
for col in q_df_smoothed.columns:
    series = q_df_smoothed[col].values
    smoothed = median_filter(series, size=5, mode="wrap")
    residuals = np.abs(series - smoothed)
    threshold = residuals.std() * 3
    spike_mask = residuals > threshold

    if spike_mask.any():
        valid_indices = np.where(~spike_mask)[0]
        spike_indices = np.where(spike_mask)[0]
        if len(valid_indices) > 1:
            interpolated_values = np.interp(
                spike_indices, valid_indices, series[valid_indices]
            )
            series[spike_indices] = interpolated_values

    series_smoothed = savgol_filter(series, window_length=11, polyorder=3, mode="wrap")
    q_df_smoothed[col] = series_smoothed

q_df = q_df_smoothed.copy()

# Normalize to [0, 1]
q_df_normalized = (q_df - q_df.min()) / (q_df.max() - q_df.min())

# Prepare for clustering (rows=gauges, columns=days)
q_df_clust = q_df_normalized.T.copy()

# Add spatial coordinates
for gauge_id in q_df_clust.index:
    q_df_clust.loc[gauge_id, "lat"] = gauges.loc[gauge_id, "geometry"].y
    q_df_clust.loc[gauge_id, "lon"] = gauges.loc[gauge_id, "geometry"].x

q_df_clust = q_df_clust.dropna()
hydro_index = q_df_clust.index

# Hierarchical clustering
n_hydro_clusters = 10
Z_hydro = linkage(q_df_clust.values, method="ward", metric="euclidean")
hydro_labels = fcluster(Z_hydro, t=n_hydro_clusters, criterion="maxclust")

print(
    f"Hydrological clustering: {n_hydro_clusters} clusters for {len(hydro_index)} gauges"
)

Hydrological clustering: 10 clusters for 996 gauges


## 3. Create Raw Hybrid Combinations (Ф#-Г#)

In [4]:
# Create gauge-level dataframe with both clusters
gauge_clustered = gauges.copy()
gauge_clustered["geo_cluster"] = [f"G{cl}" for cl in geo_labels]
gauge_clustered["hydro_cluster"] = [f"H{cl}" for cl in hydro_labels]
gauge_clustered["hybrid_combo"] = (
    gauge_clustered["geo_cluster"] + "-" + gauge_clustered["hydro_cluster"]
)

# Apply Russian naming
gauge_clustered["geo_cluster_ru"] = gauge_clustered["geo_cluster"].str.replace("G", "Ф")
gauge_clustered["hydro_cluster_ru"] = gauge_clustered["hydro_cluster"].str.replace(
    "H", "Г"
)
gauge_clustered["hybrid_combo_ru"] = (
    gauge_clustered["geo_cluster_ru"] + "-" + gauge_clustered["hydro_cluster_ru"]
)

combo_counts = gauge_clustered["hybrid_combo"].value_counts()
print(f"\nRaw hybrid combinations: {len(combo_counts)}")
print(f"Gauges per combination (mean): {combo_counts.mean():.1f}")
print(f"Gauges per combination (median): {combo_counts.median():.0f}")
print("\nTop 10 combinations:")
print(combo_counts.head(10))


Raw hybrid combinations: 51
Gauges per combination (mean): 19.5
Gauges per combination (median): 11

Top 10 combinations:
hybrid_combo
G1-H5    68
G5-H3    66
G9-H7    64
G1-H1    52
G1-H3    52
G4-H2    50
G3-H8    50
G9-H9    44
G8-H1    43
G5-H2    43
Name: count, dtype: int64


## 4. Consolidate via Hex-Based Aggregation (~16 classes)

In [5]:
# Transform to equal-area projection
aea_crs = (
    "+proj=aea +lat_0=56 +lon_0=100 +lat_1=50 +lat_2=70 "
    "+x_0=0 +y_0=0 +datum=WGS84 +units=m +no_defs"
)

ws_aea = to_equal_area(ws, crs=aea_crs)
gauges_aea = to_equal_area(gauge_clustered, crs=aea_crs)

# Build hexagonal grid
from shapely.geometry import box

extent_poly = box(*ws_aea.total_bounds)
hex_grid = build_hex_grid(extent_poly, r_km=80, crs=aea_crs)

print(f"Created {len(hex_grid)} hexagons with radius ~80 km")

# Aggregate geo clusters to hexes
ws_with_geo = ws_aea.copy()
ws_with_geo["geo_cluster"] = gauge_clustered.loc[ws_aea.index, "geo_cluster"]

hexes_with_geo = aggregate_clusters_to_hex(
    watersheds=ws_with_geo,
    hexes=hex_grid,
    cluster_col="geo_cluster",
    method="dominant",
    min_watersheds=2,
)

print(f"Hexes with dominant geo cluster: {len(hexes_with_geo)}")

# Map hydro clusters to hexes
ws_with_hydro = ws_aea.copy()
ws_with_hydro["hydro_cluster"] = gauge_clustered.loc[ws_aea.index, "hydro_cluster"]

hexes_with_both = map_secondary_cluster_to_hex(
    watersheds=ws_with_hydro,
    hexes_with_primary=hexes_with_geo,
    primary_cluster_col="dominant_cluster",
    secondary_cluster_col="hydro_cluster",
    hex_id_col="hex_id",
)

# Create hybrid combo on hex level
hexes_with_both["hybrid_combo"] = (
    hexes_with_both["dominant_cluster"].astype(str)
    + "-"
    + hexes_with_both["dominant_hydro_cluster"].astype(str)
)

hex_combo_counts = hexes_with_both["hybrid_combo"].value_counts()
print(f"\nUnique hex-level combinations: {len(hex_combo_counts)}")

Created 1897 hexagons with radius ~80 km
Hexes with dominant geo cluster: 231

Unique hex-level combinations: 37


## 5. Assign Gauges to Consolidated Classes

In [6]:
# Assign gauges to hex-based hybrid classes
gauges_with_combos = assign_gauges_to_hybrid_classes(
    gauges=gauges_aea,
    watersheds=ws_aea,
    hexes_with_combos=hexes_with_both,
    combo_col="hybrid_combo",
    hex_id_col="hex_id",
    k_neighbors=5,
)

print(f"Gauges with hybrid combos: {gauges_with_combos['hybrid_combo'].notna().sum()}")
print(f"Unique combinations: {gauges_with_combos['hybrid_combo'].nunique()}")

# Consolidate to target N classes
gauges_final, mapping_df = consolidate_hybrid_combinations(
    gauges_with_combos=gauges_with_combos,
    combo_col="hybrid_combo",
    target_n_classes=15,
    min_gauges_per_class=5,
)

print("\n=== FINAL RESULT ===")
print(f"Final hybrid classes: {gauges_final['hybrid_class'].nunique()}")
print(
    f"Gauges per class (mean): {gauges_final['hybrid_class'].value_counts().mean():.1f}"
)
print("\nClass distribution:")
print(gauges_final["hybrid_class"].value_counts().sort_index())

Gauges with hybrid combos: 996
Unique combinations: 37

=== FINAL RESULT ===
Final hybrid classes: 16
Gauges per class (mean): 62.2

Class distribution:
hybrid_class
G1-H1        46
G1-H3        66
G1-H5       110
G2-H4        33
G3-H8       115
G4-H2        54
G5-H1        51
G5-H10       40
G5-H2        41
G5-H3       100
G6-H2        62
G8-H1        75
G8-H10       40
G9-H7       105
G9-H9        43
Other-G7     15
Name: count, dtype: int64


## 6. Apply Russian Naming to Final Classes

In [7]:
# Convert back to WGS84
gauges_final_wgs84 = gauges_final.to_crs(gauges.crs)
gauges_final_wgs84.index = gauge_clustered.index


# Apply Russian naming to hybrid_class
def convert_to_russian(hybrid_class: str) -> str:
    """Convert G#-H# format to Ф#-Г# format."""
    if pd.isna(hybrid_class):
        return hybrid_class
    return hybrid_class.replace("G", "Ф").replace("H", "Г")


gauges_final_wgs84["hybrid_class_ru"] = gauges_final_wgs84["hybrid_class"].apply(
    convert_to_russian
)

# Merge back to gauge_clustered
gauge_clustered = gauge_clustered.join(
    gauges_final_wgs84[["hybrid_class", "hybrid_class_ru"]], how="left"
)

print("\nFinal classes with Russian naming:")
print(gauge_clustered["hybrid_class_ru"].value_counts().sort_index())


Final classes with Russian naming:
hybrid_class_ru
Other-Ф7     15
Ф1-Г1        46
Ф1-Г3        66
Ф1-Г5       110
Ф2-Г4        33
Ф3-Г8       115
Ф4-Г2        54
Ф5-Г1        51
Ф5-Г10       40
Ф5-Г2        41
Ф5-Г3       100
Ф6-Г2        62
Ф8-Г1        75
Ф8-Г10       40
Ф9-Г7       105
Ф9-Г9        43
Name: count, dtype: int64


## 7. Extract DOY-Based Peak Timing from Discharge Patterns

In [8]:
# Calculate peak DOY for each hydrological cluster
from collections import OrderedDict

cluster_q_mm = OrderedDict()
for cluster_id in range(1, n_hydro_clusters + 1):
    cluster_gauges = gauge_clustered[
        gauge_clustered["hydro_cluster"] == f"G{cluster_id}"
    ].index
    if len(cluster_gauges) == 0:
        continue

    # Get normalized patterns for this cluster
    cluster_patterns = q_df_normalized[cluster_gauges]
    median_pattern = cluster_patterns.median(axis=1)
    cluster_q_mm[f"Г{cluster_id}"] = median_pattern.values

# Extract regime characteristics
regime_data = []
for cluster_name, pattern in cluster_q_mm.items():
    peak_doy = np.argmax(pattern) + 1  # +1 for 1-based DOY
    peak_value = np.max(pattern)

    # Seasonal ratios
    winter = pattern[:59].mean()  # Jan-Feb
    spring = pattern[59:151].mean()  # Mar-May
    summer = pattern[151:243].mean()  # Jun-Aug
    autumn = pattern[243:334].mean()  # Sep-Nov

    # Determine regime type
    if peak_doy <= 151:  # Before June
        regime_type = "Spring-dominated (snowmelt)"
    elif peak_doy <= 243:  # June-Aug
        regime_type = "Summer-dominated (rain/glacier)"
    else:
        regime_type = "Autumn-dominated (rain)"

    cv = np.std(pattern) / np.mean(pattern)

    regime_data.append(
        {
            "cluster_name": cluster_name,
            "peak_doy": peak_doy,
            "peak_value": peak_value,
            "regime_type": regime_type,
            "winter_ratio": winter,
            "spring_ratio": spring,
            "summer_ratio": summer,
            "autumn_ratio": autumn,
            "cv": cv,
        }
    )

regime_df = pd.DataFrame(regime_data)
print("\nHydrological Regime Characteristics:")
print(regime_df[["cluster_name", "peak_doy", "regime_type", "cv"]])


Hydrological Regime Characteristics:


KeyError: "None of [Index(['cluster_name', 'peak_doy', 'regime_type', 'cv'], dtype='object')] are in the [columns]"

## 8. Export Gauge Mapping with Hex Assignments

In [None]:
# Create final export dataframe
export_df = gauge_clustered[
    ["geo_cluster_ru", "hydro_cluster_ru", "hybrid_combo_ru", "hybrid_class_ru"]
].copy()
export_df = export_df.reset_index().rename(columns={"index": "gauge_id"})

# Add hex_id from gauges_final_wgs84
if "hex_id" in gauges_final_wgs84.columns:
    export_df = export_df.merge(
        gauges_final_wgs84.reset_index()[["index", "hex_id"]],
        left_on="gauge_id",
        right_on="index",
        how="left",
    ).drop(columns=["index"])

# Export to CSV
output_dir = Path("../res/chapter_one")
output_dir.mkdir(parents=True, exist_ok=True)

output_file = output_dir / "gauge_hybrid_mapping.csv"
export_df.to_csv(output_file, index=False)

print(f"\n✓ Exported gauge mapping to {output_file}")
print(f"Columns: {list(export_df.columns)}")
print(f"Total gauges: {len(export_df)}")
print("\nSample rows:")
print(export_df.head(10))

## 9. Generate YAML Reference Documentation

In [None]:
import yaml

# Build YAML structure
yaml_data = {
    "hybrid_classification": {
        "description": "Hybrid geo-hydrological classification of Russian watersheds",
        "n_geo_clusters": n_geo_clusters,
        "n_hydro_clusters": n_hydro_clusters,
        "n_final_classes": int(gauges_final["hybrid_class"].nunique()),
        "naming_convention": {
            "geo": "Ф# (Физико-географический кластер)",
            "hydro": "Г# (Гидрологический кластер)",
            "hybrid": "Ф#-Г# (гибридная комбинация)",
        },
        "classes": {},
    }
}

# Add class details
for hybrid_class in sorted(gauges_final["hybrid_class"].dropna().unique()):
    class_gauges = gauge_clustered[gauge_clustered["hybrid_class"] == hybrid_class]
    n_gauges = len(class_gauges)

    # Get Russian name
    class_ru = class_gauges["hybrid_class_ru"].iloc[0] if len(class_gauges) > 0 else ""

    # Extract geo and hydro parts
    parts = hybrid_class.split("-")
    geo_part = parts[0] if len(parts) > 0 else ""
    hydro_part = parts[1] if len(parts) > 1 else ""

    # Get regime info if available
    regime_info = None
    hydro_ru = hydro_part.replace("H", "Г")
    regime_match = regime_df[regime_df["cluster_name"] == hydro_ru]
    if not regime_match.empty:
        regime_info = {
            "peak_doy": int(regime_match.iloc[0]["peak_doy"]),
            "regime_type": regime_match.iloc[0]["regime_type"],
            "cv": float(regime_match.iloc[0]["cv"]),
        }

    yaml_data["hybrid_classification"]["classes"][class_ru] = {
        "n_gauges": n_gauges,
        "geo_cluster": geo_part.replace("G", "Ф"),
        "hydro_cluster": hydro_ru,
        "regime": regime_info,
    }

# Export YAML
yaml_file = output_dir / "hybrid_classification_reference.yml"
with open(yaml_file, "w", encoding="utf-8") as f:
    yaml.dump(yaml_data, f, allow_unicode=True, sort_keys=False, default_flow_style=False)

print(f"\n✓ Exported YAML reference to {yaml_file}")