diff --git a/custom_components/solaredge_modbus_multi/__init__.py b/custom_components/solaredge_modbus_multi/__init__.py index 9765acdd..a8df7323 100644 --- a/custom_components/solaredge_modbus_multi/__init__.py +++ b/custom_components/solaredge_modbus_multi/__init__.py @@ -1,21 +1,34 @@ """The SolarEdge Modbus Integration.""" import logging from datetime import timedelta +from typing import Any import async_timeout from homeassistant.config_entries import ConfigEntry -from homeassistant.const import (CONF_HOST, CONF_NAME, CONF_PORT, - CONF_SCAN_INTERVAL, Platform) +from homeassistant.const import ( + CONF_HOST, + CONF_NAME, + CONF_PORT, + CONF_SCAN_INTERVAL, + Platform, +) from homeassistant.core import HomeAssistant -from homeassistant.helpers.update_coordinator import (DataUpdateCoordinator, - UpdateFailed) - -from .const import (CONF_DETECT_BATTERIES, CONF_DETECT_METERS, CONF_DEVICE_ID, - CONF_KEEP_MODBUS_OPEN, CONF_NUMBER_INVERTERS, - CONF_SINGLE_DEVICE_ENTITY, DEFAULT_DETECT_BATTERIES, - DEFAULT_DETECT_METERS, DEFAULT_KEEP_MODBUS_OPEN, - DEFAULT_SCAN_INTERVAL, DEFAULT_SINGLE_DEVICE_ENTITY, - DOMAIN) +from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed + +from .const import ( + CONF_DETECT_BATTERIES, + CONF_DETECT_METERS, + CONF_DEVICE_ID, + CONF_KEEP_MODBUS_OPEN, + CONF_NUMBER_INVERTERS, + CONF_SINGLE_DEVICE_ENTITY, + DEFAULT_DETECT_BATTERIES, + DEFAULT_DETECT_METERS, + DEFAULT_KEEP_MODBUS_OPEN, + DEFAULT_SCAN_INTERVAL, + DEFAULT_SINGLE_DEVICE_ENTITY, + DOMAIN, +) from .hub import DataUpdateFailed, HubInitFailed, SolarEdgeModbusMultiHub _LOGGER = logging.getLogger(__name__) @@ -25,7 +38,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Set up SolarEdge Modbus from a config entry.""" - + entry_updates: dict[str, Any] = {} if CONF_SCAN_INTERVAL in entry.data: data = {**entry.data} @@ -36,7 +49,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: } if entry_updates: hass.config_entries.async_update_entry(entry, **entry_updates) - + solaredge_hub = SolarEdgeModbusMultiHub( hass, entry.data[CONF_NAME], @@ -49,63 +62,66 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: entry.options.get(CONF_SINGLE_DEVICE_ENTITY, DEFAULT_SINGLE_DEVICE_ENTITY), entry.options.get(CONF_KEEP_MODBUS_OPEN, DEFAULT_KEEP_MODBUS_OPEN), ) - + coordinator = SolarEdgeCoordinator( hass, solaredge_hub, - entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL) + entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL), ) - + hass.data.setdefault(DOMAIN, {}) hass.data[DOMAIN][entry.entry_id] = { "hub": solaredge_hub, "coordinator": coordinator, } - + await coordinator.async_config_entry_first_refresh() - + await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) - + entry.async_on_unload(entry.add_update_listener(async_reload_entry)) - + return True + async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Unload a config entry.""" - solaredge_hub = hass.data[DOMAIN][entry.entry_id]['hub'] + solaredge_hub = hass.data[DOMAIN][entry.entry_id]["hub"] unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) - + if unload_ok: await solaredge_hub.shutdown() hass.data[DOMAIN].pop(entry.entry_id) - + return unload_ok + async def async_reload_entry(hass: HomeAssistant, entry: ConfigEntry) -> None: """Handle an options update.""" await hass.config_entries.async_reload(entry.entry_id) + class SolarEdgeCoordinator(DataUpdateCoordinator): def __init__(self, hass, hub, scan_interval): super().__init__( hass, _LOGGER, - name = "SolarEdge Coordinator", - update_interval = timedelta(seconds=scan_interval), + name="SolarEdge Coordinator", + update_interval=timedelta(seconds=scan_interval), ) self._hub = hub - + if scan_interval < 10: _LOGGER.warning("Polling frequency < 10, requiring keep modbus open.") self._hub.keep_modbus_open = True - + async def _async_update_data(self): try: async with async_timeout.timeout(5): return await self._hub.async_refresh_modbus_data() - + except HubInitFailed as e: raise UpdateFailed(f"{e}") - + except DataUpdateFailed as e: raise UpdateFailed(f"{e}") diff --git a/custom_components/solaredge_modbus_multi/config_flow.py b/custom_components/solaredge_modbus_multi/config_flow.py index 5d128b4e..7e35f9fc 100644 --- a/custom_components/solaredge_modbus_multi/config_flow.py +++ b/custom_components/solaredge_modbus_multi/config_flow.py @@ -5,19 +5,28 @@ import voluptuous as vol from homeassistant import config_entries from homeassistant.config_entries import ConfigEntry -from homeassistant.const import (CONF_HOST, CONF_NAME, CONF_PORT, - CONF_SCAN_INTERVAL) +from homeassistant.const import CONF_HOST, CONF_NAME, CONF_PORT, CONF_SCAN_INTERVAL from homeassistant.core import HomeAssistant, callback from homeassistant.data_entry_flow import FlowResult -from .const import (CONF_DETECT_BATTERIES, CONF_DETECT_METERS, CONF_DEVICE_ID, - CONF_KEEP_MODBUS_OPEN, CONF_NUMBER_INVERTERS, - CONF_SINGLE_DEVICE_ENTITY, DEFAULT_DETECT_BATTERIES, - DEFAULT_DETECT_METERS, DEFAULT_DEVICE_ID, - DEFAULT_KEEP_MODBUS_OPEN, DEFAULT_NAME, - DEFAULT_NUMBER_INVERTERS, DEFAULT_PORT, - DEFAULT_SCAN_INTERVAL, DEFAULT_SINGLE_DEVICE_ENTITY, - DOMAIN) +from .const import ( + CONF_DETECT_BATTERIES, + CONF_DETECT_METERS, + CONF_DEVICE_ID, + CONF_KEEP_MODBUS_OPEN, + CONF_NUMBER_INVERTERS, + CONF_SINGLE_DEVICE_ENTITY, + DEFAULT_DETECT_BATTERIES, + DEFAULT_DETECT_METERS, + DEFAULT_DEVICE_ID, + DEFAULT_KEEP_MODBUS_OPEN, + DEFAULT_NAME, + DEFAULT_NUMBER_INVERTERS, + DEFAULT_PORT, + DEFAULT_SCAN_INTERVAL, + DEFAULT_SINGLE_DEVICE_ENTITY, + DOMAIN, +) def host_valid(host): @@ -29,6 +38,7 @@ def host_valid(host): disallowed = re.compile(r"[^a-zA-Z\d\-]") return all(x and not disallowed.search(x) for x in host.split(".")) + @callback def solaredge_modbus_multi_entries(hass: HomeAssistant): """Return the hosts already configured.""" @@ -36,6 +46,7 @@ def solaredge_modbus_multi_entries(hass: HomeAssistant): entry.data[CONF_HOST] for entry in hass.config_entries.async_entries(DOMAIN) ) + class SolaredgeModbusMultiConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): """Solaredge Modbus configflow.""" @@ -58,7 +69,7 @@ async def async_step_user(self, user_input=None): errors = {} if user_input is not None: - + if self._host_in_configuration_exists(user_input[CONF_HOST]): errors[CONF_HOST] = "already_configured" elif not host_valid(user_input[CONF_HOST]): @@ -68,11 +79,11 @@ async def async_step_user(self, user_input=None): elif user_input[CONF_PORT] > 65535: errors[CONF_PORT] = "invalid_tcp_port" elif user_input[CONF_DEVICE_ID] > 247: - errors[CONF_DEVICE_ID] = "max_device_id" + errors[CONF_DEVICE_ID] = "max_device_id" elif user_input[CONF_DEVICE_ID] < 1: errors[CONF_DEVICE_ID] = "min_device_id" elif user_input[CONF_NUMBER_INVERTERS] > 32: - errors[CONF_NUMBER_INVERTERS] = "max_inverters" + errors[CONF_NUMBER_INVERTERS] = "max_inverters" elif user_input[CONF_NUMBER_INVERTERS] < 1: errors[CONF_NUMBER_INVERTERS] = "min_inverters" elif user_input[CONF_NUMBER_INVERTERS] + user_input[CONF_DEVICE_ID] > 247: @@ -96,75 +107,85 @@ async def async_step_user(self, user_input=None): step_id="user", data_schema=vol.Schema( { - vol.Optional( - CONF_NAME, default=user_input[CONF_NAME] - ): cv.string, - vol.Required( - CONF_HOST, default=user_input[CONF_HOST] - ): cv.string, - vol.Required( - CONF_PORT, default=user_input[CONF_PORT] - ): vol.Coerce(int), + vol.Optional(CONF_NAME, default=user_input[CONF_NAME]): cv.string, + vol.Required(CONF_HOST, default=user_input[CONF_HOST]): cv.string, + vol.Required(CONF_PORT, default=user_input[CONF_PORT]): vol.Coerce( + int + ), vol.Required( - CONF_NUMBER_INVERTERS, default=user_input[CONF_NUMBER_INVERTERS] + CONF_NUMBER_INVERTERS, + default=user_input[CONF_NUMBER_INVERTERS], ): vol.Coerce(int), vol.Required( CONF_DEVICE_ID, default=user_input[CONF_DEVICE_ID] ): vol.Coerce(int), }, ), - errors = errors + errors=errors, ) + class SolaredgeModbusMultiOptionsFlowHandler(config_entries.OptionsFlow): def __init__(self, config_entry: ConfigEntry): """Initialize options flow.""" self.config_entry = config_entry - async def async_step_init(self, user_input = None) -> FlowResult: + async def async_step_init(self, user_input=None) -> FlowResult: errors = {} """Manage the options.""" if user_input is not None: - + if user_input[CONF_SCAN_INTERVAL] < 1: errors[CONF_SCAN_INTERVAL] = "invalid_scan_interval" elif user_input[CONF_SCAN_INTERVAL] > 86400: errors[CONF_SCAN_INTERVAL] = "invalid_scan_interval" else: - return self.async_create_entry( - title = "", - data = user_input - ) + return self.async_create_entry(title="", data=user_input) else: user_input = { - CONF_SCAN_INTERVAL: self.config_entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL), - CONF_SINGLE_DEVICE_ENTITY: self.config_entry.options.get(CONF_SINGLE_DEVICE_ENTITY, DEFAULT_SINGLE_DEVICE_ENTITY), - CONF_KEEP_MODBUS_OPEN: self.config_entry.options.get(CONF_KEEP_MODBUS_OPEN, DEFAULT_KEEP_MODBUS_OPEN), - CONF_DETECT_METERS: self.config_entry.options.get(CONF_DETECT_METERS, DEFAULT_DETECT_METERS), - CONF_DETECT_BATTERIES: self.config_entry.options.get(CONF_DETECT_BATTERIES, DEFAULT_DETECT_BATTERIES), + CONF_SCAN_INTERVAL: self.config_entry.options.get( + CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL + ), + CONF_SINGLE_DEVICE_ENTITY: self.config_entry.options.get( + CONF_SINGLE_DEVICE_ENTITY, DEFAULT_SINGLE_DEVICE_ENTITY + ), + CONF_KEEP_MODBUS_OPEN: self.config_entry.options.get( + CONF_KEEP_MODBUS_OPEN, DEFAULT_KEEP_MODBUS_OPEN + ), + CONF_DETECT_METERS: self.config_entry.options.get( + CONF_DETECT_METERS, DEFAULT_DETECT_METERS + ), + CONF_DETECT_BATTERIES: self.config_entry.options.get( + CONF_DETECT_BATTERIES, DEFAULT_DETECT_BATTERIES + ), } return self.async_show_form( - step_id = "init", - data_schema = vol.Schema( + step_id="init", + data_schema=vol.Schema( { vol.Optional( - CONF_SCAN_INTERVAL, default=user_input[CONF_SCAN_INTERVAL] + CONF_SCAN_INTERVAL, + default=user_input[CONF_SCAN_INTERVAL], ): vol.Coerce(int), vol.Optional( - CONF_SINGLE_DEVICE_ENTITY, default=user_input[CONF_SINGLE_DEVICE_ENTITY] + CONF_SINGLE_DEVICE_ENTITY, + default=user_input[CONF_SINGLE_DEVICE_ENTITY], ): cv.boolean, vol.Optional( - CONF_KEEP_MODBUS_OPEN, default=user_input[CONF_KEEP_MODBUS_OPEN] + CONF_KEEP_MODBUS_OPEN, + default=user_input[CONF_KEEP_MODBUS_OPEN], ): cv.boolean, vol.Optional( - CONF_DETECT_METERS, default=user_input[CONF_DETECT_METERS] + CONF_DETECT_METERS, + default=user_input[CONF_DETECT_METERS], ): cv.boolean, vol.Optional( - CONF_DETECT_BATTERIES, default=user_input[CONF_DETECT_BATTERIES] + CONF_DETECT_BATTERIES, + default=user_input[CONF_DETECT_BATTERIES], ): cv.boolean, }, ), - errors = errors + errors=errors, ) diff --git a/custom_components/solaredge_modbus_multi/const.py b/custom_components/solaredge_modbus_multi/const.py index 6716fae7..b3d1888a 100644 --- a/custom_components/solaredge_modbus_multi/const.py +++ b/custom_components/solaredge_modbus_multi/const.py @@ -28,7 +28,29 @@ SUNSPEC_ACCUM_LIMIT = 4294967295 SUNSPEC_NOT_IMPL_FLOAT32 = 0x7FC00000 -SUNSPEC_SF_RANGE = [-10, -9, -8, -7, -6, -5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] +SUNSPEC_SF_RANGE = [ + -10, + -9, + -8, + -7, + -6, + -5, + -4, + -3, + -2, + -1, + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, +] # parameter names per sunspec DEVICE_STATUS_DESC = { diff --git a/custom_components/solaredge_modbus_multi/helpers.py b/custom_components/solaredge_modbus_multi/helpers.py index 56027208..9ba147a1 100644 --- a/custom_components/solaredge_modbus_multi/helpers.py +++ b/custom_components/solaredge_modbus_multi/helpers.py @@ -4,26 +4,29 @@ def scale_factor(value: int, sf: int): except ZeroDivisionError: return 0 + def watts_to_kilowatts(value): return round(value * 0.001, 3) + def parse_modbus_string(s: str) -> str: s = s.decode(encoding="utf-8", errors="ignore") s = s.replace("\x00", "").rstrip() return str(s) + def update_accum(self, accum_value: int) -> None: - + if self.last is None: self.last = 0 - + if not accum_value > 0: - raise ValueError(f"update_accum must be non-zero value.") - + raise ValueError("update_accum must be non-zero value.") + if accum_value >= self.last: - # doesn't account for accumulator rollover, but it would probably take + # doesn't check accumulator rollover, but it would probably take # several decades to roll over to 0 so we'll worry about it later self.last = accum_value - return accum_value + return accum_value else: - raise ValueError(f"update_accum must be an increasing value.") + raise ValueError("update_accum must be an increasing value.") diff --git a/custom_components/solaredge_modbus_multi/hub.py b/custom_components/solaredge_modbus_multi/hub.py index e8d71d36..6e9f52b2 100644 --- a/custom_components/solaredge_modbus_multi/hub.py +++ b/custom_components/solaredge_modbus_multi/hub.py @@ -2,7 +2,7 @@ import logging import threading from collections import OrderedDict -from typing import Any, Callable, Dict, Optional +from typing import Any, Dict, Optional from homeassistant.core import HomeAssistant from pymodbus.client.sync import ModbusTcpClient @@ -18,32 +18,46 @@ class SolarEdgeError(Exception): """Base class for other exceptions""" + pass + class HubInitFailed(SolarEdgeError): """Raised when an error happens during init""" + pass + class DeviceInitFailed(SolarEdgeError): """Raised when a device can't be initialized""" + pass + class ModbusReadError(SolarEdgeError): """Raised when a modbus read fails""" + pass + class ModbusWriteError(SolarEdgeError): """Raised when a modbus write fails""" + pass + class DataUpdateFailed(SolarEdgeError): """Raised when an update cycle fails""" + pass + class DeviceInvalid(SolarEdgeError): """Raised when a device is not usable or invalid""" + pass + class SolarEdgeModbusMultiHub: def __init__( self, @@ -74,132 +88,162 @@ def __init__( self.inverters = [] self.meters = [] self.batteries = [] - + self._client = ModbusTcpClient(host=self._host, port=self._port) - + self.initalized = False self.online = False - - async def _async_init_solaredge(self) -> None: + + async def _async_init_solaredge(self) -> None: if not self.is_socket_open(): raise HubInitFailed(f"Could not open Modbus/TCP connection to {self._host}") - + if self._detect_batteries: - _LOGGER.warning("Battery registers are not officially supported by SolarEdge. Use at your own risk!") - + _LOGGER.warning( + "Battery registers not officially supported by SolarEdge. ", + "Use at your own risk!", + ) + for inverter_index in range(self.number_of_inverters): inverter_unit_id = inverter_index + self.start_device_id - + try: new_inverter = SolarEdgeInverter(inverter_unit_id, self) await self._hass.async_add_executor_job(new_inverter.init_device) self.inverters.append(new_inverter) - + except Exception as e: """Inverters are required""" _LOGGER.error(f"Inverter device ID {inverter_unit_id}: {e}") raise HubInitFailed(f"Inverter device ID {inverter_unit_id} not found.") - + if self._detect_meters: try: new_meter_1 = SolarEdgeMeter(inverter_unit_id, 1, self) await self._hass.async_add_executor_job(new_meter_1.init_device) - + for meter in self.meters: if new_meter_1.serial == meter.serial: - _LOGGER.warning(f"Duplicate serial {new_meter_1.serial}. Ignoring meter 1 on inverter ID {inverter_unit_id}") - raise DeviceInitFailed(f"Duplicate meter 1 serial {new_meter_1.serial}") - + _LOGGER.warning( + f"Duplicate serial {new_meter_1.serial} ", + f"on meter 1 inverter {inverter_unit_id}", + ) + raise DeviceInitFailed( + f"Duplicate m1 serial {new_meter_1.serial}" + ) + self.meters.append(new_meter_1) _LOGGER.debug(f"Found meter 1 on inverter ID {inverter_unit_id}") - except: + except Exception: pass - + try: new_meter_2 = SolarEdgeMeter(inverter_unit_id, 2, self) await self._hass.async_add_executor_job(new_meter_2.init_device) - + for meter in self.meters: if new_meter_2.serial == meter.serial: - _LOGGER.warning(f"Duplicate serial {new_meter_2.serial}. Ignoring meter 2 on inverter ID {inverter_unit_id}") - raise DeviceInitFailed(f"Duplicate meter 2 serial {new_meter_2.serial}") - + _LOGGER.warning( + f"Duplicate serial {new_meter_2.serial} ", + "on meter 2 inverter {inverter_unit_id}", + ) + raise DeviceInitFailed( + f"Duplicate m2 serial {new_meter_2.serial}" + ) + self.meters.append(new_meter_2) _LOGGER.debug(f"Found meter 2 on inverter ID {inverter_unit_id}") - except: + except Exception: pass - + try: new_meter_3 = SolarEdgeMeter(inverter_unit_id, 3, self) await self._hass.async_add_executor_job(new_meter_3.init_device) for meter in self.meters: if new_meter_3.serial == meter.serial: - _LOGGER.warning(f"Duplicate serial {new_meter_3.serial}. Ignoring meter 3 on inverter ID {inverter_unit_id}") - raise DeviceInitFailed(f"Duplicate meter 3 serial {new_meter_3.serial}") - + _LOGGER.warning( + f"Duplicate serial {new_meter_3.serial} ", + "on meter 3 inverter {inverter_unit_id}", + ) + raise DeviceInitFailed( + f"Duplicate m3 serial {new_meter_3.serial}" + ) + self.meters.append(new_meter_3) _LOGGER.debug(f"Found meter 3 on inverter ID {inverter_unit_id}") - except: + except Exception: pass - + if self._detect_batteries: try: new_battery_1 = SolarEdgeBattery(inverter_unit_id, 1, self) await self._hass.async_add_executor_job(new_battery_1.init_device) - + for battery in self.batteries: if new_battery_1.serial == battery.serial: - _LOGGER.warning(f"Duplicate serial {new_battery_1.serial}. Ignoring battery 1 on inverter ID {inverter_unit_id}") - raise DeviceInitFailed(f"Duplicate battery 1 serial {new_battery_1.serial}") - + _LOGGER.warning( + f"Duplicate serial {new_battery_1.serial} ", + "on battery 1 inverter {inverter_unit_id}", + ) + raise DeviceInitFailed( + f"Duplicate b1 serial {new_battery_1.serial}" + ) + self.batteries.append(new_battery_1) - _LOGGER.debug(f"Found battery 1 on inverter ID {inverter_unit_id}") - except: + _LOGGER.debug(f"Found battery 1 inverter {inverter_unit_id}") + except Exception: pass - + try: new_battery_2 = SolarEdgeBattery(inverter_unit_id, 2, self) await self._hass.async_add_executor_job(new_battery_2.init_device) - + for battery in self.batteries: if new_battery_2.serial == battery.serial: - _LOGGER.warning(f"Duplicate serial {new_battery_2.serial}. Ignoring battery 2 on inverter ID {inverter_unit_id}") - raise DeviceInitFailed(f"Duplicate battery 2 serial {new_battery_1.serial}") - + _LOGGER.warning( + f"Duplicate serial {new_battery_2.serial} ", + "on battery 2 inverter {inverter_unit_id}", + ) + raise DeviceInitFailed( + f"Duplicate b2 serial {new_battery_1.serial}" + ) + self.batteries.append(new_battery_2) - _LOGGER.debug(f"Found battery 2 on inverter ID {inverter_unit_id}") - except: + _LOGGER.debug(f"Found battery 2 inverter {inverter_unit_id}") + except Exception: pass - + try: for inverter in self.inverters: await self._hass.async_add_executor_job(inverter.read_modbus_data) - + for meter in self.meters: await self._hass.async_add_executor_job(meter.read_modbus_data) - + for battery in self.batteries: await self._hass.async_add_executor_job(battery.read_modbus_data) - - except: - raise HubInitFailed(f"Devices not ready.") - + + except Exception: + raise HubInitFailed("Devices not ready.") + self.initalized = True - + async def async_refresh_modbus_data(self, _now: Optional[int] = None) -> bool: - if not self.is_socket_open(): + if not self.is_socket_open(): await self.connect() - + if not self.initalized: await self._async_init_solaredge() - + if not self.is_socket_open(): self.online = False - raise DataUpdateFailed(f"Could not open Modbus/TCP connection to {self._host}") - + raise DataUpdateFailed( + f"Could not open Modbus/TCP connection to {self._host}" + ) + else: - self.online = True + self.online = True try: for inverter in self.inverters: await self._hass.async_add_executor_job(inverter.read_modbus_data) @@ -207,46 +251,46 @@ async def async_refresh_modbus_data(self, _now: Optional[int] = None) -> bool: await self._hass.async_add_executor_job(meter.read_modbus_data) for battery in self.batteries: await self._hass.async_add_executor_job(battery.read_modbus_data) - + except Exception as e: self.online = False raise DataUpdateFailed(f"Failed to update devices: {e}") - + if not self.keep_modbus_open: self.disconnect() - + return True - + @property def name(self): """Return the name of this hub.""" return self._name - + @property def hub_id(self) -> str: return self._id - + def disconnect(self): """Disconnect client.""" with self._lock: self._client.close() - + async def connect(self): """Connect client.""" with self._lock: await self._hass.async_add_executor_job(self._client.connect) - + def is_socket_open(self): """Check client.""" with self._lock: return self._client.is_socket_open() - + async def shutdown(self) -> None: - self.online = False + self.online = False self.disconnect() self._client = None await asyncio.sleep(5) - + def read_holding_registers(self, unit, address, count): """Read holding registers.""" with self._lock: @@ -261,7 +305,7 @@ def __init__(self, device_id: int, hub: SolarEdgeModbusMultiHub) -> None: self.decoded_common = [] self.decoded_model = [] self.has_parent = False - + def init_device(self) -> None: inverter_data = self.hub.read_holding_registers( unit=self.inverter_unit_id, address=40000, count=4 @@ -269,58 +313,73 @@ def init_device(self) -> None: if inverter_data.isError(): _LOGGER.debug(f"Inverter {self.inverter_unit_id}: {inverter_data}") raise ModbusReadError(inverter_data) - + decoder = BinaryPayloadDecoder.fromRegisters( inverter_data.registers, byteorder=Endian.Big ) - - decoded_ident = OrderedDict([ - ('C_SunSpec_ID', decoder.decode_32bit_uint()), - ('C_SunSpec_DID', decoder.decode_16bit_uint()), - ('C_SunSpec_Length', decoder.decode_16bit_uint()), - ]) - + + decoded_ident = OrderedDict( + [ + ("C_SunSpec_ID", decoder.decode_32bit_uint()), + ("C_SunSpec_DID", decoder.decode_16bit_uint()), + ("C_SunSpec_Length", decoder.decode_16bit_uint()), + ] + ) + for name, value in iteritems(decoded_ident): - _LOGGER.debug("%s %s", name, hex(value) if isinstance(value, int) else value) - + _LOGGER.debug( + "%s %s", name, hex(value) if isinstance(value, int) else value + ) + if ( - decoded_ident['C_SunSpec_DID'] == SUNSPEC_NOT_IMPL_UINT16 - or decoded_ident['C_SunSpec_DID'] != 0x0001 - or decoded_ident['C_SunSpec_Length'] != 65 + decoded_ident["C_SunSpec_DID"] == SUNSPEC_NOT_IMPL_UINT16 + or decoded_ident["C_SunSpec_DID"] != 0x0001 + or decoded_ident["C_SunSpec_Length"] != 65 ): raise DeviceInvalid("Inverter {self.inverter_unit_id} not usable.") - + inverter_data = self.hub.read_holding_registers( unit=self.inverter_unit_id, address=40004, count=65 ) if inverter_data.isError(): _LOGGER.debug(f"Inverter {self.inverter_unit_id}: {inverter_data}") raise ModbusReadError(inverter_data) - + decoder = BinaryPayloadDecoder.fromRegisters( inverter_data.registers, byteorder=Endian.Big ) - - self.decoded_common = OrderedDict([ - ('C_Manufacturer', parse_modbus_string(decoder.decode_string(32))), - ('C_Model', parse_modbus_string(decoder.decode_string(32))), - ('C_Option', parse_modbus_string(decoder.decode_string(16))), - ('C_Version', parse_modbus_string(decoder.decode_string(16))), - ('C_SerialNumber', parse_modbus_string(decoder.decode_string(32))), - ('C_Device_address', decoder.decode_16bit_uint()), - ]) - + + self.decoded_common = OrderedDict( + [ + ( + "C_Manufacturer", + parse_modbus_string(decoder.decode_string(32)), + ), + ("C_Model", parse_modbus_string(decoder.decode_string(32))), + ("C_Option", parse_modbus_string(decoder.decode_string(16))), + ("C_Version", parse_modbus_string(decoder.decode_string(16))), + ( + "C_SerialNumber", + parse_modbus_string(decoder.decode_string(32)), + ), + ("C_Device_address", decoder.decode_16bit_uint()), + ] + ) + for name, value in iteritems(self.decoded_common): - _LOGGER.debug(f"Inverter {self.inverter_unit_id}: {name} {hex(value) if isinstance(value, int) else value}") - - self.manufacturer = self.decoded_common['C_Manufacturer'] - self.model = self.decoded_common['C_Model'] - self.option = self.decoded_common['C_Option'] - self.fw_version = self.decoded_common['C_Version'] - self.serial = self.decoded_common['C_SerialNumber'] - self.device_address = self.decoded_common['C_Device_address'] + _LOGGER.debug( + f"Inverter {self.inverter_unit_id}: ", + f"{name} {hex(value) if isinstance(value, int) else value}", + ) + + self.manufacturer = self.decoded_common["C_Manufacturer"] + self.model = self.decoded_common["C_Model"] + self.option = self.decoded_common["C_Option"] + self.fw_version = self.decoded_common["C_Version"] + self.serial = self.decoded_common["C_SerialNumber"] + self.device_address = self.decoded_common["C_Device_address"] self.name = f"{self.hub.hub_id.capitalize()} I{self.inverter_unit_id}" - + self._device_info = { "identifiers": {(DOMAIN, f"{self.model}_{self.serial}")}, "name": self.name, @@ -329,7 +388,7 @@ def init_device(self) -> None: "sw_version": self.fw_version, "hw_version": self.option, } - + def read_modbus_data(self) -> None: inverter_data = self.hub.read_holding_registers( unit=self.inverter_unit_id, address=40069, count=2 @@ -337,97 +396,109 @@ def read_modbus_data(self) -> None: if inverter_data.isError(): _LOGGER.debug(f"Inverter {self.inverter_unit_id}: {inverter_data}") raise ModbusReadError(inverter_data) - + decoder = BinaryPayloadDecoder.fromRegisters( inverter_data.registers, byteorder=Endian.Big ) - - decoded_ident = OrderedDict([ - ('C_SunSpec_DID', decoder.decode_16bit_uint()), - ('C_SunSpec_Length', decoder.decode_16bit_uint()), - ]) - + + decoded_ident = OrderedDict( + [ + ("C_SunSpec_DID", decoder.decode_16bit_uint()), + ("C_SunSpec_Length", decoder.decode_16bit_uint()), + ] + ) + for name, value in iteritems(decoded_ident): - _LOGGER.debug(f"Inverter {self.inverter_unit_id}: {name} {hex(value) if isinstance(value, int) else value}") - + _LOGGER.debug( + f"Inverter {self.inverter_unit_id}: ", + f"{name} {hex(value) if isinstance(value, int) else value}", + ) + if ( - decoded_ident['C_SunSpec_DID'] == SUNSPEC_NOT_IMPL_UINT16 - or decoded_ident['C_SunSpec_DID'] not in [101,102,103] - or decoded_ident['C_SunSpec_Length'] != 50 + decoded_ident["C_SunSpec_DID"] == SUNSPEC_NOT_IMPL_UINT16 + or decoded_ident["C_SunSpec_DID"] not in [101, 102, 103] + or decoded_ident["C_SunSpec_Length"] != 50 ): raise DeviceInvalid(f"Inverter {self.inverter_unit_id} not usable.") - + inverter_data = self.hub.read_holding_registers( unit=self.inverter_unit_id, address=40071, count=38 ) if inverter_data.isError(): _LOGGER.debug(f"Inverter {self.inverter_unit_id}: {inverter_data}") raise ModbusReadError(inverter_data) - + decoder = BinaryPayloadDecoder.fromRegisters( inverter_data.registers, byteorder=Endian.Big ) - - self.decoded_model = OrderedDict([ - ('C_SunSpec_DID', decoded_ident['C_SunSpec_DID']), - ('AC_Current', decoder.decode_16bit_uint()), - ('AC_Current_A', decoder.decode_16bit_uint()), - ('AC_Current_B', decoder.decode_16bit_uint()), - ('AC_Current_C', decoder.decode_16bit_uint()), - ('AC_Current_SF', decoder.decode_16bit_int()), - ('AC_Voltage_AB', decoder.decode_16bit_uint()), - ('AC_Voltage_BC', decoder.decode_16bit_uint()), - ('AC_Voltage_CA', decoder.decode_16bit_uint()), - ('AC_Voltage_AN', decoder.decode_16bit_uint()), - ('AC_Voltage_BN', decoder.decode_16bit_uint()), - ('AC_Voltage_CN', decoder.decode_16bit_uint()), - ('AC_Voltage_SF', decoder.decode_16bit_int()), - ('AC_Power', decoder.decode_16bit_int()), - ('AC_Power_SF', decoder.decode_16bit_int()), - ('AC_Frequency', decoder.decode_16bit_uint()), - ('AC_Frequency_SF', decoder.decode_16bit_int()), - ('AC_VA', decoder.decode_16bit_int()), - ('AC_VA_SF', decoder.decode_16bit_int()), - ('AC_var', decoder.decode_16bit_int()), - ('AC_var_SF', decoder.decode_16bit_int()), - ('AC_PF', decoder.decode_16bit_int()), - ('AC_PF_SF', decoder.decode_16bit_int()), - ('AC_Energy_WH', decoder.decode_32bit_uint()), - ('AC_Energy_WH_SF', decoder.decode_16bit_uint()), - ('I_DC_Current', decoder.decode_16bit_uint()), - ('I_DC_Current_SF', decoder.decode_16bit_int()), - ('I_DC_Voltage', decoder.decode_16bit_uint()), - ('I_DC_Voltage_SF', decoder.decode_16bit_int()), - ('I_DC_Power', decoder.decode_16bit_int()), - ('I_DC_Power_SF', decoder.decode_16bit_int()), - ('I_Temp_Cab', decoder.decode_16bit_int()), - ('I_Temp_Sink', decoder.decode_16bit_int()), - ('I_Temp_Trns', decoder.decode_16bit_int()), - ('I_Temp_Other', decoder.decode_16bit_int()), - ('I_Temp_SF', decoder.decode_16bit_int()), - ('I_Status', decoder.decode_16bit_int()), - ('I_Status_Vendor', decoder.decode_16bit_int()), - ]) - + + self.decoded_model = OrderedDict( + [ + ("C_SunSpec_DID", decoded_ident["C_SunSpec_DID"]), + ("AC_Current", decoder.decode_16bit_uint()), + ("AC_Current_A", decoder.decode_16bit_uint()), + ("AC_Current_B", decoder.decode_16bit_uint()), + ("AC_Current_C", decoder.decode_16bit_uint()), + ("AC_Current_SF", decoder.decode_16bit_int()), + ("AC_Voltage_AB", decoder.decode_16bit_uint()), + ("AC_Voltage_BC", decoder.decode_16bit_uint()), + ("AC_Voltage_CA", decoder.decode_16bit_uint()), + ("AC_Voltage_AN", decoder.decode_16bit_uint()), + ("AC_Voltage_BN", decoder.decode_16bit_uint()), + ("AC_Voltage_CN", decoder.decode_16bit_uint()), + ("AC_Voltage_SF", decoder.decode_16bit_int()), + ("AC_Power", decoder.decode_16bit_int()), + ("AC_Power_SF", decoder.decode_16bit_int()), + ("AC_Frequency", decoder.decode_16bit_uint()), + ("AC_Frequency_SF", decoder.decode_16bit_int()), + ("AC_VA", decoder.decode_16bit_int()), + ("AC_VA_SF", decoder.decode_16bit_int()), + ("AC_var", decoder.decode_16bit_int()), + ("AC_var_SF", decoder.decode_16bit_int()), + ("AC_PF", decoder.decode_16bit_int()), + ("AC_PF_SF", decoder.decode_16bit_int()), + ("AC_Energy_WH", decoder.decode_32bit_uint()), + ("AC_Energy_WH_SF", decoder.decode_16bit_uint()), + ("I_DC_Current", decoder.decode_16bit_uint()), + ("I_DC_Current_SF", decoder.decode_16bit_int()), + ("I_DC_Voltage", decoder.decode_16bit_uint()), + ("I_DC_Voltage_SF", decoder.decode_16bit_int()), + ("I_DC_Power", decoder.decode_16bit_int()), + ("I_DC_Power_SF", decoder.decode_16bit_int()), + ("I_Temp_Cab", decoder.decode_16bit_int()), + ("I_Temp_Sink", decoder.decode_16bit_int()), + ("I_Temp_Trns", decoder.decode_16bit_int()), + ("I_Temp_Other", decoder.decode_16bit_int()), + ("I_Temp_SF", decoder.decode_16bit_int()), + ("I_Status", decoder.decode_16bit_int()), + ("I_Status_Vendor", decoder.decode_16bit_int()), + ] + ) + for name, value in iteritems(self.decoded_model): - _LOGGER.debug(f"Inverter {self.inverter_unit_id}: {name} {hex(value) if isinstance(value, int) else value}") - + _LOGGER.debug( + f"Inverter {self.inverter_unit_id}: ", + f"{name} {hex(value) if isinstance(value, int) else value}", + ) + @property def online(self) -> bool: """Device is online.""" return self.hub.online - + @property def device_info(self) -> Optional[Dict[str, Any]]: return self._device_info - + @property def single_device_entity(self) -> bool: return self.hub._single_device_entity class SolarEdgeMeter: - def __init__(self, device_id: int, meter_id: int, hub: SolarEdgeModbusMultiHub) -> None: + def __init__( + self, device_id: int, meter_id: int, hub: SolarEdgeModbusMultiHub + ) -> None: self.inverter_unit_id = device_id self.hub = hub self.decoded_common = [] @@ -435,7 +506,7 @@ def __init__(self, device_id: int, meter_id: int, hub: SolarEdgeModbusMultiHub) self.start_address = None self.meter_id = meter_id self.has_parent = True - + if self.meter_id == 1: self.start_address = 40000 + 121 elif self.meter_id == 2: @@ -444,63 +515,84 @@ def __init__(self, device_id: int, meter_id: int, hub: SolarEdgeModbusMultiHub) self.start_address = 40000 + 469 else: raise ValueError(f"Invalid meter_id {self.meter_id}") - + def init_device(self) -> None: meter_info = self.hub.read_holding_registers( unit=self.inverter_unit_id, address=self.start_address, count=2 ) if meter_info.isError(): - _LOGGER.debug(f"Inverter {self.inverter_unit_id} meter {self.meter_id}: {meter_info}") + _LOGGER.debug( + f"Inverter {self.inverter_unit_id} ", + f"meter {self.meter_id}: {meter_info}", + ) raise ModbusReadError(meter_info) - + decoder = BinaryPayloadDecoder.fromRegisters( meter_info.registers, byteorder=Endian.Big ) - decoded_ident = OrderedDict([ - ('C_SunSpec_DID', decoder.decode_16bit_uint()), - ('C_SunSpec_Length', decoder.decode_16bit_uint()), - ]) - + decoded_ident = OrderedDict( + [ + ("C_SunSpec_DID", decoder.decode_16bit_uint()), + ("C_SunSpec_Length", decoder.decode_16bit_uint()), + ] + ) + for name, value in iteritems(decoded_ident): - _LOGGER.debug(f"Inverter {self.inverter_unit_id} meter {self.meter_id}: {name} {hex(value) if isinstance(value, int) else value}") - + _LOGGER.debug( + f"Inverter {self.inverter_unit_id} meter {self.meter_id}: ", + f"{name} {hex(value) if isinstance(value, int) else value}", + ) + if ( - decoded_ident['C_SunSpec_DID'] == SUNSPEC_NOT_IMPL_UINT16 - or decoded_ident['C_SunSpec_DID'] != 0x0001 - or decoded_ident['C_SunSpec_Length'] != 65 + decoded_ident["C_SunSpec_DID"] == SUNSPEC_NOT_IMPL_UINT16 + or decoded_ident["C_SunSpec_DID"] != 0x0001 + or decoded_ident["C_SunSpec_Length"] != 65 ): raise DeviceInvalid("Meter {self.meter_id} not usable.") - + meter_info = self.hub.read_holding_registers( - unit=self.inverter_unit_id, address=self.start_address + 2, count=65 + unit=self.inverter_unit_id, + address=self.start_address + 2, + count=65, ) if meter_info.isError(): _LOGGER.debug(meter_info) raise ModbusReadError(meter_info) - + decoder = BinaryPayloadDecoder.fromRegisters( meter_info.registers, byteorder=Endian.Big ) - self.decoded_common = OrderedDict([ - ('C_Manufacturer', parse_modbus_string(decoder.decode_string(32))), - ('C_Model', parse_modbus_string(decoder.decode_string(32))), - ('C_Option', parse_modbus_string(decoder.decode_string(16))), - ('C_Version', parse_modbus_string(decoder.decode_string(16))), - ('C_SerialNumber', parse_modbus_string(decoder.decode_string(32))), - ('C_Device_address', decoder.decode_16bit_uint()), - ]) + self.decoded_common = OrderedDict( + [ + ( + "C_Manufacturer", + parse_modbus_string(decoder.decode_string(32)), + ), + ("C_Model", parse_modbus_string(decoder.decode_string(32))), + ("C_Option", parse_modbus_string(decoder.decode_string(16))), + ("C_Version", parse_modbus_string(decoder.decode_string(16))), + ( + "C_SerialNumber", + parse_modbus_string(decoder.decode_string(32)), + ), + ("C_Device_address", decoder.decode_16bit_uint()), + ] + ) for name, value in iteritems(self.decoded_common): - _LOGGER.debug(f"Inverter {self.inverter_unit_id} meter {self.meter_id}: {name} {hex(value) if isinstance(value, int) else value}") - - self.manufacturer = self.decoded_common['C_Manufacturer'] - self.model = self.decoded_common['C_Model'] - self.option = self.decoded_common['C_Option'] - self.fw_version = self.decoded_common['C_Version'] - self.serial = self.decoded_common['C_SerialNumber'] - self.device_address = self.decoded_common['C_Device_address'] + _LOGGER.debug( + f"Inverter {self.inverter_unit_id} meter {self.meter_id}: ", + f"{name} {hex(value) if isinstance(value, int) else value}", + ) + + self.manufacturer = self.decoded_common["C_Manufacturer"] + self.model = self.decoded_common["C_Model"] + self.option = self.decoded_common["C_Option"] + self.fw_version = self.decoded_common["C_Version"] + self.serial = self.decoded_common["C_SerialNumber"] + self.device_address = self.decoded_common["C_Device_address"] self.name = f"{self.hub.hub_id.capitalize()} M{self.meter_id}" - + self._device_info = { "identifiers": {(DOMAIN, f"{self.model}_{self.serial}")}, "name": self.name, @@ -509,140 +601,161 @@ def init_device(self) -> None: "sw_version": self.fw_version, "hw_version": self.option, } - + def read_modbus_data(self) -> None: meter_data = self.hub.read_holding_registers( - unit=self.inverter_unit_id, address=self.start_address + 67, count=2 + unit=self.inverter_unit_id, + address=self.start_address + 67, + count=2, ) if meter_data.isError(): - _LOGGER.debug(f"Inverter {self.inverter_unit_id} meter {self.meter_id}: {meter_data}") + _LOGGER.debug( + f"Inverter {self.inverter_unit_id} ", + f"meter {self.meter_id}: {meter_data}", + ) raise ModbusReadError(f"Meter read error: {meter_data}") - + decoder = BinaryPayloadDecoder.fromRegisters( meter_data.registers, byteorder=Endian.Big ) - - decoded_ident = OrderedDict([ - ('C_SunSpec_DID', decoder.decode_16bit_uint()), - ('C_SunSpec_Length', decoder.decode_16bit_uint()), - ]) - + + decoded_ident = OrderedDict( + [ + ("C_SunSpec_DID", decoder.decode_16bit_uint()), + ("C_SunSpec_Length", decoder.decode_16bit_uint()), + ] + ) + for name, value in iteritems(decoded_ident): - _LOGGER.debug(f"Inverter {self.inverter_unit_id} meter {self.meter_id}: {name} {hex(value) if isinstance(value, int) else value}") - + _LOGGER.debug( + f"Inverter {self.inverter_unit_id} meter {self.meter_id}: ", + f"{name} {hex(value) if isinstance(value, int) else value}", + ) + if ( - decoded_ident['C_SunSpec_DID'] == SUNSPEC_NOT_IMPL_UINT16 - or decoded_ident['C_SunSpec_DID'] not in [201,202,203,204] - or decoded_ident['C_SunSpec_Length'] != 105 + decoded_ident["C_SunSpec_DID"] == SUNSPEC_NOT_IMPL_UINT16 + or decoded_ident["C_SunSpec_DID"] not in [201, 202, 203, 204] + or decoded_ident["C_SunSpec_Length"] != 105 ): - raise DeviceInvalid(f"Meter on inverter {self.inverter_unit_id} not usable.") - + raise DeviceInvalid( + f"Meter on inverter {self.inverter_unit_id} not usable." + ) + meter_data = self.hub.read_holding_registers( - unit=self.inverter_unit_id, address=self.start_address + 69, count=105 + unit=self.inverter_unit_id, + address=self.start_address + 69, + count=105, ) if meter_data.isError(): _LOGGER.error(f"Meter read error: {meter_data}") raise ModbusReadError(f"Meter read error: {meter_data}") - + decoder = BinaryPayloadDecoder.fromRegisters( meter_data.registers, byteorder=Endian.Big ) - - self.decoded_model = OrderedDict([ - ('C_SunSpec_DID', decoded_ident['C_SunSpec_DID']), - ('AC_Current', decoder.decode_16bit_int()), - ('AC_Current_A', decoder.decode_16bit_int()), - ('AC_Current_B', decoder.decode_16bit_int()), - ('AC_Current_C', decoder.decode_16bit_int()), - ('AC_Current_SF', decoder.decode_16bit_int()), - ('AC_Voltage_LN', decoder.decode_16bit_int()), - ('AC_Voltage_AN', decoder.decode_16bit_int()), - ('AC_Voltage_BN', decoder.decode_16bit_int()), - ('AC_Voltage_CN', decoder.decode_16bit_int()), - ('AC_Voltage_LL', decoder.decode_16bit_int()), - ('AC_Voltage_AB', decoder.decode_16bit_int()), - ('AC_Voltage_BC', decoder.decode_16bit_int()), - ('AC_Voltage_CA', decoder.decode_16bit_int()), - ('AC_Voltage_SF', decoder.decode_16bit_int()), - ('AC_Frequency', decoder.decode_16bit_int()), - ('AC_Frequency_SF', decoder.decode_16bit_int()), - ('AC_Power', decoder.decode_16bit_int()), - ('AC_Power_A', decoder.decode_16bit_int()), - ('AC_Power_B', decoder.decode_16bit_int()), - ('AC_Power_C', decoder.decode_16bit_int()), - ('AC_Power_SF', decoder.decode_16bit_int()), - ('AC_VA', decoder.decode_16bit_int()), - ('AC_VA_A', decoder.decode_16bit_int()), - ('AC_VA_B', decoder.decode_16bit_int()), - ('AC_VA_C', decoder.decode_16bit_int()), - ('AC_VA_SF', decoder.decode_16bit_int()), - ('AC_var', decoder.decode_16bit_int()), - ('AC_var_A', decoder.decode_16bit_int()), - ('AC_var_B', decoder.decode_16bit_int()), - ('AC_var_C', decoder.decode_16bit_int()), - ('AC_var_SF', decoder.decode_16bit_int()), - ('AC_PF', decoder.decode_16bit_int()), - ('AC_PF_A', decoder.decode_16bit_int()), - ('AC_PF_B', decoder.decode_16bit_int()), - ('AC_PF_C', decoder.decode_16bit_int()), - ('AC_PF_SF', decoder.decode_16bit_int()), - ('AC_Energy_WH_Exported', decoder.decode_32bit_uint()), - ('AC_Energy_WH_Exported_A', decoder.decode_32bit_uint()), - ('AC_Energy_WH_Exported_B', decoder.decode_32bit_uint()), - ('AC_Energy_WH_Exported_C', decoder.decode_32bit_uint()), - ('AC_Energy_WH_Imported', decoder.decode_32bit_uint()), - ('AC_Energy_WH_Imported_A', decoder.decode_32bit_uint()), - ('AC_Energy_WH_Imported_B', decoder.decode_32bit_uint()), - ('AC_Energy_WH_Imported_C', decoder.decode_32bit_uint()), - ('AC_Energy_WH_SF', decoder.decode_16bit_int()), - ('M_VAh_Exported', decoder.decode_32bit_uint()), - ('M_VAh_Exported_A', decoder.decode_32bit_uint()), - ('M_VAh_Exported_B', decoder.decode_32bit_uint()), - ('M_VAh_Exported_C', decoder.decode_32bit_uint()), - ('M_VAh_Imported', decoder.decode_32bit_uint()), - ('M_VAh_Imported_A', decoder.decode_32bit_uint()), - ('M_VAh_Imported_B', decoder.decode_32bit_uint()), - ('M_VAh_Imported_C', decoder.decode_32bit_uint()), - ('M_VAh_SF', decoder.decode_16bit_int()), - ('M_varh_Import_Q1', decoder.decode_32bit_uint()), - ('M_varh_Import_Q1_A', decoder.decode_32bit_uint()), - ('M_varh_Import_Q1_B', decoder.decode_32bit_uint()), - ('M_varh_Import_Q1_C', decoder.decode_32bit_uint()), - ('M_varh_Import_Q2', decoder.decode_32bit_uint()), - ('M_varh_Import_Q2_A', decoder.decode_32bit_uint()), - ('M_varh_Import_Q2_B', decoder.decode_32bit_uint()), - ('M_varh_Import_Q2_C', decoder.decode_32bit_uint()), - ('M_varh_Export_Q3', decoder.decode_32bit_uint()), - ('M_varh_Export_Q3_A', decoder.decode_32bit_uint()), - ('M_varh_Export_Q3_B', decoder.decode_32bit_uint()), - ('M_varh_Export_Q3_C', decoder.decode_32bit_uint()), - ('M_varh_Export_Q4', decoder.decode_32bit_uint()), - ('M_varh_Export_Q4_A', decoder.decode_32bit_uint()), - ('M_varh_Export_Q4_B', decoder.decode_32bit_uint()), - ('M_varh_Export_Q4_C', decoder.decode_32bit_uint()), - ('M_varh_SF', decoder.decode_16bit_int()), - ('M_Events', decoder.decode_32bit_uint()), - ]) - + + self.decoded_model = OrderedDict( + [ + ("C_SunSpec_DID", decoded_ident["C_SunSpec_DID"]), + ("AC_Current", decoder.decode_16bit_int()), + ("AC_Current_A", decoder.decode_16bit_int()), + ("AC_Current_B", decoder.decode_16bit_int()), + ("AC_Current_C", decoder.decode_16bit_int()), + ("AC_Current_SF", decoder.decode_16bit_int()), + ("AC_Voltage_LN", decoder.decode_16bit_int()), + ("AC_Voltage_AN", decoder.decode_16bit_int()), + ("AC_Voltage_BN", decoder.decode_16bit_int()), + ("AC_Voltage_CN", decoder.decode_16bit_int()), + ("AC_Voltage_LL", decoder.decode_16bit_int()), + ("AC_Voltage_AB", decoder.decode_16bit_int()), + ("AC_Voltage_BC", decoder.decode_16bit_int()), + ("AC_Voltage_CA", decoder.decode_16bit_int()), + ("AC_Voltage_SF", decoder.decode_16bit_int()), + ("AC_Frequency", decoder.decode_16bit_int()), + ("AC_Frequency_SF", decoder.decode_16bit_int()), + ("AC_Power", decoder.decode_16bit_int()), + ("AC_Power_A", decoder.decode_16bit_int()), + ("AC_Power_B", decoder.decode_16bit_int()), + ("AC_Power_C", decoder.decode_16bit_int()), + ("AC_Power_SF", decoder.decode_16bit_int()), + ("AC_VA", decoder.decode_16bit_int()), + ("AC_VA_A", decoder.decode_16bit_int()), + ("AC_VA_B", decoder.decode_16bit_int()), + ("AC_VA_C", decoder.decode_16bit_int()), + ("AC_VA_SF", decoder.decode_16bit_int()), + ("AC_var", decoder.decode_16bit_int()), + ("AC_var_A", decoder.decode_16bit_int()), + ("AC_var_B", decoder.decode_16bit_int()), + ("AC_var_C", decoder.decode_16bit_int()), + ("AC_var_SF", decoder.decode_16bit_int()), + ("AC_PF", decoder.decode_16bit_int()), + ("AC_PF_A", decoder.decode_16bit_int()), + ("AC_PF_B", decoder.decode_16bit_int()), + ("AC_PF_C", decoder.decode_16bit_int()), + ("AC_PF_SF", decoder.decode_16bit_int()), + ("AC_Energy_WH_Exported", decoder.decode_32bit_uint()), + ("AC_Energy_WH_Exported_A", decoder.decode_32bit_uint()), + ("AC_Energy_WH_Exported_B", decoder.decode_32bit_uint()), + ("AC_Energy_WH_Exported_C", decoder.decode_32bit_uint()), + ("AC_Energy_WH_Imported", decoder.decode_32bit_uint()), + ("AC_Energy_WH_Imported_A", decoder.decode_32bit_uint()), + ("AC_Energy_WH_Imported_B", decoder.decode_32bit_uint()), + ("AC_Energy_WH_Imported_C", decoder.decode_32bit_uint()), + ("AC_Energy_WH_SF", decoder.decode_16bit_int()), + ("M_VAh_Exported", decoder.decode_32bit_uint()), + ("M_VAh_Exported_A", decoder.decode_32bit_uint()), + ("M_VAh_Exported_B", decoder.decode_32bit_uint()), + ("M_VAh_Exported_C", decoder.decode_32bit_uint()), + ("M_VAh_Imported", decoder.decode_32bit_uint()), + ("M_VAh_Imported_A", decoder.decode_32bit_uint()), + ("M_VAh_Imported_B", decoder.decode_32bit_uint()), + ("M_VAh_Imported_C", decoder.decode_32bit_uint()), + ("M_VAh_SF", decoder.decode_16bit_int()), + ("M_varh_Import_Q1", decoder.decode_32bit_uint()), + ("M_varh_Import_Q1_A", decoder.decode_32bit_uint()), + ("M_varh_Import_Q1_B", decoder.decode_32bit_uint()), + ("M_varh_Import_Q1_C", decoder.decode_32bit_uint()), + ("M_varh_Import_Q2", decoder.decode_32bit_uint()), + ("M_varh_Import_Q2_A", decoder.decode_32bit_uint()), + ("M_varh_Import_Q2_B", decoder.decode_32bit_uint()), + ("M_varh_Import_Q2_C", decoder.decode_32bit_uint()), + ("M_varh_Export_Q3", decoder.decode_32bit_uint()), + ("M_varh_Export_Q3_A", decoder.decode_32bit_uint()), + ("M_varh_Export_Q3_B", decoder.decode_32bit_uint()), + ("M_varh_Export_Q3_C", decoder.decode_32bit_uint()), + ("M_varh_Export_Q4", decoder.decode_32bit_uint()), + ("M_varh_Export_Q4_A", decoder.decode_32bit_uint()), + ("M_varh_Export_Q4_B", decoder.decode_32bit_uint()), + ("M_varh_Export_Q4_C", decoder.decode_32bit_uint()), + ("M_varh_SF", decoder.decode_16bit_int()), + ("M_Events", decoder.decode_32bit_uint()), + ] + ) + for name, value in iteritems(self.decoded_model): - _LOGGER.debug(f"Inverter {self.inverter_unit_id} meter {self.meter_id}: {name} {hex(value) if isinstance(value, int) else value}") - + _LOGGER.debug( + f"Inverter {self.inverter_unit_id} meter {self.meter_id}: ", + f"{name} {hex(value) if isinstance(value, int) else value}", + ) + @property def online(self) -> bool: """Device is online.""" return self.hub.online - + @property def device_info(self) -> Optional[Dict[str, Any]]: return self._device_info - + @property def single_device_entity(self) -> bool: return self.hub._single_device_entity -class SolarEdgeBattery: - def __init__(self, device_id: int, battery_id: int, hub: SolarEdgeModbusMultiHub) -> None: +class SolarEdgeBattery: + def __init__( + self, device_id: int, battery_id: int, hub: SolarEdgeModbusMultiHub + ) -> None: self.inverter_unit_id = device_id self.hub = hub self.decoded_common = [] @@ -650,63 +763,85 @@ def __init__(self, device_id: int, battery_id: int, hub: SolarEdgeModbusMultiHub self.start_address = None self.battery_id = battery_id self.has_parent = True - + if self.battery_id == 1: self.start_address = 57600 elif self.battery_id == 2: self.start_address = 57856 else: raise ValueError("Invalid battery_id {self.battery_id}") - + def init_device(self) -> None: battery_info = self.hub.read_holding_registers( unit=self.inverter_unit_id, address=self.start_address, count=76 ) if battery_info.isError(): - _LOGGER.debug(f"Inverter {self.inverter_unit_id} battery {self.battery_id}: {battery_info}") + _LOGGER.debug( + f"Inverter {self.inverter_unit_id} ", + f"battery {self.battery_id}: {battery_info}", + ) raise ModbusReadError(battery_info) - + decoder = BinaryPayloadDecoder.fromRegisters( - battery_info.registers, byteorder=Endian.Big, wordorder=Endian.Little + battery_info.registers, + byteorder=Endian.Big, + wordorder=Endian.Little, + ) + self.decoded_common = OrderedDict( + [ + ( + "B_Manufacturer", + parse_modbus_string(decoder.decode_string(32)), + ), + ("B_Model", parse_modbus_string(decoder.decode_string(32))), + ("B_Version", parse_modbus_string(decoder.decode_string(32))), + ( + "B_SerialNumber", + parse_modbus_string(decoder.decode_string(32)), + ), + ("B_Device_Address", decoder.decode_16bit_uint()), + ("Reserved", decoder.decode_16bit_uint()), + ("B_RatedEnergy", decoder.decode_32bit_float()), + ("B_MaxChargePower", decoder.decode_32bit_float()), + ("B_MaxDischargePower", decoder.decode_32bit_float()), + ("B_MaxChargePeakPower", decoder.decode_32bit_float()), + ("B_MaxDischargePeakPower", decoder.decode_32bit_float()), + ] ) - self.decoded_common = OrderedDict([ - ('B_Manufacturer', parse_modbus_string(decoder.decode_string(32))), - ('B_Model', parse_modbus_string(decoder.decode_string(32))), - ('B_Version', parse_modbus_string(decoder.decode_string(32))), - ('B_SerialNumber', parse_modbus_string(decoder.decode_string(32))), - ('B_Device_Address', decoder.decode_16bit_uint()), - ('Reserved', decoder.decode_16bit_uint()), - ('B_RatedEnergy', decoder.decode_32bit_float()), - ('B_MaxChargePower', decoder.decode_32bit_float()), - ('B_MaxDischargePower', decoder.decode_32bit_float()), - ('B_MaxChargePeakPower', decoder.decode_32bit_float()), - ('B_MaxDischargePeakPower', decoder.decode_32bit_float()), - ]) - + for name, value in iteritems(self.decoded_common): - _LOGGER.debug(f"Inverter {self.inverter_unit_id} battery {self.battery_id}: {name} {hex(value) if isinstance(value, int) else value}") - - self.decoded_common['B_Manufacturer'] = self.decoded_common['B_Manufacturer'].removesuffix(self.decoded_common['B_SerialNumber']) - self.decoded_common['B_Model'] = self.decoded_common['B_Model'].removesuffix(self.decoded_common['B_SerialNumber']) - - ascii_ctrl_chars = dict.fromkeys(range(32)) - self.decoded_common['B_Manufacturer'] = self.decoded_common['B_Manufacturer'].translate(ascii_ctrl_chars) - + _LOGGER.debug( + f"Inverter {self.inverter_unit_id} batt {self.battery_id}: ", + f"{name} {hex(value) if isinstance(value, int) else value}", + ) + + self.decoded_common["B_Manufacturer"] = self.decoded_common[ + "B_Manufacturer" + ].removesuffix(self.decoded_common["B_SerialNumber"]) + self.decoded_common["B_Model"] = self.decoded_common["B_Model"].removesuffix( + self.decoded_common["B_SerialNumber"] + ) + + ascii_ctrl_chars = dict.fromkeys(range(32)) + self.decoded_common["B_Manufacturer"] = self.decoded_common[ + "B_Manufacturer" + ].translate(ascii_ctrl_chars) + if ( - len(self.decoded_common['B_Manufacturer']) == 0 - or len(self.decoded_common['B_Model']) == 0 - or len(self.decoded_common['B_SerialNumber']) == 0 + len(self.decoded_common["B_Manufacturer"]) == 0 + or len(self.decoded_common["B_Model"]) == 0 + or len(self.decoded_common["B_SerialNumber"]) == 0 ): raise DeviceInvalid("Battery {self.battery_id} not usable.") - - self.manufacturer = self.decoded_common['B_Manufacturer'] - self.model = self.decoded_common['B_Model'] - self.option = '' - self.fw_version = self.decoded_common['B_Version'] - self.serial = self.decoded_common['B_SerialNumber'] - self.device_address = self.decoded_common['B_Device_Address'] + + self.manufacturer = self.decoded_common["B_Manufacturer"] + self.model = self.decoded_common["B_Model"] + self.option = "" + self.fw_version = self.decoded_common["B_Version"] + self.serial = self.decoded_common["B_SerialNumber"] + self.device_address = self.decoded_common["B_Device_Address"] self.name = f"{self.hub.hub_id.capitalize()} B{self.battery_id}" - + self._device_info = { "identifiers": {(DOMAIN, f"{self.model}_{self.serial}")}, "name": self.name, @@ -714,63 +849,72 @@ def init_device(self) -> None: "model": self.model, "sw_version": self.fw_version, } - + def read_modbus_data(self) -> None: battery_data = self.hub.read_holding_registers( - unit=self.inverter_unit_id, address=self.start_address + 108, count=46 + unit=self.inverter_unit_id, + address=self.start_address + 108, + count=46, ) if battery_data.isError(): _LOGGER.error(f"Battery read error: {battery_data}") raise ModbusReadError(f"Battery read error: {battery_data}") - + decoder = BinaryPayloadDecoder.fromRegisters( - battery_data.registers, byteorder=Endian.Big, wordorder=Endian.Little + battery_data.registers, + byteorder=Endian.Big, + wordorder=Endian.Little, + ) + + self.decoded_model = OrderedDict( + [ + ("B_Temp_Average", decoder.decode_32bit_float()), + ("B_Temp_Max", decoder.decode_32bit_float()), + ("B_DC_Voltage", decoder.decode_32bit_float()), + ("B_DC_Current", decoder.decode_32bit_float()), + ("B_DC_Power", decoder.decode_32bit_float()), + ("B_Export_Energy_WH", decoder.decode_64bit_uint()), + ("B_Import_Energy_WH", decoder.decode_64bit_uint()), + ("B_Energy_Max", decoder.decode_32bit_float()), + ("B_Energy_Available", decoder.decode_32bit_float()), + ("B_SOH", decoder.decode_32bit_float()), + ("B_SOE", decoder.decode_32bit_float()), + ("B_Status", decoder.decode_32bit_uint()), + ("B_Status_Vendor", decoder.decode_32bit_uint()), + ("B_Event_Log1", decoder.decode_16bit_uint()), + ("B_Event_Log2", decoder.decode_16bit_uint()), + ("B_Event_Log3", decoder.decode_16bit_uint()), + ("B_Event_Log4", decoder.decode_16bit_uint()), + ("B_Event_Log5", decoder.decode_16bit_uint()), + ("B_Event_Log6", decoder.decode_16bit_uint()), + ("B_Event_Log7", decoder.decode_16bit_uint()), + ("B_Event_Log8", decoder.decode_16bit_uint()), + ("B_Event_Log_Vendor1", decoder.decode_16bit_uint()), + ("B_Event_Log_Vendor2", decoder.decode_16bit_uint()), + ("B_Event_Log_Vendor3", decoder.decode_16bit_uint()), + ("B_Event_Log_Vendor4", decoder.decode_16bit_uint()), + ("B_Event_Log_Vendor5", decoder.decode_16bit_uint()), + ("B_Event_Log_Vendor6", decoder.decode_16bit_uint()), + ("B_Event_Log_Vendor7", decoder.decode_16bit_uint()), + ("B_Event_Log_Vendor8", decoder.decode_16bit_uint()), + ] ) - - self.decoded_model = OrderedDict([ - ('B_Temp_Average', decoder.decode_32bit_float()), - ('B_Temp_Max', decoder.decode_32bit_float()), - ('B_DC_Voltage', decoder.decode_32bit_float()), - ('B_DC_Current', decoder.decode_32bit_float()), - ('B_DC_Power', decoder.decode_32bit_float()), - ('B_Export_Energy_WH', decoder.decode_64bit_uint()), - ('B_Import_Energy_WH', decoder.decode_64bit_uint()), - ('B_Energy_Max', decoder.decode_32bit_float()), - ('B_Energy_Available', decoder.decode_32bit_float()), - ('B_SOH', decoder.decode_32bit_float()), - ('B_SOE', decoder.decode_32bit_float()), - ('B_Status', decoder.decode_32bit_uint()), - ('B_Status_Vendor', decoder.decode_32bit_uint()), - ('B_Event_Log1', decoder.decode_16bit_uint()), - ('B_Event_Log2', decoder.decode_16bit_uint()), - ('B_Event_Log3', decoder.decode_16bit_uint()), - ('B_Event_Log4', decoder.decode_16bit_uint()), - ('B_Event_Log5', decoder.decode_16bit_uint()), - ('B_Event_Log6', decoder.decode_16bit_uint()), - ('B_Event_Log7', decoder.decode_16bit_uint()), - ('B_Event_Log8', decoder.decode_16bit_uint()), - ('B_Event_Log_Vendor1', decoder.decode_16bit_uint()), - ('B_Event_Log_Vendor2', decoder.decode_16bit_uint()), - ('B_Event_Log_Vendor3', decoder.decode_16bit_uint()), - ('B_Event_Log_Vendor4', decoder.decode_16bit_uint()), - ('B_Event_Log_Vendor5', decoder.decode_16bit_uint()), - ('B_Event_Log_Vendor6', decoder.decode_16bit_uint()), - ('B_Event_Log_Vendor7', decoder.decode_16bit_uint()), - ('B_Event_Log_Vendor8', decoder.decode_16bit_uint()), - ]) - + for name, value in iteritems(self.decoded_model): - _LOGGER.debug(f"Inverter {self.inverter_unit_id} battery {self.battery_id}: {name} {hex(value) if isinstance(value, int) else value}") - + _LOGGER.debug( + f"Inverter {self.inverter_unit_id} batt {self.battery_id}: ", + f"{name} {hex(value) if isinstance(value, int) else value}", + ) + @property def online(self) -> bool: """Device is online.""" return self.hub.online - + @property def device_info(self) -> Optional[Dict[str, Any]]: - return self._device_info - + return self._device_info + @property def single_device_entity(self) -> bool: return self.hub._single_device_entity diff --git a/custom_components/solaredge_modbus_multi/sensor.py b/custom_components/solaredge_modbus_multi/sensor.py index 42e06d93..72883c77 100644 --- a/custom_components/solaredge_modbus_multi/sensor.py +++ b/custom_components/solaredge_modbus_multi/sensor.py @@ -1,28 +1,46 @@ import logging import re -from datetime import datetime -from typing import Any, Dict, Optional -from homeassistant.components.sensor import (SensorDeviceClass, SensorEntity, - SensorStateClass) +from homeassistant.components.sensor import ( + SensorDeviceClass, + SensorEntity, + SensorStateClass, +) from homeassistant.config_entries import ConfigEntry -from homeassistant.const import (CONF_NAME, ELECTRIC_CURRENT_AMPERE, - ELECTRIC_POTENTIAL_VOLT, - ENERGY_KILO_WATT_HOUR, FREQUENCY_HERTZ, - PERCENTAGE, POWER_KILO_WATT, - POWER_VOLT_AMPERE, POWER_VOLT_AMPERE_REACTIVE, - POWER_WATT, TEMP_CELSIUS) +from homeassistant.const import ( + ELECTRIC_CURRENT_AMPERE, + ELECTRIC_POTENTIAL_VOLT, + ENERGY_KILO_WATT_HOUR, + FREQUENCY_HERTZ, + PERCENTAGE, + POWER_VOLT_AMPERE, + POWER_VOLT_AMPERE_REACTIVE, + POWER_WATT, + TEMP_CELSIUS, +) from homeassistant.core import HomeAssistant, callback from homeassistant.helpers.entity import EntityCategory from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.update_coordinator import CoordinatorEntity -from .const import (BATTERY_STATUS, DEVICE_STATUS, DEVICE_STATUS_DESC, DOMAIN, - ENERGY_VOLT_AMPERE_HOUR, ENERGY_VOLT_AMPERE_REACTIVE_HOUR, - METER_EVENTS, SUNSPEC_ACCUM_LIMIT, SUNSPEC_DID, - SUNSPEC_NOT_ACCUM_ACC32, SUNSPEC_NOT_IMPL_FLOAT32, - SUNSPEC_NOT_IMPL_INT16, SUNSPEC_NOT_IMPL_UINT16, - SUNSPEC_NOT_IMPL_UINT32, SUNSPEC_SF_RANGE, VENDOR_STATUS) +from .const import ( + BATTERY_STATUS, + DEVICE_STATUS, + DEVICE_STATUS_DESC, + DOMAIN, + ENERGY_VOLT_AMPERE_HOUR, + ENERGY_VOLT_AMPERE_REACTIVE_HOUR, + METER_EVENTS, + SUNSPEC_ACCUM_LIMIT, + SUNSPEC_DID, + SUNSPEC_NOT_ACCUM_ACC32, + SUNSPEC_NOT_IMPL_FLOAT32, + SUNSPEC_NOT_IMPL_INT16, + SUNSPEC_NOT_IMPL_UINT16, + SUNSPEC_NOT_IMPL_UINT32, + SUNSPEC_SF_RANGE, + VENDOR_STATUS, +) from .helpers import scale_factor, update_accum, watts_to_kilowatts _LOGGER = logging.getLogger(__name__) @@ -33,12 +51,12 @@ async def async_setup_entry( config_entry: ConfigEntry, async_add_entities: AddEntitiesCallback, ) -> None: - - hub = hass.data[DOMAIN][config_entry.entry_id]['hub'] - coordinator = hass.data[DOMAIN][config_entry.entry_id]['coordinator'] - + + hub = hass.data[DOMAIN][config_entry.entry_id]["hub"] + coordinator = hass.data[DOMAIN][config_entry.entry_id]["coordinator"] + entities = [] - + for inverter in hub.inverters: if inverter.single_device_entity: entities.append(SolarEdgeDevice(inverter, config_entry, coordinator)) @@ -50,18 +68,18 @@ async def async_setup_entry( entities.append(DeviceAddress(inverter, config_entry, coordinator)) entities.append(SunspecDID(inverter, config_entry, coordinator)) entities.append(Version(inverter, config_entry, coordinator)) - entities.append(Status(inverter,config_entry, coordinator)) + entities.append(Status(inverter, config_entry, coordinator)) entities.append(StatusVendor(inverter, config_entry, coordinator)) entities.append(ACCurrentSensor(inverter, config_entry, coordinator)) - entities.append(ACCurrentSensor(inverter, config_entry, coordinator, 'A')) - entities.append(ACCurrentSensor(inverter, config_entry, coordinator, 'B')) - entities.append(ACCurrentSensor(inverter, config_entry, coordinator, 'C')) - entities.append(VoltageSensor(inverter, config_entry, coordinator, 'AB')) - entities.append(VoltageSensor(inverter, config_entry, coordinator, 'BC')) - entities.append(VoltageSensor(inverter, config_entry, coordinator, 'CA')) - entities.append(VoltageSensor(inverter, config_entry, coordinator, 'AN')) - entities.append(VoltageSensor(inverter, config_entry, coordinator, 'BN')) - entities.append(VoltageSensor(inverter, config_entry, coordinator, 'CN')) + entities.append(ACCurrentSensor(inverter, config_entry, coordinator, "A")) + entities.append(ACCurrentSensor(inverter, config_entry, coordinator, "B")) + entities.append(ACCurrentSensor(inverter, config_entry, coordinator, "C")) + entities.append(VoltageSensor(inverter, config_entry, coordinator, "AB")) + entities.append(VoltageSensor(inverter, config_entry, coordinator, "BC")) + entities.append(VoltageSensor(inverter, config_entry, coordinator, "CA")) + entities.append(VoltageSensor(inverter, config_entry, coordinator, "AN")) + entities.append(VoltageSensor(inverter, config_entry, coordinator, "BN")) + entities.append(VoltageSensor(inverter, config_entry, coordinator, "CN")) entities.append(ACPower(inverter, config_entry, coordinator)) entities.append(ACFrequency(inverter, config_entry, coordinator)) entities.append(ACVoltAmp(inverter, config_entry, coordinator)) @@ -72,7 +90,7 @@ async def async_setup_entry( entities.append(DCVoltage(inverter, config_entry, coordinator)) entities.append(DCPower(inverter, config_entry, coordinator)) entities.append(HeatSinkTemperature(inverter, config_entry, coordinator)) - + for meter in hub.meters: if meter.single_device_entity: entities.append(SolarEdgeDevice(meter, config_entry, coordinator)) @@ -88,67 +106,67 @@ async def async_setup_entry( entities.append(Version(meter, config_entry, coordinator)) entities.append(MeterEvents(meter, config_entry, coordinator)) entities.append(ACCurrentSensor(meter, config_entry, coordinator)) - entities.append(ACCurrentSensor(meter, config_entry, coordinator, 'A')) - entities.append(ACCurrentSensor(meter, config_entry, coordinator, 'B')) - entities.append(ACCurrentSensor(meter, config_entry, coordinator, 'C')) - entities.append(VoltageSensor(meter, config_entry, coordinator, 'LN')) - entities.append(VoltageSensor(meter, config_entry, coordinator, 'AN')) - entities.append(VoltageSensor(meter, config_entry, coordinator, 'BN')) - entities.append(VoltageSensor(meter, config_entry, coordinator, 'CN')) - entities.append(VoltageSensor(meter, config_entry, coordinator, 'LL')) - entities.append(VoltageSensor(meter, config_entry, coordinator, 'AB')) - entities.append(VoltageSensor(meter, config_entry, coordinator, 'BC')) - entities.append(VoltageSensor(meter, config_entry, coordinator, 'CA')) + entities.append(ACCurrentSensor(meter, config_entry, coordinator, "A")) + entities.append(ACCurrentSensor(meter, config_entry, coordinator, "B")) + entities.append(ACCurrentSensor(meter, config_entry, coordinator, "C")) + entities.append(VoltageSensor(meter, config_entry, coordinator, "LN")) + entities.append(VoltageSensor(meter, config_entry, coordinator, "AN")) + entities.append(VoltageSensor(meter, config_entry, coordinator, "BN")) + entities.append(VoltageSensor(meter, config_entry, coordinator, "CN")) + entities.append(VoltageSensor(meter, config_entry, coordinator, "LL")) + entities.append(VoltageSensor(meter, config_entry, coordinator, "AB")) + entities.append(VoltageSensor(meter, config_entry, coordinator, "BC")) + entities.append(VoltageSensor(meter, config_entry, coordinator, "CA")) entities.append(ACFrequency(meter, config_entry, coordinator)) entities.append(ACPower(meter, config_entry, coordinator)) - entities.append(ACPower(meter, config_entry, coordinator, 'A')) - entities.append(ACPower(meter, config_entry, coordinator, 'B')) - entities.append(ACPower(meter, config_entry, coordinator, 'C')) + entities.append(ACPower(meter, config_entry, coordinator, "A")) + entities.append(ACPower(meter, config_entry, coordinator, "B")) + entities.append(ACPower(meter, config_entry, coordinator, "C")) entities.append(ACVoltAmp(meter, config_entry, coordinator)) - entities.append(ACVoltAmp(meter, config_entry, coordinator, 'A')) - entities.append(ACVoltAmp(meter, config_entry, coordinator, 'B')) - entities.append(ACVoltAmp(meter, config_entry, coordinator, 'C')) + entities.append(ACVoltAmp(meter, config_entry, coordinator, "A")) + entities.append(ACVoltAmp(meter, config_entry, coordinator, "B")) + entities.append(ACVoltAmp(meter, config_entry, coordinator, "C")) entities.append(ACVoltAmpReactive(meter, config_entry, coordinator)) - entities.append(ACVoltAmpReactive(meter, config_entry, coordinator, 'A')) - entities.append(ACVoltAmpReactive(meter, config_entry, coordinator, 'B')) - entities.append(ACVoltAmpReactive(meter, config_entry, coordinator, 'C')) + entities.append(ACVoltAmpReactive(meter, config_entry, coordinator, "A")) + entities.append(ACVoltAmpReactive(meter, config_entry, coordinator, "B")) + entities.append(ACVoltAmpReactive(meter, config_entry, coordinator, "C")) entities.append(ACPowerFactor(meter, config_entry, coordinator)) - entities.append(ACPowerFactor(meter, config_entry, coordinator, 'A')) - entities.append(ACPowerFactor(meter, config_entry, coordinator, 'B')) - entities.append(ACPowerFactor(meter, config_entry, coordinator, 'C')) - entities.append(ACEnergy(meter, config_entry, coordinator, 'Exported')) - entities.append(ACEnergy(meter, config_entry, coordinator, 'Exported_A')) - entities.append(ACEnergy(meter, config_entry, coordinator, 'Exported_B')) - entities.append(ACEnergy(meter, config_entry, coordinator, 'Exported_C')) - entities.append(ACEnergy(meter, config_entry, coordinator, 'Imported')) - entities.append(ACEnergy(meter, config_entry, coordinator, 'Imported_A')) - entities.append(ACEnergy(meter, config_entry, coordinator, 'Imported_B')) - entities.append(ACEnergy(meter, config_entry, coordinator, 'Imported_C')) - entities.append(MeterVAhIE(meter, config_entry, coordinator, 'Exported')) - entities.append(MeterVAhIE(meter, config_entry, coordinator, 'Exported_A')) - entities.append(MeterVAhIE(meter, config_entry, coordinator, 'Exported_B')) - entities.append(MeterVAhIE(meter, config_entry, coordinator, 'Exported_C')) - entities.append(MeterVAhIE(meter, config_entry, coordinator, 'Imported')) - entities.append(MeterVAhIE(meter, config_entry, coordinator, 'Imported_A')) - entities.append(MeterVAhIE(meter, config_entry, coordinator, 'Imported_B')) - entities.append(MeterVAhIE(meter, config_entry, coordinator, 'Imported_C')) - entities.append(MetervarhIE(meter, config_entry, coordinator, 'Import_Q1')) - entities.append(MetervarhIE(meter, config_entry, coordinator, 'Import_Q1_A')) - entities.append(MetervarhIE(meter, config_entry, coordinator, 'Import_Q1_B')) - entities.append(MetervarhIE(meter, config_entry, coordinator, 'Import_Q1_C')) - entities.append(MetervarhIE(meter, config_entry, coordinator, 'Import_Q2')) - entities.append(MetervarhIE(meter, config_entry, coordinator, 'Import_Q2_A')) - entities.append(MetervarhIE(meter, config_entry, coordinator, 'Import_Q2_B')) - entities.append(MetervarhIE(meter, config_entry, coordinator, 'Import_Q2_C')) - entities.append(MetervarhIE(meter, config_entry, coordinator, 'Export_Q3')) - entities.append(MetervarhIE(meter, config_entry, coordinator, 'Export_Q3_A')) - entities.append(MetervarhIE(meter, config_entry, coordinator, 'Export_Q3_B')) - entities.append(MetervarhIE(meter, config_entry, coordinator, 'Export_Q3_C')) - entities.append(MetervarhIE(meter, config_entry, coordinator, 'Export_Q4')) - entities.append(MetervarhIE(meter, config_entry, coordinator, 'Export_Q4_A')) - entities.append(MetervarhIE(meter, config_entry, coordinator, 'Export_Q4_B')) - entities.append(MetervarhIE(meter, config_entry, coordinator, 'Export_Q4_C')) - + entities.append(ACPowerFactor(meter, config_entry, coordinator, "A")) + entities.append(ACPowerFactor(meter, config_entry, coordinator, "B")) + entities.append(ACPowerFactor(meter, config_entry, coordinator, "C")) + entities.append(ACEnergy(meter, config_entry, coordinator, "Exported")) + entities.append(ACEnergy(meter, config_entry, coordinator, "Exported_A")) + entities.append(ACEnergy(meter, config_entry, coordinator, "Exported_B")) + entities.append(ACEnergy(meter, config_entry, coordinator, "Exported_C")) + entities.append(ACEnergy(meter, config_entry, coordinator, "Imported")) + entities.append(ACEnergy(meter, config_entry, coordinator, "Imported_A")) + entities.append(ACEnergy(meter, config_entry, coordinator, "Imported_B")) + entities.append(ACEnergy(meter, config_entry, coordinator, "Imported_C")) + entities.append(MeterVAhIE(meter, config_entry, coordinator, "Exported")) + entities.append(MeterVAhIE(meter, config_entry, coordinator, "Exported_A")) + entities.append(MeterVAhIE(meter, config_entry, coordinator, "Exported_B")) + entities.append(MeterVAhIE(meter, config_entry, coordinator, "Exported_C")) + entities.append(MeterVAhIE(meter, config_entry, coordinator, "Imported")) + entities.append(MeterVAhIE(meter, config_entry, coordinator, "Imported_A")) + entities.append(MeterVAhIE(meter, config_entry, coordinator, "Imported_B")) + entities.append(MeterVAhIE(meter, config_entry, coordinator, "Imported_C")) + entities.append(MetervarhIE(meter, config_entry, coordinator, "Import_Q1")) + entities.append(MetervarhIE(meter, config_entry, coordinator, "Import_Q1_A")) + entities.append(MetervarhIE(meter, config_entry, coordinator, "Import_Q1_B")) + entities.append(MetervarhIE(meter, config_entry, coordinator, "Import_Q1_C")) + entities.append(MetervarhIE(meter, config_entry, coordinator, "Import_Q2")) + entities.append(MetervarhIE(meter, config_entry, coordinator, "Import_Q2_A")) + entities.append(MetervarhIE(meter, config_entry, coordinator, "Import_Q2_B")) + entities.append(MetervarhIE(meter, config_entry, coordinator, "Import_Q2_C")) + entities.append(MetervarhIE(meter, config_entry, coordinator, "Export_Q3")) + entities.append(MetervarhIE(meter, config_entry, coordinator, "Export_Q3_A")) + entities.append(MetervarhIE(meter, config_entry, coordinator, "Export_Q3_B")) + entities.append(MetervarhIE(meter, config_entry, coordinator, "Export_Q3_C")) + entities.append(MetervarhIE(meter, config_entry, coordinator, "Export_Q4")) + entities.append(MetervarhIE(meter, config_entry, coordinator, "Export_Q4_A")) + entities.append(MetervarhIE(meter, config_entry, coordinator, "Export_Q4_B")) + entities.append(MetervarhIE(meter, config_entry, coordinator, "Export_Q4_C")) + for battery in hub.batteries: if battery.single_device_entity: entities.append(SolarEdgeDevice(battery, config_entry, coordinator)) @@ -165,14 +183,20 @@ async def async_setup_entry( entities.append(SolarEdgeBatteryVoltage(battery, config_entry, coordinator)) entities.append(SolarEdgeBatteryCurrent(battery, config_entry, coordinator)) entities.append(SolarEdgeBatteryPower(battery, config_entry, coordinator)) - entities.append(SolarEdgeBatteryEnergyExport(battery, config_entry, coordinator)) - entities.append(SolarEdgeBatteryEnergyImport(battery, config_entry, coordinator)) + entities.append( + SolarEdgeBatteryEnergyExport(battery, config_entry, coordinator) + ) + entities.append( + SolarEdgeBatteryEnergyImport(battery, config_entry, coordinator) + ) entities.append(SolarEdgeBatteryMaxEnergy(battery, config_entry, coordinator)) - entities.append(SolarEdgeBatteryAvailableEnergy(battery, config_entry, coordinator)) + entities.append( + SolarEdgeBatteryAvailableEnergy(battery, config_entry, coordinator) + ) entities.append(SolarEdgeBatterySOH(battery, config_entry, coordinator)) entities.append(SolarEdgeBatterySOE(battery, config_entry, coordinator)) entities.append(SolarEdgeBatteryStatus(battery, config_entry, coordinator)) - + if entities: async_add_entities(entities) @@ -186,168 +210,181 @@ def __init__(self, platform, config_entry, coordinator): """Initialize the sensor.""" self._platform = platform self._config_entry = config_entry - + @property def device_info(self): return self._platform.device_info - + @property def config_entry_id(self): return self._config_entry.entry_id - + @property def config_entry_name(self): - return self._config_entry.data['name'] - + return self._config_entry.data["name"] + @property def available(self) -> bool: return self._platform.online - + @callback def _handle_coordinator_update(self) -> None: self.async_write_ha_state() + class SolarEdgeDevice(SolarEdgeSensorBase): entity_category = EntityCategory.DIAGNOSTIC - + def __init__(self, platform, config_entry, coordinator): super().__init__(platform, config_entry, coordinator) """Initialize the sensor.""" - + @property def unique_id(self) -> str: return f"{self._platform.model}_{self._platform.serial}_device" - + @property def name(self) -> str: return f"{self._platform._device_info['name']} Device" - + @property def native_value(self): return self._platform.model - + @property def extra_state_attributes(self): attrs = {} - + try: - attrs["batt_charge_peak"] = self._platform.decoded_common["B_MaxChargePeakPower"] - attrs["batt_discharge_peak"] = self._platform.decoded_common["B_MaxDischargePeakPower"] - attrs["batt_max_charge"] = self._platform.decoded_common["B_MaxChargePower"] - attrs["batt_max_discharge"] = self._platform.decoded_common["B_MaxDischargePower"] - attrs["batt_rated_energy"] = self._platform.decoded_common["B_RatedEnergy"] - + attrs["batt_charge_peak"] = self._platform.decoded_common[ + "B_MaxChargePeakPower" + ] + attrs["batt_discharge_peak"] = self._platform.decoded_common[ + "B_MaxDischargePeakPower" + ] + attrs["batt_max_charge"] = self._platform.decoded_common["B_MaxChargePower"] + attrs["batt_max_discharge"] = self._platform.decoded_common[ + "B_MaxDischargePower" + ] + attrs["batt_rated_energy"] = self._platform.decoded_common["B_RatedEnergy"] + except KeyError: pass - - attrs["device_id"] = self._platform.device_address + + attrs["device_id"] = self._platform.device_address attrs["manufacturer"] = self._platform.manufacturer attrs["model"] = self._platform.model - + if len(self._platform.option) > 0: attrs["option"] = self._platform.option - + if self._platform.has_parent: attrs["parent_device_id"] = self._platform.inverter_unit_id - + attrs["serial_number"] = self._platform.serial - + try: - if self._platform.decoded_model['C_SunSpec_DID'] in SUNSPEC_DID: - attrs["sunspec_device"] = SUNSPEC_DID[self._platform.decoded_model['C_SunSpec_DID']] - + if self._platform.decoded_model["C_SunSpec_DID"] in SUNSPEC_DID: + attrs["sunspec_device"] = SUNSPEC_DID[ + self._platform.decoded_model["C_SunSpec_DID"] + ] + else: attrs["sunspec_device"] = "unknown" - + except KeyError: attrs["sunspec_device"] = None - + try: - attrs["sunspec_did"] = self._platform.decoded_model['C_SunSpec_DID'] + attrs["sunspec_did"] = self._platform.decoded_model["C_SunSpec_DID"] except KeyError: attrs["sunspec_did"] = None - + return attrs + class SerialNumber(SolarEdgeSensorBase): entity_category = EntityCategory.DIAGNOSTIC - + def __init__(self, platform, config_entry, coordinator): super().__init__(platform, config_entry, coordinator) """Initialize the sensor.""" - + @property def unique_id(self) -> str: return f"{self._platform.model}_{self._platform.serial}_serial_number" - + @property def name(self) -> str: return f"{self._platform._device_info['name']} Serial Number" - + @property def native_value(self): return self._platform.serial + class Manufacturer(SolarEdgeSensorBase): entity_category = EntityCategory.DIAGNOSTIC - + def __init__(self, platform, config_entry, coordinator): super().__init__(platform, config_entry, coordinator) """Initialize the sensor.""" - + @property def unique_id(self) -> str: return f"{self._platform.model}_{self._platform.serial}_manufacturer" - + @property def name(self) -> str: return f"{self._platform._device_info['name']} Manufacturer" - + @property def native_value(self): return self._platform.manufacturer + class Model(SolarEdgeSensorBase): entity_category = EntityCategory.DIAGNOSTIC - + def __init__(self, platform, config_entry, coordinator): super().__init__(platform, config_entry, coordinator) """Initialize the sensor.""" - + @property def unique_id(self) -> str: return f"{self._platform.model}_{self._platform.serial}_model" - + @property def name(self) -> str: return f"{self._platform._device_info['name']} Model" - + @property def native_value(self): return self._platform.model + class Option(SolarEdgeSensorBase): entity_category = EntityCategory.DIAGNOSTIC - + def __init__(self, platform, config_entry, coordinator): super().__init__(platform, config_entry, coordinator) """Initialize the sensor.""" - + @property def unique_id(self) -> str: return f"{self._platform.model}_{self._platform.serial}_option" - + @property def name(self) -> str: return f"{self._platform._device_info['name']} Option" - + @property def entity_registry_enabled_default(self) -> bool: if len(self._platform.option) == 0: return False else: return True - + @property def native_value(self): if len(self._platform.option) > 0: @@ -355,507 +392,608 @@ def native_value(self): else: return None + class Version(SolarEdgeSensorBase): entity_category = EntityCategory.DIAGNOSTIC - + def __init__(self, platform, config_entry, coordinator): super().__init__(platform, config_entry, coordinator) """Initialize the sensor.""" - + @property def unique_id(self) -> str: return f"{self._platform.model}_{self._platform.serial}_version" - + @property def name(self) -> str: return f"{self._platform._device_info['name']} Version" - + @property def native_value(self): return self._platform.fw_version + class DeviceAddress(SolarEdgeSensorBase): entity_category = EntityCategory.DIAGNOSTIC - + def __init__(self, platform, config_entry, coordinator): super().__init__(platform, config_entry, coordinator) """Initialize the sensor.""" - + @property def unique_id(self) -> str: return f"{self._platform.model}_{self._platform.serial}_device_id" - + @property def name(self) -> str: return f"{self._platform._device_info['name']} Device ID" - + @property def native_value(self): return self._platform.device_address + class DeviceAddressParent(SolarEdgeSensorBase): entity_category = EntityCategory.DIAGNOSTIC - + def __init__(self, platform, config_entry, coordinator): super().__init__(platform, config_entry, coordinator) """Initialize the sensor.""" - + @property def unique_id(self) -> str: return f"{self._platform.model}_{self._platform.serial}_parent_device_id" - + @property def name(self) -> str: return f"{self._platform._device_info['name']} Parent Device ID" - + @property def native_value(self): return self._platform.inverter_unit_id + class SunspecDID(SolarEdgeSensorBase): entity_category = EntityCategory.DIAGNOSTIC - + def __init__(self, platform, config_entry, coordinator): super().__init__(platform, config_entry, coordinator) """Initialize the sensor.""" - + @property def unique_id(self) -> str: return f"{self._platform.model}_{self._platform.serial}_sunspec_device_id" - + @property def name(self) -> str: return f"{self._platform._device_info['name']} Sunspec Device ID" - + @property def native_value(self): try: - if (self._platform.decoded_model['C_SunSpec_DID'] == SUNSPEC_NOT_IMPL_UINT16): + if self._platform.decoded_model["C_SunSpec_DID"] == SUNSPEC_NOT_IMPL_UINT16: return None - + else: - return self._platform.decoded_model['C_SunSpec_DID'] - + return self._platform.decoded_model["C_SunSpec_DID"] + except TypeError: return None - + @property def extra_state_attributes(self): try: - if self._platform.decoded_model['C_SunSpec_DID'] in SUNSPEC_DID: - return {"description": SUNSPEC_DID[self._platform.decoded_model['C_SunSpec_DID']]} - + if self._platform.decoded_model["C_SunSpec_DID"] in SUNSPEC_DID: + return { + "description": SUNSPEC_DID[ + self._platform.decoded_model["C_SunSpec_DID"] + ] + } + else: return None - + except KeyError: return None + class ACCurrentSensor(SolarEdgeSensorBase): device_class = SensorDeviceClass.CURRENT state_class = SensorStateClass.MEASUREMENT native_unit_of_measurement = ELECTRIC_CURRENT_AMPERE - + def __init__(self, platform, config_entry, coordinator, phase: str = None): super().__init__(platform, config_entry, coordinator) """Initialize the sensor.""" self._phase = phase - - if self._platform.decoded_model['C_SunSpec_DID'] in [101,102,103]: + + if self._platform.decoded_model["C_SunSpec_DID"] in [101, 102, 103]: self.SUNSPEC_NOT_IMPL = SUNSPEC_NOT_IMPL_UINT16 - elif self._platform.decoded_model['C_SunSpec_DID'] in [201,202,203,204]: + elif self._platform.decoded_model["C_SunSpec_DID"] in [201, 202, 203, 204]: self.SUNSPEC_NOT_IMPL = SUNSPEC_NOT_IMPL_INT16 else: - raise RuntimeError("ACCurrentSensor: Unknown C_SunSpec_DID {self._platform.decoded_model['C_SunSpec_DID']}") - + raise RuntimeError( + "ACCurrentSensor C_SunSpec_DID ", + f"{self._platform.decoded_model['C_SunSpec_DID']}", + ) + @property def unique_id(self) -> str: - if self._phase == None: + if self._phase is None: return f"{self._platform.model}_{self._platform.serial}_ac_current" else: - return f"{self._platform.model}_{self._platform.serial}_ac_current_{self._phase.lower()}" - + return ( + f"{self._platform.model}_{self._platform.serial}_", + f"ac_current_{self._phase.lower()}", + ) + @property def name(self) -> str: - if self._phase == None: + if self._phase is None: return f"{self._platform._device_info['name']} AC Current" else: - return f"{self._platform._device_info['name']} AC Current {self._phase.upper()}" - + return ( + f"{self._platform._device_info['name']} ", + f"AC Current {self._phase.upper()}", + ) + @property def native_value(self): - if self._phase == None: + if self._phase is None: model_key = "AC_Current" else: model_key = f"AC_Current_{self._phase.upper()}" - + try: - if (self._platform.decoded_model[model_key] == self.SUNSPEC_NOT_IMPL or - self._platform.decoded_model['AC_Current_SF'] == SUNSPEC_NOT_IMPL_INT16 or - self._platform.decoded_model['AC_Current_SF'] not in SUNSPEC_SF_RANGE + if ( + self._platform.decoded_model[model_key] == self.SUNSPEC_NOT_IMPL + or self._platform.decoded_model["AC_Current_SF"] + == SUNSPEC_NOT_IMPL_INT16 + or self._platform.decoded_model["AC_Current_SF"] not in SUNSPEC_SF_RANGE ): return None - + else: - return scale_factor(self._platform.decoded_model[model_key], self._platform.decoded_model['AC_Current_SF']) - + return scale_factor( + self._platform.decoded_model[model_key], + self._platform.decoded_model["AC_Current_SF"], + ) + except TypeError: return None + class VoltageSensor(SolarEdgeSensorBase): device_class = SensorDeviceClass.VOLTAGE state_class = SensorStateClass.MEASUREMENT native_unit_of_measurement = ELECTRIC_POTENTIAL_VOLT - + def __init__(self, platform, config_entry, coordinator, phase: str = None): super().__init__(platform, config_entry, coordinator) """Initialize the sensor.""" self._phase = phase - - if self._platform.decoded_model['C_SunSpec_DID'] in [101,102,103]: + + if self._platform.decoded_model["C_SunSpec_DID"] in [101, 102, 103]: self.SUNSPEC_NOT_IMPL = SUNSPEC_NOT_IMPL_UINT16 - elif self._platform.decoded_model['C_SunSpec_DID'] in [201,202,203,204]: + elif self._platform.decoded_model["C_SunSpec_DID"] in [201, 202, 203, 204]: self.SUNSPEC_NOT_IMPL = SUNSPEC_NOT_IMPL_INT16 else: - raise RuntimeError("ACCurrentSensor: Unknown C_SunSpec_DID {self._platform.decoded_model['C_SunSpec_DID']}") - + raise RuntimeError( + "ACCurrentSensor C_SunSpec_DID ", + f"{self._platform.decoded_model['C_SunSpec_DID']}", + ) + @property def unique_id(self) -> str: - if self._phase == None: + if self._phase is None: return f"{self._platform.model}_{self._platform.serial}_ac_voltage" else: - return f"{self._platform.model}_{self._platform.serial}_ac_voltage_{self._phase.lower()}" - + return ( + f"{self._platform.model}_{self._platform.serial}_", + f"ac_voltage_{self._phase.lower()}", + ) + @property def name(self) -> str: - if self._phase == None: + if self._phase is None: return f"{self._platform._device_info['name']} AC Voltage" else: - return f"{self._platform._device_info['name']} AC Voltage {self._phase.upper()}" - + return ( + f"{self._platform._device_info['name']} ", + f"AC Voltage {self._phase.upper()}", + ) + @property def native_value(self): - if self._phase == None: + if self._phase is None: model_key = "AC_Voltage" else: model_key = f"AC_Voltage_{self._phase.upper()}" - + try: - if (self._platform.decoded_model[model_key] == self.SUNSPEC_NOT_IMPL or - self._platform.decoded_model['AC_Voltage_SF'] == SUNSPEC_NOT_IMPL_INT16 or - self._platform.decoded_model['AC_Voltage_SF'] not in SUNSPEC_SF_RANGE + if ( + self._platform.decoded_model[model_key] == self.SUNSPEC_NOT_IMPL + or self._platform.decoded_model["AC_Voltage_SF"] + == SUNSPEC_NOT_IMPL_INT16 + or self._platform.decoded_model["AC_Voltage_SF"] not in SUNSPEC_SF_RANGE ): return None - + else: - value = scale_factor(self._platform.decoded_model[model_key], self._platform.decoded_model['AC_Voltage_SF']) - return round(value, abs(self._platform.decoded_model['AC_Voltage_SF'])) - + value = scale_factor( + self._platform.decoded_model[model_key], + self._platform.decoded_model["AC_Voltage_SF"], + ) + return round(value, abs(self._platform.decoded_model["AC_Voltage_SF"])) + except TypeError: return None + class ACPower(SolarEdgeSensorBase): device_class = SensorDeviceClass.POWER state_class = SensorStateClass.MEASUREMENT native_unit_of_measurement = POWER_WATT - icon = 'mdi:solar-power' - + icon = "mdi:solar-power" + def __init__(self, platform, config_entry, coordinator, phase: str = None): super().__init__(platform, config_entry, coordinator) """Initialize the sensor.""" self._phase = phase - + @property def unique_id(self) -> str: - if self._phase == None: + if self._phase is None: return f"{self._platform.model}_{self._platform.serial}_ac_power" else: - return f"{self._platform.model}_{self._platform.serial}_ac_power_{self._phase.lower()}" - + return ( + f"{self._platform.model}_{self._platform.serial}_", + f"ac_power_{self._phase.lower()}", + ) + @property def name(self) -> str: - if self._phase == None: + if self._phase is None: return f"{self._platform._device_info['name']} AC Power" else: - return f"{self._platform._device_info['name']} AC Power {self._phase.upper()}" - + return ( + f"{self._platform._device_info['name']} AC Power {self._phase.upper()}" + ) + @property def native_value(self): - if self._phase == None: + if self._phase is None: model_key = "AC_Power" else: model_key = f"AC_Power_{self._phase.upper()}" - + try: - if (self._platform.decoded_model[model_key] == SUNSPEC_NOT_IMPL_INT16 or - self._platform.decoded_model['AC_Power_SF'] == SUNSPEC_NOT_IMPL_INT16 + if ( + self._platform.decoded_model[model_key] == SUNSPEC_NOT_IMPL_INT16 + or self._platform.decoded_model["AC_Power_SF"] == SUNSPEC_NOT_IMPL_INT16 ): return None - + else: - value = scale_factor(self._platform.decoded_model[model_key], self._platform.decoded_model['AC_Power_SF']) - return round(value, abs(self._platform.decoded_model['AC_Power_SF'])) - + value = scale_factor( + self._platform.decoded_model[model_key], + self._platform.decoded_model["AC_Power_SF"], + ) + return round(value, abs(self._platform.decoded_model["AC_Power_SF"])) + except TypeError: return None + class ACFrequency(SolarEdgeSensorBase): device_class = SensorDeviceClass.FREQUENCY state_class = SensorStateClass.MEASUREMENT native_unit_of_measurement = FREQUENCY_HERTZ - + def __init__(self, platform, config_entry, coordinator): super().__init__(platform, config_entry, coordinator) """Initialize the sensor.""" - + @property def unique_id(self) -> str: return f"{self._platform.model}_{self._platform.serial}_ac_frequency" - + @property def name(self) -> str: return f"{self._platform._device_info['name']} AC Frequency" - + @property def native_value(self): try: - if (self._platform.decoded_model['AC_Frequency'] == SUNSPEC_NOT_IMPL_UINT16 or - self._platform.decoded_model['AC_Frequency_SF'] == SUNSPEC_NOT_IMPL_INT16 or - self._platform.decoded_model['AC_Frequency_SF'] not in SUNSPEC_SF_RANGE + if ( + self._platform.decoded_model["AC_Frequency"] == SUNSPEC_NOT_IMPL_UINT16 + or self._platform.decoded_model["AC_Frequency_SF"] + == SUNSPEC_NOT_IMPL_INT16 + or self._platform.decoded_model["AC_Frequency_SF"] + not in SUNSPEC_SF_RANGE ): return None - + else: - value = scale_factor(self._platform.decoded_model['AC_Frequency'], self._platform.decoded_model['AC_Frequency_SF']) - return round(value, abs(self._platform.decoded_model['AC_Frequency_SF'])) - + value = scale_factor( + self._platform.decoded_model["AC_Frequency"], + self._platform.decoded_model["AC_Frequency_SF"], + ) + return round( + value, abs(self._platform.decoded_model["AC_Frequency_SF"]) + ) + except TypeError: return None + class ACVoltAmp(SolarEdgeSensorBase): device_class = SensorDeviceClass.APPARENT_POWER state_class = SensorStateClass.MEASUREMENT native_unit_of_measurement = POWER_VOLT_AMPERE - + def __init__(self, platform, config_entry, coordinator, phase: str = None): super().__init__(platform, config_entry, coordinator) """Initialize the sensor.""" self._phase = phase - + @property def unique_id(self) -> str: - if self._phase == None: + if self._phase is None: return f"{self._platform.model}_{self._platform.serial}_ac_va" else: - return f"{self._platform.model}_{self._platform.serial}_ac_va_{self._phase.lower()}" - + return ( + f"{self._platform.model}_{self._platform.serial}_", + f"ac_va_{self._phase.lower()}", + ) + @property def name(self) -> str: - if self._phase == None: + if self._phase is None: return f"{self._platform._device_info['name']} AC VA" else: return f"{self._platform._device_info['name']} AC VA {self._phase.upper()}" - + @property def entity_registry_enabled_default(self) -> bool: return False - + @property def native_value(self): - if self._phase == None: + if self._phase is None: model_key = "AC_VA" else: model_key = f"AC_VA_{self._phase.upper()}" try: - if (self._platform.decoded_model[model_key] == SUNSPEC_NOT_IMPL_INT16 or - self._platform.decoded_model['AC_VA_SF'] == SUNSPEC_NOT_IMPL_INT16 or - self._platform.decoded_model['AC_VA_SF'] not in SUNSPEC_SF_RANGE + if ( + self._platform.decoded_model[model_key] == SUNSPEC_NOT_IMPL_INT16 + or self._platform.decoded_model["AC_VA_SF"] == SUNSPEC_NOT_IMPL_INT16 + or self._platform.decoded_model["AC_VA_SF"] not in SUNSPEC_SF_RANGE ): return None - + else: - value = scale_factor(self._platform.decoded_model[model_key], self._platform.decoded_model['AC_VA_SF']) - return round(value, abs(self._platform.decoded_model['AC_VA_SF'])) - + value = scale_factor( + self._platform.decoded_model[model_key], + self._platform.decoded_model["AC_VA_SF"], + ) + return round(value, abs(self._platform.decoded_model["AC_VA_SF"])) + except TypeError: return None + class ACVoltAmpReactive(SolarEdgeSensorBase): device_class = SensorDeviceClass.REACTIVE_POWER state_class = SensorStateClass.MEASUREMENT native_unit_of_measurement = POWER_VOLT_AMPERE_REACTIVE - + def __init__(self, platform, config_entry, coordinator, phase: str = None): super().__init__(platform, config_entry, coordinator) """Initialize the sensor.""" self._phase = phase - + @property def unique_id(self) -> str: - if self._phase == None: + if self._phase is None: return f"{self._platform.model}_{self._platform.serial}_ac_var" else: - return f"{self._platform.model}_{self._platform.serial}_ac_var_{self._phase.lower()}" - + return ( + f"{self._platform.model}_{self._platform.serial}_", + f"ac_var_{self._phase.lower()}", + ) + @property def name(self) -> str: - if self._phase == None: + if self._phase is None: return f"{self._platform._device_info['name']} AC var" else: - return f"{self._platform._device_info['name']} AC var {self._phase.upper()}" - + return ( + f"{self._platform._device_info['name']} ", + f"AC var {self._phase.upper()}", + ) + @property def entity_registry_enabled_default(self) -> bool: return False - + @property def native_value(self): - if self._phase == None: + if self._phase is None: model_key = "AC_var" else: model_key = f"AC_var_{self._phase.upper()}" - + try: - if (self._platform.decoded_model[model_key] == SUNSPEC_NOT_IMPL_INT16 or - self._platform.decoded_model['AC_var_SF'] == SUNSPEC_NOT_IMPL_INT16 or - self._platform.decoded_model['AC_var_SF'] not in SUNSPEC_SF_RANGE + if ( + self._platform.decoded_model[model_key] == SUNSPEC_NOT_IMPL_INT16 + or self._platform.decoded_model["AC_var_SF"] == SUNSPEC_NOT_IMPL_INT16 + or self._platform.decoded_model["AC_var_SF"] not in SUNSPEC_SF_RANGE ): return None - + else: - value = scale_factor(self._platform.decoded_model[model_key], self._platform.decoded_model['AC_var_SF']) - return round(value, abs(self._platform.decoded_model['AC_var_SF'])) - + value = scale_factor( + self._platform.decoded_model[model_key], + self._platform.decoded_model["AC_var_SF"], + ) + return round(value, abs(self._platform.decoded_model["AC_var_SF"])) + except TypeError: return None + class ACPowerFactor(SolarEdgeSensorBase): device_class = SensorDeviceClass.POWER_FACTOR state_class = SensorStateClass.MEASUREMENT native_unit_of_measurement = PERCENTAGE - + def __init__(self, platform, config_entry, coordinator, phase: str = None): super().__init__(platform, config_entry, coordinator) """Initialize the sensor.""" self._phase = phase - + @property def unique_id(self) -> str: - if self._phase == None: + if self._phase is None: return f"{self._platform.model}_{self._platform.serial}_ac_pf" else: - return f"{self._platform.model}_{self._platform.serial}_ac_pf_{self._phase.lower()}" - + return ( + f"{self._platform.model}_{self._platform.serial}_", + f"ac_pf_{self._phase.lower()}", + ) + @property def name(self) -> str: - if self._phase == None: + if self._phase is None: return f"{self._platform._device_info['name']} AC PF" else: return f"{self._platform._device_info['name']} AC PF {self._phase.upper()}" - + @property def entity_registry_enabled_default(self) -> bool: return False - + @property def native_value(self): - if self._phase == None: + if self._phase is None: model_key = "AC_PF" else: model_key = f"AC_PF_{self._phase.upper()}" - + try: - if (self._platform.decoded_model[model_key] == SUNSPEC_NOT_IMPL_INT16 or - self._platform.decoded_model['AC_PF_SF'] == SUNSPEC_NOT_IMPL_INT16 or - self._platform.decoded_model['AC_PF_SF'] not in SUNSPEC_SF_RANGE + if ( + self._platform.decoded_model[model_key] == SUNSPEC_NOT_IMPL_INT16 + or self._platform.decoded_model["AC_PF_SF"] == SUNSPEC_NOT_IMPL_INT16 + or self._platform.decoded_model["AC_PF_SF"] not in SUNSPEC_SF_RANGE ): return None - + else: - value = scale_factor(self._platform.decoded_model[model_key], self._platform.decoded_model['AC_PF_SF']) - return round(value, abs(self._platform.decoded_model['AC_PF_SF'])) - + value = scale_factor( + self._platform.decoded_model[model_key], + self._platform.decoded_model["AC_PF_SF"], + ) + return round(value, abs(self._platform.decoded_model["AC_PF_SF"])) + except TypeError: return None + class ACEnergy(SolarEdgeSensorBase): device_class = SensorDeviceClass.ENERGY state_class = SensorStateClass.TOTAL_INCREASING native_unit_of_measurement = ENERGY_KILO_WATT_HOUR - + def __init__(self, platform, config_entry, coordinator, phase: str = None): super().__init__(platform, config_entry, coordinator) """Initialize the sensor.""" self._phase = phase self.last = None - - if self._platform.decoded_model['C_SunSpec_DID'] in [101,102,103]: + + if self._platform.decoded_model["C_SunSpec_DID"] in [101, 102, 103]: self.SUNSPEC_NOT_IMPL = SUNSPEC_NOT_IMPL_UINT16 - elif self._platform.decoded_model['C_SunSpec_DID'] in [201,202,203,204]: + elif self._platform.decoded_model["C_SunSpec_DID"] in [201, 202, 203, 204]: self.SUNSPEC_NOT_IMPL = SUNSPEC_NOT_IMPL_INT16 else: - raise RuntimeError("ACEnergy: Unknown C_SunSpec_DID {self._platform.decoded_model['C_SunSpec_DID']}") - + raise RuntimeError( + "ACEnergy C_SunSpec_DID ", + f"{self._platform.decoded_model['C_SunSpec_DID']}", + ) + @property def icon(self) -> str: if self._phase is None: return None - - elif re.match('import', self._phase.lower()): - return 'mdi:transmission-tower-export' - - elif re.match('export', self._phase.lower()): - return 'mdi:transmission-tower-import' - + + elif re.match("import", self._phase.lower()): + return "mdi:transmission-tower-export" + + elif re.match("export", self._phase.lower()): + return "mdi:transmission-tower-import" + else: return None - + @property def unique_id(self) -> str: - if self._phase == None: + if self._phase is None: return f"{self._platform.model}_{self._platform.serial}_ac_energy_kwh" else: - return f"{self._platform.model}_{self._platform.serial}_{self._phase.lower()}_kwh" - + return ( + f"{self._platform.model}_{self._platform.serial}_", + f"{self._phase.lower()}_kwh", + ) + @property def name(self) -> str: - if self._phase == None: + if self._phase is None: return f"{self._platform._device_info['name']} AC Energy kWh" else: - return f"{self._platform._device_info['name']} {re.sub('_', ' ', self._phase)} kWh" - + return ( + f"{self._platform._device_info['name']} ", + f"{re.sub('_', ' ', self._phase)} kWh", + ) + @property def native_value(self): - if self._phase == None: + if self._phase is None: model_key = "AC_Energy_WH" else: model_key = f"AC_Energy_WH_{self._phase}" - + try: - if (self._platform.decoded_model[model_key] == SUNSPEC_NOT_ACCUM_ACC32 or - self._platform.decoded_model[model_key] > SUNSPEC_ACCUM_LIMIT or - self._platform.decoded_model['AC_Energy_WH_SF'] == self.SUNSPEC_NOT_IMPL or - self._platform.decoded_model['AC_Energy_WH_SF'] not in SUNSPEC_SF_RANGE + if ( + self._platform.decoded_model[model_key] == SUNSPEC_NOT_ACCUM_ACC32 + or self._platform.decoded_model[model_key] > SUNSPEC_ACCUM_LIMIT + or self._platform.decoded_model["AC_Energy_WH_SF"] + == self.SUNSPEC_NOT_IMPL + or self._platform.decoded_model["AC_Energy_WH_SF"] + not in SUNSPEC_SF_RANGE ): return None - + else: - value = scale_factor(self._platform.decoded_model[model_key], self._platform.decoded_model['AC_Energy_WH_SF']) - + value = scale_factor( + self._platform.decoded_model[model_key], + self._platform.decoded_model["AC_Energy_WH_SF"], + ) + try: return watts_to_kilowatts(update_accum(self, value)) - except: + except Exception: return None - + except TypeError: return None + class DCCurrent(SolarEdgeSensorBase): device_class = SensorDeviceClass.CURRENT state_class = SensorStateClass.MEASUREMENT native_unit_of_measurement = ELECTRIC_CURRENT_AMPERE - icon = 'mdi:current-dc' + icon = "mdi:current-dc" def __init__(self, platform, config_entry, coordinator): super().__init__(platform, config_entry, coordinator) @@ -872,18 +1010,27 @@ def name(self) -> str: @property def native_value(self): try: - if (self._platform.decoded_model['I_DC_Current'] == SUNSPEC_NOT_IMPL_UINT16 or - self._platform.decoded_model['I_DC_Current_SF'] == SUNSPEC_NOT_IMPL_INT16 or - self._platform.decoded_model['I_DC_Current_SF'] not in SUNSPEC_SF_RANGE + if ( + self._platform.decoded_model["I_DC_Current"] == SUNSPEC_NOT_IMPL_UINT16 + or self._platform.decoded_model["I_DC_Current_SF"] + == SUNSPEC_NOT_IMPL_INT16 + or self._platform.decoded_model["I_DC_Current_SF"] + not in SUNSPEC_SF_RANGE ): return None - + else: - value = scale_factor(self._platform.decoded_model['I_DC_Current'], self._platform.decoded_model['I_DC_Current_SF']) - return round(value, abs(self._platform.decoded_model['I_DC_Current_SF'])) - + value = scale_factor( + self._platform.decoded_model["I_DC_Current"], + self._platform.decoded_model["I_DC_Current_SF"], + ) + return round( + value, abs(self._platform.decoded_model["I_DC_Current_SF"]) + ) + except TypeError: - return None + return None + class DCVoltage(SolarEdgeSensorBase): device_class = SensorDeviceClass.VOLTAGE @@ -905,24 +1052,33 @@ def name(self) -> str: @property def native_value(self): try: - if (self._platform.decoded_model['I_DC_Voltage'] == SUNSPEC_NOT_IMPL_UINT16 or - self._platform.decoded_model['I_DC_Voltage_SF'] == SUNSPEC_NOT_IMPL_INT16 or - self._platform.decoded_model['I_DC_Voltage_SF'] not in SUNSPEC_SF_RANGE + if ( + self._platform.decoded_model["I_DC_Voltage"] == SUNSPEC_NOT_IMPL_UINT16 + or self._platform.decoded_model["I_DC_Voltage_SF"] + == SUNSPEC_NOT_IMPL_INT16 + or self._platform.decoded_model["I_DC_Voltage_SF"] + not in SUNSPEC_SF_RANGE ): return None - + else: - value = scale_factor(self._platform.decoded_model['I_DC_Voltage'], self._platform.decoded_model['I_DC_Voltage_SF']) - return round(value, abs(self._platform.decoded_model['I_DC_Voltage_SF'])) - + value = scale_factor( + self._platform.decoded_model["I_DC_Voltage"], + self._platform.decoded_model["I_DC_Voltage_SF"], + ) + return round( + value, abs(self._platform.decoded_model["I_DC_Voltage_SF"]) + ) + except TypeError: - return None + return None + class DCPower(SolarEdgeSensorBase): device_class = SensorDeviceClass.POWER state_class = SensorStateClass.MEASUREMENT native_unit_of_measurement = POWER_WATT - icon = 'mdi:solar-power' + icon = "mdi:solar-power" def __init__(self, platform, config_entry, coordinator): super().__init__(platform, config_entry, coordinator) @@ -939,18 +1095,24 @@ def name(self) -> str: @property def native_value(self): try: - if (self._platform.decoded_model['I_DC_Power'] == SUNSPEC_NOT_IMPL_INT16 or - self._platform.decoded_model['I_DC_Power_SF'] == SUNSPEC_NOT_IMPL_INT16 or - self._platform.decoded_model['I_DC_Power_SF'] not in SUNSPEC_SF_RANGE + if ( + self._platform.decoded_model["I_DC_Power"] == SUNSPEC_NOT_IMPL_INT16 + or self._platform.decoded_model["I_DC_Power_SF"] + == SUNSPEC_NOT_IMPL_INT16 + or self._platform.decoded_model["I_DC_Power_SF"] not in SUNSPEC_SF_RANGE ): return None - + else: - value = scale_factor(self._platform.decoded_model['I_DC_Power'], self._platform.decoded_model['I_DC_Power_SF']) - return round(value, abs(self._platform.decoded_model['I_DC_Power_SF'])) - + value = scale_factor( + self._platform.decoded_model["I_DC_Power"], + self._platform.decoded_model["I_DC_Power_SF"], + ) + return round(value, abs(self._platform.decoded_model["I_DC_Power_SF"])) + except TypeError: - return None + return None + class HeatSinkTemperature(SolarEdgeSensorBase): device_class = SensorDeviceClass.TEMPERATURE @@ -972,27 +1134,32 @@ def name(self) -> str: @property def native_value(self): try: - if (self._platform.decoded_model['I_Temp_Sink'] == 0x0 or - self._platform.decoded_model['I_Temp_Sink'] == SUNSPEC_NOT_IMPL_INT16 or - self._platform.decoded_model['I_Temp_SF'] == SUNSPEC_NOT_IMPL_INT16 or - self._platform.decoded_model['I_Temp_SF'] not in SUNSPEC_SF_RANGE + if ( + self._platform.decoded_model["I_Temp_Sink"] == 0x0 + or self._platform.decoded_model["I_Temp_Sink"] == SUNSPEC_NOT_IMPL_INT16 + or self._platform.decoded_model["I_Temp_SF"] == SUNSPEC_NOT_IMPL_INT16 + or self._platform.decoded_model["I_Temp_SF"] not in SUNSPEC_SF_RANGE ): return None - + else: - value = scale_factor(self._platform.decoded_model['I_Temp_Sink'], self._platform.decoded_model['I_Temp_SF']) - return round(value, abs(self._platform.decoded_model['I_Temp_SF'])) - + value = scale_factor( + self._platform.decoded_model["I_Temp_Sink"], + self._platform.decoded_model["I_Temp_SF"], + ) + return round(value, abs(self._platform.decoded_model["I_Temp_SF"])) + except TypeError: - return None + return None + class Status(SolarEdgeSensorBase): entity_category = EntityCategory.DIAGNOSTIC - + def __init__(self, platform, config_entry, coordinator): super().__init__(platform, config_entry, coordinator) """Initialize the sensor.""" - + @property def unique_id(self) -> str: return f"{self._platform.model}_{self._platform.serial}_status" @@ -1004,38 +1171,43 @@ def name(self) -> str: @property def native_value(self): try: - if (self._platform.decoded_model['I_Status'] == SUNSPEC_NOT_IMPL_INT16): + if self._platform.decoded_model["I_Status"] == SUNSPEC_NOT_IMPL_INT16: return None - + else: - return str(self._platform.decoded_model['I_Status']) - + return str(self._platform.decoded_model["I_Status"]) + except TypeError: return None - + @property def extra_state_attributes(self): attrs = {} - + try: - if self._platform.decoded_model['I_Status'] in DEVICE_STATUS_DESC: - attrs["description"] = DEVICE_STATUS_DESC[self._platform.decoded_model['I_Status']] - - if self._platform.decoded_model['I_Status'] in DEVICE_STATUS: - attrs["status_text"] = DEVICE_STATUS[self._platform.decoded_model['I_Status']] - + if self._platform.decoded_model["I_Status"] in DEVICE_STATUS_DESC: + attrs["description"] = DEVICE_STATUS_DESC[ + self._platform.decoded_model["I_Status"] + ] + + if self._platform.decoded_model["I_Status"] in DEVICE_STATUS: + attrs["status_text"] = DEVICE_STATUS[ + self._platform.decoded_model["I_Status"] + ] + except KeyError: pass return attrs + class StatusVendor(SolarEdgeSensorBase): entity_category = EntityCategory.DIAGNOSTIC - + def __init__(self, platform, config_entry, coordinator): super().__init__(platform, config_entry, coordinator) """Initialize the sensor.""" - + @property def unique_id(self) -> str: return f"{self._platform.model}_{self._platform.serial}_status_vendor" @@ -1047,34 +1219,42 @@ def name(self) -> str: @property def native_value(self): try: - if (self._platform.decoded_model['I_Status_Vendor'] == SUNSPEC_NOT_IMPL_INT16): + if ( + self._platform.decoded_model["I_Status_Vendor"] + == SUNSPEC_NOT_IMPL_INT16 + ): return None - + else: - return str(self._platform.decoded_model['I_Status_Vendor']) - + return str(self._platform.decoded_model["I_Status_Vendor"]) + except TypeError: return None - + @property def extra_state_attributes(self): try: - if self._platform.decoded_model['I_Status_Vendor'] in VENDOR_STATUS: - return {"description": VENDOR_STATUS[self._platform.decoded_model['I_Status_Vendor']]} - + if self._platform.decoded_model["I_Status_Vendor"] in VENDOR_STATUS: + return { + "description": VENDOR_STATUS[ + self._platform.decoded_model["I_Status_Vendor"] + ] + } + else: return None - + except KeyError: return None + class MeterEvents(SolarEdgeSensorBase): entity_category = EntityCategory.DIAGNOSTIC - + def __init__(self, platform, config_entry, coordinator): super().__init__(platform, config_entry, coordinator) """Initialize the sensor.""" - + @property def unique_id(self) -> str: return f"{self._platform.model}_{self._platform.serial}_meter_events" @@ -1086,30 +1266,33 @@ def name(self) -> str: @property def native_value(self): try: - if (self._platform.decoded_model['M_Events'] == SUNSPEC_NOT_IMPL_UINT32): + if self._platform.decoded_model["M_Events"] == SUNSPEC_NOT_IMPL_UINT32: return None - + else: - return self._platform.decoded_model['M_Events'] - + return self._platform.decoded_model["M_Events"] + except TypeError: return None - + @property def extra_state_attributes(self): try: m_events_active = [] - if int(str(self._platform.decoded_model['M_Events']),16) == 0x0: + if int(str(self._platform.decoded_model["M_Events"]), 16) == 0x0: return {"description": str(m_events_active)} else: - for i in range(2,31): - if (int(str(self._platform.decoded_model['M_Events']),16) & (1 << i)): + for i in range(2, 31): + if int(str(self._platform.decoded_model["M_Events"]), 16) & ( + 1 << i + ): m_events_active.append(METER_EVENTS[i]) return {"description": str(m_events_active)} - + except KeyError: return None + class MeterVAhIE(SolarEdgeSensorBase): device_class = SensorDeviceClass.ENERGY state_class = SensorStateClass.TOTAL_INCREASING @@ -1125,60 +1308,71 @@ def __init__(self, platform, config_entry, coordinator, phase: str = None): def icon(self) -> str: if self._phase is None: return None - - elif re.match('import', self._phase.lower()): - return 'mdi:transmission-tower-export' - - elif re.match('export', self._phase.lower()): - return 'mdi:transmission-tower-import' - + + elif re.match("import", self._phase.lower()): + return "mdi:transmission-tower-export" + + elif re.match("export", self._phase.lower()): + return "mdi:transmission-tower-import" + else: return None @property def unique_id(self) -> str: - if self._phase == None: + if self._phase is None: raise NotImplementedError else: - return f"{self._platform.model}_{self._platform.serial}_{self._phase.lower()}_vah" - + return ( + f"{self._platform.model}_{self._platform.serial}_", + f"{self._phase.lower()}_vah", + ) + @property def entity_registry_enabled_default(self) -> bool: return False @property def name(self) -> str: - if self._phase == None: + if self._phase is None: raise NotImplementedError else: - return f"{self._platform._device_info['name']} {re.sub('_', ' ', self._phase)} VAh" - + return ( + f"{self._platform._device_info['name']} ", + f"{re.sub('_', ' ', self._phase)} VAh", + ) + @property def native_value(self): - if self._phase == None: + if self._phase is None: raise NotImplementedError else: model_key = f"M_VAh_{self._phase}" try: - if (self._platform.decoded_model[model_key] == SUNSPEC_NOT_ACCUM_ACC32 or - self._platform.decoded_model[model_key] > SUNSPEC_ACCUM_LIMIT or - self._platform.decoded_model['M_VAh_SF'] == SUNSPEC_NOT_IMPL_INT16 or - self._platform.decoded_model['M_VAh_SF'] not in SUNSPEC_SF_RANGE + if ( + self._platform.decoded_model[model_key] == SUNSPEC_NOT_ACCUM_ACC32 + or self._platform.decoded_model[model_key] > SUNSPEC_ACCUM_LIMIT + or self._platform.decoded_model["M_VAh_SF"] == SUNSPEC_NOT_IMPL_INT16 + or self._platform.decoded_model["M_VAh_SF"] not in SUNSPEC_SF_RANGE ): return None - + else: - value = scale_factor(self._platform.decoded_model[model_key], self._platform.decoded_model['M_VAh_SF']) - + value = scale_factor( + self._platform.decoded_model[model_key], + self._platform.decoded_model["M_VAh_SF"], + ) + try: return update_accum(self, value, value) - except: + except Exception: return None - + except TypeError: return None + class MetervarhIE(SolarEdgeSensorBase): device_class = SensorDeviceClass.ENERGY state_class = SensorStateClass.TOTAL_INCREASING @@ -1194,60 +1388,71 @@ def __init__(self, platform, config_entry, coordinator, phase: str = None): def icon(self) -> str: if self._phase is None: return None - - elif re.match('import', self._phase.lower()): - return 'mdi:transmission-tower-export' - - elif re.match('export', self._phase.lower()): - return 'mdi:transmission-tower-import' - + + elif re.match("import", self._phase.lower()): + return "mdi:transmission-tower-export" + + elif re.match("export", self._phase.lower()): + return "mdi:transmission-tower-import" + else: return None @property def unique_id(self) -> str: - if self._phase == None: + if self._phase is None: raise NotImplementedError else: - return f"{self._platform.model}_{self._platform.serial}_{self._phase.lower()}_varh" - + return ( + f"{self._platform.model}_{self._platform.serial}_", + f"{self._phase.lower()}_varh", + ) + @property def entity_registry_enabled_default(self) -> bool: return False - + @property def name(self) -> str: - if self._phase == None: + if self._phase is None: raise NotImplementedError else: - return f"{self._platform._device_info['name']} {re.sub('_', ' ', self._phase)} varh" - + return ( + f"{self._platform._device_info['name']} ", + f"{re.sub('_', ' ', self._phase)} varh", + ) + @property def native_value(self): - if self._phase == None: + if self._phase is None: raise NotImplementedError else: model_key = f"M_varh_{self._phase}" try: - if (self._platform.decoded_model[model_key] == SUNSPEC_NOT_ACCUM_ACC32 or - self._platform.decoded_model[model_key] > SUNSPEC_ACCUM_LIMIT or - self._platform.decoded_model['M_varh_SF'] == SUNSPEC_NOT_IMPL_INT16 or - self._platform.decoded_model['M_varh_SF'] not in SUNSPEC_SF_RANGE + if ( + self._platform.decoded_model[model_key] == SUNSPEC_NOT_ACCUM_ACC32 + or self._platform.decoded_model[model_key] > SUNSPEC_ACCUM_LIMIT + or self._platform.decoded_model["M_varh_SF"] == SUNSPEC_NOT_IMPL_INT16 + or self._platform.decoded_model["M_varh_SF"] not in SUNSPEC_SF_RANGE ): return None - + else: - value = scale_factor(self._platform.decoded_model[model_key], self._platform.decoded_model['M_varh_SF']) - + value = scale_factor( + self._platform.decoded_model[model_key], + self._platform.decoded_model["M_varh_SF"], + ) + try: return update_accum(self, value, value) - except: + except Exception: return None - + except TypeError: return None + class SolarEdgeBatteryAvgTemp(HeatSinkTemperature): @property def unique_id(self) -> str: @@ -1261,17 +1466,19 @@ def name(self) -> str: def native_value(self): try: if ( - self._platform.decoded_model['B_Temp_Average'] == SUNSPEC_NOT_IMPL_FLOAT32 - or self._platform.decoded_model['B_Temp_Average'] == 0xFF7FFFFF - or self._platform.decoded_model['B_Temp_Average'] == 0x7F7FFFFF + self._platform.decoded_model["B_Temp_Average"] + == SUNSPEC_NOT_IMPL_FLOAT32 + or self._platform.decoded_model["B_Temp_Average"] == 0xFF7FFFFF + or self._platform.decoded_model["B_Temp_Average"] == 0x7F7FFFFF ): return None - + else: - return round(self._platform.decoded_model['B_Temp_Average'], 1) - + return round(self._platform.decoded_model["B_Temp_Average"], 1) + except TypeError: - return None + return None + class SolarEdgeBatteryMaxTemp(HeatSinkTemperature): @property @@ -1281,7 +1488,7 @@ def unique_id(self) -> str: @property def name(self) -> str: return f"{self._platform._device_info['name']} Max Temperature" - + @property def entity_registry_enabled_default(self) -> bool: return False @@ -1290,85 +1497,89 @@ def entity_registry_enabled_default(self) -> bool: def native_value(self): try: if ( - self._platform.decoded_model['B_Temp_Max'] == SUNSPEC_NOT_IMPL_FLOAT32 - or self._platform.decoded_model['B_Temp_Max'] == 0xFF7FFFFF - or self._platform.decoded_model['B_Temp_Max'] == 0x7F7FFFFF + self._platform.decoded_model["B_Temp_Max"] == SUNSPEC_NOT_IMPL_FLOAT32 + or self._platform.decoded_model["B_Temp_Max"] == 0xFF7FFFFF + or self._platform.decoded_model["B_Temp_Max"] == 0x7F7FFFFF ): return None - + else: - return round(self._platform.decoded_model['B_Temp_Max'], 1) - + return round(self._platform.decoded_model["B_Temp_Max"], 1) + except TypeError: - return None + return None + class SolarEdgeBatteryVoltage(DCVoltage): @property def native_value(self): try: if ( - self._platform.decoded_model['B_DC_Voltage'] == SUNSPEC_NOT_IMPL_FLOAT32 - or self._platform.decoded_model['B_DC_Voltage'] == 0xFF7FFFFF - or self._platform.decoded_model['B_DC_Voltage'] == 0x7F7FFFFF + self._platform.decoded_model["B_DC_Voltage"] == SUNSPEC_NOT_IMPL_FLOAT32 + or self._platform.decoded_model["B_DC_Voltage"] == 0xFF7FFFFF + or self._platform.decoded_model["B_DC_Voltage"] == 0x7F7FFFFF ): return None - - elif self._platform.decoded_model['B_Status'] in [0]: + + elif self._platform.decoded_model["B_Status"] in [0]: return None - + else: - return round(self._platform.decoded_model['B_DC_Voltage'], 2) - + return round(self._platform.decoded_model["B_DC_Voltage"], 2) + except TypeError: - return None + return None + class SolarEdgeBatteryCurrent(DCCurrent): @property def native_value(self): try: if ( - self._platform.decoded_model['B_DC_Current'] == SUNSPEC_NOT_IMPL_FLOAT32 - or self._platform.decoded_model['B_DC_Current'] == 0xFF7FFFFF - or self._platform.decoded_model['B_DC_Current'] == 0x7F7FFFFF + self._platform.decoded_model["B_DC_Current"] == SUNSPEC_NOT_IMPL_FLOAT32 + or self._platform.decoded_model["B_DC_Current"] == 0xFF7FFFFF + or self._platform.decoded_model["B_DC_Current"] == 0x7F7FFFFF ): return None - - elif self._platform.decoded_model['B_Status'] in [0]: + + elif self._platform.decoded_model["B_Status"] in [0]: return None - + else: - return round(self._platform.decoded_model['B_DC_Current'], 2) - + return round(self._platform.decoded_model["B_DC_Current"], 2) + except TypeError: - return None + return None + class SolarEdgeBatteryPower(DCPower): - icon = 'mdi:lightning-bolt' - + icon = "mdi:lightning-bolt" + @property def native_value(self): try: if ( - self._platform.decoded_model['B_DC_Power'] == SUNSPEC_NOT_IMPL_FLOAT32 - or self._platform.decoded_model['B_DC_Power'] == 0xFF7FFFFF - or self._platform.decoded_model['B_DC_Power'] == 0x7F7FFFFF + self._platform.decoded_model["B_DC_Power"] == SUNSPEC_NOT_IMPL_FLOAT32 + or self._platform.decoded_model["B_DC_Power"] == 0xFF7FFFFF + or self._platform.decoded_model["B_DC_Power"] == 0x7F7FFFFF ): return None - - elif self._platform.decoded_model['B_Status'] in [0]: + + elif self._platform.decoded_model["B_Status"] in [0]: return None - + else: - return round(self._platform.decoded_model['B_DC_Power'], 2) - + return round(self._platform.decoded_model["B_DC_Power"], 2) + except TypeError: - return None + return None + class SolarEdgeBatteryEnergyExport(SolarEdgeSensorBase): device_class = SensorDeviceClass.ENERGY state_class = SensorStateClass.TOTAL_INCREASING native_unit_of_measurement = ENERGY_KILO_WATT_HOUR - icon = 'mdi:battery-charging-20' + icon = "mdi:battery-charging-20" def __init__(self, platform, config_entry, coordinator): super().__init__(platform, config_entry, coordinator) @@ -1386,23 +1597,28 @@ def name(self) -> str: @property def native_value(self): try: - if self._platform.decoded_model['B_Export_Energy_WH'] == 0xFFFFFFFFFFFFFFFF: + if self._platform.decoded_model["B_Export_Energy_WH"] == 0xFFFFFFFFFFFFFFFF: return None - - else: + + else: try: - return watts_to_kilowatts(update_accum(self, self._platform.decoded_model['B_Export_Energy_WH'])) - except: + return watts_to_kilowatts( + update_accum( + self, self._platform.decoded_model["B_Export_Energy_WH"] + ) + ) + except Exception: return None - + except TypeError: return None + class SolarEdgeBatteryEnergyImport(SolarEdgeSensorBase): device_class = SensorDeviceClass.ENERGY state_class = SensorStateClass.TOTAL_INCREASING native_unit_of_measurement = ENERGY_KILO_WATT_HOUR - icon = 'mdi:battery-charging-100' + icon = "mdi:battery-charging-100" def __init__(self, platform, config_entry, coordinator): super().__init__(platform, config_entry, coordinator) @@ -1420,18 +1636,23 @@ def name(self) -> str: @property def native_value(self): try: - if self._platform.decoded_model['B_Import_Energy_WH'] == 0xFFFFFFFFFFFFFFFF: + if self._platform.decoded_model["B_Import_Energy_WH"] == 0xFFFFFFFFFFFFFFFF: return None - - else: + + else: try: - return watts_to_kilowatts(update_accum(self, self._platform.decoded_model['B_Import_Energy_WH'])) - except: + return watts_to_kilowatts( + update_accum( + self, self._platform.decoded_model["B_Import_Energy_WH"] + ) + ) + except Exception: return None - + except TypeError: return None + class SolarEdgeBatteryMaxEnergy(SolarEdgeSensorBase): device_class = SensorDeviceClass.ENERGY state_class = SensorStateClass.MEASUREMENT @@ -1440,7 +1661,7 @@ class SolarEdgeBatteryMaxEnergy(SolarEdgeSensorBase): def __init__(self, platform, config_entry, coordinator): super().__init__(platform, config_entry, coordinator) """Initialize the sensor.""" - + @property def unique_id(self) -> str: return f"{self._platform.model}_{self._platform.serial}_max_energy" @@ -1450,16 +1671,17 @@ def name(self) -> str: return f"{self._platform._device_info['name']} Maximum Energy" @property - def native_value(self): + def native_value(self): if ( - self._platform.decoded_model['B_Energy_Max']== SUNSPEC_NOT_IMPL_FLOAT32 - or self._platform.decoded_model['B_Energy_Max'] == 0xFF7FFFFF - or self._platform.decoded_model['B_Energy_Max'] == 0x7F7FFFFF + self._platform.decoded_model["B_Energy_Max"] == SUNSPEC_NOT_IMPL_FLOAT32 + or self._platform.decoded_model["B_Energy_Max"] == 0xFF7FFFFF + or self._platform.decoded_model["B_Energy_Max"] == 0x7F7FFFFF ): return None - + else: - return watts_to_kilowatts(self._platform.decoded_model['B_Energy_Max']) + return watts_to_kilowatts(self._platform.decoded_model["B_Energy_Max"]) + class SolarEdgeBatteryAvailableEnergy(SolarEdgeSensorBase): device_class = SensorDeviceClass.ENERGY @@ -1481,14 +1703,18 @@ def name(self) -> str: @property def native_value(self): if ( - self._platform.decoded_model['B_Energy_Available']== SUNSPEC_NOT_IMPL_FLOAT32 - or self._platform.decoded_model['B_Energy_Available'] == 0xFF7FFFFF - or self._platform.decoded_model['B_Energy_Available'] == 0x7F7FFFFF + self._platform.decoded_model["B_Energy_Available"] + == SUNSPEC_NOT_IMPL_FLOAT32 + or self._platform.decoded_model["B_Energy_Available"] == 0xFF7FFFFF + or self._platform.decoded_model["B_Energy_Available"] == 0x7F7FFFFF ): return None - + else: - return watts_to_kilowatts(self._platform.decoded_model['B_Energy_Available']) + return watts_to_kilowatts( + self._platform.decoded_model["B_Energy_Available"] + ) + class SolarEdgeBatterySOH(SolarEdgeSensorBase): state_class = SensorStateClass.MEASUREMENT @@ -1510,15 +1736,16 @@ def name(self) -> str: @property def native_value(self): if ( - self._platform.decoded_model['B_SOH']== SUNSPEC_NOT_IMPL_FLOAT32 - or self._platform.decoded_model['B_SOH'] == 0xFF7FFFFF - or self._platform.decoded_model['B_SOH'] == 0x7F7FFFFF - or self._platform.decoded_model['B_SOH'] < 0 - or self._platform.decoded_model['B_SOH'] > 100 + self._platform.decoded_model["B_SOH"] == SUNSPEC_NOT_IMPL_FLOAT32 + or self._platform.decoded_model["B_SOH"] == 0xFF7FFFFF + or self._platform.decoded_model["B_SOH"] == 0x7F7FFFFF + or self._platform.decoded_model["B_SOH"] < 0 + or self._platform.decoded_model["B_SOH"] > 100 ): return None else: - return round(self._platform.decoded_model['B_SOH'], 0) + return round(self._platform.decoded_model["B_SOH"], 0) + class SolarEdgeBatterySOE(SolarEdgeSensorBase): state_class = SensorStateClass.MEASUREMENT @@ -1540,15 +1767,16 @@ def name(self) -> str: @property def native_value(self): if ( - self._platform.decoded_model['B_SOE']== SUNSPEC_NOT_IMPL_FLOAT32 - or self._platform.decoded_model['B_SOE'] == 0xFF7FFFFF - or self._platform.decoded_model['B_SOE'] == 0x7F7FFFFF - or self._platform.decoded_model['B_SOE'] < 0 - or self._platform.decoded_model['B_SOE'] > 100 + self._platform.decoded_model["B_SOE"] == SUNSPEC_NOT_IMPL_FLOAT32 + or self._platform.decoded_model["B_SOE"] == 0xFF7FFFFF + or self._platform.decoded_model["B_SOE"] == 0x7F7FFFFF + or self._platform.decoded_model["B_SOE"] < 0 + or self._platform.decoded_model["B_SOE"] > 100 ): return None else: - return round(self._platform.decoded_model['B_SOE'], 0) + return round(self._platform.decoded_model["B_SOE"], 0) + class SolarEdgeBatteryStatus(Status): def __init__(self, platform, config_entry, coordinator): @@ -1558,23 +1786,25 @@ def __init__(self, platform, config_entry, coordinator): @property def native_value(self): try: - if (self._platform.decoded_model['B_Status'] == SUNSPEC_NOT_IMPL_UINT32): + if self._platform.decoded_model["B_Status"] == SUNSPEC_NOT_IMPL_UINT32: return None - + else: - return str(self._platform.decoded_model['B_Status']) - + return str(self._platform.decoded_model["B_Status"]) + except TypeError: return None - + @property def extra_state_attributes(self): attrs = {} - + try: - if self._platform.decoded_model['B_Status'] in BATTERY_STATUS: - attrs["status_text"] = BATTERY_STATUS[self._platform.decoded_model['B_Status']] - + if self._platform.decoded_model["B_Status"] in BATTERY_STATUS: + attrs["status_text"] = BATTERY_STATUS[ + self._platform.decoded_model["B_Status"] + ] + except KeyError: pass