diff --git a/custom_components/sat/area.py b/custom_components/sat/area.py index fb3f2a87..58cafd07 100644 --- a/custom_components/sat/area.py +++ b/custom_components/sat/area.py @@ -5,7 +5,10 @@ from homeassistant.const import STATE_UNKNOWN, STATE_UNAVAILABLE from homeassistant.core import HomeAssistant, State +from .heating_curve import HeatingCurve from .helpers import float_value +from .pid import PID +from .pwm import PWM from .util import ( create_pwm_controller, create_pid_controller, @@ -17,13 +20,13 @@ class Area: def __init__(self, config_data: MappingProxyType[str, Any], config_options: MappingProxyType[str, Any], entity_id: str): - self._hass = None - self._entity_id = entity_id + self._entity_id: str = entity_id + self._hass: HomeAssistant | None = None # Create controllers with the given configuration options - self.pid = create_pid_controller(config_options) - self.heating_curve = create_heating_curve_controller(config_data, config_options) - self.pwm = create_pwm_controller(self.heating_curve, config_data, config_options) + self.pid: PID = create_pid_controller(config_options) + self.heating_curve: HeatingCurve = create_heating_curve_controller(config_data, config_options) + self.pwm: PWM = create_pwm_controller(self.heating_curve, config_data, config_options) @property def id(self) -> str: @@ -75,18 +78,20 @@ async def async_added_to_hass(self, hass: HomeAssistant): async def async_control_heating_loop(self, _time=None) -> None: """Asynchronously control the heating loop.""" - if (temperature_error := self.error) is not None: - # Control the integral (if exceeded the time limit) - self.pid.update_integral(temperature_error, self.heating_curve.value) + if self.error is None or self.heating_curve.value is None: + return + + # Control the integral (if exceeded the time limit) + self.pid.update_integral(self.error, self.heating_curve.value) class Areas: - def __init__(self, config_data: MappingProxyType[str, Any], config_options: MappingProxyType[str, Any], entity_ids: list): + def __init__(self, config_data: MappingProxyType[str, Any], config_options: MappingProxyType[str, Any], entity_ids: list[str]): """Initialize Areas with multiple Area instances using shared config data and options.""" - self._entity_ids = entity_ids - self._config_data = config_data - self._config_options = config_options - self._areas = [Area(config_data, config_options, entity_id) for entity_id in entity_ids] + self._entity_ids: list[str] = entity_ids + self._config_data: MappingProxyType[str, Any] = config_data + self._config_options: MappingProxyType[str, Any] = config_options + self._areas: list[Area] = [Area(config_data, config_options, entity_id) for entity_id in entity_ids] @property def errors(self) -> List[float]: diff --git a/custom_components/sat/climate.py b/custom_components/sat/climate.py index c6295dbf..21d804c0 100644 --- a/custom_components/sat/climate.py +++ b/custom_components/sat/climate.py @@ -3,8 +3,9 @@ import asyncio import logging -from datetime import timedelta +from datetime import timedelta, datetime from time import monotonic, time +from typing import Optional from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR_DOMAIN from homeassistant.components.climate import ( @@ -68,7 +69,7 @@ async def async_setup_entry(_hass: HomeAssistant, _config_entry: ConfigEntry, _a class SatWarmingUp: - def __init__(self, error: float, boiler_temperature: float = None, started: int = None): + def __init__(self, error: float, boiler_temperature: Optional[float] = None, started: Optional[int] = None): self.error = error self.boiler_temperature = boiler_temperature self.started = started if started is not None else int(time()) @@ -238,7 +239,7 @@ async def async_added_to_hass(self) -> None: # Let the coordinator know we are ready await self._coordinator.async_added_to_hass() - async def _register_event_listeners(self, _time=None): + async def _register_event_listeners(self, _time: Optional[datetime] = None): """Register event listeners.""" self.async_on_remove( async_track_time_interval( @@ -890,7 +891,7 @@ async def async_track_sensor_temperature(self, entity_id): self._sensors.append(entity_id) - async def async_control_heating_loop(self, _time=None) -> None: + async def async_control_heating_loop(self, _time: Optional[datetime] = None) -> None: """Control the heating based on current temperature, target temperature, and outside temperature.""" # If the current, target or outside temperature is not available, do nothing if self.current_temperature is None or self.target_temperature is None or self.current_outside_temperature is None: diff --git a/custom_components/sat/config_flow.py b/custom_components/sat/config_flow.py index 8eec1cc9..68e721d9 100644 --- a/custom_components/sat/config_flow.py +++ b/custom_components/sat/config_flow.py @@ -1,7 +1,7 @@ """Adds config flow for SAT.""" import asyncio import logging -from typing import Any +from typing import Optional, Any import voluptuous as vol from homeassistant import config_entries @@ -25,8 +25,8 @@ from . import SatDataUpdateCoordinatorFactory from .const import * from .coordinator import SatDataUpdateCoordinator -from .manufacturer import ManufacturerFactory, MANUFACTURERS from .helpers import calculate_default_maximum_setpoint, snake_case +from .manufacturer import ManufacturerFactory, MANUFACTURERS from .overshoot_protection import OvershootProtection from .validators import valid_serial_device @@ -519,7 +519,7 @@ async def async_create_coordinator(self) -> SatDataUpdateCoordinator: hass=self.hass, data=self.data, mode=self.data[CONF_MODE], device=self.data[CONF_DEVICE] ) - def _create_mqtt_form(self, step_id: str, default_topic: str = None, default_device: str = None): + def _create_mqtt_form(self, step_id: str, default_topic: Optional[str] = None, default_device: Optional[str] = None): """Create a common MQTT configuration form.""" schema = {vol.Required(CONF_NAME, default=DEFAULT_NAME): str} diff --git a/custom_components/sat/coordinator.py b/custom_components/sat/coordinator.py index f9cda253..5d3cac13 100644 --- a/custom_components/sat/coordinator.py +++ b/custom_components/sat/coordinator.py @@ -354,7 +354,7 @@ async def async_will_remove_from_hass(self) -> None: """Run when an entity is removed from hass.""" pass - async def async_control_heating_loop(self, climate: SatClimate = None, _time=None) -> None: + async def async_control_heating_loop(self, climate: Optional[SatClimate] = None, _time=None) -> None: """Control the heating loop for the device.""" # Update Flame State if not self.flame_active: diff --git a/custom_components/sat/esphome/__init__.py b/custom_components/sat/esphome/__init__.py index 2a8585f6..84391d54 100644 --- a/custom_components/sat/esphome/__init__.py +++ b/custom_components/sat/esphome/__init__.py @@ -12,6 +12,8 @@ from homeassistant.components.switch import DOMAIN as SWITCH_DOMAIN, SERVICE_TURN_ON, SERVICE_TURN_OFF from homeassistant.core import HomeAssistant, Event from homeassistant.helpers import device_registry, entity_registry +from homeassistant.helpers.device_registry import DeviceEntry +from homeassistant.helpers.entity_registry import EntityRegistry, RegistryEntry from homeassistant.helpers.event import async_track_state_change_event from ..coordinator import DeviceState, SatDataUpdateCoordinator, SatEntityCoordinator @@ -47,13 +49,13 @@ class SatEspHomeCoordinator(SatDataUpdateCoordinator, SatEntityCoordinator): def __init__(self, hass: HomeAssistant, device_id: str, data: Mapping[str, Any], options: Mapping[str, Any] | None = None) -> None: super().__init__(hass, data, options) - self.data = {} + self.data: dict = {} - self._device = device_registry.async_get(hass).async_get(device_id) - self._mac_address = list(self._device.connections)[0][1] + self._device: DeviceEntry = device_registry.async_get(hass).async_get(device_id) + self._mac_address: str = list(self._device.connections)[0][1] - self._entity_registry = entity_registry.async_get(hass) - self._entities = entity_registry.async_entries_for_device(self._entity_registry, self._device.id) + self._entity_registry: EntityRegistry = entity_registry.async_get(hass) + self._entities: list[RegistryEntry] = entity_registry.async_entries_for_device(self._entity_registry, self._device.id) @property def device_id(self) -> str: @@ -113,7 +115,7 @@ def minimum_hot_water_setpoint(self) -> float: return super().minimum_hot_water_setpoint @property - def maximum_hot_water_setpoint(self) -> float | None: + def maximum_hot_water_setpoint(self) -> float: if (setpoint := self.get(SENSOR_DOMAIN, DATA_DHW_SETPOINT_MAXIMUM)) is not None: return float(setpoint) diff --git a/custom_components/sat/helpers.py b/custom_components/sat/helpers.py index eb82acb3..f80fc179 100644 --- a/custom_components/sat/helpers.py +++ b/custom_components/sat/helpers.py @@ -24,7 +24,7 @@ def seconds_since(start_time: float | None) -> float: return monotonic() - start_time -def convert_time_str_to_seconds(time_str: str) -> float: +def convert_time_str_to_seconds(time_str: str) -> int: """ Convert a time string in the format 'HH:MM:SS' to seconds. @@ -32,11 +32,11 @@ def convert_time_str_to_seconds(time_str: str) -> float: time_str: A string representing a time in the format 'HH:MM:SS'. Returns: - float: The time in seconds. + int: The time in seconds. """ date_time = dt.parse_time(time_str) # Calculate the number of seconds by multiplying the hours, minutes and seconds - return (date_time.hour * 3600) + (date_time.minute * 60) + date_time.second + return round((date_time.hour * 3600) + (date_time.minute * 60) + date_time.second, 0) def calculate_derivative_per_hour(temperature_error: float, time_taken_seconds: float): diff --git a/custom_components/sat/mqtt/__init__.py b/custom_components/sat/mqtt/__init__.py index d1b64e7a..46a023f1 100644 --- a/custom_components/sat/mqtt/__init__.py +++ b/custom_components/sat/mqtt/__init__.py @@ -22,10 +22,10 @@ class SatMqttCoordinator(SatDataUpdateCoordinator, ABC): def __init__(self, hass: HomeAssistant, device_id: str, data: Mapping[str, Any], options: Mapping[str, Any] | None = None) -> None: super().__init__(hass, data, options) - self.data = {} - self._device_id = device_id - self._topic = data.get(CONF_MQTT_TOPIC) - self._store = Store(hass, STORAGE_VERSION, snake_case(f"{self.__class__.__name__}_{device_id}")) + self.data: dict = {} + self._device_id: str = device_id + self._topic: str = data.get(CONF_MQTT_TOPIC) + self._store: Store = Store(hass, STORAGE_VERSION, snake_case(f"{self.__class__.__name__}_{device_id}")) @property def device_id(self) -> str: diff --git a/custom_components/sat/mqtt/ems.py b/custom_components/sat/mqtt/ems.py index c7430105..3fd2308a 100644 --- a/custom_components/sat/mqtt/ems.py +++ b/custom_components/sat/mqtt/ems.py @@ -100,9 +100,9 @@ def member_id(self) -> int | None: # Not supported (yet) return None - async def boot(self) -> SatMqttCoordinator: + async def boot(self) -> None: # Nothing needs to be booted (yet) - return self + pass def get_tracked_entities(self) -> list[str]: return [DATA_BOILER_DATA] diff --git a/custom_components/sat/mqtt/opentherm.py b/custom_components/sat/mqtt/opentherm.py index cd577f50..1a62a50c 100644 --- a/custom_components/sat/mqtt/opentherm.py +++ b/custom_components/sat/mqtt/opentherm.py @@ -86,7 +86,7 @@ def minimum_hot_water_setpoint(self) -> float: return super().minimum_hot_water_setpoint @property - def maximum_hot_water_setpoint(self) -> float | None: + def maximum_hot_water_setpoint(self) -> float: if (setpoint := self.data.get(DATA_DHW_SETPOINT_MAXIMUM)) is not None: return float(setpoint) diff --git a/custom_components/sat/overshoot_protection.py b/custom_components/sat/overshoot_protection.py index 4fd7a38b..8fdda714 100644 --- a/custom_components/sat/overshoot_protection.py +++ b/custom_components/sat/overshoot_protection.py @@ -17,10 +17,10 @@ class OvershootProtection: def __init__(self, coordinator: SatDataUpdateCoordinator, heating_system: str): """Initialize OvershootProtection with a coordinator and heating system configuration.""" - self._alpha = 0.5 - self._coordinator = coordinator - self._stable_temperature = None - self._setpoint = OVERSHOOT_PROTECTION_SETPOINT.get(heating_system) + self._alpha: float = 0.5 + self._stable_temperature: float | None = None + self._coordinator: SatDataUpdateCoordinator = coordinator + self._setpoint: int = OVERSHOOT_PROTECTION_SETPOINT.get(heating_system) if self._setpoint is None: raise ValueError(f"Invalid heating system: {heating_system}") diff --git a/custom_components/sat/pid.py b/custom_components/sat/pid.py index a75b0c6b..66f0b946 100644 --- a/custom_components/sat/pid.py +++ b/custom_components/sat/pid.py @@ -43,38 +43,38 @@ def __init__(self, :param sample_time_limit: The minimum time interval between updates to the PID controller, in seconds. :param version: The version of math calculation for the controller. """ - self._kp = kp - self._ki = ki - self._kd = kd - self._version = version - self._deadband = deadband - self._history_size = max_history - self._heating_system = heating_system - self._automatic_gains = automatic_gains - self._automatic_gains_value = automatic_gain_value - self._derivative_time_weight = derivative_time_weight - self._heating_curve_coefficient = heating_curve_coefficient - - self._last_interval_updated = monotonic() - self._sample_time_limit = max(sample_time_limit, 1) - self._integral_time_limit = max(integral_time_limit, 1) + self._kp: float = kp + self._ki: float = ki + self._kd: float = kd + self._version: int = version + self._deadband: float = deadband + self._history_size: int = max_history + self._heating_system: str = heating_system + self._automatic_gains: bool = automatic_gains + self._automatic_gains_value: float = automatic_gain_value + self._derivative_time_weight: float = derivative_time_weight + self._heating_curve_coefficient: float = heating_curve_coefficient + + self._last_interval_updated: float = monotonic() + self._sample_time_limit: float = max(sample_time_limit, 1) + self._integral_time_limit: float = max(integral_time_limit, 1) self.reset() def reset(self) -> None: """Reset the PID controller.""" - self._last_error = 0.0 - self._time_elapsed = 0 - self._last_updated = monotonic() - self._last_heating_curve_value = 0 - self._last_boiler_temperature = None + self._last_error: float = 0.0 + self._time_elapsed: float = 0 + self._last_updated: float = monotonic() + self._last_heating_curve_value: float = 0 + self._last_boiler_temperature: float | None = None # Reset the integral and derivative - self._integral = 0.0 - self._raw_derivative = 0.0 + self._integral: float = 0.0 + self._raw_derivative: float = 0.0 # Reset all lists - self._times = deque(maxlen=self._history_size) - self._errors = deque(maxlen=self._history_size) + self._times: deque = deque(maxlen=self._history_size) + self._errors: deque = deque(maxlen=self._history_size) def update(self, error: float, heating_curve_value: float, boiler_temperature: float) -> None: """Update the PID controller with the current error, inside temperature, outside temperature, and heating curve value. diff --git a/custom_components/sat/pwm.py b/custom_components/sat/pwm.py index b127c2ee..7462a299 100644 --- a/custom_components/sat/pwm.py +++ b/custom_components/sat/pwm.py @@ -11,6 +11,7 @@ class PWMState(str, Enum): + """The current state of Pulse Width Modulation""" ON = "on" OFF = "off" IDLE = "idle" @@ -21,25 +22,25 @@ class PWM: def __init__(self, heating_curve: HeatingCurve, max_cycle_time: int, automatic_duty_cycle: bool, max_cycles: int, force: bool = False): """Initialize the PWM control.""" - self._alpha = 0.2 - self._force = force - self._last_boiler_temperature = None + self._alpha: float = 0.2 + self._force: bool = force + self._last_boiler_temperature: float | None = None - self._max_cycles = max_cycles - self._heating_curve = heating_curve - self._max_cycle_time = max_cycle_time - self._automatic_duty_cycle = automatic_duty_cycle + self._max_cycles: int = max_cycles + self._heating_curve: HeatingCurve = heating_curve + self._max_cycle_time: int = max_cycle_time + self._automatic_duty_cycle: bool = automatic_duty_cycle # Timing thresholds for duty cycle management - self._on_time_lower_threshold = 180 - self._on_time_upper_threshold = 3600 / self._max_cycles - self._on_time_max_threshold = self._on_time_upper_threshold * 2 + self._on_time_lower_threshold: float = 180 + self._on_time_upper_threshold: float = 3600 / self._max_cycles + self._on_time_max_threshold: float = self._on_time_upper_threshold * 2 # Duty cycle percentage thresholds - self._duty_cycle_lower_threshold = self._on_time_lower_threshold / self._on_time_upper_threshold - self._duty_cycle_upper_threshold = 1 - self._duty_cycle_lower_threshold - self._min_duty_cycle_percentage = self._duty_cycle_lower_threshold / 2 - self._max_duty_cycle_percentage = 1 - self._min_duty_cycle_percentage + self._duty_cycle_lower_threshold: float = self._on_time_lower_threshold / self._on_time_upper_threshold + self._duty_cycle_upper_threshold: float = 1 - self._duty_cycle_lower_threshold + self._min_duty_cycle_percentage: float = self._duty_cycle_lower_threshold / 2 + self._max_duty_cycle_percentage: float = 1 - self._min_duty_cycle_percentage _LOGGER.debug( "Initialized PWM control with duty cycle thresholds - Lower: %.2f%%, Upper: %.2f%%", @@ -50,13 +51,13 @@ def __init__(self, heating_curve: HeatingCurve, max_cycle_time: int, automatic_d def reset(self) -> None: """Reset the PWM control.""" - self._cycles = 0 - self._duty_cycle = None - self._state = PWMState.IDLE - self._last_update = monotonic() + self._cycles: int = 0 + self._state: PWMState = PWMState.IDLE + self._last_update: float = monotonic() + self._duty_cycle: Tuple[int, int] | None = None - self._first_duty_cycle_start = None - self._last_duty_cycle_percentage = None + self._first_duty_cycle_start: float | None = None + self._last_duty_cycle_percentage: float | None = None _LOGGER.info("PWM control reset to initial state.") diff --git a/custom_components/sat/relative_modulation.py b/custom_components/sat/relative_modulation.py index 1b2d2656..ad77f1e4 100644 --- a/custom_components/sat/relative_modulation.py +++ b/custom_components/sat/relative_modulation.py @@ -18,9 +18,9 @@ class RelativeModulationState(str, Enum): class RelativeModulation: def __init__(self, coordinator: SatDataUpdateCoordinator, heating_system: str): """Initialize instance variables""" - self._coordinator = coordinator - self._heating_system = heating_system - self._pulse_width_modulation_enabled = None + self._heating_system: str = heating_system + self._pulse_width_modulation_enabled: bool = False + self._coordinator: SatDataUpdateCoordinator = coordinator _LOGGER.debug("Relative Modulation initialized for heating system: %s", heating_system) @@ -45,4 +45,4 @@ def state(self) -> RelativeModulationState: @property def enabled(self) -> bool: """Check if the relative modulation is enabled based on its current state""" - return self.state != RelativeModulationState.OFF \ No newline at end of file + return self.state != RelativeModulationState.OFF diff --git a/custom_components/sat/simulator/__init__.py b/custom_components/sat/simulator/__init__.py index 7d0333ff..8bc884f1 100644 --- a/custom_components/sat/simulator/__init__.py +++ b/custom_components/sat/simulator/__init__.py @@ -1,7 +1,8 @@ from __future__ import annotations +from datetime import datetime from time import monotonic -from typing import TYPE_CHECKING, Mapping, Any +from typing import Optional, TYPE_CHECKING, Mapping, Any from homeassistant.core import HomeAssistant @@ -84,7 +85,7 @@ async def async_set_control_max_setpoint(self, value: float) -> None: self._maximum_setpoint = value await super().async_set_control_max_setpoint(value) - async def async_control_heating_loop(self, climate: SatClimate = None, _time=None) -> None: + async def async_control_heating_loop(self, climate: Optional[SatClimate] = None, _time: Optional[datetime] = None) -> None: # Calculate the difference, so we know when to slowdown difference = abs(self._boiler_temperature - self.target) self.logger.debug(f"Target: {self.target}, Current: {self._boiler_temperature}, Difference: {difference}") diff --git a/custom_components/sat/switch/__init__.py b/custom_components/sat/switch/__init__.py index df8bcc8b..a9097b49 100644 --- a/custom_components/sat/switch/__init__.py +++ b/custom_components/sat/switch/__init__.py @@ -7,6 +7,7 @@ from homeassistant.const import SERVICE_TURN_ON, SERVICE_TURN_OFF, ATTR_ENTITY_ID, STATE_ON from homeassistant.core import HomeAssistant from homeassistant.helpers import entity_registry +from homeassistant.helpers.entity_registry import RegistryEntry from ..coordinator import DeviceState, SatDataUpdateCoordinator @@ -21,7 +22,7 @@ def __init__(self, hass: HomeAssistant, entity_id: str, data: Mapping[str, Any], """Initialize.""" super().__init__(hass, data, options) - self._entity = entity_registry.async_get(hass).async_get(entity_id) + self._entity: RegistryEntry = entity_registry.async_get(hass).async_get(entity_id) @property def device_id(self) -> str: