In [1]:
from taqsim.node import Source, Storage, PassThrough, Splitter, Demand, Sink, TimeSeries, LossRule, SplitStrategy, ReleaseRule, LossReason, EVAPORATION, SEEPAGE, WaterReceived, NodeEvent
from taqsim.edge import Edge, EdgeLossRule
from taqsim.system import WaterSystem
from taqsim.common import summarize_losses

from dataclasses import dataclass

In [2]:
@dataclass
class EqualSplit:
    """Divide flow equally among all targets."""
    def split(self, amount: float, targets: list[str], t: int) -> dict[str, float]:
        if not targets:
            return {}
        share = amount / len(targets)
        return {target: share for target in targets}

equal_split = EqualSplit()


@dataclass
class ProportionalSplit:
    """Distribute based on weights."""
    weights: dict[str, float]

    def split(self, amount: float, targets: list[str], t: int) -> dict[str, float]:
        total = sum(self.weights.get(t, 0) for t in targets)
        if total == 0:
            return {t: 0 for t in targets}
        return {t: amount * self.weights.get(t, 0) / total for t in targets}


# === EDGE LOSS RULES ===

@dataclass
class ZeroEdgeLoss:
    """No transport losses."""
    def calculate(self, flow: float, capacity: float, t: int, dt: float) -> dict[LossReason, float]:
        return {}

no_loss = ZeroEdgeLoss()


@dataclass
class PercentageEdgeLoss:
    """Constant percentage loss during transport."""
    rate: float  # e.g., 0.05 for 5% loss
    reason: LossReason = SEEPAGE

    def calculate(self, flow: float, capacity: float, t: int, dt: float) -> dict[LossReason, float]:
        return {self.reason: flow * self.rate}

seepage = PercentageEdgeLoss(rate=0.05, reason=SEEPAGE)


# === NODE LOSS RULES (for Storage) ===

@dataclass
class ZeroLoss:
    """No storage losses."""
    def calculate(self, storage: float, capacity: float, t: int, dt: float) -> dict[LossReason, float]:
        return {}


@dataclass 
class EvaporationLoss:
    """Fixed evaporation rate."""
    rate: float  # volume per timestep

    def calculate(self, storage: float, capacity: float, t: int, dt: float) -> dict[LossReason, float]:
        return {EVAPORATION: min(self.rate * dt, storage)}


@dataclass
class CombinedLoss:
    """Multiple loss mechanisms."""
    evaporation_rate: float = 0.0
    seepage_rate: float = 0.0

    def calculate(self, storage: float, capacity: float, t: int, dt: float) -> dict[LossReason, float]:
        losses = {}
        if self.evaporation_rate > 0:
            losses[EVAPORATION] = self.evaporation_rate * dt
        if self.seepage_rate > 0:
            losses[SEEPAGE] = self.seepage_rate * dt
        return losses


# === RELEASE RULES (for Storage) ===

@dataclass
class FixedRelease:
    """Release a fixed amount per timestep."""
    amount: float

    def release(self, storage: float, capacity: float, inflow: float, t: int, dt: float) -> float:
        return min(self.amount, storage + inflow)


@dataclass
class PercentageRelease:
    """Release a percentage of available water."""
    fraction: float  # e.g., 0.1 for 10%

    def release(self, storage: float, capacity: float, inflow: float, t: int, dt: float) -> float:
        return (storage + inflow) * self.fraction

rule = FixedRelease(amount=50.0)
losses = CombinedLoss(evaporation_rate=0.5, seepage_rate=0.2)

In [3]:
# Create nodes (no targets specified)
source = Source(id="river", inflow=TimeSeries([100.0] * 12))
dam = Storage(id="dam", capacity=1000, release_rule=rule, loss_rule=losses)
turbine = PassThrough(id="turbine")
junction = Splitter(id="junction", split_strategy=equal_split)
farm = Demand(id="farm", requirement=TimeSeries([30.0] * 12))
city = Demand(id="city", requirement=TimeSeries([80.0] * 12))
farm_sink = Sink(id="farm_sink") 
city_sink = Sink(id="city_sink")

# Create system
system = WaterSystem(dt=1)  # 1 day

# Add nodes
system.add_node(source)
system.add_node(dam)
system.add_node(turbine)
system.add_node(junction)
system.add_node(farm)
system.add_node(city)
system.add_node(farm_sink)
system.add_node(city_sink)

# Add edges (topology defined here)
system.add_edge(Edge(id="e1", source="river", target="dam", capacity=500, loss_rule=no_loss))
system.add_edge(Edge(id="e2", source="dam", target="turbine", capacity=500, loss_rule=no_loss))
system.add_edge(Edge(id="e3", source="turbine", target="junction", capacity=500, loss_rule=no_loss))
system.add_edge(Edge(id="e4", source="junction", target="farm", capacity=200, loss_rule=seepage))
system.add_edge(Edge(id="e5", source="junction", target="city", capacity=100, loss_rule=no_loss))
system.add_edge(Edge(id="e6", source="farm", target="farm_sink", capacity=200, loss_rule=no_loss))
system.add_edge(Edge(id="e7", source="city", target="city_sink", capacity=100, loss_rule=no_loss))

# Validate (derives targets from edges)
system.validate()

# Simulate 12 timesteps
system.simulate(timesteps=12)

# Analyze results via node events
total_received = sum(e.amount for e in city.events_of_type(WaterReceived))

In [6]:
for event in turbine.events:
    print(event)

WaterReceived(amount=50.0, source_id='e2', t=0)
WaterPassedThrough(amount=50.0, t=0)
WaterOutput(amount=50.0, t=0)
WaterReceived(amount=50.0, source_id='e2', t=1)
WaterPassedThrough(amount=50.0, t=1)
WaterOutput(amount=50.0, t=1)
WaterReceived(amount=50.0, source_id='e2', t=2)
WaterPassedThrough(amount=50.0, t=2)
WaterOutput(amount=50.0, t=2)
WaterReceived(amount=50.0, source_id='e2', t=3)
WaterPassedThrough(amount=50.0, t=3)
WaterOutput(amount=50.0, t=3)
WaterReceived(amount=50.0, source_id='e2', t=4)
WaterPassedThrough(amount=50.0, t=4)
WaterOutput(amount=50.0, t=4)
WaterReceived(amount=50.0, source_id='e2', t=5)
WaterPassedThrough(amount=50.0, t=5)
WaterOutput(amount=50.0, t=5)
WaterReceived(amount=50.0, source_id='e2', t=6)
WaterPassedThrough(amount=50.0, t=6)
WaterOutput(amount=50.0, t=6)
WaterReceived(amount=50.0, source_id='e2', t=7)
WaterPassedThrough(amount=50.0, t=7)
WaterOutput(amount=50.0, t=7)
WaterReceived(amount=50.0, source_id='e2', t=8)
WaterPassedThrough(amount=50.0, 