# Pump Control

This example demonstrates how to build and apply reinforcement learning to a continous pump speed control environment.

In [None]:
from IPython.display import display, HTML
display(HTML('<a target=\"_blank\" href=\"https://colab.research.google.com/github/WaterFutures/EPyT-Control/blob/main/docs/examples/pump_control.ipynb\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>'))

In [None]:
%pip install epyt-control --quiet

In [None]:
import numpy as np
import pandas as pd
from stable_baselines3 import SAC
from gymnasium.wrappers import RescaleAction, NormalizeObservation
from epyt_flow.simulation import ScenarioSimulator, ScenarioConfig, ScadaData
from epyt_control.envs import HydraulicControlEnv
from epyt_control.envs.actions import PumpSpeedAction

Create a control environment based on a special version of [Anytown](https://waterfutures.github.io/WaterBenchmarkHub/benchmarks/network-Anytown.html) where three (parallel) pumps next to a reservoir have to be controlled such that some pressure constraints at all nodes are satisfied.
The observations (i.e. input to the controller) are the pressure at every junction in the network, as well as the efficiency of every pump.

In [None]:
def create_scenario(f_inp_in: str) -> tuple[ScenarioConfig, list[str]]:
    """
    Creates a new scenario for a given .inp file.
    Note that pressure sensors are placed at every junction.
    """
    with ScenarioSimulator(f_inp_in=f_inp_in) as scenario:
        # Sensors = input to the agent (control strategy)
        # Place pressure sensors at all junctions
        junctions = scenario.sensor_config.nodes
        for tank_id in scenario.sensor_config.tanks:
            junctions.remove(tank_id)
        scenario.set_pressure_sensors(sensor_locations=junctions)

        # Place pump efficiency sensors at every pump
        scenario.place_pump_efficiency_sensors_everywhere()

        # Place flow sensors at every pump and tank connection
        topo = scenario.get_topology()
        tank_connections = []
        for tank in topo.get_all_tanks():
            for link, _ in topo.get_adjacent_links(tank):
                tank_connections.append(link)

        flow_sensors = tank_connections + scenario.sensor_config.pumps
        scenario.set_flow_sensors(flow_sensors)

        # Return the scenario config and tank connections
        return scenario.get_scenario_config(), tank_connections

In [None]:
class ContinuousPumpControlEnv(HydraulicControlEnv):
    """
    Class implementing a continous pump speed environment --
    i.e. a continous action space for the pump speed.
    """
    def __init__(self):
        f_inp_in = "Anytown.inp"
        scenario_config, tank_connections = create_scenario(f_inp_in)

        self._tank_connections = tank_connections
        self._network_constraints = {"min_pressure": 28.1227832,
                                     "max_pressure": 70,
                                     "max_pump_efficiencies": pd.Series({"b1": .65,
                                                                         "b2": .65,
                                                                         "b3": .65})}
        self._objective_weights = {"pressure_violation": .9,
                                   "abs_tank_flow": .02,
                                   "pump_efficiency": .08}

        super().__init__(scenario_config=scenario_config,
                         pumps_speed_actions=[PumpSpeedAction(pump_id=p_id,
                                                              speed_upper_bound=4.0)
                                              for p_id in scenario_config.sensor_config.pumps],
                         autoreset=True,
                         reload_scenario_when_reset=False)

    def _compute_reward_function(self, scada_data: ScadaData) -> float:
        # Compute different objectives and final reward
        pressure_data = scada_data.get_data_pressures()
        tanks_flow_data = scada_data.get_data_flows(sensor_locations=self._tank_connections)
        pumps_flow_data = scada_data.get_data_flows(sensor_locations=scada_data.sensor_config.pumps)
        pump_efficiency = scada_data.get_data_pumps_efficiency()

        pressure_violations = np.logical_or(
            pressure_data > self._network_constraints["max_pressure"],
            pressure_data < self._network_constraints["min_pressure"]
        ).any(axis=0).sum()
        n_sensors = pressure_data.shape[1]
        pressure_obj = float(1 - pressure_violations / n_sensors)

        total_abs_tank_flow = np.abs(tanks_flow_data).sum(axis=None)
        total_pump_flow = pumps_flow_data.sum(axis=None)
        tank_obj = float(total_pump_flow / (total_pump_flow + total_abs_tank_flow))

        pump_efficiencies = pd.Series(
            pump_efficiency.mean(axis=0),
            index=scada_data.sensor_config.pumps
        )
        max_pump_efficiencies = self._network_constraints["max_pump_efficiencies"]
        normalized_pump_efficiencies = pump_efficiencies / max_pump_efficiencies
        pump_efficiency_obj = normalized_pump_efficiencies.mean()

        reward = self._objective_weights["pressure_violation"] * pressure_obj + \
            self._objective_weights["abs_tank_flow"] * tank_obj + \
            self._objective_weights["pump_efficiency"] * pump_efficiency_obj

        return reward

In [None]:
env = ContinuousPumpControlEnv()

Improve learning by appling some standard wrapper to the environment for [normalizing the observations](https://gymnasium.farama.org/api/wrappers/observation_wrappers/#gymnasium.wrappers.NormalizeObservation) and [re-scaling the action space](https://gymnasium.farama.org/api/wrappers/action_wrappers/#gymnasium.wrappers.RescaleAction):

In [None]:
# Wrap environment
env = NormalizeObservation(env)
env = RescaleAction(env, min_action=-1, max_action=1)

Use the [Soft Actor Critic (SAC)](https://stable-baselines3.readthedocs.io/en/master/modules/sac.html) method for learning a policy (i.e. control strategy).

Note that inceasing the number of time steps might improve the performance.

In [None]:
# Apply a simple policy learner
# You might want to add wrappers (e.g. normalizing inputs, rewards, etc.) and logging here
# Also, inceasing the number of time steps might help as well
model = SAC("MlpPolicy", env)
model.learn(total_timesteps=100)
model.save("my_model_pumpspeed.zip")

Do not forget to close the environment by calling the [close()](https://epyt-flow.readthedocs.io/en/stable/epyt_flow.gym.html#epyt_flow.gym.scenario_control_env.ScenarioControlEnv.close) function:

In [None]:
env.close()