Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions src/batcontrol/logic/default.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import datetime
import numpy as np
from typing import Optional

from .logic_interface import LogicInterface
from .logic_interface import CalculationParameters, CalculationInput
Expand Down Expand Up @@ -40,7 +41,7 @@ def set_timezone(self, timezone: datetime.timezone):
""" Set the timezone for the logic calculations """
self.timezone = timezone

def calculate(self, input_data: CalculationInput, calc_timestamp:datetime = None) -> bool:
def calculate(self, input_data: CalculationInput, calc_timestamp: Optional[datetime.datetime] = None) -> bool:
""" Calculate the inverter control settings based on the input data """

logger.debug("Calculating inverter control settings...")
Expand Down Expand Up @@ -69,7 +70,7 @@ def get_inverter_control_settings(self) -> InverterControlSettings:
return self.inverter_control_settings

def calculate_inverter_mode(self, calc_input: CalculationInput,
calc_timestamp:datetime = None) -> InverterControlSettings:
calc_timestamp: Optional[datetime.datetime] = None) -> InverterControlSettings:
""" Main control logic for battery control """
# default settings
inverter_control_settings = InverterControlSettings(
Expand All @@ -81,7 +82,7 @@ def calculate_inverter_mode(self, calc_input: CalculationInput,

if self.calculation_output is None:
logger.error("Calculation output is not set. Please call calculate() first.")
return None
raise ValueError("Calculation output is not set. Please call calculate() first.")

net_consumption = calc_input.consumption - calc_input.production
prices = calc_input.prices
Expand All @@ -101,32 +102,41 @@ def calculate_inverter_mode(self, calc_input: CalculationInput,
logger.debug('Discharging is NOT allowed')
inverter_control_settings.allow_discharge = False
charging_limit_percent = self.calculation_parameters.max_charging_from_grid_limit * 100
required_recharge_energy = self.__get_required_recharge_energy(
calc_input,
net_consumption[:max_hour],
prices
)
charge_limit_capacity = self.common.max_capacity * \
self.calculation_parameters.max_charging_from_grid_limit
is_charging_possible = calc_input.stored_energy < charge_limit_capacity

logger.debug('Charging allowed: %s',
is_charging_possible)
# Defaults to 0, only calculate if charging is possible
required_recharge_energy = 0

logger.debug('Charging allowed: %s', is_charging_possible)
if is_charging_possible:
logger.debug('Charging is allowed, because SOC is below %.0f%%',
charging_limit_percent
)
required_recharge_energy = self.__get_required_recharge_energy(
calc_input,
net_consumption[:max_hour],
prices
)
else:
logger.debug('Charging is NOT allowed, because SOC is above %.0f%%',
charging_limit_percent
)

if required_recharge_energy > 0:
logger.debug(
allowed_charging_energy = charge_limit_capacity - calc_input.stored_energy
if required_recharge_energy > allowed_charging_energy:
required_recharge_energy = allowed_charging_energy
logger.debug(
'Required recharge energy limited by max. charging limit to %0.1f Wh',
required_recharge_energy
)
logger.info(
'Get additional energy via grid: %0.1f Wh',
required_recharge_energy
)
else:
elif required_recharge_energy == 0 and is_charging_possible:
logger.debug(
'No additional energy required or possible price found.')

Expand All @@ -150,7 +160,7 @@ def calculate_inverter_mode(self, calc_input: CalculationInput,
def __is_discharge_allowed(self, calc_input: CalculationInput,
net_consumption: np.ndarray,
prices: dict,
calc_timestamp:datetime = None) -> bool:
calc_timestamp: Optional[datetime.datetime] = None) -> bool:
""" Evaluate if the battery is allowed to discharge

- Check if battery is above always_allow_discharge_limit
Expand Down
170 changes: 170 additions & 0 deletions tests/batcontrol/logic/test_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,5 +156,175 @@ def test_discharge_allowed_because_highest_price(self):
result = self.logic.get_inverter_control_settings()
self.assertTrue(result.allow_discharge, "Discharge should be allowed")

def test_charge_calculation_when_charging_possible(self):
"""Test charge calculation when charging is possible due to low SOC"""
stored_energy = 2000 # 2 kWh, well below charging limit (79% = 7.9 kWh)
stored_usable_energy, free_capacity = self._calculate_battery_values(
stored_energy, self.max_capacity
)

# Setup scenario with high future prices to trigger charging
consumption = np.array([1000, 2000, 1500]) # Higher consumption in future hours
production = np.array([0, 0, 0]) # No production

calc_input = CalculationInput(
consumption=consumption,
production=production,
prices={0: 0.20, 1: 0.35, 2: 0.30}, # Low current price, high future prices
stored_energy=stored_energy,
stored_usable_energy=stored_usable_energy,
free_capacity=free_capacity,
)

# Test at 30 minutes past the hour to test charge rate calculation
calc_timestamp = datetime.datetime(2025, 6, 20, 12, 30, 0, tzinfo=datetime.timezone.utc)
self.assertTrue(self.logic.calculate(calc_input, calc_timestamp))
result = self.logic.get_inverter_control_settings()
calc_output = self.logic.get_calculation_output()

# Verify charging is enabled
self.assertFalse(result.allow_discharge, "Discharge should not be allowed when charging needed")
self.assertTrue(result.charge_from_grid, "Should charge from grid when energy needed for high price hours")
self.assertGreater(result.charge_rate, 0, "Charge rate should be greater than 0")
self.assertGreater(calc_output.required_recharge_energy, 0, "Should calculate required recharge energy")

def test_charge_calculation_when_charging_not_possible_high_soc(self):
"""Test charge calculation when charging is not possible due to high SOC"""
# Set SOC above charging limit (79%)
stored_energy = 8000 # 8.0 kWh, above charging limit of 7.9 kWh
Copy link

Copilot AI Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove trailing whitespace after the comment.

Suggested change
stored_energy = 8000 # 8.0 kWh, above charging limit of 7.9 kWh
stored_energy = 8000 # 8.0 kWh, above charging limit of 7.9 kWh

Copilot uses AI. Check for mistakes.
stored_usable_energy, free_capacity = self._calculate_battery_values(
stored_energy, self.max_capacity
)

# Create scenario where discharge is not triggered by always_allow_discharge_limit
# Use prices where current price is low and future prices are higher, but not enough
# stored energy to satisfy future high consumption
consumption = np.array([1000, 3000, 2500]) # High future consumption that requires reserves
production = np.array([0, 0, 0]) # No production

calc_input = CalculationInput(
consumption=consumption,
production=production,
prices={0: 0.20, 1: 0.35, 2: 0.30}, # Low current, high future prices
stored_energy=stored_energy,
stored_usable_energy=stored_usable_energy,
free_capacity=free_capacity,
)

calc_timestamp = datetime.datetime(2025, 6, 20, 12, 30, 0, tzinfo=datetime.timezone.utc)
self.assertTrue(self.logic.calculate(calc_input, calc_timestamp))
result = self.logic.get_inverter_control_settings()
calc_output = self.logic.get_calculation_output()

# At high SOC (above charging limit), charging should not be possible
self.assertFalse(result.charge_from_grid, "Should not charge from grid when SOC above charging limit")
self.assertEqual(result.charge_rate, 0, "Charge rate should be 0 when charging not allowed")
self.assertEqual(calc_output.required_recharge_energy, 0, "Required recharge energy should be 0 when charging not possible")

# Discharge behavior depends on reserved energy calculation vs stored usable energy
# This test primarily focuses on charging behavior when SOC is too high

def test_charge_energy_limited_by_max_charging_limit(self):
"""Test that the actual charging energy is limited by max charging capacity"""
# Set SOC close to but below charging limit
stored_energy = 6000 # 6.0 kWh, just below limit of 7.9 kWh (only 1.9 Wh remaining)
stored_usable_energy, free_capacity = self._calculate_battery_values(
stored_energy, self.max_capacity
)

# Create scenario with high future consumption to trigger large recharge requirement
consumption = np.array([2000, 8000, 8000]) # High future consumption
production = np.array([0, 0, 0]) # No production

calc_input = CalculationInput(
consumption=consumption,
production=production,
prices={0: 0.15, 1: 0.40, 2: 0.35}, # Very low current, high future prices
stored_energy=stored_energy,
stored_usable_energy=stored_usable_energy,
free_capacity=free_capacity,
)

calc_timestamp = datetime.datetime(2025, 6, 20, 12, 0, 0, tzinfo=datetime.timezone.utc)
self.assertTrue(self.logic.calculate(calc_input, calc_timestamp))
result = self.logic.get_inverter_control_settings()

# With only 50 Wh remaining to charge limit, the system should still allow charging
# but the energy should be limited
charge_limit_capacity = self.max_capacity * self.calculation_parameters.max_charging_from_grid_limit
max_allowed_charging = charge_limit_capacity - stored_energy

self.assertAlmostEqual(max_allowed_charging, 1900.0, delta=1.0, msg="Expected about 1900 Wh remaining capacity")

self.assertTrue(result.charge_from_grid, "Should charge when capacity available")

self.assertGreater(result.charge_rate, 0, "Should have positive charge rate")
# The charge rate should be reasonable for the small remaining capacity
remaining_time = (60 - calc_timestamp.minute) / 60
if remaining_time > 0:
# Maximum theoretical charge energy in the remaining time
max_charge_energy = result.charge_rate * remaining_time / self.common.charge_rate_multiplier
# This should be roughly limited to the available capacity
self.assertLessEqual(max_charge_energy, max_allowed_charging + 10,
"Charge energy should be roughly limited by available capacity")

def test_calculate_inverter_mode_error_without_calculation_output(self):
"""Test that calculate_inverter_mode raises ValueError when calculation_output is None"""
logic = DefaultLogic()
logic.set_calculation_parameters(self.calculation_parameters)

stored_energy = 5000
stored_usable_energy, free_capacity = self._calculate_battery_values(
stored_energy, self.max_capacity
)

calc_input = CalculationInput(
consumption=np.array([500, 600, 700]),
production=np.array([0, 0, 0]),
prices={0: 0.25, 1: 0.30, 2: 0.35},
stored_energy=stored_energy,
stored_usable_energy=stored_usable_energy,
free_capacity=free_capacity,
)

# Call calculate_inverter_mode directly without calling calculate() first
with self.assertRaises(ValueError) as context:
logic.calculate_inverter_mode(calc_input)

self.assertIn("Calculation output is not set", str(context.exception))

def test_charge_rate_calculation_with_remaining_time(self):
"""Test that charge rate is correctly calculated based on remaining time in hour"""
stored_energy = 3000 # 3 kWh
stored_usable_energy, free_capacity = self._calculate_battery_values(
stored_energy, self.max_capacity
)

consumption = np.array([1000, 3000, 2000]) # High consumption in hour 1
production = np.array([0, 0, 0])

calc_input = CalculationInput(
consumption=consumption,
production=production,
prices={0: 0.20, 1: 0.35, 2: 0.25}, # High price in hour 1
stored_energy=stored_energy,
stored_usable_energy=stored_usable_energy,
free_capacity=free_capacity,
)

# Test at 45 minutes past the hour (15 minutes remaining)
calc_timestamp = datetime.datetime(2025, 6, 20, 12, 45, 0, tzinfo=datetime.timezone.utc)
self.assertTrue(self.logic.calculate(calc_input, calc_timestamp))
result = self.logic.get_inverter_control_settings()
calc_output = self.logic.get_calculation_output()

if calc_output.required_recharge_energy > 0:
expected_remaining_time = (60 - 45) / 60 # 15 minutes = 0.25 hours
expected_charge_rate_before_multiplier = calc_output.required_recharge_energy / expected_remaining_time

# The actual charge rate should be higher due to charge_rate_multiplier (1.1)
self.assertGreater(result.charge_rate, expected_charge_rate_before_multiplier,
"Charge rate should be adjusted by charge_rate_multiplier")

if __name__ == '__main__':
unittest.main()