From 0ef7133a5becb312805e5999f30b439ce9264d79 Mon Sep 17 00:00:00 2001 From: Matthias Strubel Date: Wed, 24 Sep 2025 16:11:15 +0200 Subject: [PATCH 1/2] defaul logic: enhance charging calculation and messages --- src/batcontrol/logic/default.py | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/src/batcontrol/logic/default.py b/src/batcontrol/logic/default.py index ad678cee..3bfcac35 100644 --- a/src/batcontrol/logic/default.py +++ b/src/batcontrol/logic/default.py @@ -81,7 +81,7 @@ def calculate_inverter_mode(self, calc_input: CalculationInput, if self.calculation_output is None: logger.error("Calculation output is not set. Please call calculate() first.") - return None + raise ValueError("Calculation output is not set. Please call calculate() first.") net_consumption = calc_input.consumption - calc_input.production prices = calc_input.prices @@ -101,32 +101,41 @@ def calculate_inverter_mode(self, calc_input: CalculationInput, logger.debug('Discharging is NOT allowed') inverter_control_settings.allow_discharge = False charging_limit_percent = self.calculation_parameters.max_charging_from_grid_limit * 100 - required_recharge_energy = self.__get_required_recharge_energy( - calc_input, - net_consumption[:max_hour], - prices - ) charge_limit_capacity = self.common.max_capacity * \ self.calculation_parameters.max_charging_from_grid_limit is_charging_possible = calc_input.stored_energy < charge_limit_capacity - logger.debug('Charging allowed: %s', - is_charging_possible) + # Defaults to 0, only calculate if charging is possible + required_recharge_energy = 0 + + logger.debug('Charging allowed: %s', is_charging_possible) if is_charging_possible: logger.debug('Charging is allowed, because SOC is below %.0f%%', charging_limit_percent ) + required_recharge_energy = self.__get_required_recharge_energy( + calc_input, + net_consumption[:max_hour], + prices + ) else: logger.debug('Charging is NOT allowed, because SOC is above %.0f%%', charging_limit_percent ) if required_recharge_energy > 0: - logger.debug( + allowed_charging_energy = charge_limit_capacity - calc_input.stored_energy + if required_recharge_energy > allowed_charging_energy: + required_recharge_energy = allowed_charging_energy + logger.debug( + 'Required recharge energy limited by max. charging limit to %0.1f Wh', + required_recharge_energy + ) + logger.info( 'Get additional energy via grid: %0.1f Wh', required_recharge_energy ) - else: + elif required_recharge_energy == 0 and is_charging_possible: logger.debug( 'No additional energy required or possible price found.') From 47c4569d9098a01aba0ff4aac64be67d403c299d Mon Sep 17 00:00:00 2001 From: Matthias Strubel Date: Wed, 24 Sep 2025 16:40:51 +0200 Subject: [PATCH 2/2] default logic: add tests for charge rate calculation --- src/batcontrol/logic/default.py | 7 +- tests/batcontrol/logic/test_default.py | 170 +++++++++++++++++++++++++ 2 files changed, 174 insertions(+), 3 deletions(-) diff --git a/src/batcontrol/logic/default.py b/src/batcontrol/logic/default.py index 3bfcac35..4cc65559 100644 --- a/src/batcontrol/logic/default.py +++ b/src/batcontrol/logic/default.py @@ -1,6 +1,7 @@ import logging import datetime import numpy as np +from typing import Optional from .logic_interface import LogicInterface from .logic_interface import CalculationParameters, CalculationInput @@ -40,7 +41,7 @@ def set_timezone(self, timezone: datetime.timezone): """ Set the timezone for the logic calculations """ self.timezone = timezone - def calculate(self, input_data: CalculationInput, calc_timestamp:datetime = None) -> bool: + def calculate(self, input_data: CalculationInput, calc_timestamp: Optional[datetime.datetime] = None) -> bool: """ Calculate the inverter control settings based on the input data """ logger.debug("Calculating inverter control settings...") @@ -69,7 +70,7 @@ def get_inverter_control_settings(self) -> InverterControlSettings: return self.inverter_control_settings def calculate_inverter_mode(self, calc_input: CalculationInput, - calc_timestamp:datetime = None) -> InverterControlSettings: + calc_timestamp: Optional[datetime.datetime] = None) -> InverterControlSettings: """ Main control logic for battery control """ # default settings inverter_control_settings = InverterControlSettings( @@ -159,7 +160,7 @@ def calculate_inverter_mode(self, calc_input: CalculationInput, def __is_discharge_allowed(self, calc_input: CalculationInput, net_consumption: np.ndarray, prices: dict, - calc_timestamp:datetime = None) -> bool: + calc_timestamp: Optional[datetime.datetime] = None) -> bool: """ Evaluate if the battery is allowed to discharge - Check if battery is above always_allow_discharge_limit diff --git a/tests/batcontrol/logic/test_default.py b/tests/batcontrol/logic/test_default.py index 81b447c4..86f3075c 100644 --- a/tests/batcontrol/logic/test_default.py +++ b/tests/batcontrol/logic/test_default.py @@ -156,5 +156,175 @@ def test_discharge_allowed_because_highest_price(self): result = self.logic.get_inverter_control_settings() self.assertTrue(result.allow_discharge, "Discharge should be allowed") + def test_charge_calculation_when_charging_possible(self): + """Test charge calculation when charging is possible due to low SOC""" + stored_energy = 2000 # 2 kWh, well below charging limit (79% = 7.9 kWh) + stored_usable_energy, free_capacity = self._calculate_battery_values( + stored_energy, self.max_capacity + ) + + # Setup scenario with high future prices to trigger charging + consumption = np.array([1000, 2000, 1500]) # Higher consumption in future hours + production = np.array([0, 0, 0]) # No production + + calc_input = CalculationInput( + consumption=consumption, + production=production, + prices={0: 0.20, 1: 0.35, 2: 0.30}, # Low current price, high future prices + stored_energy=stored_energy, + stored_usable_energy=stored_usable_energy, + free_capacity=free_capacity, + ) + + # Test at 30 minutes past the hour to test charge rate calculation + calc_timestamp = datetime.datetime(2025, 6, 20, 12, 30, 0, tzinfo=datetime.timezone.utc) + self.assertTrue(self.logic.calculate(calc_input, calc_timestamp)) + result = self.logic.get_inverter_control_settings() + calc_output = self.logic.get_calculation_output() + + # Verify charging is enabled + self.assertFalse(result.allow_discharge, "Discharge should not be allowed when charging needed") + self.assertTrue(result.charge_from_grid, "Should charge from grid when energy needed for high price hours") + self.assertGreater(result.charge_rate, 0, "Charge rate should be greater than 0") + self.assertGreater(calc_output.required_recharge_energy, 0, "Should calculate required recharge energy") + + def test_charge_calculation_when_charging_not_possible_high_soc(self): + """Test charge calculation when charging is not possible due to high SOC""" + # Set SOC above charging limit (79%) + stored_energy = 8000 # 8.0 kWh, above charging limit of 7.9 kWh + stored_usable_energy, free_capacity = self._calculate_battery_values( + stored_energy, self.max_capacity + ) + + # Create scenario where discharge is not triggered by always_allow_discharge_limit + # Use prices where current price is low and future prices are higher, but not enough + # stored energy to satisfy future high consumption + consumption = np.array([1000, 3000, 2500]) # High future consumption that requires reserves + production = np.array([0, 0, 0]) # No production + + calc_input = CalculationInput( + consumption=consumption, + production=production, + prices={0: 0.20, 1: 0.35, 2: 0.30}, # Low current, high future prices + stored_energy=stored_energy, + stored_usable_energy=stored_usable_energy, + free_capacity=free_capacity, + ) + + calc_timestamp = datetime.datetime(2025, 6, 20, 12, 30, 0, tzinfo=datetime.timezone.utc) + self.assertTrue(self.logic.calculate(calc_input, calc_timestamp)) + result = self.logic.get_inverter_control_settings() + calc_output = self.logic.get_calculation_output() + + # At high SOC (above charging limit), charging should not be possible + self.assertFalse(result.charge_from_grid, "Should not charge from grid when SOC above charging limit") + self.assertEqual(result.charge_rate, 0, "Charge rate should be 0 when charging not allowed") + self.assertEqual(calc_output.required_recharge_energy, 0, "Required recharge energy should be 0 when charging not possible") + + # Discharge behavior depends on reserved energy calculation vs stored usable energy + # This test primarily focuses on charging behavior when SOC is too high + + def test_charge_energy_limited_by_max_charging_limit(self): + """Test that the actual charging energy is limited by max charging capacity""" + # Set SOC close to but below charging limit + stored_energy = 6000 # 6.0 kWh, just below limit of 7.9 kWh (only 1.9 Wh remaining) + stored_usable_energy, free_capacity = self._calculate_battery_values( + stored_energy, self.max_capacity + ) + + # Create scenario with high future consumption to trigger large recharge requirement + consumption = np.array([2000, 8000, 8000]) # High future consumption + production = np.array([0, 0, 0]) # No production + + calc_input = CalculationInput( + consumption=consumption, + production=production, + prices={0: 0.15, 1: 0.40, 2: 0.35}, # Very low current, high future prices + stored_energy=stored_energy, + stored_usable_energy=stored_usable_energy, + free_capacity=free_capacity, + ) + + calc_timestamp = datetime.datetime(2025, 6, 20, 12, 0, 0, tzinfo=datetime.timezone.utc) + self.assertTrue(self.logic.calculate(calc_input, calc_timestamp)) + result = self.logic.get_inverter_control_settings() + + # With only 50 Wh remaining to charge limit, the system should still allow charging + # but the energy should be limited + charge_limit_capacity = self.max_capacity * self.calculation_parameters.max_charging_from_grid_limit + max_allowed_charging = charge_limit_capacity - stored_energy + + self.assertAlmostEqual(max_allowed_charging, 1900.0, delta=1.0, msg="Expected about 1900 Wh remaining capacity") + + self.assertTrue(result.charge_from_grid, "Should charge when capacity available") + + self.assertGreater(result.charge_rate, 0, "Should have positive charge rate") + # The charge rate should be reasonable for the small remaining capacity + remaining_time = (60 - calc_timestamp.minute) / 60 + if remaining_time > 0: + # Maximum theoretical charge energy in the remaining time + max_charge_energy = result.charge_rate * remaining_time / self.common.charge_rate_multiplier + # This should be roughly limited to the available capacity + self.assertLessEqual(max_charge_energy, max_allowed_charging + 10, + "Charge energy should be roughly limited by available capacity") + + def test_calculate_inverter_mode_error_without_calculation_output(self): + """Test that calculate_inverter_mode raises ValueError when calculation_output is None""" + logic = DefaultLogic() + logic.set_calculation_parameters(self.calculation_parameters) + + stored_energy = 5000 + stored_usable_energy, free_capacity = self._calculate_battery_values( + stored_energy, self.max_capacity + ) + + calc_input = CalculationInput( + consumption=np.array([500, 600, 700]), + production=np.array([0, 0, 0]), + prices={0: 0.25, 1: 0.30, 2: 0.35}, + stored_energy=stored_energy, + stored_usable_energy=stored_usable_energy, + free_capacity=free_capacity, + ) + + # Call calculate_inverter_mode directly without calling calculate() first + with self.assertRaises(ValueError) as context: + logic.calculate_inverter_mode(calc_input) + + self.assertIn("Calculation output is not set", str(context.exception)) + + def test_charge_rate_calculation_with_remaining_time(self): + """Test that charge rate is correctly calculated based on remaining time in hour""" + stored_energy = 3000 # 3 kWh + stored_usable_energy, free_capacity = self._calculate_battery_values( + stored_energy, self.max_capacity + ) + + consumption = np.array([1000, 3000, 2000]) # High consumption in hour 1 + production = np.array([0, 0, 0]) + + calc_input = CalculationInput( + consumption=consumption, + production=production, + prices={0: 0.20, 1: 0.35, 2: 0.25}, # High price in hour 1 + stored_energy=stored_energy, + stored_usable_energy=stored_usable_energy, + free_capacity=free_capacity, + ) + + # Test at 45 minutes past the hour (15 minutes remaining) + calc_timestamp = datetime.datetime(2025, 6, 20, 12, 45, 0, tzinfo=datetime.timezone.utc) + self.assertTrue(self.logic.calculate(calc_input, calc_timestamp)) + result = self.logic.get_inverter_control_settings() + calc_output = self.logic.get_calculation_output() + + if calc_output.required_recharge_energy > 0: + expected_remaining_time = (60 - 45) / 60 # 15 minutes = 0.25 hours + expected_charge_rate_before_multiplier = calc_output.required_recharge_energy / expected_remaining_time + + # The actual charge rate should be higher due to charge_rate_multiplier (1.1) + self.assertGreater(result.charge_rate, expected_charge_rate_before_multiplier, + "Charge rate should be adjusted by charge_rate_multiplier") + if __name__ == '__main__': unittest.main()