From 703c31f2ecf08adf2492de9fd38621ddb0c52852 Mon Sep 17 00:00:00 2001 From: Alexander Stolyarov Date: Fri, 19 Jan 2024 13:24:18 +0300 Subject: [PATCH] el modbus examples --- README.md | 108 ++- read_el_control_board_serial.py | 115 +++ read_el_device_model.py | 129 +++ read_el_errors.py | 198 ++++ read_el_params.py | 226 +++++ requirements.txt | 1 + run_el_maintenance.py | 1539 ++++++++++++++++++++++++++++++ write_el_production_rate.py | 128 +++ write_el_reboot.py | 138 +++ write_el_syslog_skip_priority.py | 375 ++++++++ 10 files changed, 2955 insertions(+), 2 deletions(-) create mode 100644 read_el_control_board_serial.py create mode 100644 read_el_device_model.py create mode 100644 read_el_errors.py create mode 100644 read_el_params.py create mode 100644 requirements.txt create mode 100644 run_el_maintenance.py create mode 100644 write_el_production_rate.py create mode 100644 write_el_reboot.py create mode 100644 write_el_syslog_skip_priority.py diff --git a/README.md b/README.md index ebd0967..df8934c 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,106 @@ -# modbus-scripts -Examples of interaction with Enapter devices over Modbus communication protocol +## Introduction + +This repository contains examples of interaction with Enapter devices over +Modbus communication protocol. Writing holding registers allows to execute +commands (e.g. reboot), set specific parameters or update configuration. + +Reading registers (both holding and inputs) allows to get current hardware status +(e.g. switches states, H2 production parameters, timings, etc.), current configuration, +detect configuration problems. + +All information regarding registers, events (errors/warnings), etc. is available at [Enapter Handbook](https://handbook.enapter.com). + +### Requirements + +Python is used as programming language, version >= 3.10 is required. +Please refer to the actual [downloads](https://www.python.org/downloads/) and [documentation](https://www.python.org/doc/). + +Git is required to clone the repository. +Please refer to the actual [downloads](https://www.git-scm.com/downloads) and [documentation](https://www.git-scm.com/doc). + +[pyModbusTCP](https://pypi.org/project/pyModbusTCP/0.2.1/) package is required. +Please refer to the actual documentation regarding [usage of virtual environments](https://docs.python.org/3/library/venv.html) and [packages installation](https://packaging.python.org/en/latest/tutorials/installing-packages/). + +### Running scripts + +Please refer to the actual [documentation](https://docs.python.org/3/using/cmdline.html) regarding general information about running Python scripts. + +Each script requires two parameters - Modbus IP address and Modbus port. +IP address is required parameter, default port is _502_. + +**Running script with default port:** +``` +python3 /.py --modbus-ip
+``` + +**Running script with custom port:** +``` +python3 /.py --modbus-ip
--modbus-port +``` + +### Scripts description + +**_read_el_control_board_serial.py_** + +Read and decode control board serial number input (6) to a human-readable string value (e.g. '9E25E695-A66A-61DD-6570-50DB4E73652D'). + +**_read_el_device_model.py_** + +Read and decode device model input register (0) to a human-readable string value (e.g. 'EL21', 'EL40', etc.). + +**_read_el_errors.py_** + +Read and decode errors input register (832) to a list of human-readable strings with error name and hex +value (e.g. 'WR_20 (0x3194)'). Since new firmwares may add new events, UNKNOWN errors may be identified by hex value. + +**_read_el_params.py_** + +Read and decode current hardware parameters: +- system state (input, 18) +- uptime (input, 22) +- total H2 production (input, 1006) +- production rate (holding, 1002) +- high electrolyte level switch (input, 7000) +- very high electrolyte level switch (input, 7001) +- low electrolyte level switch (input, 7002) +- medium electrolyte level switch (input, 7003) +- electrolyte tank high pressure switch (input, 7004) +- electronic compartment high temperature Switch (input, 7007) +- chassis water presence switch (input, 7009)). + +**_run_el_maintenance.py_** + +Interactive script to perform maintenance on EL2.1/4.x by following the instructions in console. + +**ATTENTION!** Maintenance requires manual actions with electrolyser such as electrolyte draining, +flushing (for 4.x) and refilling. + +If script is terminated for some reason (e.g. due to network failure), in most cases it can be re-run and +maintenance will continue. + +Only refilling is performed in case of first maintenance (from factory state). + +**_write_el_production_rate.py_** + +- Read current value of the production rate percent holding register (1002) +- Write random value in 90-99 range +- Read register again to check that it contains new value + +- write_el_reboot.py + +- Write 1 to reboot holding register (4) +- Wait until electrolyser is rebooted +- Read state input register (1200) + +**_write_el_syslog_skip_priority.py_** + +- Read current value of the log skip priority holding register (4042) +- Check that there is no other configuration in progress (read configuration in progress input register (4000)) +- Begin configuration (write 1 to the configuration begin holding register (4000)) +- Ensure that configuration source is Modbus (read configuration over modbus input register (4001)) +- Write random value in 0-6 range (excluding current value) to the log skip priority holding register (4042) +- Check that configuration is OK (read configuration last result input register (4002)) +- Read log skip priority holding register (4042) again to check that it contains new value + +NOTICE. Log skip priority holding register (4042) has int32 type, so it may contain any value in the appropriate +range. Values less than 0 are considered as DISABLE_LOGGING (0), values greater than 6 are considered as ALL_MESSAGES (6). diff --git a/read_el_control_board_serial.py b/read_el_control_board_serial.py new file mode 100644 index 0000000..e264264 --- /dev/null +++ b/read_el_control_board_serial.py @@ -0,0 +1,115 @@ +# Copyright 2024 Enapter +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import sys +import uuid + +from typing import Final + +try: + from pyModbusTCP import client, utils + +except ImportError: + print( + 'No pyModbusTCP module installed.\n.' + '1. Create virtual environment\n' + '2. Run \'pip install pyModbusTCP==0.2.1\'' + ) + + raise + + +# Supported Python version +MIN_PYTHON_VERSION: Final[tuple[int, int]] = (3, 10) + +# Register address +CONTROL_BOARD_SERIAL_INPUT: Final[int] = 6 + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description='Reading EL control board serial with Modbus' + ) + + parser.add_argument( + '--modbus-ip', '-i', help='Modbus IP address', required=True + ) + + parser.add_argument( + '--modbus-port', '-p', help='Modbus port', type=int, default=502 + ) + + return parser.parse_args() + + +def main() -> None: + if sys.version_info < MIN_PYTHON_VERSION: + raise RuntimeError( + f'Python version >=' + f' {".".join(str(version) for version in MIN_PYTHON_VERSION)} is' + f' required' + ) + + args: argparse.Namespace = parse_args() + + modbus_client: client.ModbusClient = client.ModbusClient( + host=args.modbus_ip, port=args.modbus_port + ) + + try: + # Read control board serial input register, address is 6. Register type + # is uint128, so number of registers to read is 128 / 16 = 8. + raw_board_serial: list[int] = modbus_client.read_input_registers( + reg_addr=CONTROL_BOARD_SERIAL_INPUT, reg_nb=8 + ) + + print(f'Got raw control board serial data: {raw_board_serial}') + + # Convert raw response to single int value. pyModbusTCP utils has no + # built-in method for uint128, combining with 'manual' conversion. + long_long_list: list[int] = utils.word_list_to_long( + val_list=raw_board_serial, long_long=True + ) + + converted_board_serial: int = ( + long_long_list[0] << 64 | long_long_list[1] + ) + + print( + f'Got converted int value: {converted_board_serial}' + ) + + # Decode converted int to human-readable mainboard id which in fact is + # string representation of UUID. + decoded_board_serial: str = str( + uuid.UUID(int=converted_board_serial) + ).upper() + + print( + f'Got decoded human-readable serial number: {decoded_board_serial}' + ) + + except Exception as e: + # If something went wrong, we can access Modbus error/exception info. + # For example, in case of connection problems, reading register will + # return None and script will fail with error while data converting, + # but real problem description will be stored in client. + print(f'Exception occurred: {e}') + print(f'Modbus error: {modbus_client.last_error_as_txt}') + print(f'Modbus exception: {modbus_client.last_except_as_txt}') + + raise + + +if __name__ == '__main__': + main() diff --git a/read_el_device_model.py b/read_el_device_model.py new file mode 100644 index 0000000..1ec65fc --- /dev/null +++ b/read_el_device_model.py @@ -0,0 +1,129 @@ +# Copyright 2024 Enapter +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import sys + +from enum import StrEnum +from typing import Any, Final, Self + +try: + from pyModbusTCP import client, utils + +except ImportError: + print( + 'No pyModbusTCP module installed.\n.' + '1. Create virtual environment\n' + '2. Run \'pip install pyModbusTCP==0.2.1\'' + ) + + raise + + +# Supported Python version +MIN_PYTHON_VERSION: Final[tuple[int, int]] = (3, 10) + +# Register address +DEVICE_MODEL_INPUT: Final[int] = 0 + + +class DeviceModel(StrEnum): + """ + Values for ProjectId input register (0). + """ + + UNKNOWN = 'UNKNOWN' + + # Specific value for EL21. + EL21 = 'EL21' + + # Specific values for EL40. + EL40 = 'EL40' + ES40 = 'ES40' + + # Specific value for EL41. + ES41 = 'ES41' + + @classmethod + def _missing_(cls, value: Any) -> Self: + return cls.UNKNOWN + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description='Reading EL device model with Modbus' + ) + + parser.add_argument( + '--modbus-ip', '-i', help='Modbus IP address', required=True + ) + + parser.add_argument( + '--modbus-port', '-p', help='Modbus port', type=int, default=502 + ) + + return parser.parse_args() + + +def main() -> None: + if sys.version_info < MIN_PYTHON_VERSION: + raise RuntimeError( + f'Python version >=' + f' {".".join(str(version) for version in MIN_PYTHON_VERSION)} is' + f' required' + ) + + args: argparse.Namespace = parse_args() + + modbus_client: client.ModbusClient = client.ModbusClient( + host=args.modbus_ip, port=args.modbus_port + ) + + try: + # Read ProjectId input register, address is 0. Register type is uint32, + # so number of registers to read is 32 / 16 = 2. + raw_device_model: list[int] = modbus_client.read_input_registers( + reg_addr=DEVICE_MODEL_INPUT, reg_nb=2 + ) + + print(f'Got raw device model data: {raw_device_model}') + + # Convert raw response to single int value with pyModbusTCP utils. + converted_device_model: int = utils.word_list_to_long( + val_list=raw_device_model + )[0] + + print(f'Got converted int value: {converted_device_model}') + + # Decode converted int to human-readable device model. + decoded_device_model: DeviceModel = DeviceModel( + bytes.fromhex(f'{converted_device_model:x}').decode() + ) + + print( + f'Got decoded human-readable device model: {decoded_device_model}' + ) + + except Exception as e: + # If something went wrong, we can access Modbus error/exception info. + # For example, in case of connection problems, reading register will + # return None and script will fail with error while data converting, + # but real problem description will be stored in client. + print(f'Exception occurred: {e}') + print(f'Modbus error: {modbus_client.last_error_as_txt}') + print(f'Modbus exception: {modbus_client.last_except_as_txt}') + + raise + + +if __name__ == '__main__': + main() diff --git a/read_el_errors.py b/read_el_errors.py new file mode 100644 index 0000000..eacc8d0 --- /dev/null +++ b/read_el_errors.py @@ -0,0 +1,198 @@ +# Copyright 2024 Enapter +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import sys + +from enum import IntEnum +from typing import Any, Final, Self + +try: + from pyModbusTCP import client, utils + +except ImportError: + print( + 'No pyModbusTCP module installed.\n.' + '1. Create virtual environment\n' + '2. Run \'pip install pyModbusTCP==0.2.1\'' + ) + + raise + + +# Supported Python version +MIN_PYTHON_VERSION: Final[tuple[int, int]] = (3, 10) + +# Register address +ERRORS_INPUT: Final[int] = 832 + + +class Error(IntEnum): + """ + Values for errors input register (832). Names may be used for human-readable + description. + """ + + UNKNOWN = -1 + + # Common errors for EL2.1 and EL4.x. + INTERNAL_ERROR = 0x0FFF + FP_01 = 0x1F81 + FP_02 = 0x1F82 + FP_03 = 0x1F83 + FP_06 = 0x1F86 + FC_10 = 0x108A + FD_20 = 0x1114 + FR_10 = 0x118A + FR_20 = 0x1194 + FR_50 = 0x11B2 + FR_51 = 0x11B3 + FR_52 = 0x11B4 + FR_40 = 0x11A8 + FS_01 = 0x1201 + FS_10 = 0x120A + FT_10 = 0x128A + ET_10 = 0x228A + EU_10 = 0x230A + FX_03 = 0x1403 + FX_04 = 0x1404 + FX_07 = 0x1407 + FX_11 = 0x140B + FX_12 = 0x140C + FX_30 = 0x141E + FX_31 = 0x141F + FX_33 = 0x1421 + FX_35 = 0x1423 + FX_36 = 0x1424 + FX_37 = 0x1425 + FX_38 = 0x1426 + FX_39 = 0x1427 + FF_10 = 0x148A + FL_01 = 0x1501 + FO_30 = 0x159E + + # Specific values for EL2.1. + EX_01 = 0x2401 + FX_02 = 0x1402 + FX_05 = 0x1405 + FX_08 = 0x1408 + FX_09 = 0x1409 + FX_10 = 0x140A + FX_32 = 0x1420 + FX_34 = 0x1422 + FX_40 = 0x1428 + + # Specific values for EL4.x. + FX_13 = 0x140D + FX_42 = 0x142A + FB_10 = 0x170A + FB_11 = 0x170B + FB_20 = 0x1714 + FB_21 = 0x1715 + FY_10 = 0x178A + FY_20 = 0x1794 + FY_21 = 0x1795 + FY_22 = 0x1796 + FY_23 = 0x1797 + FY_24 = 0x1798 + FY_25 = 0x1799 + FY_26 = 0x179A + FY_27 = 0x179B + FY_28 = 0x179C + FY_29 = 0x179D + FY_30 = 0x179E + FY_31 = 0x179F + FY_33 = 0x1721 + FY_51 = 0x17B3 + FY_52 = 0x17B4 + FY_53 = 0x17B5 + FY_54 = 0x17B6 + FY_55 = 0x17B7 + FY_56 = 0x17B8 + + @classmethod + def _missing_(cls, value: Any) -> Self: + return cls.UNKNOWN + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description='Reading EL errors with Modbus' + ) + + parser.add_argument( + '--modbus-ip', '-i', help='Modbus IP address', required=True + ) + + parser.add_argument( + '--modbus-port', '-p', help='Modbus port', type=int, default=502 + ) + + return parser.parse_args() + + +def main() -> None: + if sys.version_info < MIN_PYTHON_VERSION: + raise RuntimeError( + f'Python version >=' + f' {".".join(str(version) for version in MIN_PYTHON_VERSION)} is' + f' required' + ) + + args: argparse.Namespace = parse_args() + + modbus_client: client.ModbusClient = client.ModbusClient( + host=args.modbus_ip, port=args.modbus_port + ) + + try: + # Read input register with errors array, address is 832. This register + # has specific structure - first uint16 contains total amount of error + # events. Number of registers to read is 528 / 16 = 33. + raw_errors_data: list[int] = modbus_client.read_input_registers( + reg_addr=ERRORS_INPUT, reg_nb=33 + ) + + print(f'Got raw errors data: {raw_errors_data}') + + if errors_count := raw_errors_data[0]: + # Total amount of errors is not 0. + print(f'Got total errors count: {errors_count}') + + decoded_errors: list[str] = [ + f'{Error(error).name} ({hex(error)})' for error in + raw_errors_data[1:errors_count + 1] + ] + + print( + f'Got decoded errors: {", ".join(decoded_errors)}\nErrors' + f' description is available at https://handbook.enapter.com' + ) + + else: + # Total amount of errors is 0. + print('There are no errors') + + except Exception as e: + # If something went wrong, we can access Modbus error/exception info. + # For example, in case of connection problems, reading register will + # return None and script will fail with error while data converting, + # but real problem description will be stored in client. + print(f'Exception occurred: {e}') + print(f'Modbus error: {modbus_client.last_error_as_txt}') + print(f'Modbus exception: {modbus_client.last_except_as_txt}') + + raise + + +if __name__ == '__main__': + main() diff --git a/read_el_params.py b/read_el_params.py new file mode 100644 index 0000000..d0086c6 --- /dev/null +++ b/read_el_params.py @@ -0,0 +1,226 @@ +# Copyright 2024 Enapter +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import sys + +from enum import IntEnum +from typing import Any, Final, Self + +try: + from pyModbusTCP import client, utils + +except ImportError: + print( + 'No pyModbusTCP module installed.\n.' + '1. Create virtual environment\n' + '2. Run \'pip install pyModbusTCP==0.2.1\'' + ) + + raise + + +# Supported Python version +MIN_PYTHON_VERSION: Final[tuple[int, int]] = (3, 10) + + +class Holdings(IntEnum): + """ + Holding registers. + """ + + PRODUCTION_RATE = 1002 + + +class Inputs(IntEnum): + """ + Input registers. + """ + + SYSTEM_STATE = 18 + UPTIME = 22 + TOTAL_H2_PRODUCTION = 1006 + LSH102B_IN = 7000 + LSHH102A_IN = 7001 + LSL102D_IN = 7002 + LSM102C_IN = 7003 + PSH102_IN = 7004 + TSH108_IN = 7007 + WPS104_IN = 7009 + + +class SystemState(IntEnum): + """ + Values for state input register (18). + """ + + UNKNOWN = -1 + + INTERNAL_ERROR_SYSTEM_NOT_INITIALIZED_YET = 0 + SYSTEM_IN_OPERATION = 1 + ERROR = 2 + SYSTEM_IN_MAINTENANCE_MODE = 3 + FATAL_ERROR = 4 + SYSTEM_IN_EXPERT_MODE = 5 + + @classmethod + def _missing_(cls, value: Any) -> Self: + return cls.UNKNOWN + + +def _read_input_registers( + modbus_client: client.ModbusClient, address: int, count: int +) -> list[int]: + """ + Read input registers. + """ + return modbus_client.read_input_registers(reg_addr=address, reg_nb=count) + + +def _read_holding_registers( + modbus_client: client.ModbusClient, address: int, count: int +) -> list[int]: + """ + Read holding registers. + """ + return modbus_client.read_holding_registers(reg_addr=address, reg_nb=count) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description='Reading current EL params with Modbus' + ) + + parser.add_argument( + '--modbus-ip', '-i', help='Modbus IP address', required=True + ) + + parser.add_argument( + '--modbus-port', '-p', help='Modbus port', type=int, default=502 + ) + + return parser.parse_args() + + +def main() -> None: + if sys.version_info < MIN_PYTHON_VERSION: + raise RuntimeError( + f'Python version >=' + f' {".".join(str(version) for version in MIN_PYTHON_VERSION)} is' + f' required' + ) + + args: argparse.Namespace = parse_args() + + modbus_client: client.ModbusClient = client.ModbusClient( + host=args.modbus_ip, port=args.modbus_port + ) + + try: + # Read and decode system state input register, address is 18. Register + # type is enum16, technically it's similar to uint16, so number of + # registers to read is 16 / 16 = 1. + system_state: SystemState = SystemState( + _read_input_registers( + modbus_client=modbus_client, + address=Inputs.SYSTEM_STATE.value, count=1 + )[0] + ) + + print(f'Got system state: {system_state.name}') + + # Read uptime input register, address is 22. Register type is uint32, + # so number of registers to read is 32 / 16 = 2. + raw_uptime_data: list[int] = _read_input_registers( + modbus_client=modbus_client, address=Inputs.UPTIME.value, count=2 + ) + + # Convert raw response to single int value with pyModbusTCP utils. + converted_uptime: int = utils.word_list_to_long( + val_list=raw_uptime_data + )[0] + + print( + f'Got uptime in seconds: {converted_uptime}' + ) + + # Read stack total H2 production input register, address is 1006. + # Register type is float32, so number of registers to read is + # 32 / 16 = 2. + raw_h2_production_data: list[int] = _read_input_registers( + modbus_client=modbus_client, + address=Inputs.TOTAL_H2_PRODUCTION.value, count=2 + ) + + # Convert raw response to single float value with pyModbusTCP utils. + converted_h2_production: float = utils.decode_ieee( + val_int=utils.word_list_to_long(val_list=raw_h2_production_data)[0] + ) + + print(f'Got total H2 production in NL: {converted_h2_production}') + + # Read production rate holding register, address is 1002. Register type + # is float32, so number of registers to read is 32 / 16 = 2. + raw_production_rate_data: list[int] = ( + _read_holding_registers( + modbus_client=modbus_client, + address=Holdings.PRODUCTION_RATE.value, count=2 + ) + ) + + # Convert raw response to single float value with pyModbusTCP utils. + converted_production_rate: float = utils.decode_ieee( + val_int=utils.word_list_to_long( + val_list=raw_production_rate_data + )[0] + ) + + print(f'Got production rate in %: {converted_production_rate}') + + # Read 7000, 7001, 7002, 7003, 7004, 7007 and 7009 input registers + # (switches). Each register type is boolean, technically it's similar + # to uint16, so number of registers to read is 16 / 16 = 1. + for register, description in ( + (Inputs.LSH102B_IN, 'High Electrolyte Level Switch'), + (Inputs.LSHH102A_IN, 'Very High Electrolyte Level Switch'), + (Inputs.LSL102D_IN, 'Low Electrolyte Level Switch'), + (Inputs.LSM102C_IN, 'Medium Electrolyte Level Switch'), + (Inputs.PSH102_IN, 'Electrolyte Tank High Pressure Switch'), + ( + Inputs.TSH108_IN, + 'Electronic Compartment High Temperature Switch' + ), + (Inputs.WPS104_IN, 'Chassis Water Presence Switch') + + ): + switch_value: bool = bool( + _read_input_registers( + modbus_client=modbus_client, address=register.value, count=1 + )[0] + ) + + print(f'{register.name} ({description}) is {switch_value}') + + except Exception as e: + # If something went wrong, we can access Modbus error/exception info. + # For example, in case of connection problems, reading register will + # return None and script will fail with error while data converting, + # but real problem description will be stored in client. + print(f'Exception occurred: {e}') + print(f'Modbus error: {modbus_client.last_error_as_txt}') + print(f'Modbus exception: {modbus_client.last_except_as_txt}') + + raise + + +if __name__ == '__main__': + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..32aebe3 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +pyModbusTCP==0.2.1 diff --git a/run_el_maintenance.py b/run_el_maintenance.py new file mode 100644 index 0000000..2caf306 --- /dev/null +++ b/run_el_maintenance.py @@ -0,0 +1,1539 @@ +# Copyright 2024 Enapter +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import os +import sys +import time + +from enum import IntEnum, StrEnum +from typing import Any, Final, Self, TypeAlias, Union + +try: + from pyModbusTCP import client, utils + +except ImportError: + print( + 'No pyModbusTCP module installed.\n.' + '1. Create virtual environment\n' + '2. Run \'pip install pyModbusTCP==0.2.1\'' + ) + + raise + + +# Supported Python version +MIN_PYTHON_VERSION: Final[tuple[int, int]] = (3, 10) + +# Windows platform id +WIN32: Final[str] = 'win32' + +# Timeout (seconds) to complete draining +DRAINING_TIMEOUT: Final[int] = 300 * 2 + +# Timeout (seconds) to complete refilling +REFILLING_TIMEOUT: Final[int] = 300 + +# Timeout (seconds) to check refilling state +REFILLING_STATE_CHECK_TIMEOUT: Final[int] = 5 + +# Timeout (seconds) to check electrolyte presence while draining/refilling +ELECTROLYTE_PRESENCE_CHECK_TIMEOUT: Final[int] = 10 + +# Timeout (seconds) after writing to somehow guarantee that value is updated +REGISTER_WRITE_TIMEOUT: Final[int] = 2 + +# Electrolyte presence check is required after water pipe connection +MAX_WATER_PIPE_CONNECT_ATTEMPTS: Final[int] = 5 + +INPUT_CONFIRMATION: Final[str] = 'YES' + + +class MaintenanceModeException(RuntimeError): + """ + Custom exception to indicate runtime problems during Maintenance. + """ + + pass + + +class WaterPipeException(RuntimeError): + """ + Custom exception to indicate problems with water pipe. + """ + + pass + + +class HighElectrolyteLevelException(RuntimeError): + """ + Custom exception to indicate unexpected high electrolyte level. + """ + + pass + + +class ElectrolyteLevel(IntEnum): + """ + Representation of electrolyte levels for convenience. + """ + + EMPTY = 0 + LOW = 1 + MEDIUM = 2 + HIGH = 3 + VERY_HIGH = 4 + + +class ConsoleColor(StrEnum): + """ + Console colors including the end tag. + """ + + CYAN = '\033[96m' + GREEN = '\033[32m' + RED = '\033[91m' + YELLOW = '\033[33m' + + # Special tag to end colored text. + END = '\033[0m' + + +class Holdings(IntEnum): + """ + Holding registers. + """ + + MAINTENANCE = 1013 + FLUSHING = 1015 + REFILLING_MINWATERPPESSURE_BAR = 4400 + REFILLING_MAXWATERPPESSURE_BAR = 4402 + REFILLING_MIXINGTIME_MS = 4418 + + +class Inputs(IntEnum): + """ + Input registers. + """ + + DEVICE_MODEL = 0 + WARNINGS = 768 + ERRORS = 832 + STATE = 1200 + REFILLING_STATE = 1201 + LSH102B_IN = 7000 # High Electrolyte Level + LSHH102A_IN = 7001 # Very High Electrolyte Level + LSL102D_IN = 7002 # Low Electrolyte Level + LSM102C_IN = 7003 # Medium Electrolyte Level + PT105_IN_BAR = 7516 # Water Inlet Pressure + + +class DeviceModel(StrEnum): + """ + Values for ProjectId input register (0). + """ + + UNKNOWN = 'UNKNOWN' + + # Specific value for EL21. + EL21 = 'EL21' + + # Specific values for EL40. + EL40 = 'EL40' + ES40 = 'ES40' + + # Specific value for EL41. + ES41 = 'ES41' + + @classmethod + def _missing_(cls, value: Any) -> Self: + return cls.UNKNOWN + + +class State(IntEnum): + """ + Enum values for Electrolyser state input register (1200). + """ + + UNKNOWN = -1 + + # Common states for EL2.1 and EL4.x. + HALTED = 0 + MAINTENANCE_MODE = 1 + IDLE = 2 + STEADY = 3 + STAND_BY = 4 + CURVE = 5 + BLOWDOWN = 6 + + # Specific state for EL40. + RECOMBINER = 7 + + @classmethod + def _missing_(cls, value: Any) -> Self: + return cls.UNKNOWN + + +class RefillingState(IntEnum): + """ + Enum values for Electrolyser refilling state input register (1201). + """ + + UNKNOWN = -1 + + NONE = 0 + HALT = 1 + IDLE = 2 + FILLING = 3 + DRAINING = 4 + MAINTENANCE = 5 + KOH_REFILLING = 6 + MAINTENANCE_REFILLING = 7 + KOH_REFILLING_FINISH = 8 + FINAL_REFILLING = 9 + DEMAND_REFILLING = 10 + WAIT_FLUSHING = 11 + FLUSHING = 12 + SERVICE_REFILLING = 13 + + @classmethod + def _missing_(cls, value: Any) -> Self: + return cls.UNKNOWN + + +class ElError(IntEnum): + """ + Values for errors register (832). Names may be used for human-readable + description. + """ + + UNKNOWN = -1 + + # Common errors for EL2.1 and EL4.x. + + INTERNAL_ERROR = 0x0FFF + FP_01 = 0x1F81 + FP_02 = 0x1F82 + FP_03 = 0x1F83 + FP_06 = 0x1F86 + FC_10 = 0x108A + FD_20 = 0x1114 + FR_10 = 0x118A + FR_20 = 0x1194 + FR_50 = 0x11B2 + FR_51 = 0x11B3 + FR_52 = 0x11B4 + FR_40 = 0x11A8 + FS_01 = 0x1201 + FS_10 = 0x120A + FT_10 = 0x128A + ET_10 = 0x228A + EU_10 = 0x230A + FX_03 = 0x1403 + FX_04 = 0x1404 + FX_07 = 0x1407 + FX_11 = 0x140B + FX_12 = 0x140C + FX_30 = 0x141E + FX_31 = 0x141F + FX_33 = 0x1421 + FX_35 = 0x1423 + FX_36 = 0x1424 + FX_37 = 0x1425 + FX_38 = 0x1426 + FX_39 = 0x1427 + FF_10 = 0x148A + FL_01 = 0x1501 + FO_30 = 0x159E + + # Specific values for EL21. + EX_01 = 0x2401 + FX_02 = 0x1402 + FX_05 = 0x1405 + FX_08 = 0x1408 + FX_09 = 0x1409 + FX_10 = 0x140A + FX_32 = 0x1420 + FX_34 = 0x1422 + FX_40 = 0x1428 + + # Specific values for EL4.x. + FX_13 = 0x140D + FX_42 = 0x142A + FB_10 = 0x170A + FB_11 = 0x170B + FB_20 = 0x1714 + FB_21 = 0x1715 + FY_10 = 0x178A + FY_20 = 0x1794 + FY_21 = 0x1795 + FY_22 = 0x1796 + FY_23 = 0x1797 + FY_24 = 0x1798 + FY_25 = 0x1799 + FY_26 = 0x179A + FY_27 = 0x179B + FY_28 = 0x179C + FY_29 = 0x179D + FY_30 = 0x179E + FY_31 = 0x179F + FY_33 = 0x1721 + FY_51 = 0x17B3 + FY_52 = 0x17B4 + FY_53 = 0x17B5 + FY_54 = 0x17B6 + FY_55 = 0x17B7 + FY_56 = 0x17B8 + + @classmethod + def _missing_(cls, value: Any) -> Self: + return cls.UNKNOWN + + +class ElWarning(IntEnum): + """ + Values for warnings input register (768). Names may be used for + human-readable description. + """ + + UNKNOWN = -1 + + # Common warnings for EL2.1 and EL4.x. + WP_04 = 0x3F84 + WP_05 = 0x3F85 + WR_10 = 0x318A + WR_20 = 0x3194 + WR_21 = 0x3195 + WR_22 = 0x3196 + WR_51 = 0x31B3 + WR_52 = 0x31B4 + WR_53 = 0x31B5 + WS_21 = 0x3215 + WS_22 = 0x3216 + WS_30 = 0x321E + WT_20 = 0x3294 + WU_10 = 0x330A + WX_52 = 0x3434 + WX_53 = 0x3435 + WX_55 = 0x3437 + WX_59 = 0x343B + WX_60 = 0x343C + WX_61 = 0x343D + WX_62 = 0x343E + WX_63 = 0x343F + WX_64 = 0x3440 + WX_65 = 0x3441 + WX_66 = 0x3442 + WX_67 = 0x3443 + WX_69 = 0x3445 + WL_10 = 0x350A + WO_10 = 0x358A + WO_20 = 0x3594 + WH_10 = 0x360A + WH_11 = 0x360B + WH_12 = 0x360C + WZ_10 = 0x368A + + # Specific values for EL2.1. + WT_21 = 0x3295 + WX_50 = 0x3432 + WX_51 = 0x3433 + WX_54 = 0x3436 + WX_56 = 0x3438 + WX_57 = 0x3439 + + # Specific values for EL4.x. + WR_23 = 0x3197 + WR_54 = 0x31B6 + WX_14 = 0x340E + WX_15 = 0x340F + WX_70 = 0x3446 + + @classmethod + def _missing_(cls, value: Any) -> Self: + return cls.UNKNOWN + + +Event: TypeAlias = Union[type(ElError), type(ElWarning)] + + +def _text_color(text: str, color: ConsoleColor) -> str: + """ + Add color tags to string. + """ + return f'{color}{text}{ConsoleColor.END.value}' + + +def _print_cyan(text: str) -> None: + """ + Print console text with cyan color. + """ + print(_text_color(text=text, color=ConsoleColor.CYAN)) + + +def _print_green(text: str) -> None: + """ + Print console text with green color. + """ + print(_text_color(text=text, color=ConsoleColor.GREEN)) + + +def _print_red(text: str) -> None: + """ + Print console text with red color. + """ + print(_text_color(text=text, color=ConsoleColor.RED)) + + +def _print_yellow(text: str) -> None: + """ + Print console text with yellow color. + """ + print(_text_color(text=text, color=ConsoleColor.YELLOW)) + + +def _read_input_registers( + modbus_client: client.ModbusClient, address: int, count: int +) -> list[int]: + """ + Read input registers. + """ + return modbus_client.read_input_registers(reg_addr=address, reg_nb=count) + + +def _read_holding_registers( + modbus_client: client.ModbusClient, address: int, count: int +) -> list[int]: + """ + Read holding registers. + """ + return modbus_client.read_holding_registers(reg_addr=address, reg_nb=count) + + +def _write_single_register( + modbus_client: client.ModbusClient, address: int, value: int +) -> None: + """ + Write 16 bits register. + """ + modbus_client.write_single_register(reg_addr=address, reg_value=value) + + time.sleep(REGISTER_WRITE_TIMEOUT) + + +def _format_event(name: str, value: int) -> str: + """ + Format event as string representation and hex value. + """ + return f'{name} ({hex(value)})' + + +def _format_warning(warning: ElWarning) -> str: + """ + Format warning event. + """ + return _format_event(name=warning.name, value=warning.value) + + +def _decode_events( + modbus_client: client.ModbusClient, register: Inputs, event_type: Event +) -> list[str]: + # Read and decode input register with events (warnings or errors) array. + # These registers have specific structure - first uint16 contains total + # amount of events. Number of registers to read is 528 / 16 = 33. + events: list[str] = [] + + raw_events_data: list[int] = modbus_client.read_input_registers( + reg_addr=register.value, reg_nb=33 + ) + + if events_count := raw_events_data[0]: + # Total amount of events is not 0. + events = [ + _format_event(name=event_type(event).name, value=event) + for event in raw_events_data[1:events_count + 1] + ] + + return events + + +def _decode_warnings(modbus_client: client.ModbusClient) -> list[str]: + """ + Read and decode input register with warnings, address is 768. + """ + return _decode_events( + modbus_client=modbus_client, register=Inputs.WARNINGS, + event_type=ElWarning + ) + + +def _decode_errors(modbus_client: client.ModbusClient) -> list[str]: + """ + Read and decode input register with errors, address is 832. + """ + return _decode_events( + modbus_client=modbus_client, register=Inputs.ERRORS, + event_type=ElError + ) + + +def _check_refilling_warnings(modbus_client: client.ModbusClient) -> None: + """ + Check electrolyser refilling warnings. + """ + print('Checking if any refilling warnings exist...') + + if active_warnings := _decode_warnings(modbus_client=modbus_client): + if any( + warning in ( + _format_warning(warning=ElWarning.WR_10), + _format_warning(warning=ElWarning.WR_20), + _format_warning(warning=ElWarning.WR_21), + _format_warning(warning=ElWarning.WR_22), + ) + for warning in active_warnings + ): + raise MaintenanceModeException( + f'Seems like something went wrong while refilling, active' + f' warnings are: {", ".join(active_warnings)}\nPlease' + ' contact Enapter customer support' + ) + + else: + _print_yellow( + text=( + f'Seems like there are no refilling issues, but other' + f' active warnings found: {", ".join(active_warnings)}' + ) + ) + + else: + print('No warnings found') + + +def _actual_state(modbus_client: client.ModbusClient) -> State: + """ + Read and decode state input register, address is 1200. Register type is + enum16, technically it's similar to uint16, so number of registers to read + is 16 / 16 = 1. + """ + state: State = State( + _read_input_registers( + modbus_client=modbus_client, address=Inputs.STATE.value, count=1 + )[0] + ) + + _print_cyan(text=f'{state.name} system state detected...') + + return state + + +def _actual_water_inlet_pressure(modbus_client: client.ModbusClient) -> float: + """ + Read and decode water inlet pressure input register, address is 7516. + Register type is float32, so number of registers to read is 32 / 16 = 2. + """ + return utils.decode_ieee( + val_int=utils.word_list_to_long( + val_list=_read_input_registers( + modbus_client=modbus_client, address=Inputs.PT105_IN_BAR.value, + count=2 + ) + )[0] + ) + + +def _actual_refilling_state( + modbus_client: client.ModbusClient +) -> RefillingState: + """ + Read and decode refilling state input register, address is 1201. Register + type is enum16, technically it's similar to uint16, so number of registers + to read is 16 / 16 = 1. + """ + return RefillingState( + _read_input_registers( + modbus_client=modbus_client, address=Inputs.REFILLING_STATE.value, + count=1 + )[0] + ) + + +def _actual_refilling_mixing_time(modbus_client: client.ModbusClient) -> int: + """ + Read and decode refilling mixing time holding register, address is 4418. + Register type is uint32, so number of registers to read is 32 / 16 = 2. + Decoded value is in ms, return value is in seconds. + """ + return utils.word_list_to_long( + val_list=_read_holding_registers( + modbus_client=modbus_client, + address=Holdings.REFILLING_MIXINGTIME_MS.value, count=2 + ) + )[0] // 1000 + + +def _actual_refilling_min_water_pressure( + modbus_client: client.ModbusClient +) -> float: + """ + Read and decode refilling min water pressure holding register, address is + 4400. Register type is float32, so number of registers to read is + 32 / 16 = 2. + """ + return utils.decode_ieee( + val_int=utils.word_list_to_long( + val_list=_read_holding_registers( + modbus_client=modbus_client, + address=Holdings.REFILLING_MINWATERPPESSURE_BAR.value, count=2 + ) + )[0] + ) + + +def _actual_refilling_max_water_pressure( + modbus_client: client.ModbusClient +) -> float: + """ + Read and decode refilling max water pressure holding register, address is + 4402. Register type is float32, so number of registers to read is + 32 / 16 = 2. + """ + return utils.decode_ieee( + val_int=utils.word_list_to_long( + val_list=_read_holding_registers( + modbus_client=modbus_client, + address=Holdings.REFILLING_MAXWATERPPESSURE_BAR.value, count=2 + ) + )[0] + ) + + +def _low_level_switch(modbus_client: client.ModbusClient) -> bool: + """ + Read electrolyte low level switch (7002). Register type is boolean, + technically it's similar to uint16, so number of registers to read is + 16 / 16 = 1. + """ + return bool( + _read_input_registers( + modbus_client=modbus_client, address=Inputs.LSL102D_IN.value, + count=1 + )[0] + ) + + +def _medium_level_switch(modbus_client: client.ModbusClient) -> bool: + """ + Read electrolyte medium level switch (7003). Register type is boolean, + technically it's similar to uint16, so number of registers to read is + 16 / 16 = 1. + """ + return bool( + _read_input_registers( + modbus_client=modbus_client, address=Inputs.LSM102C_IN.value, + count=1 + )[0] + ) + + +def _high_level_switch(modbus_client: client.ModbusClient) -> bool: + """ + Read electrolyte high level switch (7000). Register type is boolean, + technically it's similar to uint16, so number of registers to read is + 16 / 16 = 1. + """ + return bool( + _read_input_registers( + modbus_client=modbus_client, address=Inputs.LSH102B_IN.value, + count=1 + )[0] + ) + + +def _very_high_level_switch(modbus_client: client.ModbusClient) -> bool: + """ + Read electrolyte very high level switch (7001). Register type is boolean, + technically it's similar to uint16, so number of registers to read is + 16 / 16 = 1. + """ + return bool( + _read_input_registers( + modbus_client=modbus_client, address=Inputs.LSHH102A_IN.value, + count=1 + )[0] + ) + + +def _search_top_switch(switches: list[bool], enabled: bool) -> int: + """ + Search first from the top (highest) enabled or disabled electrolyte level + switch. + """ + return next( + ( + len(switches) - index for index, switch in + enumerate(reversed(switches), start=0) if switch is enabled + ), 0 + ) + + +def _electrolyte_level( + modbus_client: client.ModbusClient, logging: bool = True +) -> ElectrolyteLevel: + """ + Read and report electrolyte level. + """ + + switches: list[bool] = [ + _low_level_switch(modbus_client=modbus_client), + _medium_level_switch(modbus_client=modbus_client), + _high_level_switch(modbus_client=modbus_client), + _very_high_level_switch(modbus_client=modbus_client) + ] + + electrolyte_level: ElectrolyteLevel = ElectrolyteLevel( + _search_top_switch(switches=switches, enabled=True) + ) + + if not all(switches_below_max := switches[:electrolyte_level.value]): + top_disabled_level: ElectrolyteLevel = ElectrolyteLevel( + _search_top_switch( + switches=switches_below_max, enabled=False + ) + ) + + raise MaintenanceModeException( + f'Detected enabled {electrolyte_level.name} level switch while at' + f' least one switch below ({top_disabled_level.name} level) is' + f' disabled. Please contact Enapter customer support.' + ) + + if logging: + _print_cyan( + text=f'{electrolyte_level.name} electrolyte level detected...' + ) + + return electrolyte_level + + +def _toggle_maintenance( + modbus_client: client.ModbusClient, enable: bool +) -> None: + """ + Write maintenance holding register, address is 1013. Register type is + boolean, technically it's similar to uint16. We must write 1/0 to turn + Maintenance mode on/off. + """ + _write_single_register( + modbus_client=modbus_client, address=Holdings.MAINTENANCE.value, + value=int(enable) + ) + + +def _toggle_flushing(modbus_client: client.ModbusClient, enable: bool) -> None: + """ + Write flushing holding register, address is 1015. Register type is + boolean, technically it's similar to uint16. We must write 0/1 to turn + flushing on/off (logic is reversed since in fact register means 'skip + flushing'). + """ + _write_single_register( + modbus_client=modbus_client, address=Holdings.FLUSHING.value, + value=int(not enable) + ) + + +def _flushing_on(modbus_client: client.ModbusClient) -> None: + """ + Turn flushing on. + """ + print('Turning flushing on...') + + _toggle_flushing(modbus_client=modbus_client, enable=True) + + print('Successfully turned flushing on') + + +def _maintenance_on( + modbus_client: client.ModbusClient, initial_state: State +) -> None: + """ + Turn Maintenance mode on. + """ + if initial_state is State.IDLE: + print(f'Turning Maintenance mode on...') + + _toggle_maintenance(modbus_client=modbus_client, enable=True) + + if ( + state := _actual_state(modbus_client=modbus_client) + ) is not State.MAINTENANCE_MODE: + raise MaintenanceModeException( + f'Got state {state.name} instead of ' + f'{State.MAINTENANCE_MODE.name}, please contact Enapter ' + f'customer support' + ) + + print('Successfully turned Maintenance mode on') + + else: + print('Maintenance mode is already on') + + +def _maintenance_off(modbus_client: client.ModbusClient) -> None: + """ + Turn Maintenance mode off. + """ + print(f'Turning Maintenance mode off...') + + _toggle_maintenance(modbus_client=modbus_client, enable=False) + + if (state := _actual_state(modbus_client=modbus_client)) is not State.IDLE: + raise MaintenanceModeException( + f'Got state {state.name} instead of {State.IDLE.name}, please ' + f'contact Enapter customer support' + ) + + print('Successfully turned Maintenance mode off') + + +def _check_refilling_state( + modbus_client: client.ModbusClient, expected_refilling_state: RefillingState +) -> None: + """ + Check specific refilling state. + """ + print('Checking refilling state...') + + if ( + actual_refilling_state := _actual_refilling_state( + modbus_client=modbus_client + ) + ) is not expected_refilling_state: + raise MaintenanceModeException( + f'Wrong refilling state detected, expected ' + f'{expected_refilling_state.name}, actual ' + f'{actual_refilling_state.name}. Please contact Enapter customer ' + f'support.' + ) + + _print_cyan( + f'Detected expected {actual_refilling_state.name} refilling state\n' + ) + + +def _handle_very_high_electrolyte_level( + modbus_client: client.ModbusClient, el_21: bool +) -> None: + """ + Handle very high electrolyte level while refilling. + """ + if _electrolyte_level( + modbus_client=modbus_client, logging=False + ) is ElectrolyteLevel.VERY_HIGH: + _print_red( + f'Detected {ElectrolyteLevel.VERY_HIGH.name} electrolyte level.' + ) + + if el_21: + _wait_confirmation( + prompt=( + f'Please drain electrolyte to {ElectrolyteLevel.HIGH.name}' + f' level.\nType {INPUT_CONFIRMATION} and press Enter to' + f' proceed:' + ) + ) + + _wait_electrolyte_level( + modbus_client=modbus_client, + expected_level=ElectrolyteLevel.HIGH, timeout=REFILLING_TIMEOUT + ) + + else: + raise MaintenanceModeException( + 'Please contact Enapter customer support' + ) + + +def _perform_draining(modbus_client: client.ModbusClient) -> None: + """ + Draining procedure with required checks. + """ + _check_refilling_state( + modbus_client=modbus_client, + expected_refilling_state=RefillingState.DRAINING + ) + + print( + f'\n=================================================================\n' + f'============================ DRAINING ===========================\n\n' + ) + _wait_confirmation( + prompt=( + f'Electrolyser is ready for draining.\nCurrent electrolyte level' + f' will be reported every {ELECTROLYTE_PRESENCE_CHECK_TIMEOUT}' + f' seconds until draining is complete.\nType {INPUT_CONFIRMATION}' + f' and press Enter to proceed:' + ) + ) + + _wait_electrolyte_level( + modbus_client=modbus_client, + expected_level=ElectrolyteLevel.EMPTY, timeout=DRAINING_TIMEOUT + ) + + +def _perform_flushing(modbus_client: client.ModbusClient) -> None: + """ + Flushing procedure with required checks. + """ + print( + f'\n=================================================================\n' + f'============================ FLUSHING ===========================\n\n' + ) + + _check_refilling_state( + modbus_client=modbus_client, + expected_refilling_state=RefillingState.WAIT_FLUSHING + ) + + _wait_confirmation( + prompt=( + f'Flushing will be performed.\n' + f'1. Disconnect pipe from FILL/DRAIN port\n' + f'2. Connect water pipe\n' + f'3. Type {INPUT_CONFIRMATION} and press Enter when ready:\n' + ) + ) + + _check_water_pipe_connection( + modbus_client=modbus_client, expect_no_electrolyte=True + ) + + _wait_confirmation( + prompt=( + f'\nNow electrolyser will be automatically refilled with' + f' water.\nCurrent electrolyte level will be reported every' + f' {ELECTROLYTE_PRESENCE_CHECK_TIMEOUT} seconds until flushing is' + f' complete.\nType {INPUT_CONFIRMATION} and press Enter to proceed:' + ) + ) + + _flushing_on(modbus_client=modbus_client) + + _wait_electrolyte_level( + modbus_client=modbus_client, expected_level=ElectrolyteLevel.HIGH, + timeout=REFILLING_TIMEOUT, refilling=True + ) + + _check_refilling_warnings(modbus_client=modbus_client) + + _print_yellow( + text=( + f'Now waiting until water is mixed with pump, this process will' + f' be monitored automatically...' + ) + ) + + _wait_refilling_state( + modbus_client=modbus_client, expected_state=RefillingState.DRAINING, + timeout=_actual_refilling_mixing_time(modbus_client=modbus_client) * 2 + ) + + _perform_draining(modbus_client=modbus_client) + + +def _perform_refilling( + modbus_client: client.ModbusClient, el_21: bool = False +) -> None: + """ + Refilling procedure with required checks. + """ + _check_refilling_state( + modbus_client=modbus_client, + expected_refilling_state=RefillingState.MAINTENANCE + ) + + print( + f'\n=================================================================\n' + f'=========================== REFILLING ===========================\n\n' + ) + + refill_to_level: ElectrolyteLevel = ( + ElectrolyteLevel.HIGH if el_21 else ElectrolyteLevel.MEDIUM + ) + + electrolyte_type: str = ( + '3.6L of 1% KOH' if el_21 else '2L of 1.54% KOH' + ) + + _wait_confirmation( + prompt=( + f'1. Prepare electrolyte bag with {electrolyte_type} solution\n' + f'2. Connect refilling pipe to FILL/DRAIN port\n' + f'3. Type {INPUT_CONFIRMATION} and press Enter when ready:\n' + ) + ) + _print_yellow( + text=( + f'Now carefully raise the electrolyte bag above the device to fill' + f' to {ElectrolyteLevel.LOW.name} level.\nPlease don\'t fill more' + f' at this step.' + ) + ) + + if not el_21: + _print_red(text='WARNING! Overfill may damage Electrolyser!') + + _print_yellow( + text=f'Waiting for {ElectrolyteLevel.LOW.name} level electrolyte...' + ) + + try: + _wait_electrolyte_level( + modbus_client=modbus_client, expected_level=ElectrolyteLevel.LOW, + timeout=REFILLING_TIMEOUT, refilling=True + ) + + except HighElectrolyteLevelException: + _print_yellow( + f'Higher than {ElectrolyteLevel.LOW.name} electrolyte level' + f' detected...' + ) + + _handle_very_high_electrolyte_level( + modbus_client=modbus_client, el_21=el_21 + ) + + else: + _check_refilling_state( + modbus_client=modbus_client, + expected_refilling_state=RefillingState.KOH_REFILLING + ) + + finally: + if el_21 and _electrolyte_level( + modbus_client=modbus_client + ).value < ElectrolyteLevel.HIGH.value: + _check_refilling_state( + modbus_client=modbus_client, + expected_refilling_state=RefillingState.KOH_REFILLING + ) + + if _electrolyte_level( + modbus_client=modbus_client + ).value < refill_to_level.value: + _print_yellow( + text=( + f'Now carefully raise the electrolyte bag above the device to' + f' fill to {refill_to_level.name} level.\nPlease don\'t fill' + f' more at this step.' + ) + ) + + if not el_21: + _print_red(text='WARNING! Overfill may damage Electrolyser!') + + _print_yellow( + text=f'Waiting for {refill_to_level.name} level electrolyte...' + ) + + try: + _wait_electrolyte_level( + modbus_client=modbus_client, expected_level=refill_to_level, + timeout=REFILLING_TIMEOUT, refilling=True + ) + + except HighElectrolyteLevelException: + _handle_very_high_electrolyte_level( + modbus_client=modbus_client, el_21=el_21 + ) + + _check_refilling_state( + modbus_client=modbus_client, + expected_refilling_state=RefillingState.KOH_REFILLING_FINISH + ) + + +def _finish_refilling(modbus_client: client.ModbusClient) -> None: + """ + Finish refilling procedure with required checks. + """ + _check_refilling_state( + modbus_client=modbus_client, + expected_refilling_state=RefillingState.SERVICE_REFILLING + ) + + _wait_electrolyte_level( + modbus_client=modbus_client, expected_level=ElectrolyteLevel.HIGH, + timeout=REFILLING_TIMEOUT, refilling=True + ) + + _print_yellow( + text=( + f'Now waiting until water is mixed with pump, this process will' + f' be monitored automatically...' + ) + ) + + _wait_refilling_state( + modbus_client=modbus_client, expected_state=RefillingState.IDLE, + timeout=_actual_refilling_mixing_time(modbus_client=modbus_client) * 2 + ) + + _check_refilling_warnings(modbus_client=modbus_client) + + _check_refilling_state( + modbus_client=modbus_client, + expected_refilling_state=RefillingState.IDLE + ) + + +def _wait_electrolyte_level( + modbus_client: client.ModbusClient, expected_level: ElectrolyteLevel, + timeout: int, refilling: bool = False +) -> None: + """ + Wait for specific electrolyte level. + """ + wait_until: float = time.time() + timeout + + while time.time() < wait_until: + if ( + electrolyte_level := _electrolyte_level(modbus_client=modbus_client) + ) is expected_level: + break + + elif ( + refilling and electrolyte_level.value > expected_level.value + ): + raise HighElectrolyteLevelException( + f'Higher electrolyte level detected. Expected' + f' {expected_level.name}, actual {electrolyte_level.name}. If' + f' this happened during automatic refilling, please contact' + f' Enapter customer support.' + ) + + time.sleep(ELECTROLYTE_PRESENCE_CHECK_TIMEOUT) + + else: + raise MaintenanceModeException( + f'Failed to wait for {expected_level.name} level during' + f' {timeout // 60} minutes.' + ) + + +def _wait_refilling_state( + modbus_client: client.ModbusClient, expected_state: RefillingState, + timeout: int +) -> None: + """ + Wait for specific refilling state. + """ + wait_until: float = time.time() + timeout + + while time.time() < wait_until: + if _actual_refilling_state( + modbus_client=modbus_client + ) is expected_state: + break + + time.sleep(REFILLING_STATE_CHECK_TIMEOUT) + + else: + raise MaintenanceModeException( + f'Failed to wait for {expected_state.name} refilling state during' + f' {timeout} seconds.' + ) + + +def _check_water_pipe_connection( + modbus_client: client.ModbusClient, expect_no_electrolyte: bool = False, + attempts: int = MAX_WATER_PIPE_CONNECT_ATTEMPTS +) -> None: + """ + Ensure that water pipe is connected properly. Check pressure and + electrolyte presence if necessary. + """ + water_pipe_connect_attempts: int = 0 + + pressure_ok: bool = False + electrolyte_ok: bool = False + + min_pressure: float = _actual_refilling_min_water_pressure( + modbus_client=modbus_client + ) + + max_pressure: float = _actual_refilling_max_water_pressure( + modbus_client=modbus_client + ) + + while water_pipe_connect_attempts < attempts: + print('Now checking water inlet pressure...') + + if ( + min_pressure < ( + actual_water_inlet_pressure := _actual_water_inlet_pressure( + modbus_client=modbus_client + ) + ) < max_pressure + ): + pressure_ok = True + + print('Pressure is OK') + + else: + _print_yellow( + text=( + f'Water inlet pressure {actual_water_inlet_pressure} is not' + f' in allowed bounds ({min_pressure}, {max_pressure})' + ) + ) + + if expect_no_electrolyte: + print('Now re-checking electrolyte presence...') + + if _electrolyte_level( + modbus_client=modbus_client + ) is ElectrolyteLevel.EMPTY: + print('Electrolyte is OK') + + electrolyte_ok = True + + else: + _print_yellow( + text=( + 'Electrolyte presence detected. This is normal and may' + ' occur while attaching a water pipe' + ) + ) + + _perform_draining(modbus_client=modbus_client) + + _check_refilling_state( + modbus_client=modbus_client, + expected_refilling_state=RefillingState.WAIT_FLUSHING + ) + + else: + electrolyte_ok = True + + if pressure_ok and electrolyte_ok: + break + + _wait_confirmation( + prompt=( + f'Problems with water pipe detected.\nPlease double check water' + f' pipe, type {INPUT_CONFIRMATION} and press Enter when' + f' ready:\n' + ) + ) + + water_pipe_connect_attempts += 1 + + else: + raise WaterPipeException( + f'Water pipe problems detected after {attempts} connection' + f' attempt{"s" if attempts > 1 else ""}. Please contact Enapter' + f' customer support' + ) + + +def _wait_confirmation(prompt: str) -> None: + """ + Print prompt and wait for user confirmation. + """ + print(prompt) + + while input().upper() != INPUT_CONFIRMATION: + _print_yellow(text='Wrong confirmation') + + +def _run_maintenance_21( + modbus_client: client.ModbusClient, initial_state: State +) -> None: + """ + Run Maintenance actions for EL2.1. + """ + print('Running Maintenance for EL2.1...') + + _maintenance_on(modbus_client=modbus_client, initial_state=initial_state) + + match actual_refilling_state := _actual_refilling_state( + modbus_client=modbus_client + ): + case RefillingState.DRAINING: + draining_required: bool = True + + case RefillingState.MAINTENANCE: + draining_required: bool = False + + print( + 'Seems like it\'s first refilling. Draining procedure will be' + ' skipped.' + ) + + case _: + raise MaintenanceModeException( + f'Got unexpected refilling state {actual_refilling_state.name},' + f' please contact Enapter customer support' + ) + + if draining_required: + _perform_draining(modbus_client=modbus_client) + + _perform_refilling(modbus_client=modbus_client, el_21=True) + + _check_refilling_warnings(modbus_client=modbus_client) + + _maintenance_off(modbus_client=modbus_client) + + _check_refilling_state( + modbus_client=modbus_client, + expected_refilling_state=RefillingState.IDLE + ) + + +def _run_maintenance_4x( + modbus_client: client.ModbusClient, initial_state: State +) -> None: + """ + Run Maintenance actions for EL4.x. + """ + print('Running Maintenance for EL4.x...') + + _maintenance_on(modbus_client=modbus_client, initial_state=initial_state) + + match actual_refilling_state := _actual_refilling_state( + modbus_client=modbus_client + ): + case RefillingState.DRAINING: + draining_required: bool = True + flushing_required: bool = True + + case RefillingState.WAIT_FLUSHING: + draining_required: bool = False + flushing_required: bool = True + + case RefillingState.MAINTENANCE: + draining_required: bool = False + flushing_required: bool = False + + print( + 'Seems like it\'s first refilling. Draining and flushing' + ' procedures will be skipped.' + ) + + case _: + raise MaintenanceModeException( + f'Got unexpected refilling state {actual_refilling_state.name},' + f' please contact Enapter customer support' + ) + + if draining_required: + _perform_draining(modbus_client=modbus_client) + + # Handle situation when script was somehow interrupted and flushing is + # already complete. + if _actual_refilling_state( + modbus_client=modbus_client + ) is RefillingState.MAINTENANCE: + flushing_required = False + + if flushing_required: + _perform_flushing(modbus_client=modbus_client) + + _perform_refilling(modbus_client=modbus_client) + + _wait_confirmation( + prompt=( + f'Now Maintenance mode will be turned off. Check that water pipe' + f' is connected to finish refilling automatically.\nType' + f' {INPUT_CONFIRMATION} and press Enter when ready:' + ) + ) + + _maintenance_off(modbus_client=modbus_client) + + try: + _check_water_pipe_connection(modbus_client=modbus_client, attempts=1) + + except WaterPipeException: + _print_yellow( + f'Failed to fill water to {ElectrolyteLevel.HIGH} level.\nPlease' + f' make sure that water pipe is connected properly.\nAfter pipe is' + f' OK, water will be added automatically when required.' + ) + + else: + _finish_refilling(modbus_client=modbus_client) + + +def _run_maintenance( + modbus_client: client.ModbusClient, initial_state: State +) -> None: + """ + Run actions required for Maintenance mode depending on initial state and + electrolyser model. + """ + + # Read and decode ProjectId input register, address is 0. Register type is + # uint32, so number of registers to read is 32 / 16 = 2. First convert raw + # response to single int value with pyModbusTCP utils. + converted_device_model: int = utils.word_list_to_long( + val_list=_read_input_registers( + modbus_client=modbus_client, address=Inputs.DEVICE_MODEL.value, + count=2 + ) + )[0] + + # Decode converted int to human-readable device model. + match DeviceModel( + decoded_device_name := bytes.fromhex( + f'{converted_device_model:x}' + ).decode() + ): + case DeviceModel.EL21: + _run_maintenance_21( + modbus_client=modbus_client, initial_state=initial_state + ) + + case DeviceModel.EL40 | DeviceModel.ES40 | DeviceModel.ES41: + _run_maintenance_4x( + modbus_client=modbus_client, initial_state=initial_state + ) + + case _: + raise MaintenanceModeException( + f'Got unknown device model: {decoded_device_name}, please ' + f'contact Enapter customer support.' + ) + + print('Maintenance mode completed OK') + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description='Run EL maintenance with Modbus' + ) + + parser.add_argument( + '--modbus-ip', '-i', help='Modbus IP address', required=True + ) + + parser.add_argument( + '--modbus-port', '-p', help='Modbus port', type=int, default=502 + ) + + return parser.parse_args() + + +def main() -> None: + if sys.version_info < MIN_PYTHON_VERSION: + raise RuntimeError( + f'Python version >=' + f' {".".join(str(version) for version in MIN_PYTHON_VERSION)} is' + f' required' + ) + + if sys.platform == WIN32: + # Enable colored output for Windows console + os.system('color') + + if input( + _text_color( + text=( + 'WARNING! Please read carefully!\nThis script turns' + f' Maintenance mode on.\nMaintenance mode requires manual' + f' actions with electrolyser such as electrolyte draining,' + f' flushing (for EL4.x) and refilling.\nIf you fill electrolyte' + f' for the first time, only refilling is required.\nIf at some' + f' step electrolyte level is reported incorrectly, it may' + f' indicate hardware problems.\nIn this case terminate script' + f' with Ctrl+C and contact Enapter customer support.\nScript' + f' will be terminated anyway if draining/refilling is not' + f' complete during' + f' {DRAINING_TIMEOUT // 60}/{REFILLING_TIMEOUT // 60} minutes' + f' correspondingly.\nType {INPUT_CONFIRMATION} and press Enter' + f' if you really want to continue. Press Ctrl+ะก or Enter to' + f' exit:\n' + ), color=ConsoleColor.RED + ) + ).upper() != INPUT_CONFIRMATION: + sys.exit('Maintenance mode script execution stopped by user') + + args: argparse.Namespace = parse_args() + + modbus_client: client.ModbusClient = client.ModbusClient( + host=args.modbus_ip, port=args.modbus_port + ) + + try: + _print_yellow('Checking if Maintenance can be performed...') + + if _electrolyte_level( + modbus_client=modbus_client + ) is ElectrolyteLevel.VERY_HIGH: + sys.exit( + 'Very High Electrolyte Level Switch is enabled, Maintenance' + ' mode can\'t be turned on.\nIf you use Electrolyser 2.1,' + ' please drain some electrolyte, reboot device and run script' + ' again.\nOtherwise please contact Enapter customer support.' + ) + + if (state := _actual_state(modbus_client=modbus_client)) not in ( + State.IDLE, State.MAINTENANCE_MODE + ): + sys.exit( + f'Electrolyser state is {state.name}, can\'t turn Maintenance' + f' mode on. Switch electrolyser to either {State.IDLE.name} or' + f' {State.MAINTENANCE_MODE.name} and run script again.' + ) + + _print_yellow(text='Conditions for Maintenance are OK') + + _run_maintenance(modbus_client=modbus_client, initial_state=state) + + except Exception as e: + # If something went wrong, we can access Modbus error/exception info. + # For example, in case of connection problems, reading register will + # return None and script will fail with error while data converting, + # but real problem description will be stored in client. + print(f'Exception occurred: {e}') + print(f'Modbus error: {modbus_client.last_error_as_txt}') + print(f'Modbus exception: {modbus_client.last_except_as_txt}') + + active_errors: str = ( + ', '.join(_decode_errors(modbus_client=modbus_client)) or + 'No errors' + ) + + active_warnings: str = ( + ', '.join(_decode_warnings(modbus_client=modbus_client)) or + 'No warnings' + ) + + _print_red( + text=( + f'Please provide the following errors and warnings to Enapter' + f' customer support:\nErrors: {active_errors}\nWarnings: ' + f'{active_warnings}\nEvents description is available at' + f' https://handbook.enapter.com' + ) + ) + + raise + + +if __name__ == '__main__': + main() diff --git a/write_el_production_rate.py b/write_el_production_rate.py new file mode 100644 index 0000000..fcfecfd --- /dev/null +++ b/write_el_production_rate.py @@ -0,0 +1,128 @@ +# Copyright 2024 Enapter +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import random +import sys +import time + +from typing import Final + +try: + from pyModbusTCP import client, utils + +except ImportError: + print( + 'No pyModbusTCP module installed.\n.' + '1. Create virtual environment\n' + '2. Run \'pip install pyModbusTCP==0.2.1\'' + ) + + raise + +# Supported Python version +MIN_PYTHON_VERSION: Final[tuple[int, int]] = (3, 10) + +# Register address +PRODUCTION_RATE_HOLDING: Final[int] = 1002 + +# Timeout after writing to somehow guarantee that value is updated +REGISTER_WRITE_TIMEOUT: Final[int] = 2 + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description='Writing EL production rate with Modbus' + ) + + parser.add_argument( + '--modbus-ip', '-i', help='Modbus IP address', required=True + ) + + parser.add_argument( + '--modbus-port', '-p', help='Modbus port', type=int, default=502 + ) + + return parser.parse_args() + + +def _read_production_rate(modbus_client: client.ModbusClient) -> float: + """ + Read production rate holding register, address is 1002. Register type is + float32, so number of registers to read is 32 / 16 = 2. Convert raw + response to single float value with pyModbusTCP utils. + """ + return utils.decode_ieee( + val_int=utils.word_list_to_long( + val_list=modbus_client.read_holding_registers( + reg_addr=PRODUCTION_RATE_HOLDING, reg_nb=2 + ) + )[0] + ) + + +def main() -> None: + if sys.version_info < MIN_PYTHON_VERSION: + raise RuntimeError( + f'Python version >=' + f' {".".join(str(version) for version in MIN_PYTHON_VERSION)} is' + f' required' + ) + + args: argparse.Namespace = parse_args() + + modbus_client: client.ModbusClient = client.ModbusClient( + host=args.modbus_ip, port=args.modbus_port + ) + + try: + print( + f'Got initial production rate in %: ' + f'{_read_production_rate(modbus_client=modbus_client)}' + ) + + random_production_rate: float = random.uniform(a=90.0, b=99.0) + + print(f'Generated random production rate %: {random_production_rate}') + + print('Writing new production rate...') + + # Write production rate holding register, address is 1002. Convert + # generated float value with pyModbusTCP utils. + modbus_client.write_multiple_registers( + regs_addr=PRODUCTION_RATE_HOLDING, + regs_value=utils.long_list_to_word( + val_list=[utils.encode_ieee(val_float=random_production_rate)] + ) + ) + + time.sleep(REGISTER_WRITE_TIMEOUT) + + print( + f'Got updated production rate in %: ' + f'{_read_production_rate(modbus_client=modbus_client)}' + ) + + except Exception as e: + # If something went wrong, we can access Modbus error/exception info. + # For example, in case of connection problems, reading register will + # return None and script will fail with error while data converting, + # but real problem description will be stored in client. + print(f'Exception occurred: {e}') + print(f'Modbus error: {modbus_client.last_error_as_txt}') + print(f'Modbus exception: {modbus_client.last_except_as_txt}') + + raise + + +if __name__ == '__main__': + main() diff --git a/write_el_reboot.py b/write_el_reboot.py new file mode 100644 index 0000000..8dacb6a --- /dev/null +++ b/write_el_reboot.py @@ -0,0 +1,138 @@ +# Copyright 2024 Enapter +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import sys +import time + +from enum import IntEnum +from typing import Any, Final, Self + +try: + from pyModbusTCP import client, utils + +except ImportError: + print( + 'No pyModbusTCP module installed.\n.' + '1. Create virtual environment\n' + '2. Run \'pip install pyModbusTCP==0.2.1\'' + ) + + raise + + +# Supported Python version +MIN_PYTHON_VERSION: Final[tuple[int, int]] = (3, 10) + +# Registers addresses +REBOOT_HOLDING: Final[int] = 4 +STATE_INPUT: Final[int] = 1200 + + +class State(IntEnum): + """ + Enum values for Electrolyser state input register (1200). + """ + + UNKNOWN = -1 + + # Common states for EL2.1 and EL4.x. + HALTED = 0 + MAINTENANCE_MODE = 1 + IDLE = 2 + STEADY = 3 + STAND_BY = 4 + CURVE = 5 + BLOWDOWN = 6 + + # Specific state for EL40. + RECOMBINER = 7 + + @classmethod + def _missing_(cls, value: Any) -> Self: + return cls.UNKNOWN + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description='Reboot EL with Modbus' + ) + + parser.add_argument( + '--modbus-ip', '-i', help='Modbus IP address', required=True + ) + + parser.add_argument( + '--modbus-port', '-p', help='Modbus port', type=int, default=502 + ) + + return parser.parse_args() + + +def main() -> None: + if sys.version_info < MIN_PYTHON_VERSION: + raise RuntimeError( + f'Python version >=' + f' {".".join(str(version) for version in MIN_PYTHON_VERSION)} is' + f' required' + ) + + args: argparse.Namespace = parse_args() + + modbus_client: client.ModbusClient = client.ModbusClient( + host=args.modbus_ip, port=args.modbus_port + ) + + try: + # Write reboot holding register, address is 4. Register type is + # boolean, technically it's similar to uint16. + modbus_client.write_single_register( + reg_addr=REBOOT_HOLDING, reg_value=1 + ) + + print('Rebooting...') + + # Reading electrolyser state input register, address is 1200. Register + # type is enum16, technically it's similar to uint16, so number of + # registers to read is 16 / 16 = 1. Result is None if Modbus is not + # ready (client generates connection error, reading function returns + # None). + while ( + ( + state_raw_data := modbus_client.read_input_registers( + reg_addr=STATE_INPUT, reg_nb=1 + ) + ) is None + ): + print(f'Waiting for Modbus initialization...') + + time.sleep(1) + + print( + f'Got electrolyser state {State(state_raw_data[0]).name} ' + f'after reboot' + ) + + except Exception as e: + # If something went wrong, we can access Modbus error/exception info. + # For example, in case of connection problems, reading register will + # return None and script will fail with error while data converting, + # but real problem description will be stored in client. + print(f'Exception occurred: {e}') + print(f'Modbus error: {modbus_client.last_error_as_txt}') + print(f'Modbus exception: {modbus_client.last_except_as_txt}') + + raise + + +if __name__ == '__main__': + main() diff --git a/write_el_syslog_skip_priority.py b/write_el_syslog_skip_priority.py new file mode 100644 index 0000000..0d6d909 --- /dev/null +++ b/write_el_syslog_skip_priority.py @@ -0,0 +1,375 @@ +# Copyright 2024 Enapter +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import random +import sys +import time + +from enum import IntEnum +from typing import Any, Final, Optional, Self + +try: + from pyModbusTCP import client, utils + +except ImportError: + print( + 'No pyModbusTCP module installed.\n.' + '1. Create virtual environment\n' + '2. Run \'pip install pyModbusTCP==0.2.1\'' + ) + + raise + + +# Supported Python version +MIN_PYTHON_VERSION: Final[tuple[int, int]] = (3, 10) + +# Timeout after writing to somehow guarantee that value is updated +REGISTER_WRITE_TIMEOUT: Final[int] = 2 + + +class ModbusWriteException(RuntimeError): + """ + Custom exception to indicate runtime problems writing modbus registers. + """ + + pass + + +class Holdings(IntEnum): + """ + Holding registers. + """ + + CONFIGURATION_BEGIN = 4000 + CONFIGURATION_COMMIT = 4001 + LOG_SKIP_PRIORITY = 4042 + + +class Inputs(IntEnum): + """ + Input registers. + """ + + CONFIGURATION_IN_PROGRESS = 4000 + CONFIGURATION_OVER_MODBUS = 4001 + CONFIGURATION_LAST_RESULT = 4002 + CONFIGURATION_INVALIDATED_HOLDING = 4004 + + +class LogSkipPriority(IntEnum): + """ + Enum values for Log_SyslogSkipPriority holding register (4042). + """ + + UNKNOWN = -1 + + DISABLE_LOGGING = 0 + FATAL_ERRORS = 1 + ALL_ERRORS = 2 + ERRORS_AND_WARNINGS = 3 + ERRORS_AND_WARNINGS_AND_IMPORTANT_MESSAGES = 4 + ERRORS_AND_WARNINGS_AND_MESSAGES_EXCEPT_DEBUG = 5 + ALL_MESSAGES = 6 + + @classmethod + def _missing_(cls, value: Any) -> Self: + return cls.UNKNOWN + + @classmethod + def values(cls, exclude_values: Optional[list[Self]] = None) -> list[Self]: + """ + List enum values without UNKNOWN with possibility to exclude specific + values. + """ + values_: list[LogSkipPriority] = list(cls) + + values_.remove(cls.UNKNOWN) + + if exclude_values is not None: + for value in exclude_values: + if value is not cls.UNKNOWN: + values_.remove(value) + + return values_ + + +class ConfigurationLastResult(IntEnum): + """ + Enum values for Configuration-LastResult input register (4002). + """ + + UNKNOWN = -1 + + OK = 0 + PERMANENT = 1 + NO_ENTRY = 2 + I_O = 5 + TRY_AGAIN = 11 + ACCESS_DENIED = 13 + BUSY = 16 + INVALID = 22 + + @classmethod + def _missing_(cls, value: Any) -> Self: + return cls.UNKNOWN + + +def _write_single_register( + modbus_client: client.ModbusClient, address: int, value: int +) -> None: + """ + Write 16 bits register. + """ + modbus_client.write_single_register(reg_addr=address, reg_value=value) + + time.sleep(REGISTER_WRITE_TIMEOUT) + + +def _write_multiple_registers( + modbus_client: client.ModbusClient, address: int, values: list[int] +) -> None: + """ + Write over 16 bits register. + """ + modbus_client.write_multiple_registers(regs_addr=address, regs_value=values) + + time.sleep(REGISTER_WRITE_TIMEOUT) + + +def _read_input_registers( + modbus_client: client.ModbusClient, address: int, count: int +) -> list[int]: + """ + Read input registers. + """ + return modbus_client.read_input_registers(reg_addr=address, reg_nb=count) + + +def _read_holding_registers( + modbus_client: client.ModbusClient, address: int, count: int +) -> list[int]: + """ + Read holding registers. + """ + return modbus_client.read_holding_registers(reg_addr=address, reg_nb=count) + + +def _read_syslog_skip_priority( + modbus_client: client.ModbusClient +) -> LogSkipPriority: + """ + Read system logs priority holding register, address is 4042. Register type + is int32, so number of registers to read is 32 / 16 = 2. Convert raw + response to single int value with pyModbusTCP utils. + """ + return LogSkipPriority( + utils.get_list_2comp( + val_list=utils.word_list_to_long( + val_list=_read_holding_registers( + modbus_client=modbus_client, + address=Holdings.LOG_SKIP_PRIORITY.value, count=2 + ) + ), val_size=32 + )[0] + ) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description='Writing EL syslog skip priority with Modbus' + ) + + parser.add_argument( + '--modbus-ip', '-i', help='Modbus IP address', required=True + ) + + parser.add_argument( + '--modbus-port', '-p', help='Modbus port', type=int, default=502 + ) + + return parser.parse_args() + + +def main() -> None: + if sys.version_info < MIN_PYTHON_VERSION: + raise RuntimeError( + f'Python version >=' + f' {".".join(str(version) for version in MIN_PYTHON_VERSION)} is' + f' required' + ) + args: argparse.Namespace = parse_args() + + modbus_client: client.ModbusClient = client.ModbusClient( + host=args.modbus_ip, port=args.modbus_port + ) + + try: + initial_syslog_skip_priority: LogSkipPriority = ( + _read_syslog_skip_priority(modbus_client=modbus_client) + ) + + print( + f'Got initial system logs priority: ' + f'{initial_syslog_skip_priority.name}' + ) + + # 4xxx holding registers are configuration registers. First we must + # check if another configuration is in progress. Reading + # Configuration-InProgress input register, address is 4000. Register + # type is boolean, technically it's similar to uint16, so number of + # registers to read is 16 / 16 = 1. + if bool( + _read_input_registers( + modbus_client=modbus_client, + address=Inputs.CONFIGURATION_IN_PROGRESS.value, count=1 + )[0] + ): + raise ModbusWriteException( + 'Seems like another configuration is in progress' + ) + + try: + print('Begin configuration...') + + # Write Configuration-Begin holding register, address is 4000. + # Register type is boolean, technically it's similar to uint16. + _write_single_register( + modbus_client=modbus_client, + address=Holdings.CONFIGURATION_BEGIN.value, value=1 + ) + + print('Check configuration source...') + + # Read Configuration-OverModbus input register, address is 4001. + # Register type is boolean, technically it's similar to uint16, + # so number of registers to read is 16 / 16 = 1. + if not bool( + _read_input_registers( + modbus_client=modbus_client, + address=Inputs.CONFIGURATION_OVER_MODBUS.value, count=1 + )[0] + ): + raise ModbusWriteException( + 'Seems like configuration source is not Modbus' + ) + + new_syslog_skip_priority: LogSkipPriority = random.choice( + LogSkipPriority.values( + exclude_values=[initial_syslog_skip_priority] + ) + ) + + print(f'Write new priority ({new_syslog_skip_priority.name})...') + + # Write system logs priority holding register, address is 4042. + # Register type is int32, so technically we can write any supported + # value. Values less than 0 are considered as DISABLE_LOGGING, + # values great than 6 are considered as ALL_MESSAGES. Convert + # generated value with pyModbusTCP utils. + _write_multiple_registers( + modbus_client=modbus_client, + address=Holdings.LOG_SKIP_PRIORITY.value, + values=utils.long_list_to_word( + val_list=[ + utils.get_2comp( + val_int=new_syslog_skip_priority.value, val_size=32 + ) + ] + ) + ) + + print('Check configuration last result...') + + # Read Configuration-LastResult input register, address is 4002. + # Register type is int32, so number of registers to read is + # 32 / 16 = 2. Convert raw response to single int value with + # pyModbusTCP utils. + configuration_last_result: ConfigurationLastResult = ( + ConfigurationLastResult( + utils.get_list_2comp( + val_list=utils.word_list_to_long( + val_list=_read_input_registers( + modbus_client=modbus_client, + address=Inputs.CONFIGURATION_LAST_RESULT.value, + count=2 + ) + ), val_size=32 + )[0] + ) + ) + + if configuration_last_result != ConfigurationLastResult.OK: + # Read Configuration-InvalidatedHolding input register, address + # is 4004. Register type is uint32, so number of registers to + # read is 16 / 16 = 1. + configuration_invalidated_holding: int = ( + _read_input_registers( + modbus_client=modbus_client, + address=Inputs.CONFIGURATION_INVALIDATED_HOLDING.value, + count=1 + )[0] + ) + + raise ModbusWriteException( + f'Configuration last result is' + f'{configuration_last_result.name}. Possible problematic' + f'register is {configuration_invalidated_holding}' + ) + + print('Commit configuration...') + + # Write Configuration-Commit holding register, address is 4001. + # Register type is boolean, technically it's similar to uint16. + # We must write 1 to commit configuration. + _write_single_register( + modbus_client=modbus_client, + address=Holdings.CONFIGURATION_COMMIT.value, value=1 + ) + + except ModbusWriteException: + print('Rollback configuration...') + + # Write Configuration-Commit holding register, address is 4001. + # Register type is boolean, technically it's similar to uint16. + # We must write 0 to rollback configuration. + _write_single_register( + modbus_client=modbus_client, + address=Holdings.CONFIGURATION_COMMIT.value, value=0 + ) + + raise + + updated_syslog_skip_priority: LogSkipPriority = ( + _read_syslog_skip_priority(modbus_client=modbus_client) + ) + + print( + f'Got updated system logs priority: ' + f'{updated_syslog_skip_priority.name}' + ) + + except Exception as e: + # If something went wrong, we can access Modbus error/exception info. + # For example, in case of connection problems, reading register will + # return None and script will fail with error while data converting, + # but real problem description will be stored in client. + print(f'Exception occurred: {e}') + print(f'Modbus error: {modbus_client.last_error_as_txt}') + print(f'Modbus exception: {modbus_client.last_except_as_txt}') + + raise + + +if __name__ == '__main__': + main()