# Custom Control Example

In [None]:
from IPython.display import display, HTML
display(HTML('<a target="_blank" href="https://colab.research.google.com/github/https://raw.githubusercontent.com/KIOS-Research/EPyT-Flow-Dev/main/docs/examples/control_example.ipynb"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>'))

This example demonstrates how to implement a custom control module/rules.

In [None]:
from epyt_flow.data.networks import load_net1
from epyt_flow.simulation import ScenarioSimulator
from epyt_flow.simulation import AdvancedControlModule, ScadaData
from epyt_flow.utils import to_seconds, volume_to_level, plot_timeseries_data
from epyt_flow.simulation.events import ActuatorConstants

A control module mimicing the control rules stated in Net1.inp

In [None]:
class MyControl(AdvancedControlModule):
    def __init__(self, **kwds):
        # Tank and pump ID
        self.__tank_id = "2"
        self.__pump_id = "9"

        # Tank diameter could be also obtained by calling epyt.epanet.getNodeTankData
        self.__tank_diameter = 50.5

        # Lower and upper threshold on tank level
        self.__lower_level_threshold = 110
        self.__upper_level_threshold = 140

        super().__init__(**kwds)

    def step(self, scada_data: ScadaData) -> None:
        # Retrieve current water level in the tank
        tank_volume = scada_data.get_data_tanks_water_volume([self.__tank_id])[0, 0]
        tank_level = volume_to_level(float(tank_volume), self.__tank_diameter)

        # Decide if pump has to be deactivated or re-activated
        if tank_level <= self.__lower_level_threshold:
            self.set_pump_status(self.__pump_id, ActuatorConstants.EN_OPEN)
        elif tank_level >= self.__upper_level_threshold:
            self.set_pump_status(self.__pump_id, ActuatorConstants.EN_CLOSED)

Create new simulation based on Net1

In [None]:
sim = ScenarioSimulator(scenario_config=load_net1(verbose=False))

Set simulation duration to two days

In [None]:
sim.set_general_parameters(simulation_duration=to_seconds(days=2))

Monitor states of tank "2" and pump "9"

In [None]:
sim.set_tank_sensors(sensor_locations=["2"])
sim.set_pump_sensors(sensor_locations=["9"])

Remove all existing controls

In [None]:
sim.epanet_api.deleteControls()

Add our custom control module

In [None]:
sim.add_control(MyControl())

Run simulation and show sensor readings over time

In [None]:
scada_data = sim.run_simulation()

In [None]:
plot_timeseries_data(scada_data.get_data_pumps_state().T,
                     x_axis_label="Time (30min steps)", y_axis_label="Pump state")

In [None]:
plot_timeseries_data(scada_data.get_data_tanks_water_volume().T,
                     x_axis_label="Time (30min steps)",
                     y_axis_label="Water volume in $m^3$")

Do not forget to close the simulation!

In [None]:
sim.close()