# Define the library

In [61]:
from typing import Final, Optional
from copy import deepcopy
from functools import reduce
import operator
from __future__ import annotations  # noqa: F404
from dataclasses import dataclass
from scipy.optimize import root_scalar
import pandas as pd 
from pandas import DataFrame, Series
import numpy as np
from numpy.typing import NDArray

## Zones and data classes

In [94]:
@dataclass(frozen=True)
class HydroForcing:
    precip: float
    temp: float
    pet: float

    @staticmethod 
    def combine(forc: list[HydroForcing], scales: list[float]) -> HydroForcing:
        """Combine a series of values into a single value. Note that this only 
        scales the temperature, as precip and PET are volumes
        """
        raise NotImplementedError()

@dataclass(frozen=True)
class ForcingInternal:
    precip: float 
    temp: float 
    pet: float
    q_in: float

@dataclass(frozen=True)
class HydroStep:
    state: float
    forc_flux: float
    vap_flux: float
    lat_flux: float
    vert_flux: float


@dataclass(frozen=True)
class HydrologicZone:

    def vert_flux(self, s: float, d: HydroForcing) -> float:
        """Calculate the vertical flux"""
        return 0.0

    def lat_flux(self, s: float, d: HydroForcing) -> float:
        """Calculate the lateral flux"""
        return 0.0

    def vap_flux(self, s: float, d: HydroForcing) -> float:
        """Calculate the vaporization flux"""
        return 0.0

    def forc_flux(self, s: float, d: HydroForcing) -> float:
        """Forcing flux"""
        return 0.0

    def mass_balance(self, s: float, d: HydroForcing) -> float:
        return (
            self.forc_flux(s, d)
            - self.vert_flux(s, d)
            - self.lat_flux(s, d)
            - self.vap_flux(s, d)
        )

    def param_list(self) -> list[float]:
        return []

    def step(self, s: float, d: HydroForcing, dt: float) -> HydroStep:
        def f(s_new: float) -> float:
            return (s_new - s) - dt * self.mass_balance(0.5 * (s + s_new), d)

        s_new: float = root_scalar(f, s).root

        return HydroStep(
            s_new,
            self.forc_flux(s_new, d),
            self.vap_flux(s_new, d),
            self.lat_flux(s_new, d),
            self.vert_flux(s_new, d),
        )
    
    @classmethod
    def default(cls: type) -> HydrologicZone:
        return HydrologicZone()

    def name(self) -> str:
        return "unnamed_zone"

@dataclass(frozen=True)
class SnowZone(HydrologicZone):
    tt: float
    fmax: float

    def forc_flux(self, s: float, d: HydroForcing) -> float:
        if d.temp < self.tt:
            return d.precip
        else:
            return 0.0

    def vert_flux(self, s: float, d: HydroForcing) -> float:
        return max(0.0, self.fmax * (d.temp - self.tt))

    def param_list(self) -> list[float]:
        return [self.tt, self.fmax]
    

    @classmethod
    def default(cls: type) -> SnowZone:
        return SnowZone(0.0, 1.0)


@dataclass(frozen=True)
class SoilZone(HydrologicZone):
    tt: float
    fc: float
    lp: float
    beta: float
    k0: float
    thr: float

    def vap_flux(self, s: float, d: HydroForcing) -> float:
        return d.pet * min(1, s / self.fc)

    def lat_flux(self, s: float, d: HydroForcing) -> float:
        return max(0.0, self.k0 * (s - self.thr))

    def forc_flux(self, s: float, d: HydroForcing) -> float:
        if d.temp >= self.tt:
            return d.precip
        else:
            return 0.0

    def vert_flux(self, s: float, d: HydroForcing) -> float:
        if d.temp >= self.tt:
            return d.precip * (s / self.fc) ** self.beta
        else:
            return 0.0

    def param_list(self) -> list[float]:
        return [self.tt, self.fc, self.lp, self.beta, self.k0, self.thr]


    @classmethod
    def default(cls: type) -> SoilZone:
        return SoilZone(0.0, 100.0, 1.0, 1.0, 0.1, 50)


@dataclass(frozen=True)
class GroundZone(HydrologicZone):
    k: float
    alpha: float
    perc: float

    def vert_flux(self, s: float, d: HydroForcing) -> float:
        return min(self.perc, s)


    def lat_Flux(self, s: float, d: HydroForcing) -> float:
        return self.k * s**self.alpha


    def param_list(self) -> list[float]:
        return [self.k, self.alpha, self.perc]


    @classmethod
    def default(cls: type) -> GroundZone:
        return GroundZone(1e-3, 1.0, 1.0)
    

In [95]:
@dataclass(frozen=True)
class ForcingData:
    """Forcing data for a single location
    """
    precip: Series
    temp: Series
    pet: Series

@dataclass(frozen=True)
class Layer:
    zones: list[HydrologicZone]

    def size(self) -> int:
        return len(self.zones)
    
    def __getitem__(self, ind: int) -> Optional[HydrologicZone]:
        if ind < self.size():
            return self.zones[ind]
        else:
            return None

@dataclass(frozen=True)
class Hillslope:
    layers: list[Layer]

    def size(self) -> int:
        return reduce(operator.add, map(lambda x: x.size(), self.layers), 0)
    
    def __getitem__(self, ind: int) -> Optional[Layer]:
        if ind < len(self):
            return self.layers[ind]
        else:
            return None

    def __len__(self) -> int:
        return len(self.layers)

    def flatten(self) -> list[HydrologicZone]:
        return reduce(operator.add, map(lambda layer: layer.zones, self.layers), [])

    def get_scales(self, scales: list[float]) -> list[float]:
        """Get the scales of all zones in this hillslope based on the size of the top

        Args:
            scales (list[float]): Scales of the top zones

        Returns:
            list[float]: Relative proportion of the watershed
        """
        out_scales: list[float] = deepcopy(scales)
        cur_scales: list[float] = deepcopy(scales)

        cur_layer: Layer = self.layers[0]
        layer: Layer
        for layer in self.layers[1:]:
            if cur_layer.size() == layer.size():
                out_scales += cur_scales
            else:
                if layer.size() != 1:
                    raise ValueError(f"Cannot have layer of size {cur_layer.size()} flow into layer of size {layer.size()}")
                else:
                    out_scales += [sum(cur_scales)]
                    cur_scales = [sum(cur_scales)]
            cur_layer = layer
            
        return out_scales

    def proportion_matrix(self, scales: list[float]) -> NDArray:
        """Generate a matrix that describes how each 
        zone in this hillslope combines different proportions
        of the surface

        Args:
            scales (list[float]): The proportions of the surface

        Returns:
            NDArray: The matrix describing the connections
        """
        mat: NDArray = np.zeros((self.size(), len(scales)), dtype=float)

        for i, scale_i in enumerate(scales):
            mat[i, i] = scale_i

        cur_zone: int = len(scales)
        layer: Layer
        for i, layer in enumerate(self.layers[1:]):
            if layer.size() == self.layers[0].size():
                for j, scales_j in enumerate(scales):
                    mat[cur_zone, j] = scales_j 
                    cur_zone += 1
            else:
                for j, scales_j in enumerate(scales):
                    mat[cur_zone, j] = scales_j
                cur_zone += 1

        return mat

    def scale_forcing(self, forcing: list[ForcingData], scales: list[float]) -> list[list[HydroForcing]]:
        """Scale a set of forcing data for each of the time series

        Args:
            forcing (list[ForcingData]): Incoming forcing series
            scales (list[float]): Scales used here

        Returns:
            list[list[HydroForcing]]: Scaled forcing data in flattened form
        """
        prop_mat: NDArray = self.proportion_matrix(scales)
        
        zone_series: list[tuple[Series, Series, Series]] = []
        precips: list[Series] = [f.precip for f in forcing]
        temps: list[Series] = [f.temp for f in forcing]
        pets: list[Series] = [f.pet for f in forcing]

        row: NDArray
        for row in prop_mat:
            ppt: Series = sum([scale_i * ppt_i] for scale_i, ppt_i in zip(scales, precips))
            temp: Series = sum([scale_i * temp_i] for scale_i, temp_i in zip(scales, temps))
            pet: Series = sum([scale_i * pet_i] for scale_i, pet_i in zip(scales, pets))
            zone_series.append((ppt, temp, pet))

        zone_forcings: list[list[HydroForcing]] = [
            [
                HydroForcing(precip.iloc[i], temp.iloc[i], pet.iloc[i]) for i in range(len(precip))
            ] for (precip, temp, pet) in zone_series
        ]

        return zone_forcings

class HydrologicModel:

    def __init__(self, hillslopes: list[Hillslope], scales: list[list[float]]) -> None:
        self.__hillslopes: list[Hillslope] = hillslopes
        self.__raw_scales: list[list[float]] = scales
        self.__flat_scales: list[float] = self.flatten_scales()
        self.__flat_model: list[AnnotatedZone] = self.flatten()  # Model in linear order to be evaluated
        self.__size = self.size()  # Number of zones in the model
        self.__lat_mat = self.construct_lat_matrix()
        self.__vert_mat = self.construct_vert_matrix()

    @property
    def hillslopes(self) -> list[Hillslope]:
        return self.__hillslopes

    @property
    def scales(self) -> list[list[float]]:
        return self.__raw_scales
    
    @property
    def flat_scales(self) -> list[float]:
        return self.__flat_scales

    @property 
    def flat_model(self) -> list[AnnotatedZone]:
        return self.__flat_model

    @property 
    def lat_matrix(self) -> NDArray:
        return self.__lat_mat
    
    @property 
    def vert_matrix(self) -> NDArray:
        return self.__vert_mat

    def __len__(self) -> int:
        return self.__size

    def __getitem__(self, ind: int) -> Optional[Hillslope]:
        if ind < self.size():
            return self.hillslopes[ind]
        else:
            return None

    def flatten_zones(self) -> list[HydrologicZone]:
        return reduce(operator.add, map(lambda hs: hs.flatten(), self.hillslopes), [])

    def flatten_scales(self) -> list[float]:
        hs_scales: list[list[float]] = [
            hs.get_scales(self.__raw_scales[i]) for i, hs in enumerate(self.hillslopes)
        ]
        scales: list[float] = reduce(operator.add, hs_scales, [])
        return scales

    def flatten(self) -> list[AnnotatedZone]:
        positions: list[ZonePosition] = []
        flat_zones: list[HydrologicZone] = self.flatten_zones()
        hs_scales: list[list[float]] = [
            hs.get_scales(self.__raw_scales[i]) for i, hs in enumerate(self.hillslopes)
        ]
        scales: list[float] = reduce(operator.add, hs_scales, [])
        cur_zone: int = 0

        hillslope_id: int
        hillslope: Hillslope
        for hillslope_id, hillslope in enumerate(self.hillslopes):
            layer_id: int
            layer: Layer
            for layer_id, layer in enumerate(hillslope.layers):
                zone_id: int
                zone: HydrologicZone
                for zone_id, zone in enumerate(layer.zones):
                    positions.append(
                        ZonePosition(cur_zone, zone_id, layer_id, hillslope_id)
                    )
                    cur_zone += 1

        ann_zones: list[AnnotatedZone] = [AnnotatedZone(flat_zones[i], scales[i], positions[i]) for i, _ in enumerate(positions)]

        return ann_zones

    def size(self) -> int:
        return reduce(operator.add, map(lambda x: x.size(), self.hillslopes), 0)

    def coord_to_index(self, hs_id: int, ly_id: int, zone_id: int) -> int:
        """Convert the series of indices in the nested structure to the linear index
        """
        for z in self.flat_model:
            if z.pos.hillslope_id == hs_id and z.pos.layer_id == ly_id and z.pos.zone_id == zone_id:
                return z.pos.model_id

    def construct_lat_matrix(self) -> NDArray:
        """Construct the matrix describing how all zones are laterally connected
        """
        mat: NDArray = np.zeros((len(self), len(self)))

        def drains_into(hs: int, ly: int, z: int) -> Optional[int]:
            """Get the lower cell that this zone drains into, if possible
            """
            hillslope: Hillslope = self[hs]
            this_layer: Layer = hillslope[ly] # type: ignore

            if this_layer[z + 1] is not None:
                return self.coord_to_index(hs, ly, z+1)
            else:
                return None

        for hs_id, hs in enumerate(self.hillslopes):
            for ly_id, ly in enumerate(hs.layers):
                for z_id, _ in enumerate(ly.zones):
                    this_zone: int = self.coord_to_index(hs_id, ly_id, z_id)
                    next_zone: Optional[int] = drains_into(hs_id, ly_id, z_id)

                    if next_zone is not None:
                        mat[next_zone, this_zone] = 1.0

        return mat
  
    def construct_vert_matrix(self) -> NDArray:
        mat: NDArray = np.zeros((len(self), len(self)))

        def drains_into(hs: int, ly: int, z: int) -> Optional[int]:
            """Get the lower cell that this zone drains into, if possible
            """
            hillslope: Hillslope = self[hs]

            if hillslope[ly + 1] is not None:
                this_layer: Layer = hillslope[ly] # type: ignore
                next_layer: Layer = hillslope[ly + 1] # type: ignore

                if next_layer.size() == this_layer.size(): # Just drains to the one immediately below
                    return self.coord_to_index(hs, ly+1, z)
                else:
                    if next_layer.size() != 1:
                        raise ValueError(f"Invalid structure: cannot have layer with size {this_layer.size()} drain to layer of size {next_layer.size()}")
                    else:
                        return self.coord_to_index(hs, ly+1, 0)
            else:
                return None


        hs: Hillslope
        ly: Layer
        z: HydrologicZone
        for hs_id, hs in enumerate(self.hillslopes):
            for ly_id, ly in enumerate(hs.layers):
                for z_id, z in enumerate(ly.zones):
                    this_id: int = self.coord_to_index(hs_id, ly_id, z_id)
                    next_id: Optional[int] = drains_into(hs_id, ly_id, z_id)
                    if next_id is not None:
                        mat[next_id, this_id] = 1.0

        return mat

    def step(
        self, states: list[HydroStep], ds: list[HydroForcing], dt: float
    ) -> list[HydroStep]:
        """Step the hydrologic model given the forcing and the time step"""
        res: list[HydroStep] = []
        new_state: NDArray = np.zeros(len(self))
        forc_flux: NDArray = np.zeros_like(new_state) # Forcing flux
        vert_flux: NDArray = np.zeros_like(new_state) # Vertical flux
        lat_flux: NDArray = np.zeros_like(new_state) # Lateral flux
        vap_flux: NDArray = np.zeros_like(new_state) # Vaporization flux

        for i, zone in enumerate(self.flat_model):
            d_i: HydroForcing = ds[i]
            q_in: float = vert_flux.dot(self.vert_matrix[i]) + lat_flux.dot(self.lat_matrix[i])
            d: ForcingInternal = ForcingInternal(d_i.precip, d_i.temp, d_i.pet, q_in)
            step_res: HydroStep = zone.zone.step(states[i].state, d, dt)
            res.append(step_res)
            new_state[i] = step_res.state
            forc_flux[i] = step_res.forc_flux
            vert_flux[i] = step_res.vert_flux
            lat_flux[i] = step_res.lat_flux
            vap_flux[i] = step_res.vap_flux

        return res
    
    def prepare_forcing(self, forcing: list[list[ForcingData]]) -> list[list[HydroForcing]]:
        """Construct the forcing data as it can be used by the model
        """
        forcings: list[list[HydroForcing]] = []
        hillslope: Hillslope
        for i, hillslope in enumerate(self.hillslopes):
            forcings += hillslope.scale_forcing(forcing[i], self.__raw_scales[i])
        
        return forcings

    def prepare_output(self, results: list[list[HydroStep]]) -> DataFrame:
        num_zones: Final[int] = len(results[0])
        zone_lists: list[list[HydroStep]] = [ # Each list is a list of the steps
            [row[i] for row in results] for i in range(num_zones)
        ]
        zone_names: list[str] = [x.zone.name() for x in self.flat_model]
        col_names: list[str] = reduce(operator.add, map(lambda x_i: [f"{x_i}_state", f"{x_i}_forc", f"{x_i}_vap", f"{x_i}_lat", f"{x_i}_vert"], zone_names), [])

        df_cols: list[Series] = []
        for i, z in enumerate(zone_lists):
            df_cols.append(Series([x.state for x in z]))
            df_cols.append(Series([x.forc_flux for x in z]))
            df_cols.append(Series([x.vap_flux for x in z]))
            df_cols.append(Series([x.lat_flux for x in z]))
            df_cols.append(Series([x.vert_flux for x in z]))
        
        return DataFrame(data=df_cols, columns=col_names)

    def run(self, init_state: NDArray, forcing: list[ForcingData], dt: float) -> DataFrame:
        states: list[HydroStep] = [
            HydroStep(s_i, 0, 0, 0, 0) for i, s_i in enumerate(init_state)
        ]

        # Construct forcing data from the input
        prepared_forcing: list[list[HydroForcing]] = self.prepare_forcing(forcing)

        # Run the model
        results: list[list[HydroStep]] = deepcopy(states)
        for i, d_i in enumerate(prepared_forcing):
            states = self.step(states, d_i, dt)
            results.append(deepcopy(states))

        # Construct the output files
        out_df: DataFrame = self.prepare_output(results)

        return out_df

@dataclass(frozen=True)
class ZonePosition:
    model_id: int  # The index of this zone in the model
    zone_id: int  # The index of this zone in the layer
    layer_id: int  # The index of this layer's zone
    hillslope_id: int  # The index of the hillslope this zone is in

@dataclass(frozen=True)
class AnnotatedZone:
    zone: HydrologicZone  # The hydrologic zone
    size: float  # Proportion of the catchment area this zone takes up
    pos: ZonePosition  # Position of this zone in the model


# Run the model

## Load the data

In [96]:
data_df = pd.read_csv("../input/Sleepers_Results.txt", sep=r"\s+")
q_obs: Series = data_df["Qobs"]
ppt: Series = data_df["Precipitation"]
temp: Series = data_df["Temperature"]
pet: Series = data_df["PET"]
dates: Series = pd.to_datetime(data_df["Date"], format="%Y%m%d")
data_df.head()

Unnamed: 0,Date,Qsim,Qobs,Precipitation,Temperature,AET,PET,Snow,Snowcover,SM,Recharge,SUZ,SLZ,Q0,Q1,Q2,Qsim_rain,Qsim_snow
0,20161001,0.49827,0.138,0.0,8.7,1.29,2.27,0.0,0.0,92.7,0.0,0.0,6.3,0.0,0.0,0.49827,0.496,0.002
1,20161002,0.461498,0.17,0.0,9.8,0.94,1.68,0.0,0.0,91.7,0.0,0.0,5.8,0.0,0.0,0.461498,0.46,0.002
2,20161003,0.477553,0.207,4.3,11.7,1.08,1.91,0.0,0.0,94.3,0.7,0.0,6.0,0.0,0.0,0.477553,0.476,0.002
3,20161004,0.44231,0.195,0.0,11.6,1.09,1.9,0.0,0.0,93.2,0.0,0.0,5.6,0.0,0.0,0.44231,0.441,0.002
4,20161005,0.409667,0.163,0.0,12.1,1.3,2.3,0.0,0.0,91.9,0.0,0.0,5.1,0.0,0.0,0.409667,0.408,0.002


In [97]:
forcing_data: ForcingData = ForcingData(ppt, temp, pet)

## Create the model

In [98]:
snow: Layer = Layer([SnowZone.default()])
soil: Layer = Layer([SoilZone.default()])
ground: Layer = Layer([GroundZone.default()])

hillslope: Hillslope = Hillslope([snow, soil, ground])
scales: list[list[float]] = [[1.0]]

model: HydrologicModel = HydrologicModel([hillslope], scales)

## Run the model one time

In [100]:
init_state: NDArray = np.array([0.0, 10.0, 100.0])
res = model.run(init_state, [[forcing_data]], 1.0)

TypeError: unsupported operand type(s) for +: 'int' and 'list'