# Building a minimum supply temperature controller for heat consumers in a district heating network

## MinimumSupplyTemperatureController

The `MinimumSupplyTemperatureController` is designed to ensure that the supply temperature at each heat consumer in a district heating network does not fall below a specified minimum value. This is particularly important during periods of low heat demand, when water circulation slows and the supply temperature can drop—sometimes even below the return temperature—leading to operational issues and inefficient heating.

This controller monitors the supply temperature at the heat consumer and, if it detects that the temperature is below the required minimum, it incrementally increases the target return temperature. This adjustment forces the network to increase the supply temperature, thereby maintaining the desired minimum level. The controller uses a PID-like approach with configurable proportional, integral, and derivative gains, as well as a tolerance for convergence and a maximum number of iterations per time step.

Typical use cases include:
- Preventing supply temperatures from dropping below return temperatures during low load or standby conditions.
- Maintaining reliable and efficient operation of district heating systems, especially in networks with variable or low demand.

The controller is flexible and can be integrated into time-series simulations, supporting dynamic adjustment of the minimum supply temperature via an external data source if needed.

In [None]:
import numpy as np
from pandapower.control.basic_controller import BasicCtrl


class MinimumSupplyTemperatureController(BasicCtrl):
    """
    A controller for maintaining the min supply temperature of the heat consumers in the network.
    
    Args:
        net (pandapipesNet): The pandapipes network.
        heat_consumer_idx (int): Index of the heat consumer.
        min_supply_temperature (float, optional): Minimum supply temperature. Defaults to 65.
        tolerance (float, optional): Tolerance for temperature difference. Defaults to 2.
        max_iterations (int, optional): Maximum number of iterations. Defaults to 100.
        temperature_adjustment_step (float, optional): Step to adjust the target return temperature. Defaults to 1.
        debug (bool, optional): Flag to enable debug output. Defaults to False.
        **kwargs: Additional keyword arguments.
    """
    def __init__(self, net, heat_consumer_idx, min_supply_temperature=65, tolerance=2, max_iterations=100, temperature_adjustment_step=1, debug=False, **kwargs):
        super(MinimumSupplyTemperatureController, self).__init__(net, **kwargs)
        self.heat_consumer_idx = heat_consumer_idx
        self.min_supply_temperature = min_supply_temperature
        self.tolerance = tolerance
        self.max_iterations = max_iterations
        self.temperature_adjustment_step = temperature_adjustment_step  # Step to adjust the target return temperature
        self.debug = debug

        self.data_source = None
        self.iteration = 0  # Add iteration counter
        self.previous_temperatures = []  # Use a list to store previous temperatures

    def time_step(self, net, time_step):
        """Reset the controller parameters at the start of each time step.

        Args:
            net (pandapipesNet): The pandapipes network.
            time_step (int): The current time step.

        Returns:
            int: The current time step.
        """
        self.iteration = 0  # reset iteration counter
        self.previous_temperatures = []  # Reset to an empty list

        if time_step == 0:
            # Store the standard return temperature for the heat consumer
            self.standard_return_temperature = net.heat_consumer["treturn_k"].at[self.heat_consumer_idx]

        else:
            # Restore the standard return temperature for the heat consumer
            net.heat_consumer["treturn_k"].at[self.heat_consumer_idx] = self.standard_return_temperature

        # Check if a data source exists and get the target temperature for the current time step
        if self.data_source is not None:
            self.min_supply_temperature = self.data_source.df.at[time_step, f'min_supply_temperature']
        
        return time_step

    def get_weighted_average_temperature(self):
        """Calculate the weighted average of the previous temperatures.

        Returns:
            float: The weighted average temperature.
        """
        if len(self.previous_temperatures) == 0:
            return None
        weights = np.arange(1, len(self.previous_temperatures) + 1)
        weighted_avg = np.dot(self.previous_temperatures, weights) / weights.sum()
        return weighted_avg

    def control_step(self, net):
        """Adjust the mass flow to maintain the target return temperature.

        Args:
            net (pandapipesNet): The pandapipes network.
        """
        # Increment iteration counter
        self.iteration += 1
        
        if all(net.heat_consumer["qext_w"] == 0):
            # Switch to standby mode
            print("No heat flow detected. Switching to standby mode.")
            return super(MinimumSupplyTemperatureController, self).control_step(net)

        # Calculate new mass flow
        current_T_out = net.res_heat_consumer["t_to_k"].at[self.heat_consumer_idx] - 273.15
        current_T_in = net.res_heat_consumer["t_from_k"].at[self.heat_consumer_idx] - 273.15

        weighted_avg_T_in = self.get_weighted_average_temperature()
        if weighted_avg_T_in is not None:
            current_T_in = weighted_avg_T_in

        current_mass_flow = net.res_heat_consumer["mdot_from_kg_per_s"].at[self.heat_consumer_idx]

        # Ensure the supply temperature does not fall below the minimum supply temperature
        if current_T_in < self.min_supply_temperature:
            new_T_out = net.heat_consumer["treturn_k"].at[self.heat_consumer_idx] + self.temperature_adjustment_step
            net.heat_consumer["treturn_k"].at[self.heat_consumer_idx] = new_T_out
            
            if self.debug:
                print(f"Minimum supply temperature not met. Adjusted target output temperature to {new_T_out} °C.")
            return super(MinimumSupplyTemperatureController, self).control_step(net)

    def is_converged(self, net):
        """Check if the controller has converged.

        Args:
            net (pandapipesNet): The pandapipes network.

        Returns:
            bool: True if converged, False otherwise.
        """
        # not converging under that value
        if all(net.heat_consumer["qext_w"] == 0):
            return True
        
        # Check whether the temperatures have changed within the specified tolerance
        current_T_out = net.res_heat_consumer["t_to_k"].at[self.heat_consumer_idx] - 273.15
        current_T_in = net.res_heat_consumer["t_from_k"].at[self.heat_consumer_idx] - 273.15
        previous_T_in = self.previous_temperatures[-1] if self.previous_temperatures else None

        # Testing for convergence
        temperature_change = abs(current_T_in - previous_T_in) if previous_T_in is not None else float('inf')
        converged_T_in = temperature_change < self.tolerance

        # Update the list of previous temperatures
        self.previous_temperatures.append(current_T_in)
        if len(self.previous_temperatures) > 2:  # Keep the last two temperatures
            self.previous_temperatures.pop(0)

        current_mass_flow = net.res_heat_consumer["mdot_from_kg_per_s"].at[self.heat_consumer_idx]
        
        # Convergence based on the minimum supply temperature
        if current_T_in < self.min_supply_temperature:
            if self.debug:
                print(f"Supply temperature not met for heat_consumer_idx: {self.heat_consumer_idx}. current_temperature_in: {current_T_in}), current_temperature_out: {current_T_out}), current_mass_flow: {current_mass_flow}")
            return False
        
        if converged_T_in:
            if self.debug:
                print(f'Regler konvergiert: heat_consumer_idx: {self.heat_consumer_idx}, current_temperature_in: {current_T_in}), current_temperature_out: {current_T_out}), current_mass_flow: {current_mass_flow}')
            return True

        # Check if the maximum number of iterations has been reached
        if self.iteration >= self.max_iterations:
            if self.debug:
                print(f"Max iterations reached for heat_consumer_idx: {self.heat_consumer_idx}")
            return True

        return False