diff --git a/.github/workflows/python-3.10.yml b/.github/workflows/python-3.10.yml index bc58dfb..5bd4334 100644 --- a/.github/workflows/python-3.10.yml +++ b/.github/workflows/python-3.10.yml @@ -3,6 +3,9 @@ name: Python 3.10 +permissions: + contents: read + on: push: branches: [ "main" ] diff --git a/.github/workflows/python-3.11.yml b/.github/workflows/python-3.11.yml index 3f7b7d2..b37e33b 100644 --- a/.github/workflows/python-3.11.yml +++ b/.github/workflows/python-3.11.yml @@ -3,6 +3,9 @@ name: Python 3.11 +permissions: + contents: read + on: push: branches: [ "main" ] diff --git a/.github/workflows/python-3.12.yml b/.github/workflows/python-3.12.yml index dec4e7b..f105cdd 100644 --- a/.github/workflows/python-3.12.yml +++ b/.github/workflows/python-3.12.yml @@ -3,6 +3,9 @@ name: Python 3.12 +permissions: + contents: read + on: push: branches: [ "main" ] diff --git a/.github/workflows/python-3.13.yml b/.github/workflows/python-3.13.yml index 9368392..ebba97c 100644 --- a/.github/workflows/python-3.13.yml +++ b/.github/workflows/python-3.13.yml @@ -3,6 +3,9 @@ name: Python 3.13 +permissions: + contents: read + on: push: branches: [ "main" ] diff --git a/.github/workflows/python-3.9.yml b/.github/workflows/python-3.9.yml index c8e470d..ea54ae9 100644 --- a/.github/workflows/python-3.9.yml +++ b/.github/workflows/python-3.9.yml @@ -3,6 +3,9 @@ name: Python 3.9 +permissions: + contents: read + on: push: branches: [ "main" ] diff --git a/.gitignore b/.gitignore index 586ea38..c67907d 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,8 @@ classes/transports/*custom* input_registry.json holding_registry.json + +#ignore pypi / pyproject.toml output +dist/* +build/* +python_protocol_gateway.egg-info/* \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 92598e0..cfe9a98 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.9-alpine as base +FROM python:3.13-alpine as base FROM base as builder RUN mkdir /install WORKDIR /install diff --git a/README.md b/README.md index 1d9ebf4..040330c 100644 --- a/README.md +++ b/README.md @@ -5,10 +5,12 @@ ![Python 3.12](https://github.com/HotNoob/PythonProtocolGateway/actions/workflows/python-3.12.yml/badge.svg) ![Python 3.13](https://github.com/HotNoob/PythonProtocolGateway/actions/workflows/python-3.13.yml/badge.svg) +[![PyPI version](https://img.shields.io/pypi/v/python-protocol-gateway.svg)](https://pypi.org/project/python-protocol-gateway/) [![CodeQL](https://github.com/HotNoob/PythonProtocolGateway/actions/workflows/github-code-scanning/codeql/badge.svg)](https://github.com/HotNoob/PythonProtocolGateway/actions/workflows/github-code-scanning/codeql) For advanced configuration help, please checkout the documentation :) -https://github.com/HotNoob/PythonProtocolGateway/tree/main/documentation + +[/documentation](/documentation) # Python Protocol Gateway @@ -17,8 +19,9 @@ Configuration is handled via a small config files. In the long run, Python Protocol Gateway will become a general purpose protocol gateway to translate between more than just modbus and mqtt. For specific device installation instructions please checkout the documentation: -Growatt, EG4, Sigineer, SOK, PACE-BMS -https://github.com/HotNoob/PythonProtocolGateway/tree/main/documentation +Growatt, EG4, Sigineer, SOK, PACE-BMS, Sigineer, ect... + +[/documentation/devices](/documentation/devices) # General Installation Connect the USB port on the inverter into your computer / device. This port is essentially modbus usb adapter. @@ -28,7 +31,7 @@ Alternatively, connect a usb adapter to your rs485 / can port with appropriate w ### install as homeassistant add-on checkout: -https://github.com/felipecrs/python-protocol-gateway-hass-addon/tree/master +[PPG HASS Addon](https://github.com/HotNoob/python-protocol-gateway-hass-addon/tree/master) ### install requirements ``` @@ -49,21 +52,30 @@ nano config.cfg manually select protocol in .cfg protocol_version = {{version}} ``` +eg4_v58 = eg4 inverters +eg4_3000ehv_v1 = eg4 inverters v0.14 = growatt inverters 2020+ sigineer_v0.11 = sigineer inverters -growatt_2020_v1.24 = alt protocol for large growatt inverters - currently untested -srne_v3.9 = SRNE inverters - confirmed working-ish -victron_gx_3.3 = Victron GX Devices - Untested -solark_v1.1 = SolarArk 8/12K Inverters - Untested +srne_v3.9 = SRNE inverters + hdhk_16ch_ac_module = some chinese current monitoring device :P -srne_2021_v1.96 = SRNE inverters 2021+ (tested at ASF48100S200-H, ok-ish for HF2430U60-100 ) +``` -eg4_v58 = eg4 inverters ( EG4-6000XP ) - confirmed working -eg4_3000ehv_v1 = eg4 inverters ( EG4_3000EHV ) +Untested Protocols ``` +growatt_2020_v1.24 = alt protocol for large growatt inverters +victron_gx_3.3 = Victron GX Devices +solark_v1.1 = SolarArk 8/12K Inverters +``` + +For a complete list of protocols, explore: +[/Protocols](/protocols) -more details on these protocols can be found in the documentation: -https://github.com/HotNoob/PythonProtocolGateway/tree/main/documentation +For a more complete list of tested devices & protocols: +[Tested Devices & Protocols](documentation/usage/devices_and_protocols.csv) + +more advanced details can be found in the documentation: +[/Documentation](/documentation) ### run as script ``` @@ -108,8 +120,11 @@ once installed; the device should show up on home assistant under mqtt ```Settings -> Devices & Services -> MQTT ``` -more docs on setting up mqtt here: https://www.home-assistant.io/integrations/mqtt -i probably might have missed something. ha is new to me. +more docs on setting up mqtt here: +https://www.home-assistant.io/integrations/mqtt + +#### connect mqtt on home assistant with external mqtt broker +[HowTo Connect External MQTT Broker To HomeAssistant](https://www.youtube.com/watch?v=sP2gYLYQat8) ### general update procedure update files and restart script / service @@ -118,8 +133,6 @@ git pull systemctl restart protocol_gateway.service ``` -**if you installed this when it was called growatt2mqtt-hotnoob or invertermodbustomqtt, you'll need to reinstall if you want to update. ** - ### Unknown Status MQTT Home Assistant If all values appear as "Unknown" This is a bug with home assistant's discovery that some times happens when adding for the first time. just restart the service / script and it will fix itself. @@ -148,17 +161,18 @@ As i dive deeper into solar monitoring and general automation, i've come to the So... don't mind me as i may add other devices such as battery bms' and... i have a home energy monitor on the way! so i'll be adding that when it arrives. -### Rebranding Again... last time. -if you installed this when it was called growatt2mqtt-hotnoob or InverterModBusToMQTT, you'll need to reinstall if you want to update. - - ### donate this took me a while to make; and i had to make it because there werent any working solutions. -donations would be appreciated. -![BitCoin Donation](https://github.com/HotNoob/growatt2mqtt-hotnoob/blob/main/images/donate_to_hotnoob.png?raw=true) - -```(btc) bc1qh394vazcguedkw2rlklnuhapdq7qgpnnz9c3t0``` +donations / sponsoring this repo would be appreciated. -### Use Docker - untested +### Use Docker - ```docker build . -t protocol_gateway ``` - ```docker run --device=/dev/ttyUSB0 protocol_gateway``` + +### Use Docker Image +- ``` docker pull hotn00b/pythonprotocolgateway ``` +- ```docker run -v $(pwd)/config.cfg:/app/config.cfg --device=/dev/ttyUSB0 hotn00b/pythonprotocolgateway``` + +See [config.cfg.example](/config.cfg.example) + +[Docker Image Repo](https://hub.docker.com/r/hotn00b/pythonprotocolgateway) diff --git a/RELEASE.md b/RELEASE.md new file mode 100644 index 0000000..44ba12b --- /dev/null +++ b/RELEASE.md @@ -0,0 +1,8 @@ +things todo to perform a release. + +can try to automate some of these later. + +GitHub - https://github.com/HotNoob/PythonProtocolGateway/releases +PyPi Package - https://pypi.org/project/python-protocol-gateway/ +HomeAssistant repo - https://github.com/HotNoob/python-protocol-gateway-hass-addon +Docker Image - https://hub.docker.com/r/hotn00b/pythonprotocolgateway \ No newline at end of file diff --git a/classes/protocol_settings.py b/classes/protocol_settings.py index fb2cb35..7920524 100644 --- a/classes/protocol_settings.py +++ b/classes/protocol_settings.py @@ -188,6 +188,7 @@ class registry_map_entry: register_bit : int register_byte : int ''' byte offset for canbus ect... ''' + variable_name : str documented_name : str unit : str @@ -208,6 +209,9 @@ class registry_map_entry: data_type_size : int = -1 ''' for non-fixed size types like ASCII''' + data_byteorder : str = '' + ''' entry specific byte order little | big | '' ''' + read_command : bytes = None ''' for transports/protocols that require sending a command ontop of "register" ''' @@ -330,6 +334,19 @@ def get_registry_entry(self, name : str, registry_type : Registry_Type) -> regis return item return None + + def get_code_by_value(self, entry : registry_map_entry, value : str, fallback=None) -> str: + ''' case insensitive ''' + + value = value.strip().lower() + + if entry.variable_name+"_codes" in self.codes: + codes = self.codes[entry.variable_name+"_codes"] + for code, val in codes.items(): + if value == val.lower(): + return code + + return fallback def load__json(self, file : str = "", settings_dir : str = ""): if not settings_dir: @@ -512,16 +529,30 @@ def process_row(row): #region data type data_type = Data_Type.USHORT - data_type_len : int = -1 + data_byteorder : str = '' #optional row, only needed for non-default data types if "data type" in row and row["data type"]: + data_type_str : str = '' + matches = data_type_regex.search(row["data type"]) if matches: data_type_len = int(matches.group("length")) - data_type = Data_Type.fromString(matches.group("datatype")) + data_type_str = matches.group("datatype") else: - data_type = Data_Type.fromString(row["data type"]) + data_type_str = row["data type"] + + #check if datatype specifies byteorder + if data_type_str.upper().endswith("_LE"): + data_byteorder = "little" + data_type_str = data_type_str[:-3] + elif data_type_str.upper().endswith("_BE"): + data_byteorder = "big" + data_type_str = data_type_str[:-3] + + + data_type = Data_Type.fromString(data_type_str) + if "values" not in row: @@ -658,6 +689,7 @@ def process_row(row): unit_mod= unit_multiplier, data_type= data_type, data_type_size = data_type_len, + data_byteorder = data_byteorder, concatenate = concatenate, concatenate_registers = concatenate_registers, values=values, @@ -857,6 +889,10 @@ def load_registry_map(self, registry_type : Registry_Type, file : str = "", sett def process_register_bytes(self, registry : dict[int,bytes], entry : registry_map_entry): ''' process bytes into data''' + byte_order : str = self.byteorder + if entry.data_byteorder: #allow map entry to override byteorder + byte_order = entry.data_byteorder + if isinstance(registry[entry.register], tuple): register = registry[entry.register][0] #can bus uses tuple for timestamp else: @@ -869,14 +905,15 @@ def process_register_bytes(self, registry : dict[int,bytes], entry : registry_ma register = register[:entry.data_type_size] if entry.data_type == Data_Type.UINT: - value = int.from_bytes(register[:4], byteorder=self.byteorder, signed=False) + value = int.from_bytes(register[:4], byteorder=byte_order, signed=False) elif entry.data_type == Data_Type.INT: - value = int.from_bytes(register[:4], byteorder=self.byteorder, signed=True) + value = int.from_bytes(register[:4], byteorder=byte_order, signed=True) elif entry.data_type == Data_Type.USHORT: - value = int.from_bytes(register[:2], byteorder=self.byteorder, signed=False) + value = int.from_bytes(register[:2], byteorder=byte_order, signed=False) elif entry.data_type == Data_Type.SHORT: - value = int.from_bytes(register[:2], byteorder=self.byteorder, signed=True) + value = int.from_bytes(register[:2], byteorder=byte_order, signed=True) elif entry.data_type == Data_Type._16BIT_FLAGS or entry.data_type == Data_Type._8BIT_FLAGS or entry.data_type == Data_Type._32BIT_FLAGS: + val = int.from_bytes(register, byteorder=byte_order, signed=False) #16 bit flags start_bit : int = 0 end_bit : int = 16 #default 16 bit @@ -952,11 +989,20 @@ def process_register_bytes(self, registry : dict[int,bytes], entry : registry_ma # If positive, simply extract the value using the bit mask value = (register >> bit_index) & bit_mask - elif entry.data_type.value > 200 or entry.data_type == Data_Type.BYTE: #bit types + elif entry.data_type == Data_Type.BYTE: #bit types + value = int.from_bytes(register[:1], byteorder=byte_order, signed=False) + elif entry.data_type.value > 200: #bit types bit_size = Data_Type.getSize(entry.data_type) bit_mask = (1 << bit_size) - 1 # Create a mask for extracting X bits bit_index = entry.register_bit + + + if isinstance(register, bytes): + register = int.from_bytes(register, byteorder=byte_order) + value = (register >> bit_index) & bit_mask + + elif entry.data_type == Data_Type.HEX: value = register.hex() #convert bytes to hex elif entry.data_type == Data_Type.ASCII: @@ -986,6 +1032,11 @@ def process_register_bytes(self, registry : dict[int,bytes], entry : registry_ma def process_register_ushort(self, registry : dict[int, int], entry : registry_map_entry ): ''' process ushort type registry into data''' + + byte_order : str = self.byteorder + if entry.data_byteorder: + byte_order = entry.data_byteorder + if entry.data_type == Data_Type.UINT: #read uint if entry.register + 1 not in registry: return @@ -1058,7 +1109,7 @@ def process_register_ushort(self, registry : dict[int, int], entry : registry_ma else: flags : list[str] = [] if end_bit > 0: - end : int = 16 if end_bit >= 16 else end_bit + end : int = 16 if end_bit >= 16 else end_bit for i in range(start_bit, end): # Iterate over each bit position (0 to 15) # Check if the i-th bit is set if (val >> i) & 1: @@ -1074,10 +1125,10 @@ def process_register_ushort(self, registry : dict[int, int], entry : registry_ma bit_index = entry.register_bit value = (registry[entry.register] >> bit_index) & bit_mask elif entry.data_type == Data_Type.HEX: - value = registry[entry.register].to_bytes((16 + 7) // 8, byteorder=self.byteorder) #convert to ushort to bytes + value = registry[entry.register].to_bytes((16 + 7) // 8, byteorder=byte_order) #convert to ushort to bytes value = value.hex() #convert bytes to hex elif entry.data_type == Data_Type.ASCII: - value = registry[entry.register].to_bytes((16 + 7) // 8, byteorder=self.byteorder) #convert to ushort to bytes + value = registry[entry.register].to_bytes((16 + 7) // 8, byteorder=byte_order) #convert to ushort to bytes try: value = value.decode("utf-8") #convert bytes to ascii except UnicodeDecodeError as e: diff --git a/classes/transports/influxdb_out.py b/classes/transports/influxdb_out.py new file mode 100644 index 0000000..3765caf --- /dev/null +++ b/classes/transports/influxdb_out.py @@ -0,0 +1,593 @@ +import logging +import os +import pickle +import time +from configparser import SectionProxy + +from influxdb import InfluxDBClient + +from defs.common import strtobool + +from ..protocol_settings import Registry_Type +from .transport_base import transport_base + + +class influxdb_out(transport_base): + ''' InfluxDB v1 output transport that writes data to an InfluxDB server ''' + host: str = "localhost" + port: int = 8086 + database: str = "solar" + username: str = "" + password: str = "" + measurement: str = "device_data" + include_timestamp: bool = True + include_device_info: bool = True + batch_size: int = 100 + batch_timeout: float = 10.0 + force_float: bool = True # Force all numeric fields to be floats to avoid InfluxDB type conflicts + + # Connection monitoring settings + reconnect_attempts: int = 5 + reconnect_delay: float = 5.0 + connection_timeout: int = 10 + + # Exponential backoff settings + use_exponential_backoff: bool = True + max_reconnect_delay: float = 300.0 # 5 minutes max delay + + # Persistent storage settings + enable_persistent_storage: bool = True + persistent_storage_path: str = "influxdb_backlog" + max_backlog_size: int = 10000 # Maximum number of points to store + max_backlog_age: int = 86400 # 24 hours in seconds + + # Periodic reconnection settings + periodic_reconnect_interval: float = 14400.0 # 4 hours in seconds + + client = None + batch_points = [] + last_batch_time = 0 + last_connection_check = 0 + connection_check_interval = 300 # Check connection every 300 seconds + + # Periodic reconnection settings + last_periodic_reconnect_attempt = 0 + + # Persistent storage + backlog_file = None + backlog_points = [] + + def __init__(self, settings: SectionProxy): + self.host = settings.get("host", fallback=self.host) + self.port = settings.getint("port", fallback=self.port) + self.database = settings.get("database", fallback=self.database) + self.username = settings.get("username", fallback=self.username) + self.password = settings.get("password", fallback=self.password) + self.measurement = settings.get("measurement", fallback=self.measurement) + self.include_timestamp = strtobool(settings.get("include_timestamp", fallback=self.include_timestamp)) + self.include_device_info = strtobool(settings.get("include_device_info", fallback=self.include_device_info)) + self.batch_size = settings.getint("batch_size", fallback=self.batch_size) + self.batch_timeout = settings.getfloat("batch_timeout", fallback=self.batch_timeout) + self.force_float = strtobool(settings.get("force_float", fallback=self.force_float)) + + # Connection monitoring settings + self.reconnect_attempts = settings.getint("reconnect_attempts", fallback=self.reconnect_attempts) + self.reconnect_delay = settings.getfloat("reconnect_delay", fallback=self.reconnect_delay) + self.connection_timeout = settings.getint("connection_timeout", fallback=self.connection_timeout) + + # Exponential backoff settings + self.use_exponential_backoff = strtobool(settings.get("use_exponential_backoff", fallback=self.use_exponential_backoff)) + self.max_reconnect_delay = settings.getfloat("max_reconnect_delay", fallback=self.max_reconnect_delay) + + # Persistent storage settings + self.enable_persistent_storage = strtobool(settings.get("enable_persistent_storage", fallback=self.enable_persistent_storage)) + self.persistent_storage_path = settings.get("persistent_storage_path", fallback=self.persistent_storage_path) + self.max_backlog_size = settings.getint("max_backlog_size", fallback=self.max_backlog_size) + self.max_backlog_age = settings.getint("max_backlog_age", fallback=self.max_backlog_age) + + # Periodic reconnection settings + self.periodic_reconnect_interval = settings.getfloat("periodic_reconnect_interval", fallback=self.periodic_reconnect_interval) + + self.write_enabled = True # InfluxDB output is always write-enabled + super().__init__(settings) + + # Initialize persistent storage + if self.enable_persistent_storage: + self._init_persistent_storage() + + def _init_persistent_storage(self): + """Initialize persistent storage for data backlog""" + try: + # Create storage directory if it doesn't exist + if not os.path.exists(self.persistent_storage_path): + os.makedirs(self.persistent_storage_path) + + # Create backlog file path + self.backlog_file = os.path.join( + self.persistent_storage_path, + f"influxdb_backlog_{self.transport_name}.pkl" + ) + + # Load existing backlog + self._load_backlog() + + self._log.info(f"Persistent storage initialized: {self.backlog_file}") + self._log.info(f"Loaded {len(self.backlog_points)} points from backlog") + + except Exception as e: + self._log.error(f"Failed to initialize persistent storage: {e}") + self.enable_persistent_storage = False + + def _load_backlog(self): + """Load backlog points from persistent storage""" + if not self.backlog_file or not os.path.exists(self.backlog_file): + self.backlog_points = [] + return + + try: + with open(self.backlog_file, 'rb') as f: + self.backlog_points = pickle.load(f) + + # Clean old points based on age + current_time = time.time() + original_count = len(self.backlog_points) + self.backlog_points = [ + point for point in self.backlog_points + if current_time - point.get('_backlog_time', 0) < self.max_backlog_age + ] + + if len(self.backlog_points) < original_count: + self._log.info(f"Cleaned {original_count - len(self.backlog_points)} old points from backlog") + self._save_backlog() + + except Exception as e: + self._log.error(f"Failed to load backlog: {e}") + self.backlog_points = [] + + def _save_backlog(self): + """Save backlog points to persistent storage""" + if not self.backlog_file or not self.enable_persistent_storage: + return + + try: + with open(self.backlog_file, 'wb') as f: + pickle.dump(self.backlog_points, f) + except Exception as e: + self._log.error(f"Failed to save backlog: {e}") + + def _add_to_backlog(self, point): + """Add a point to the backlog""" + if not self.enable_persistent_storage: + return + + # Add timestamp for age tracking + point['_backlog_time'] = time.time() + + self.backlog_points.append(point) + + # Limit backlog size + if len(self.backlog_points) > self.max_backlog_size: + removed = self.backlog_points.pop(0) # Remove oldest point + self._log.warning(f"Backlog full, removed oldest point: {removed.get('measurement', 'unknown')}") + + self._save_backlog() + self._log.debug(f"Added point to backlog. Backlog size: {len(self.backlog_points)}") + + def _flush_backlog(self): + """Flush backlog points to InfluxDB""" + if not self.backlog_points or not self.connected: + return + + self._log.info(f"Flushing {len(self.backlog_points)} backlog points to InfluxDB") + + try: + # Remove internal timestamp before sending to InfluxDB + points_to_send = [] + for point in self.backlog_points: + point_copy = point.copy() + point_copy.pop('_backlog_time', None) # Remove internal timestamp + points_to_send.append(point_copy) + + self.client.write_points(points_to_send) + self._log.info(f"Successfully wrote {len(points_to_send)} backlog points to InfluxDB") + + # Clear backlog after successful write + self.backlog_points = [] + self._save_backlog() + + except Exception as e: + self._log.error(f"Failed to flush backlog to InfluxDB: {e}") + # Don't clear backlog on failure - will retry later + + def connect(self): + """Initialize the InfluxDB client connection""" + self._log.info("influxdb_out connect") + + try: + + # Create InfluxDB client with timeout settings + self.client = InfluxDBClient( + host=self.host, + port=self.port, + username=self.username if self.username else None, + password=self.password if self.password else None, + database=self.database, + timeout=self.connection_timeout + ) + + # Test connection + self.client.ping() + + # Create database if it doesn't exist + databases = self.client.get_list_database() + if not any(db['name'] == self.database for db in databases): + self._log.info(f"Creating database: {self.database}") + self.client.create_database(self.database) + + self.connected = True + self.last_connection_check = time.time() + self.last_periodic_reconnect_attempt = time.time() + self._log.info(f"Connected to InfluxDB at {self.host}:{self.port}") + + # Flush any backlog after successful connection + if self.enable_persistent_storage: + self._flush_backlog() + + except ImportError: + self._log.error("InfluxDB client not installed. Please install with: pip install influxdb") + self.connected = False + except Exception as e: + self._log.error(f"Failed to connect to InfluxDB: {e}") + self.connected = False + + def _check_connection(self): + """Check if the connection is still alive and reconnect if necessary""" + current_time = time.time() + + # Check for periodic reconnection (even if connected) + if (self.periodic_reconnect_interval > 0 and + current_time - self.last_periodic_reconnect_attempt >= self.periodic_reconnect_interval): + + self.last_periodic_reconnect_attempt = current_time + self._log.info(f"Periodic reconnection check (every {self.periodic_reconnect_interval} seconds)") + + # Force a reconnection attempt to refresh the connection + if self.connected and self.client: + try: + # Test current connection + self.client.ping() + self._log.debug("Periodic connection check: connection is healthy") + except Exception as e: + self._log.warning(f"Periodic connection check failed: {e}") + return self._attempt_reconnect() + else: + # Not connected, attempt reconnection + return self._attempt_reconnect() + + # Only check connection periodically to avoid excessive ping calls + if current_time - self.last_connection_check < self.connection_check_interval: + return self.connected + + self.last_connection_check = current_time + + if not self.connected or not self.client: + return self._attempt_reconnect() + + try: + # Test connection with ping + self.client.ping() + return True + except Exception as e: + self._log.warning(f"Connection check failed: {e}") + return self._attempt_reconnect() + + def _attempt_reconnect(self): + """Attempt to reconnect to InfluxDB with exponential backoff""" + self._log.info(f"Attempting to reconnect to InfluxDB at {self.host}:{self.port}") + + for attempt in range(self.reconnect_attempts): + try: + self._log.info(f"Reconnection attempt {attempt + 1}/{self.reconnect_attempts}") + + # Close existing client if it exists + if self.client: + try: + self.client.close() + except Exception: + pass + + # Create new client + from influxdb import InfluxDBClient + self.client = InfluxDBClient( + host=self.host, + port=self.port, + username=self.username if self.username else None, + password=self.password if self.password else None, + database=self.database, + timeout=self.connection_timeout + ) + + # Test connection + self.client.ping() + + self.connected = True + self.last_periodic_reconnect_attempt = time.time() + self._log.info(f"Successfully reconnected to InfluxDB") + + # Flush any backlog after successful reconnection + if self.enable_persistent_storage: + self._flush_backlog() + + return True + + except Exception as e: + self._log.warning(f"Reconnection attempt {attempt + 1} failed: {e}") + if attempt < self.reconnect_attempts - 1: + # Calculate delay with exponential backoff + if self.use_exponential_backoff: + delay = min(self.reconnect_delay * (2 ** attempt), self.max_reconnect_delay) + self._log.info(f"Waiting {delay:.1f} seconds before next attempt (exponential backoff)") + else: + delay = self.reconnect_delay + self._log.info(f"Waiting {delay:.1f} seconds before next attempt") + + time.sleep(delay) + + self._log.error(f"Failed to reconnect after {self.reconnect_attempts} attempts") + self.connected = False + return False + + def trigger_periodic_reconnect(self): + """Manually trigger a periodic reconnection check""" + self.last_periodic_reconnect_attempt = 0 # Reset timer to force immediate check + return self._check_connection() + + def write_data(self, data: dict[str, str], from_transport: transport_base): + """Write data to InfluxDB""" + if not self.write_enabled: + return + + # Check connection status before processing data + if not self._check_connection(): + self._log.warning("Not connected to InfluxDB, storing data in backlog") + # Store data in backlog instead of skipping + self._process_and_store_data(data, from_transport) + return + + self._log.debug(f"write data from [{from_transport.transport_name}] to influxdb_out transport") + self._log.debug(f"Data: {data}") + + # Process and write data + self._process_and_write_data(data, from_transport) + + def _process_and_store_data(self, data: dict[str, str], from_transport: transport_base): + """Process data and store in backlog when not connected""" + if not self.enable_persistent_storage: + self._log.warning("Persistent storage disabled, data will be lost") + return + + # Create InfluxDB point + point = self._create_influxdb_point(data, from_transport) + + # Add to backlog + self._add_to_backlog(point) + + # Also add to current batch for immediate flush when reconnected + self.batch_points.append(point) + + current_time = time.time() + if (len(self.batch_points) >= self.batch_size or + (current_time - self.last_batch_time) >= self.batch_timeout): + self._log.debug(f"Flushing batch to backlog: size={len(self.batch_points)}") + self._flush_batch() + + def _process_and_write_data(self, data: dict[str, str], from_transport: transport_base): + """Process data and write to InfluxDB when connected""" + # Prepare tags for InfluxDB + tags = {} + + # Add device information as tags if enabled + if self.include_device_info: + tags.update({ + "device_identifier": from_transport.device_identifier, + "device_name": from_transport.device_name, + "device_manufacturer": from_transport.device_manufacturer, + "device_model": from_transport.device_model, + "device_serial_number": from_transport.device_serial_number, + "transport": from_transport.transport_name + }) + self._log.debug(f"Tags: {tags}") + + # Prepare fields (the actual data values) + fields = {} + for key, value in data.items(): + # Check if we should force float formatting based on protocol settings + should_force_float = False + unit_mod_found = None + + # Try to get registry entry from protocol settings to check unit_mod + if hasattr(from_transport, 'protocolSettings') and from_transport.protocolSettings: + # Check both input and holding registries + for registry_type in [Registry_Type.INPUT, Registry_Type.HOLDING]: + registry_map = from_transport.protocolSettings.get_registry_map(registry_type) + for entry in registry_map: + # Match by variable_name (which is lowercase) + if entry.variable_name.lower() == key.lower(): + unit_mod_found = entry.unit_mod + # If unit_mod is not 1.0, this value should be treated as float + if entry.unit_mod != 1.0: + should_force_float = True + self._log.debug(f"Variable {key} has unit_mod {entry.unit_mod}, forcing float format") + break + if should_force_float: + break + + # Try to convert to numeric values for InfluxDB + try: + # Try to convert to float first + float_val = float(value) + + # Always use float for InfluxDB to avoid type conflicts + # InfluxDB is strict about field types - once a field is created as integer, + # it must always be integer. Using float avoids this issue. + if self.force_float: + fields[key] = float_val + else: + # Only use integer if it's actually an integer and we're not forcing floats + if float_val.is_integer(): + fields[key] = int(float_val) + else: + fields[key] = float_val + + # Log data type conversion for debugging + if self._log.isEnabledFor(logging.DEBUG): + original_type = type(value).__name__ + final_type = type(fields[key]).__name__ + self._log.debug(f"Field {key}: {value} ({original_type}) -> {fields[key]} ({final_type}) [unit_mod: {unit_mod_found}]") + + except (ValueError, TypeError): + # If conversion fails, store as string + fields[key] = str(value) + self._log.debug(f"Field {key}: {value} -> string (conversion failed)") + + # Create InfluxDB point + point = self._create_influxdb_point(data, from_transport) + + # Add to batch + self.batch_points.append(point) + self._log.debug(f"Added point to batch. Batch size: {len(self.batch_points)}") + + # Check if we should flush the batch + current_time = time.time() + if (len(self.batch_points) >= self.batch_size or + (current_time - self.last_batch_time) >= self.batch_timeout): + self._log.debug(f"Flushing batch: size={len(self.batch_points)}, timeout={current_time - self.last_batch_time:.1f}s") + self._flush_batch() + + def _create_influxdb_point(self, data: dict[str, str], from_transport: transport_base): + """Create an InfluxDB point from data""" + # Prepare tags for InfluxDB + tags = {} + + # Add device information as tags if enabled + if self.include_device_info: + tags.update({ + "device_identifier": from_transport.device_identifier, + "device_name": from_transport.device_name, + "device_manufacturer": from_transport.device_manufacturer, + "device_model": from_transport.device_model, + "device_serial_number": from_transport.device_serial_number, + "transport": from_transport.transport_name + }) + + # Prepare fields (the actual data values) + fields = {} + for key, value in data.items(): + # Check if we should force float formatting based on protocol settings + should_force_float = False + unit_mod_found = None + + # Try to get registry entry from protocol settings to check unit_mod + if hasattr(from_transport, 'protocolSettings') and from_transport.protocolSettings: + # Check both input and holding registries + for registry_type in [Registry_Type.INPUT, Registry_Type.HOLDING]: + registry_map = from_transport.protocolSettings.get_registry_map(registry_type) + for entry in registry_map: + # Match by variable_name (which is lowercase) + if entry.variable_name.lower() == key.lower(): + unit_mod_found = entry.unit_mod + # If unit_mod is not 1.0, this value should be treated as float + if entry.unit_mod != 1.0: + should_force_float = True + break + if should_force_float: + break + + # Try to convert to numeric values for InfluxDB + try: + # Try to convert to float first + float_val = float(value) + + # Always use float for InfluxDB to avoid type conflicts + if self.force_float: + fields[key] = float_val + else: + # Only use integer if it's actually an integer and we're not forcing floats + if float_val.is_integer(): + fields[key] = int(float_val) + else: + fields[key] = float_val + + except (ValueError, TypeError): + # If conversion fails, store as string + fields[key] = str(value) + + # Create InfluxDB point + point = { + "measurement": self.measurement, + "tags": tags, + "fields": fields + } + + # Add timestamp if enabled + if self.include_timestamp: + point["time"] = int(time.time() * 1e9) # Convert to nanoseconds + + return point + + def _flush_batch(self): + """Flush the batch of points to InfluxDB""" + if not self.batch_points: + return + + # Check connection before attempting to write + if not self._check_connection(): + self._log.warning("Not connected to InfluxDB, storing batch in backlog") + # Store all points in backlog + for point in self.batch_points: + self._add_to_backlog(point) + self.batch_points = [] + return + + try: + self.client.write_points(self.batch_points) + self._log.info(f"Wrote {len(self.batch_points)} points to InfluxDB") + self.batch_points = [] + self.last_batch_time = time.time() + except Exception as e: + self._log.error(f"Failed to write batch to InfluxDB: {e}") + # Don't immediately mark as disconnected, try to reconnect first + if self._attempt_reconnect(): + # If reconnection successful, try to write again + try: + self.client.write_points(self.batch_points) + self._log.info(f"Successfully wrote {len(self.batch_points)} points to InfluxDB after reconnection") + self.batch_points = [] + self.last_batch_time = time.time() + except Exception as retry_e: + self._log.error(f"Failed to write batch after reconnection: {retry_e}") + # Store failed points in backlog + for point in self.batch_points: + self._add_to_backlog(point) + self.batch_points = [] + self.connected = False + else: + # Store failed points in backlog + for point in self.batch_points: + self._add_to_backlog(point) + self.batch_points = [] + self.connected = False + + def init_bridge(self, from_transport: transport_base): + """Initialize bridge - not needed for InfluxDB output""" + pass + + def __del__(self): + """Cleanup on destruction - flush any remaining points""" + if self.batch_points: + self._flush_batch() + if self.client: + try: + self.client.close() + except Exception: + pass \ No newline at end of file diff --git a/classes/transports/json_out.py b/classes/transports/json_out.py new file mode 100644 index 0000000..51fe6d4 --- /dev/null +++ b/classes/transports/json_out.py @@ -0,0 +1,109 @@ +import json +import sys +from configparser import SectionProxy +from typing import TextIO + +from defs.common import strtobool + +from ..protocol_settings import Registry_Type, WriteMode, registry_map_entry +from .transport_base import transport_base + + +class json_out(transport_base): + ''' JSON output transport that writes data to a file or stdout ''' + output_file: str = "stdout" + pretty_print: bool = True + append_mode: bool = False + include_timestamp: bool = True + include_device_info: bool = True + + file_handle: TextIO = None + + def __init__(self, settings: SectionProxy): + self.output_file = settings.get("output_file", fallback=self.output_file) + self.pretty_print = strtobool(settings.get("pretty_print", fallback=self.pretty_print)) + self.append_mode = strtobool(settings.get("append_mode", fallback=self.append_mode)) + self.include_timestamp = strtobool(settings.get("include_timestamp", fallback=self.include_timestamp)) + self.include_device_info = strtobool(settings.get("include_device_info", fallback=self.include_device_info)) + + self.write_enabled = True # JSON output is always write-enabled + super().__init__(settings) + + def connect(self): + """Initialize the output file handle""" + self._log.info("json_out connect") + + if self.output_file.lower() == "stdout": + self.file_handle = sys.stdout + else: + try: + mode = "a" if self.append_mode else "w" + self.file_handle = open(self.output_file, mode, encoding='utf-8') + self.connected = True + except Exception as e: + self._log.error(f"Failed to open output file {self.output_file}: {e}") + self.connected = False + return + + self.connected = True + + def write_data(self, data: dict[str, str], from_transport: transport_base): + """Write data as JSON to the output file""" + if not self.write_enabled or not self.connected: + return + + self._log.info(f"write data from [{from_transport.transport_name}] to json_out transport") + self._log.info(data) + + # Prepare the JSON output structure + output_data = {} + + # Add device information if enabled + if self.include_device_info: + output_data["device"] = { + "identifier": from_transport.device_identifier, + "name": from_transport.device_name, + "manufacturer": from_transport.device_manufacturer, + "model": from_transport.device_model, + "serial_number": from_transport.device_serial_number, + "transport": from_transport.transport_name + } + + # Add timestamp if enabled + if self.include_timestamp: + import time + output_data["timestamp"] = time.time() + + # Add the actual data + output_data["data"] = data + + # Convert to JSON + if self.pretty_print: + json_string = json.dumps(output_data, indent=2, ensure_ascii=False) + else: + json_string = json.dumps(output_data, ensure_ascii=False) + + # Write to file + try: + if self.output_file.lower() != "stdout": + # For files, add a newline and flush + self.file_handle.write(json_string + "\n") + self.file_handle.flush() + else: + # For stdout, just print + print(json_string) + except Exception as e: + self._log.error(f"Failed to write to output: {e}") + self.connected = False + + def init_bridge(self, from_transport: transport_base): + """Initialize bridge - not needed for JSON output""" + pass + + def __del__(self): + """Cleanup file handle on destruction""" + if self.file_handle and self.output_file.lower() != "stdout": + try: + self.file_handle.close() + except: + pass \ No newline at end of file diff --git a/classes/transports/modbus_base.py b/classes/transports/modbus_base.py index d040746..f365813 100644 --- a/classes/transports/modbus_base.py +++ b/classes/transports/modbus_base.py @@ -53,11 +53,10 @@ class modbus_base(transport_base): def __init__(self, settings : "SectionProxy", protocolSettings : "protocol_settings" = None): super().__init__(settings) - self.analyze_protocol_enabled = settings.getboolean("analyze_protocol", fallback=self.analyze_protocol_enabled) + self.analyze_protocol_enabled = settings.getboolean(option="analyze_protocol", fallback=self.analyze_protocol_enabled) self.analyze_protocol_save_load = settings.getboolean("analyze_protocol_save_load", fallback=self.analyze_protocol_save_load) - - #get defaults from protocol settings + # get defaults from protocol settings if "send_input_register" in self.protocolSettings.settings: self.send_input_register = strtobool(self.protocolSettings.settings["send_input_register"]) @@ -67,17 +66,13 @@ def __init__(self, settings : "SectionProxy", protocolSettings : "protocol_setti if "batch_delay" in self.protocolSettings.settings: self.modbus_delay = float(self.protocolSettings.settings["batch_delay"]) - #allow enable/disable of which registers to send + # allow enable/disable of which registers to send self.send_holding_register = settings.getboolean("send_holding_register", fallback=self.send_holding_register) self.send_input_register = settings.getboolean("send_input_register", fallback=self.send_input_register) self.modbus_delay = settings.getfloat(["batch_delay", "modbus_delay"], fallback=self.modbus_delay) self.modbus_delay_setting = self.modbus_delay - - if self.analyze_protocol_enabled: - self.connect() - self.analyze_protocol() - quit() + # Note: Connection and analyze_protocol will be called after subclass initialization is complete def init_after_connect(self): #from transport_base settings @@ -95,9 +90,18 @@ def connect(self): self.init_after_connect() def read_serial_number(self) -> str: + # First try to read "Serial Number" from input registers (for protocols like EG4 v58) + self._log.info("Looking for serial_number variable in input registers...") + serial_number = str(self.read_variable("Serial Number", Registry_Type.INPUT)) + self._log.info("read SN from input registers: " + serial_number) + if serial_number and serial_number != "None": + return serial_number + + # Then try holding registers (for other protocols) + self._log.info("Looking for serial_number variable in holding registers...") serial_number = str(self.read_variable("Serial Number", Registry_Type.HOLDING)) - self._log.info("read SN: " +serial_number) - if serial_number: + self._log.info("read SN from holding registers: " + serial_number) + if serial_number and serial_number != "None": return serial_number sn2 = "" @@ -121,8 +125,8 @@ def read_serial_number(self) -> str: time.sleep(self.modbus_delay*2) #sleep inbetween requests so modbus can rest - print(sn2) - print(sn3) + self._log.debug(f"Serial number sn2: {sn2}") + self._log.debug(f"Serial number sn3: {sn3}") if not re.search("[^a-zA-Z0-9_]", sn2) : serial_number = sn2 @@ -136,15 +140,27 @@ def enable_write(self): self._log.info("Validating Protocol for Writing") self.write_enabled = False - score_percent = self.validate_protocol(Registry_Type.HOLDING) - if(score_percent > 90): - self.write_enabled = True - self._log.warning("enable write - validation passed") - elif self.write_mode == TransportWriteMode.RELAXED: - self.write_enabled = True - self._log.warning("enable write - WARNING - RELAXED MODE") - else: - self._log.error("enable write FAILED - WRITE DISABLED") + + # Add a small delay to ensure device is ready, especially during initialization + time.sleep(self.modbus_delay * 2) + + try: + score_percent = self.validate_protocol(Registry_Type.HOLDING) + if(score_percent > 90): + self.write_enabled = True + self._log.warning("enable write - validation passed") + elif self.write_mode == TransportWriteMode.RELAXED: + self.write_enabled = True + self._log.warning("enable write - WARNING - RELAXED MODE") + else: + self._log.error("enable write FAILED - WRITE DISABLED") + except Exception as e: + self._log.error(f"enable write FAILED due to error: {str(e)}") + if self.write_mode == TransportWriteMode.RELAXED: + self.write_enabled = True + self._log.warning("enable write - WARNING - RELAXED MODE (due to validation error)") + else: + self._log.error("enable write FAILED - WRITE DISABLED") @@ -267,8 +283,8 @@ def analyze_protocol(self, settings_dir : str = "protocols"): else: #perform registry scan ##batch_size = 1, read registers one by one; if out of bound. it just returns error - input_registry = self.read_modbus_registers(start=0, end=max_input_register, batch_size=45, registry_type=Registry_Type.INPUT) - holding_registry = self.read_modbus_registers(start=0, end=max_holding_register, batch_size=45, registry_type=Registry_Type.HOLDING) + input_registry = self.read_modbus_registers(start=0, end=max_input_register, registry_type=Registry_Type.INPUT) + holding_registry = self.read_modbus_registers(start=0, end=max_holding_register, registry_type=Registry_Type.HOLDING) if self.analyze_protocol_save_load: #save results if enabled with open(input_save_path, "w") as file: @@ -278,14 +294,14 @@ def analyze_protocol(self, settings_dir : str = "protocols"): json.dump(holding_registry, file) #print results for debug - print("=== START INPUT REGISTER ===") + self._log.debug("=== START INPUT REGISTER ===") if input_registry: - print([(key, value) for key, value in input_registry.items()]) - print("=== END INPUT REGISTER ===") - print("=== START HOLDING REGISTER ===") + self._log.debug([(key, value) for key, value in input_registry.items()]) + self._log.debug("=== END INPUT REGISTER ===") + self._log.debug("=== START HOLDING REGISTER ===") if holding_registry: - print([(key, value) for key, value in holding_registry.items()]) - print("=== END HOLDING REGISTER ===") + self._log.debug([(key, value) for key, value in holding_registry.items()]) + self._log.debug("=== END HOLDING REGISTER ===") #very well possible the registers will be incomplete due to different hardware sizes #so dont assume they are set / complete @@ -369,15 +385,15 @@ def evaluate_score(entry : registry_map_entry, val): #print scores for name in sorted(protocol_scores, key=protocol_scores.get, reverse=True): - print("=== "+str(name)+" - "+str(protocol_scores[name])+" ===") - print("input register score: " + str(input_register_score[name]) + "; valid registers: "+str(input_valid_count[name])+" of " + str(len(protocols[name].get_registry_map(Registry_Type.INPUT)))) - print("holding register score : " + str(holding_register_score[name]) + "; valid registers: "+str(holding_valid_count[name])+" of " + str(len(protocols[name].get_registry_map(Registry_Type.HOLDING)))) + self._log.debug("=== "+str(name)+" - "+str(protocol_scores[name])+" ===") + self._log.debug("input register score: " + str(input_register_score[name]) + "; valid registers: "+str(input_valid_count[name])+" of " + str(len(protocols[name].get_registry_map(Registry_Type.INPUT)))) + self._log.debug("holding register score : " + str(holding_register_score[name]) + "; valid registers: "+str(holding_valid_count[name])+" of " + str(len(protocols[name].get_registry_map(Registry_Type.HOLDING)))) def write_variable(self, entry : registry_map_entry, value : str, registry_type : Registry_Type = Registry_Type.HOLDING): """ writes a value to a ModBus register; todo: registry_type to handle other write functions""" - value = value.strip() + value = value.strip().lower() temp_map = [entry] ranges = self.protocolSettings.calculate_registry_ranges(temp_map, self.protocolSettings.registry_map_size[registry_type], init=True) #init=True to bypass timechecks @@ -387,6 +403,11 @@ def write_variable(self, entry : registry_map_entry, value : str, registry_type #current_registers = self.read_modbus_registers(start=entry.register, end=entry.register, registry_type=registry_type) #current_value = current_registers[entry.register] current_value = info[entry.variable_name] + + + #handle codes + value = self.protocolSettings.get_code_by_value(entry, value, fallback=value) + current_value = self.protocolSettings.get_code_by_value(entry, current_value, fallback=current_value) if not self.write_mode == TransportWriteMode.UNSAFE: if not self.protocolSettings.validate_registry_entry(entry, current_value): @@ -397,14 +418,6 @@ def write_variable(self, entry : registry_map_entry, value : str, registry_type if not self.protocolSettings.validate_registry_entry(entry, value): return self._log.error(f"WRITE_ERROR: Invalid new value, '{value}'. Unsafe to write") - #handle codes - if entry.variable_name+"_codes" in self.protocolSettings.codes: - codes = self.protocolSettings.codes[entry.variable_name+"_codes"] - for key, val in codes.items(): - if val == value: #convert "string" to key value - value = key - break - #apply unit_mod before writing. if entry.unit_mod != 1: value = int(float(value) / entry.unit_mod) # say unitmod is 0.1. 105*0.1 = 10.5. 10.5 / 0.1 = 105. @@ -504,9 +517,20 @@ def read_variable(self, variable_name : str, registry_type : Registry_Type, entr results = self.protocolSettings.process_registery(registers, registry_map) return results[entry.variable_name] - def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, end : int = None, batch_size : int = 45, registry_type : Registry_Type = Registry_Type.INPUT ) -> dict: + def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, end : int = None, batch_size : int = None, registry_type : Registry_Type = Registry_Type.INPUT ) -> dict: ''' maybe move this to transport_base ?''' + # Get batch_size from protocol settings if not provided + if batch_size is None: + if hasattr(self, 'protocolSettings') and self.protocolSettings: + batch_size = self.protocolSettings.settings.get("batch_size", 45) + try: + batch_size = int(batch_size) + except (ValueError, TypeError): + batch_size = 45 + else: + batch_size = 45 + if not ranges: #ranges is empty, use min max if start == 0 and end is None: return {} #empty @@ -533,22 +557,24 @@ def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, en time.sleep(self.modbus_delay) #sleep for 1ms to give bus a rest #manual recommends 1s between commands isError = False + register = None # Initialize register variable try: register = self.read_registers(range[0], range[1], registry_type=registry_type) except ModbusIOException as e: - self._log.error("ModbusIOException : ", e.error_code) - if e.error_code == 4: #if no response; probably time out. retry with increased delay - isError = True - else: - isError = True #other erorrs. ie Failed to connect[ModbusSerialClient(rtu baud[9600])] + self._log.error("ModbusIOException: " + str(e)) + # In pymodbus 3.7+, ModbusIOException doesn't have error_code attribute + # Treat all ModbusIOException as retryable errors + isError = True - if isinstance(register, bytes) or register.isError() or isError: #sometimes weird errors are handled incorrectly and response is a ascii error string - if isinstance(register, bytes): + if register is None or isinstance(register, bytes) or (hasattr(register, 'isError') and register.isError()) or isError: #sometimes weird errors are handled incorrectly and response is a ascii error string + if register is None: + self._log.error("No response received from modbus device") + elif isinstance(register, bytes): self._log.error(register.decode("utf-8")) else: - self._log.error(register.__str__) + self._log.error(str(register)) self.modbus_delay += self.modbus_delay_increament #increase delay, error is likely due to modbus being busy if self.modbus_delay > 60: #max delay. 60 seconds between requests should be way over kill if it happens @@ -573,12 +599,13 @@ def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, en if retry < 0: retry = 0 - - #combine registers into "registry" - i = -1 - while(i := i + 1 ) < range[1]: - #print(str(i) + " => " + str(i+range[0])) - registry[i+range[0]] = register.registers[i] + # Only process registers if we have a valid response + if register is not None and hasattr(register, 'registers') and register.registers is not None: + #combine registers into "registry" + i = -1 + while(i := i + 1 ) < range[1]: + #print(str(i) + " => " + str(i+range[0])) + registry[i+range[0]] = register.registers[i] return registry diff --git a/classes/transports/modbus_rtu.py b/classes/transports/modbus_rtu.py index d4f9147..c3d424a 100644 --- a/classes/transports/modbus_rtu.py +++ b/classes/transports/modbus_rtu.py @@ -38,7 +38,7 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings if "baud" in self.protocolSettings.settings: self.baudrate = strtoint(self.protocolSettings.settings["baud"]) - + #todo better baud/baudrate alias handling self.baudrate = settings.getint("baudrate", self.baudrate) address : int = settings.getint("address", 0) @@ -58,6 +58,8 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings self.client = modbus_base.clients[client_str] return + self._log.debug(f"Creating new client with baud rate: {self.baudrate}") + if "method" in init_signature.parameters: self.client = ModbusSerialClient(method="rtu", port=self.port, baudrate=int(self.baudrate), @@ -102,4 +104,5 @@ def write_register(self, register : int, value : int, **kwargs): def connect(self): self.connected = self.client.connect() + self._log.debug(f"Modbus rtu connected: {self.connected}") super().connect() diff --git a/classes/transports/modbus_tcp.py b/classes/transports/modbus_tcp.py index 594dda9..26dc9a8 100644 --- a/classes/transports/modbus_tcp.py +++ b/classes/transports/modbus_tcp.py @@ -66,9 +66,9 @@ def read_registers(self, start, count=1, registry_type : Registry_Type = Registr kwargs["slave"] = kwargs.pop("unit") if registry_type == Registry_Type.INPUT: - return self.client.read_input_registers(start, count, **kwargs ) + return self.client.read_input_registers(start,count=count, **kwargs ) elif registry_type == Registry_Type.HOLDING: - return self.client.read_holding_registers(start, count, **kwargs) + return self.client.read_holding_registers(start,count=count, **kwargs) def connect(self): self.connected = self.client.connect() diff --git a/classes/transports/modbus_tls.py b/classes/transports/modbus_tls.py index c7e1f3d..4977888 100644 --- a/classes/transports/modbus_tls.py +++ b/classes/transports/modbus_tls.py @@ -46,9 +46,9 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings def read_registers(self, start, count=1, registry_type : Registry_Type = Registry_Type.INPUT, **kwargs): if registry_type == Registry_Type.INPUT: - return self.client.read_input_registers(start, count, **kwargs) + return self.client.read_input_registers(start, count=count, **kwargs) elif registry_type == Registry_Type.HOLDING: - return self.client.read_holding_registers(start, count, **kwargs) + return self.client.read_holding_registers(start, count=count, **kwargs) def connect(self): self.connected = self.client.connect() diff --git a/classes/transports/modbus_udp.py b/classes/transports/modbus_udp.py index a3bfef6..8b96cde 100644 --- a/classes/transports/modbus_udp.py +++ b/classes/transports/modbus_udp.py @@ -25,9 +25,9 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings def read_registers(self, start, count=1, registry_type : Registry_Type = Registry_Type.INPUT, **kwargs): if registry_type == Registry_Type.INPUT: - return self.client.read_input_registers(start, count, **kwargs) + return self.client.read_input_registers(start, count=count, **kwargs) elif registry_type == Registry_Type.HOLDING: - return self.client.read_holding_registers(start, count, **kwargs) + return self.client.read_holding_registers(start, count=count, **kwargs) def connect(self): self.connected = self.client.connect() diff --git a/classes/transports/pace.py b/classes/transports/pace.py index 927a92c..c8c088e 100644 --- a/classes/transports/pace.py +++ b/classes/transports/pace.py @@ -309,9 +309,9 @@ def __init__(self, settings : dict[str,str]): def read_registers(self, start, count=1, registry_type : Registry_Type = Registry_Type.INPUT, **kwargs): if registry_type == Registry_Type.INPUT: - return self.client.read_input_registers(start, count, **kwargs) + return self.client.read_input_registers(start, count=count, **kwargs) elif registry_type == Registry_Type.HOLDING: - return self.client.read_holding_registers(start, count, **kwargs) + return self.client.read_holding_registers(start, count=count, **kwargs) time.sleep(4) diff --git a/classes/transports/transport_base.py b/classes/transports/transport_base.py index a0c33b8..dc45fa1 100644 --- a/classes/transports/transport_base.py +++ b/classes/transports/transport_base.py @@ -1,5 +1,5 @@ -from enum import Enum import logging +from enum import Enum from typing import TYPE_CHECKING, Callable from classes.protocol_settings import ( @@ -62,7 +62,7 @@ class transport_base: device_model : str = "hotnoob" device_identifier : str = "hotnoob" bridge : str = "" - + write_enabled : bool = False ''' deprecated -- use / move to write_mode''' write_mode : TransportWriteMode = None @@ -98,7 +98,7 @@ def __init__(self, settings : "SectionProxy") -> None: self.device_name = settings.get(["device_name", "name"], fallback=self.device_manufacturer+"_"+self.device_serial_number) self.bridge = settings.get("bridge", self.bridge) self.read_interval = settings.getfloat("read_interval", self.read_interval) - self.max_precision = settings.getint(["max_precision", "precision"], self.max_precision) + self.max_precision = settings.getint(["max_precision", "precision"], fallback=self.max_precision) if "write_enabled" in settings or "enable_write" in settings: self.write_enabled = settings.getboolean(["write_enabled", "enable_write"], self.write_enabled) @@ -110,7 +110,7 @@ def __init__(self, settings : "SectionProxy") -> None: #load a protocol_settings class for every transport; required for adv features. ie, variable timing. #must load after settings - self.protocol_version = settings.get("protocol_version") + self.protocol_version = settings.get("protocol_version", fallback='') if self.protocol_version: self.protocolSettings = protocol_settings(self.protocol_version, transport_settings=settings) diff --git a/config.cfg.example b/config.cfg.example index c2600e1..d498bdf 100644 --- a/config.cfg.example +++ b/config.cfg.example @@ -1,18 +1,16 @@ [general] +# Global logging level (DEBUG, INFO, WARNING, ERROR) log_level = DEBUG [transport.0] #name must be unique, ie: transport.modbus -#logging level for transport +# Logging level specific to this transport log_level = DEBUG #rs485 / modbus device #protocol config files are located in protocols/ protocol_version = v0.14 -analyze_protocol = false -write = false -#in -#was unit +# Modbus address address = 1 port = {{serial port, likely /dev/ttyUSB0}} baudrate = 9600 @@ -20,29 +18,46 @@ baudrate = 9600 #modbus tcp/tls/udp example #host = 192.168.0.7 #port = 502 -#override protocol reader +#override protocol's / transport type #transport = modbus_tcp -#the 'transport' that we want to share this with +# The 'transport' that we want to share this with bridge = transport.1 +# Device identity (for MQTT topic structure or HA discovery) manufacturer = HDHK model = HDHK 16CH AC -#optional; leave blank to autofetch serial from device +# Optional; auto-detect if omitted serial_number = HDHK777 +# How often read (in seconds) +# interplays with per register read timings: https://github.com/HotNoob/PythonProtocolGateway/blob/main/documentation/usage/creating_and_editing_protocols.md#read-interval read_interval = 10 +#advanced users only - see https://github.com/HotNoob/PythonProtocolGateway/blob/main/documentation/usage/transports.md#writing +write = false + +# incomplete feature to help identify which protocol to use +# will only "analyze" if enabled +analyze_protocol = false [transport.1] -#connect mqtt +# Set transport type to MQTT transport=mqtt + +# MQTT broker settings host = {{mqtt ip / host}} port = 1883 user = {{mqtt username here}} pass = {{mqtt password}} + +# MQTT topic settings base_topic = home/inverter/ error_topic = /error -json = false + +# Home Assistant discovery settings discovery_enabled = true -discovery_topic = homeassistant \ No newline at end of file +discovery_topic = homeassistant + +# If true, values are sent in JSON format +json = false \ No newline at end of file diff --git a/config.influxdb.example b/config.influxdb.example new file mode 100644 index 0000000..da44363 --- /dev/null +++ b/config.influxdb.example @@ -0,0 +1,42 @@ +# +[influxdb_output] +transport = influxdb_out +host = localhost +port = 8086 +database = solar +username = +password = +measurement = device_data +include_timestamp = true +include_device_info = true +batch_size = 100 +batch_timeout = 10.0 +log_level = INFO + +# Connection monitoring settings (optional) +reconnect_attempts = 5 +reconnect_delay = 5.0 +connection_timeout = 10 + +# Exponential backoff settings (optional) +use_exponential_backoff = true +max_reconnect_delay = 300.0 + +# Periodic reconnection settings (optional) +periodic_reconnect_interval = 14400.0 + +# Persistent storage for long-term outages (optional) +enable_persistent_storage = true +persistent_storage_path = influxdb_backlog +max_backlog_size = 10000 +max_backlog_age = 86400 + +# Example bridge configuration +[modbus_rtu_source] +type = modbus_rtu +port = /dev/ttyUSB0 +baudrate = 9600 +protocol_version = growatt_2020_v1.24 +device_serial_number = 123456789 +bridge = influxdb_output +# diff --git a/config.json_out.example b/config.json_out.example new file mode 100644 index 0000000..9772b9a --- /dev/null +++ b/config.json_out.example @@ -0,0 +1,43 @@ +[general] +log_level = INFO + +[transport.modbus_input] +# Modbus input transport - reads data from device +protocol_version = v0.14 +address = 1 +port = /dev/ttyUSB0 +baudrate = 9600 +bridge = transport.json_output +read_interval = 10 + +manufacturer = TestDevice +model = Test Model +serial_number = TEST123 + +[transport.json_output] +# JSON output transport - writes data to stdout +transport = json_out +output_file = stdout +pretty_print = true +include_timestamp = true +include_device_info = true + +# Alternative configurations (uncomment to use): + +# [transport.json_file] +# # JSON output to file +# transport = json_out +# output_file = /var/log/inverter_data.json +# pretty_print = false +# append_mode = true +# include_timestamp = true +# include_device_info = false + +# [transport.json_compact] +# # Compact JSON output +# transport = json_out +# output_file = /tmp/compact_data.json +# pretty_print = false +# append_mode = false +# include_timestamp = true +# include_device_info = false \ No newline at end of file diff --git a/defs/common.py b/defs/common.py index 7c5a82f..fb20b77 100644 --- a/defs/common.py +++ b/defs/common.py @@ -1,6 +1,7 @@ +import os import re -import serial.tools.list_ports +from serial.tools import list_ports def strtobool (val): @@ -46,13 +47,27 @@ def strtoint(val : str) -> int: return int(val) def get_usb_serial_port_info(port : str = "") -> str: - for p in serial.tools.list_ports.comports(): + + # If port is a symlink + if os.path.islink(port): + port = os.path.realpath(port) + + for p in list_ports.comports(): #from serial.tools if str(p.device).upper() == port.upper(): - return "["+hex(p.vid)+":"+hex(p.pid)+":"+str(p.serial_number)+":"+str(p.location)+"]" + vid = hex(p.vid) if p.vid is not None else "" + pid = hex(p.pid) if p.pid is not None else "" + serial = str(p.serial_number) if p.serial_number is not None else "" + location = str(p.location) if p.location is not None else "" + return "["+vid+":"+pid+":"+serial+":"+location+"]" return "" def find_usb_serial_port(port : str = "", vendor_id : str = "", product_id : str = "", serial_number : str = "", location : str = "") -> str: + + # If port is a symlink + if os.path.islink(port): + port = os.path.realpath(port) + if not port.startswith("["): return port @@ -65,7 +80,7 @@ def find_usb_serial_port(port : str = "", vendor_id : str = "", product_id : st serial_number = match.group("serial") if match.group("serial") else "" location = match.group("location") if match.group("location") else "" - for port in serial.tools.list_ports.comports(): + for port in list_ports.comports(): #from serial.tools if ((not vendor_id or port.vid == vendor_id) and ( not product_id or port.pid == product_id) and ( not serial_number or port.serial_number == serial_number) and diff --git a/documentation/README.md b/documentation/README.md index f756603..53ce9b3 100644 --- a/documentation/README.md +++ b/documentation/README.md @@ -32,6 +32,7 @@ This README file contains an index of all files in the documentation directory. - [modbus_rtu_to_modbus_tcp.md](usage/configuration_examples/modbus_rtu_to_modbus_tcp.md) - ModBus RTU to ModBus TCP - [modbus_rtu_to_mqtt.md](usage/configuration_examples/modbus_rtu_to_mqtt.md) - ModBus RTU to MQTT +- [influxdb_example.md](usage/configuration_examples/influxdb_example.md) - ModBus RTU to InfluxDB **3rdparty** diff --git a/documentation/installation/docker.md b/documentation/installation/docker.md new file mode 100644 index 0000000..e640ab7 --- /dev/null +++ b/documentation/installation/docker.md @@ -0,0 +1,9 @@ +### Use Docker +- ```docker build . -t protocol_gateway ``` +- ```docker run --device=/dev/ttyUSB0 protocol_gateway``` + +### Use Docker Image +- ``` docker pull hotn00b/pythonprotocolgateway ``` +- ```docker run -v $(pwd)/config.cfg:/app/config.cfg --device=/dev/ttyUSB0 hotn00b/pythonprotocolgateway``` + +[Docker Image Repo](https://hub.docker.com/r/hotn00b/pythonprotocolgateway) \ No newline at end of file diff --git a/documentation/installation/pip.md b/documentation/installation/pip.md new file mode 100644 index 0000000..c9a235f --- /dev/null +++ b/documentation/installation/pip.md @@ -0,0 +1,13 @@ +https://pypi.org/project/python-protocol-gateway/ + +``` +pip install python-protocol-gateway==1.1.9 +``` + +usage: +``` +ppg +``` +``` +ppg config.cfg +``` \ No newline at end of file diff --git a/documentation/installation/script.md b/documentation/installation/script.md new file mode 100644 index 0000000..7c4cdf6 --- /dev/null +++ b/documentation/installation/script.md @@ -0,0 +1,14 @@ +Install as standalone script + +``` +apt install pip python3 -y +pip install -r requirements.txt +``` + +``` +python3 -u ppg.py +``` + +``` +python3 -u ppg.py config.cfg +``` \ No newline at end of file diff --git a/documentation/usage/configuration_examples/canbus_to_mqtt.md b/documentation/usage/configuration_examples/canbus_to_mqtt.md index 55a283c..ec89dab 100644 --- a/documentation/usage/configuration_examples/canbus_to_mqtt.md +++ b/documentation/usage/configuration_examples/canbus_to_mqtt.md @@ -1,3 +1,6 @@ +![ppg canbus diagram drawio](https://github.com/user-attachments/assets/17d1ea02-2414-4289-b295-cd5099679cba) + + ### CanBus to MQTT ``` [general] diff --git a/documentation/usage/configuration_examples/influxdb_example.md b/documentation/usage/configuration_examples/influxdb_example.md new file mode 100644 index 0000000..eeeb4e9 --- /dev/null +++ b/documentation/usage/configuration_examples/influxdb_example.md @@ -0,0 +1,217 @@ +# InfluxDB Output Transport + +The InfluxDB output transport allows you to send data from your devices directly to an InfluxDB v1 server for time-series data storage and visualization. + +## Features + +- **Batch Writing**: Efficiently batches data points to reduce network overhead +- **Automatic Database Creation**: Creates the database if it doesn't exist +- **Device Information Tags**: Includes device metadata as InfluxDB tags for easy querying +- **Flexible Data Types**: Automatically converts data to appropriate InfluxDB field types +- **Configurable Timeouts**: Adjustable batch size and timeout settings +- **Connection Monitoring**: Automatic connection health checks and reconnection logic +- **Robust Error Handling**: Retries failed writes after reconnection attempts + +## Configuration + +### Basic Configuration + +```ini +[influxdb_output] +type = influxdb_out +host = localhost +port = 8086 +database = solar +measurement = device_data +``` + +### Advanced Configuration + +```ini +[influxdb_output] +type = influxdb_out +host = localhost +port = 8086 +database = solar +username = admin +password = your_password +measurement = device_data +include_timestamp = true +include_device_info = true +batch_size = 100 +batch_timeout = 10.0 +log_level = INFO + +# Connection monitoring settings +reconnect_attempts = 5 +reconnect_delay = 5.0 +connection_timeout = 10 +``` + +### Configuration Options + +| Option | Default | Description | +|--------|---------|-------------| +| `host` | `localhost` | InfluxDB server hostname or IP address | +| `port` | `8086` | InfluxDB server port | +| `database` | `solar` | Database name (will be created if it doesn't exist) | +| `username` | `` | Username for authentication (optional) | +| `password` | `` | Password for authentication (optional) | +| `measurement` | `device_data` | InfluxDB measurement name | +| `include_timestamp` | `true` | Include timestamp in data points | +| `include_device_info` | `true` | Include device information as tags | +| `batch_size` | `100` | Number of points to batch before writing | +| `batch_timeout` | `10.0` | Maximum time (seconds) to wait before flushing batch | +| `reconnect_attempts` | `5` | Number of reconnection attempts before giving up | +| `reconnect_delay` | `5.0` | Delay between reconnection attempts (seconds) | +| `connection_timeout` | `10` | Connection timeout for InfluxDB client (seconds) | + +## Connection Monitoring + +The InfluxDB transport includes robust connection monitoring to handle network issues and server restarts: + +### Automatic Health Checks +- Performs connection health checks every 30 seconds +- Uses InfluxDB ping command to verify connectivity +- Automatically attempts reconnection if connection is lost + +### Reconnection Logic +- Attempts reconnection up to `reconnect_attempts` times +- Waits `reconnect_delay` seconds between attempts +- Preserves buffered data during reconnection attempts +- Retries failed writes after successful reconnection + +### Error Recovery +- Gracefully handles network timeouts and connection drops +- Maintains data integrity by not losing buffered points +- Provides detailed logging for troubleshooting + +## Data Structure + +The InfluxDB output creates data points with the following structure: + +### Tags (if `include_device_info = true`) +- `device_identifier`: Device serial number (lowercase) +- `device_name`: Device name +- `device_manufacturer`: Device manufacturer +- `device_model`: Device model +- `device_serial_number`: Device serial number +- `transport`: Source transport name + +### Fields +All device data values are stored as fields. The transport automatically converts: +- Numeric strings to integers or floats +- Non-numeric strings remain as strings + +### Time +- Uses current timestamp in nanoseconds (if `include_timestamp = true`) +- Can be disabled for custom timestamp handling + +## Example Bridge Configuration + +```ini +# Source device (e.g., Modbus RTU) +[growatt_inverter] +type = modbus_rtu +port = /dev/ttyUSB0 +baudrate = 9600 +protocol_version = growatt_2020_v1.24 +device_serial_number = 123456789 +device_manufacturer = Growatt +device_model = SPH3000 +bridge = influxdb_output + +# InfluxDB output +[influxdb_output] +type = influxdb_out +host = localhost +port = 8086 +database = solar +measurement = inverter_data +``` + +## Installation + +1. Install the required dependency: + ```bash + pip install influxdb + ``` + +2. Or add to your requirements.txt: + ``` + influxdb + ``` + +## InfluxDB Setup + +1. Install InfluxDB v1: + ```bash + # Ubuntu/Debian + sudo apt install influxdb influxdb-client + sudo systemctl enable influxdb + sudo systemctl start influxdb + + # Or download from https://portal.influxdata.com/downloads/ + ``` + +2. Create a database (optional - will be created automatically): + ```bash + echo "CREATE DATABASE solar" | influx + ``` + +## Querying Data + +Once data is flowing, you can query it using InfluxDB's SQL-like query language: + +```sql +-- Show all measurements +SHOW MEASUREMENTS + +-- Query recent data +SELECT * FROM device_data WHERE time > now() - 1h + +-- Query specific device +SELECT * FROM device_data WHERE device_identifier = '123456789' + +-- Aggregate data +SELECT mean(value) FROM device_data WHERE field_name = 'battery_voltage' GROUP BY time(5m) +``` + +## Integration with Grafana + +InfluxDB data can be easily visualized in Grafana: + +1. Add InfluxDB as a data source in Grafana +2. Use the same connection details as your configuration +3. Create dashboards using InfluxDB queries + +## Troubleshooting + +### Connection Issues +- Verify InfluxDB is running: `systemctl status influxdb` +- Check firewall settings for port 8086 +- Verify host and port configuration +- Check connection timeout settings if using slow networks + +### Authentication Issues +- Ensure username/password are correct +- Check InfluxDB user permissions + +### Data Not Appearing +- Check log levels for detailed error messages +- Verify database exists and is accessible +- Check batch settings - data may be buffered +- Look for reconnection messages in logs + +### Data Stops After Some Time +- **Most Common Issue**: Network connectivity problems or InfluxDB server restarts +- Check logs for reconnection attempts and failures +- Verify InfluxDB server is stable and not restarting +- Consider increasing `reconnect_attempts` and `reconnect_delay` for unstable networks +- Monitor network connectivity between gateway and InfluxDB server + +### Performance +- Adjust `batch_size` and `batch_timeout` for your use case +- Larger batches reduce network overhead but increase memory usage +- Shorter timeouts provide more real-time data but increase network traffic +- Increase `connection_timeout` for slow networks \ No newline at end of file diff --git a/documentation/usage/configuration_examples/json_out_example.md b/documentation/usage/configuration_examples/json_out_example.md new file mode 100644 index 0000000..14d04af --- /dev/null +++ b/documentation/usage/configuration_examples/json_out_example.md @@ -0,0 +1,144 @@ +# JSON Output Transport + +The `json_out` transport outputs data in JSON format to either a file or stdout. This is useful for logging, debugging, or integrating with other systems that consume JSON data. + +## Configuration + +### Basic Configuration + +```ini +[transport.json_output] +transport = json_out +# Output to stdout (default) +output_file = stdout +# Pretty print the JSON (default: true) +pretty_print = true +# Include timestamp in output (default: true) +include_timestamp = true +# Include device information (default: true) +include_device_info = true +``` + +### File Output Configuration + +```ini +[transport.json_output] +transport = json_out +# Output to a file +output_file = /path/to/output.json +# Append to file instead of overwriting (default: false) +append_mode = false +pretty_print = true +include_timestamp = true +include_device_info = true +``` + +### Bridged Configuration Example + +```ini +[transport.modbus_input] +# Modbus input transport +protocol_version = v0.14 +address = 1 +port = /dev/ttyUSB0 +baudrate = 9600 +bridge = transport.json_output +read_interval = 10 + +[transport.json_output] +# JSON output transport +transport = json_out +output_file = /var/log/inverter_data.json +pretty_print = false +append_mode = true +include_timestamp = true +include_device_info = true +``` + +## Configuration Options + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `output_file` | string | `stdout` | Output destination. Use `stdout` for console output or a file path | +| `pretty_print` | boolean | `true` | Whether to format JSON with indentation | +| `append_mode` | boolean | `false` | Whether to append to file instead of overwriting | +| `include_timestamp` | boolean | `true` | Whether to include Unix timestamp in output | +| `include_device_info` | boolean | `true` | Whether to include device metadata in output | + +## Output Format + +The JSON output includes the following structure: + +```json +{ + "device": { + "identifier": "device_serial", + "name": "Device Name", + "manufacturer": "Manufacturer", + "model": "Model", + "serial_number": "Serial Number", + "transport": "transport_name" + }, + "timestamp": 1703123456.789, + "data": { + "variable_name": "value", + "another_variable": "another_value" + } +} +``` + +### Compact Output Example + +With `pretty_print = false` and `include_device_info = false`: + +```json +{"timestamp":1703123456.789,"data":{"battery_voltage":"48.5","battery_current":"2.1"}} +``` + +### File Output with Append Mode + +When using `append_mode = true`, each data read will be written as a separate JSON object on a new line, making it suitable for log files or streaming data processing. + +## Use Cases + +1. **Debugging**: Output data to console for real-time monitoring +2. **Logging**: Write data to log files for historical analysis +3. **Integration**: Feed data to other systems that consume JSON +4. **Data Collection**: Collect data for analysis or backup purposes + +## Examples + +### Console Output for Debugging + +```ini +[transport.debug_output] +transport = json_out +output_file = stdout +pretty_print = true +include_timestamp = true +include_device_info = true +``` + +### Log File for Data Collection + +```ini +[transport.data_log] +transport = json_out +output_file = /var/log/inverter_data.log +pretty_print = false +append_mode = true +include_timestamp = true +include_device_info = false +``` + +### Compact File Output + +```ini +[transport.compact_output] +transport = json_out +output_file = /tmp/inverter_data.json +pretty_print = false +append_mode = false +include_timestamp = true +include_device_info = false +``` \ No newline at end of file diff --git a/documentation/usage/configuration_examples/modbus_rtu_to_mqtt.md b/documentation/usage/configuration_examples/modbus_rtu_to_mqtt.md index da93887..497b6ac 100644 --- a/documentation/usage/configuration_examples/modbus_rtu_to_mqtt.md +++ b/documentation/usage/configuration_examples/modbus_rtu_to_mqtt.md @@ -1,3 +1,6 @@ +![ppg modbus flow drawio](https://github.com/user-attachments/assets/d9d59dc3-2a0a-4b34-8db7-ac054dccc67e) + + ### ModBus RTU to MQTT ``` [general] diff --git a/documentation/usage/creating_and_editing_protocols.md b/documentation/usage/creating_and_editing_protocols.md index 3fa2bfa..1705394 100644 --- a/documentation/usage/creating_and_editing_protocols.md +++ b/documentation/usage/creating_and_editing_protocols.md @@ -35,7 +35,21 @@ Defines the expected data type for the register / map entry | 32BIT_FLAGS | four bytes split into 32 bit flags. see 16BIT_FLAGS | #bit | A unsigned number comprised of # of bits. for example, 3bit is a 3 bit positive number (0 to 7). | ASCII | ascii text representation of data. -| ASCII.# | for protocols with an undefined "registry" size, the length can be specified. ie: ASCII.7 will return a 7 character long string. +| ASCII.# | for protocols with an undefined "registry" size, the length can be specified. ie: ASCII.7 will return a 7 character long string. +| HEX | hex text representation of data. + +### data type byte order +in the case of protocols with inconsistent byte order implementations. + +#### big endian +a suffix of "_BE" can be added to a data type to ensure the entry is read with a big endian byte order +ie: ASCII_BE + +#### little endian +a suffix of "_LE" can be added to a data type to ensure the entry is read with a little endian byte order +ie: ASCII_LE + + ### register Register defines the location or for other protocols the main command / id. diff --git a/documentation/usage/devices_and_protocols.csv b/documentation/usage/devices_and_protocols.csv new file mode 100644 index 0000000..e2eda00 --- /dev/null +++ b/documentation/usage/devices_and_protocols.csv @@ -0,0 +1,16 @@ +Brand,Model,Protocol,Transport,ReadMe +AOLithium,51.2V 100Ah,voltronic_bms_2020_03_25,ModBus,https://github.com/HotNoob/PythonProtocolGateway/blob/main/documentation/devices/AOLithium.md +AOLithium,51.2V 100Ah,victron_gx_generic_canbus,CanBus,https://github.com/HotNoob/PythonProtocolGateway/blob/main/documentation/devices/AOLithium.md +EG4,EG4 18kPV,eg4_v58,ModBus, +EG4,EG4 3000EHV ,eg4_3000ehv_v1 ,ModBus, +EG4,EG4 6000XP ,eg4_v58,ModBus, +EG4,MPPT100-48HV – unconfirmed,eg4_3000ehv_v1 ,ModBus, +Growatt,SPF 12000T DVM-US MPV,v0.14,ModBus, +Growatt,SPF 5000 ,v0.14,ModBus, +Growatt,SPF 6000 ,v0.14,ModBus, +Selphos,v3,growatt_bms_canbus_v1.04 ,CanBus,https://github.com/HotNoob/PythonProtocolGateway/discussions/88 +Sigineer,M3000H-48LV-3KW ,sigineer_v0.11,ModBus,https://github.com/HotNoob/PythonProtocolGateway/blob/main/documentation/devices/Sigineer.md +SOK,48v100ah,pace_bms_v1.3 ,ModBus,https://github.com/HotNoob/PythonProtocolGateway/blob/main/documentation/devices/SOK.md +SolArk,Untested,solark_v1.1 ,ModBus, +SRNE,ASF48100S200-H ,srne_v3.9,ModBus, +SRNE,HF2430U60-100 ,srne_v3.9,ModBus, diff --git a/documentation/usage/influxdb_advanced_features.md b/documentation/usage/influxdb_advanced_features.md new file mode 100644 index 0000000..a997d94 --- /dev/null +++ b/documentation/usage/influxdb_advanced_features.md @@ -0,0 +1,413 @@ +# InfluxDB Advanced Features: Exponential Backoff & Persistent Storage + +## Overview + +The InfluxDB transport now includes advanced features to handle network instability and long-term outages: + +1. **Exponential Backoff**: Intelligent reconnection timing to avoid overwhelming the server +2. **Persistent Storage**: Local data storage to prevent data loss during extended outages +3. **Periodic Reconnection**: Regular connection health checks even during quiet periods + +## Exponential Backoff + +### How It Works + +Instead of using a fixed delay between reconnection attempts, exponential backoff increases the delay exponentially: + +- **Attempt 1**: 5 seconds delay +- **Attempt 2**: 10 seconds delay +- **Attempt 3**: 20 seconds delay +- **Attempt 4**: 40 seconds delay +- **Attempt 5**: 80 seconds delay (capped at max_reconnect_delay) + +### Configuration + +```ini +[influxdb_output] +# Enable exponential backoff +use_exponential_backoff = true + +# Base delay between attempts (seconds) +reconnect_delay = 5.0 + +# Maximum delay cap (seconds) +max_reconnect_delay = 300.0 + +# Number of reconnection attempts +reconnect_attempts = 5 +``` + +### Benefits + +- **Reduces Server Load**: Prevents overwhelming the InfluxDB server during recovery +- **Network Friendly**: Respects network conditions and server capacity +- **Configurable**: Adjust timing based on your environment + +### Example Scenarios + +#### Short Network Glitch +``` +Attempt 1: 5s delay → Success +Total time: ~5 seconds +``` + +#### Server Restart +``` +Attempt 1: 5s delay → Fail +Attempt 2: 10s delay → Fail +Attempt 3: 20s delay → Success +Total time: ~35 seconds +``` + +#### Extended Outage +``` +Attempt 1: 5s delay → Fail +Attempt 2: 10s delay → Fail +Attempt 3: 20s delay → Fail +Attempt 4: 40s delay → Fail +Attempt 5: 80s delay → Fail +Total time: ~155 seconds, then data stored in backlog +``` + +## Periodic Reconnection + +### How It Works + +Periodic reconnection ensures the connection to InfluxDB remains healthy even during periods when no data is being written: + +- **Regular Health Checks**: Performs connection tests at configurable intervals +- **Connection Refresh**: Re-establishes connection even if it appears healthy +- **Quiet Period Handling**: Maintains connection during low-activity periods +- **Proactive Recovery**: Detects and fixes connection issues before data loss + +### Configuration + +```ini +[influxdb_output] +# Periodic reconnection interval (seconds) +periodic_reconnect_interval = 14400.0 # 4 hours (default) + +# Disable periodic reconnection +periodic_reconnect_interval = 0 +``` + +### Benefits + +- **Connection Stability**: Prevents connection timeouts during quiet periods +- **Proactive Monitoring**: Detects issues before they affect data transmission +- **Network Resilience**: Handles network changes and server restarts +- **Configurable**: Adjust interval based on your environment + +### Example Scenarios + +#### Quiet Periods (No Data) +``` +10:00 AM: Last data written +11:00 AM: Periodic reconnection check → Connection healthy +12:00 PM: Periodic reconnection check → Connection healthy +01:00 PM: Periodic reconnection check → Connection healthy +02:00 PM: New data arrives → Immediate transmission +``` + +#### Network Issues During Quiet Period +``` +10:00 AM: Last data written +11:00 AM: Periodic reconnection check → Connection failed +11:00 AM: Attempting reconnection → Success +12:00 PM: Periodic reconnection check → Connection healthy +``` + +#### Server Restart During Quiet Period +``` +10:00 AM: Last data written +11:00 AM: Periodic reconnection check → Connection failed +11:00 AM: Attempting reconnection → Success (server restarted) +12:00 PM: Periodic reconnection check → Connection healthy +``` + +## Persistent Storage (Data Backlog) + +### How It Works + +When InfluxDB is unavailable, data is stored locally in pickle files: + +1. **Data Collection**: Points are stored in memory and on disk +2. **Automatic Cleanup**: Old data is removed based on age limits +3. **Recovery**: When connection is restored, backlog is flushed to InfluxDB +4. **Size Management**: Backlog is limited to prevent disk space issues + +### Configuration + +```ini +[influxdb_output] +# Enable persistent storage +enable_persistent_storage = true + +# Storage directory (relative to gateway directory) +persistent_storage_path = influxdb_backlog + +# Maximum number of points to store +max_backlog_size = 10000 + +# Maximum age of points in seconds (24 hours) +max_backlog_age = 86400 +``` + +### Storage Structure + +``` +influxdb_backlog/ +├── influxdb_backlog_influxdb_output.pkl +├── influxdb_backlog_another_transport.pkl +└── ... +``` + +### Data Recovery Process + +1. **Connection Lost**: Data continues to be collected and stored locally +2. **Reconnection**: When InfluxDB becomes available, backlog is detected +3. **Batch Upload**: All stored points are sent to InfluxDB in batches +4. **Cleanup**: Backlog is cleared after successful upload + +### Example Recovery Log + +``` +[2024-01-15 10:30:00] Connection check failed: Connection refused +[2024-01-15 10:30:00] Not connected to InfluxDB, storing data in backlog +[2024-01-15 10:30:00] Added point to backlog. Backlog size: 1 +... +[2024-01-15 18:45:00] Attempting to reconnect to InfluxDB at localhost:8086 +[2024-01-15 18:45:00] Successfully reconnected to InfluxDB +[2024-01-15 18:45:00] Flushing 2847 backlog points to InfluxDB +[2024-01-15 18:45:00] Successfully wrote 2847 backlog points to InfluxDB +``` + +## Configuration Examples + +### For Stable Networks (Local InfluxDB) + +```ini +[influxdb_output] +transport = influxdb_out +host = localhost +port = 8086 +database = solar + +# Standard reconnection +reconnect_attempts = 3 +reconnect_delay = 2.0 +use_exponential_backoff = false + +# Periodic reconnection +periodic_reconnect_interval = 1800.0 # 30 minutes + +# Minimal persistent storage +enable_persistent_storage = true +max_backlog_size = 1000 +max_backlog_age = 3600 # 1 hour +``` + +### For Unstable Networks (Remote InfluxDB) + +```ini +[influxdb_output] +transport = influxdb_out +host = remote.influxdb.com +port = 8086 +database = solar + +# Aggressive reconnection with exponential backoff +reconnect_attempts = 10 +reconnect_delay = 5.0 +use_exponential_backoff = true +max_reconnect_delay = 600.0 # 10 minutes + +# Frequent periodic reconnection +periodic_reconnect_interval = 900.0 # 15 minutes + +# Large persistent storage for extended outages +enable_persistent_storage = true +max_backlog_size = 50000 +max_backlog_age = 604800 # 1 week +``` + +### For High-Volume Data + +```ini +[influxdb_output] +transport = influxdb_out +host = localhost +port = 8086 +database = solar + +# Fast reconnection for high availability +reconnect_attempts = 5 +reconnect_delay = 1.0 +use_exponential_backoff = true +max_reconnect_delay = 60.0 + +# Less frequent periodic reconnection (data keeps connection alive) +periodic_reconnect_interval = 14400.0 # 4 hours (default) + +# Large backlog for high data rates +enable_persistent_storage = true +max_backlog_size = 100000 +max_backlog_age = 86400 # 24 hours + +# Optimized batching +batch_size = 500 +batch_timeout = 5.0 +``` + +## Monitoring and Maintenance + +### Check Backlog Status + +```bash +# Check backlog file sizes +ls -lh influxdb_backlog/ + +# Check backlog contents (Python script) +python3 -c " +import pickle +import os +for file in os.listdir('influxdb_backlog'): + if file.endswith('.pkl'): + with open(f'influxdb_backlog/{file}', 'rb') as f: + data = pickle.load(f) + print(f'{file}: {len(data)} points') +" +``` + +### Monitor Logs + +```bash +# Monitor backlog activity +grep -i "backlog\|persistent" /var/log/protocol_gateway.log + +# Monitor reconnection attempts +grep -i "reconnect\|exponential" /var/log/protocol_gateway.log + +# Monitor periodic reconnection +grep -i "periodic.*reconnect" /var/log/protocol_gateway.log +``` + +### Cleanup Old Backlog Files + +```bash +# Remove backlog files older than 7 days +find influxdb_backlog/ -name "*.pkl" -mtime +7 -delete +``` + +## Performance Considerations + +### Memory Usage + +- **Backlog Storage**: Each point uses ~200-500 bytes in memory +- **10,000 points**: ~2-5 MB memory usage +- **100,000 points**: ~20-50 MB memory usage + +### Disk Usage + +- **Backlog Files**: Compressed pickle format +- **10,000 points**: ~1-2 MB disk space +- **100,000 points**: ~10-20 MB disk space + +### Network Impact + +- **Recovery Upload**: Large batches may take time to upload +- **Bandwidth**: Consider network capacity during recovery +- **Server Load**: InfluxDB may experience high load during recovery + +## Troubleshooting + +### Backlog Not Flushing + +**Symptoms:** +- Backlog points remain after reconnection +- No "Flushing X backlog points" messages + +**Solutions:** +- Check InfluxDB server capacity +- Verify database permissions +- Monitor InfluxDB logs for errors + +### Excessive Memory Usage + +**Symptoms:** +- High memory consumption +- Slow performance + +**Solutions:** +- Reduce `max_backlog_size` +- Decrease `max_backlog_age` +- Monitor system resources + +### Disk Space Issues + +**Symptoms:** +- "Backlog full" warnings +- Disk space running low + +**Solutions:** +- Clean up old backlog files +- Reduce `max_backlog_size` +- Move `persistent_storage_path` to larger disk + +### Reconnection Too Aggressive + +**Symptoms:** +- High CPU usage during outages +- Network congestion + +**Solutions:** +- Increase `reconnect_delay` +- Reduce `reconnect_attempts` +- Enable `use_exponential_backoff` + +## Best Practices + +### 1. Size Your Backlog Appropriately + +```ini +# For 1-minute intervals, 24-hour outage +max_backlog_size = 1440 # 24 * 60 + +# For 5-minute intervals, 1-week outage +max_backlog_size = 2016 # 7 * 24 * 12 +``` + +### 2. Monitor and Clean + +- Regularly check backlog file sizes +- Clean up old files automatically +- Monitor disk space usage + +### 3. Test Recovery + +- Simulate outages to test recovery +- Verify data integrity after recovery +- Monitor performance during recovery + +### 4. Plan for Scale + +- Estimate data volume and outage duration +- Size backlog accordingly +- Monitor system resources + +## Migration from Previous Version + +If upgrading from a version without these features: + +1. **No Configuration Changes Required**: Features are enabled by default with sensible defaults +2. **Backward Compatible**: Existing configurations continue to work +3. **Gradual Adoption**: Disable features if not needed: + +```ini +[influxdb_output] +# Disable exponential backoff +use_exponential_backoff = false + +# Disable persistent storage +enable_persistent_storage = false +``` \ No newline at end of file diff --git a/documentation/usage/transports.md b/documentation/usage/transports.md index 8e516c5..17c7b1f 100644 --- a/documentation/usage/transports.md +++ b/documentation/usage/transports.md @@ -37,10 +37,20 @@ For ambigious sensitive protocols/transports such as ModBus, a safety mechanism In order to write, the configuration csv file must be at least 90% verifiable. Alternatively a manual verification method will be implemented in the future. This mainly entails that the current values in the writeable register ( probably holding ), be within the value range specified in the csv. -Finally, to enable writing for a transport: -``` -write_enabled = true -``` + +#### Write Safety Modes +``` write = false ``` +default value; writting is disabled + +``` write = true ``` +default "write" behaviour; includes all validations / safties. + +```write = relaxed ``` ( dangerous - make sure you have the right protocol ) + skips the initial ( score % ) / bulk validation + +``` write = unsafe ``` ( very dangerous ) +skips all write safties. + Finally, to write, "read" data on any bridged transport. In most cases this will likely be MQTT. @@ -159,6 +169,212 @@ the writable topics are given a prefix of "/write/" ## MQTT Write by default mqtt writes data from the bridged transport. +# JSON Output +``` +###required +transport = json_out +``` + +``` +###optional +output_file = stdout +pretty_print = true +append_mode = false +include_timestamp = true +include_device_info = true +``` + +## JSON Output Configuration + +### output_file +Specifies the output destination. Use `stdout` for console output or provide a file path. +``` +output_file = stdout +output_file = /var/log/inverter_data.json +``` + +### pretty_print +Whether to format JSON with indentation for readability. +``` +pretty_print = true +``` + +### append_mode +Whether to append to file instead of overwriting. Useful for log files. +``` +append_mode = false +``` + +### include_timestamp +Whether to include Unix timestamp in the JSON output. +``` +include_timestamp = true +``` + +### include_device_info +Whether to include device metadata (identifier, name, manufacturer, etc.) in the JSON output. +``` +include_device_info = true +``` + +## JSON Output Format + +The JSON output includes the following structure: + +```json +{ + "device": { + "identifier": "device_serial", + "name": "Device Name", + "manufacturer": "Manufacturer", + "model": "Model", + "serial_number": "Serial Number", + "transport": "transport_name" + }, + "timestamp": 1703123456.789, + "data": { + "variable_name": "value", + "another_variable": "another_value" + } +} +``` + +## JSON Output Use Cases + +1. **Debugging**: Output data to console for real-time monitoring +2. **Logging**: Write data to log files for historical analysis +3. **Integration**: Feed data to other systems that consume JSON +4. **Data Collection**: Collect data for analysis or backup purposes + +# InfluxDB Output +``` +###required +transport = influxdb_out +host = +port = +database = +``` + +``` +###optional +username = +password = +measurement = device_data +include_timestamp = true +include_device_info = true +batch_size = 100 +batch_timeout = 10.0 +``` + +## InfluxDB Output Configuration + +### host +InfluxDB server hostname or IP address. +``` +host = localhost +host = 192.168.1.100 +``` + +### port +InfluxDB server port (default: 8086). +``` +port = 8086 +``` + +### database +Database name. Will be created automatically if it doesn't exist. +``` +database = solar +database = inverter_data +``` + +### username +Username for authentication (optional). +``` +username = admin +``` + +### password +Password for authentication (optional). +``` +password = your_password +``` + +### measurement +InfluxDB measurement name for storing data points. +``` +measurement = device_data +measurement = inverter_metrics +``` + +### include_timestamp +Whether to include timestamp in data points. +``` +include_timestamp = true +``` + +### include_device_info +Whether to include device metadata as InfluxDB tags. +``` +include_device_info = true +``` + +### batch_size +Number of data points to batch before writing to InfluxDB. +``` +batch_size = 100 +``` + +### batch_timeout +Maximum time (seconds) to wait before flushing batch. +``` +batch_timeout = 10.0 +``` + +## InfluxDB Data Structure + +The InfluxDB output creates data points with the following structure: + +### Tags (if `include_device_info = true`) +- `device_identifier`: Device serial number (lowercase) +- `device_name`: Device name +- `device_manufacturer`: Device manufacturer +- `device_model`: Device model +- `device_serial_number`: Device serial number +- `transport`: Source transport name + +### Fields +All device data values are stored as fields. The transport automatically converts: +- Numeric strings to integers or floats +- Non-numeric strings remain as strings + +### Time +- Uses current timestamp in nanoseconds (if `include_timestamp = true`) +- Can be disabled for custom timestamp handling + +## InfluxDB Output Use Cases + +1. **Time-Series Data Storage**: Store historical device data for analysis +2. **Grafana Integration**: Visualize data with Grafana dashboards +3. **Data Analytics**: Perform time-series analysis and trending +4. **Monitoring**: Set up alerts and monitoring based on data thresholds + +## Example InfluxDB Queries + +```sql +-- Show all measurements +SHOW MEASUREMENTS + +-- Query recent data +SELECT * FROM device_data WHERE time > now() - 1h + +-- Query specific device +SELECT * FROM device_data WHERE device_identifier = '123456789' + +-- Aggregate data +SELECT mean(value) FROM device_data WHERE field_name = 'battery_voltage' GROUP BY time(5m) +``` + # ModBus_RTU ``` ###required diff --git a/documentation/usage/troubleshooting_influxdb.md b/documentation/usage/troubleshooting_influxdb.md new file mode 100644 index 0000000..abfc759 --- /dev/null +++ b/documentation/usage/troubleshooting_influxdb.md @@ -0,0 +1,340 @@ +# InfluxDB Troubleshooting Guide + +## Common Issue: Data Stops Being Written to InfluxDB + +This guide helps you diagnose and fix the issue where data stops being written to InfluxDB after some time. + +## Quick Diagnosis + +### 1. Check Logs +First, enable debug logging to see what's happening: + +```ini +[influxdb_output] +transport = influxdb_out +host = localhost +port = 8086 +database = solar +log_level = DEBUG +``` + +Look for these log messages: +- `"Not connected to InfluxDB, skipping data write"` +- `"Connection check failed"` +- `"Attempting to reconnect to InfluxDB"` +- `"Failed to write batch to InfluxDB"` + +### 2. Check InfluxDB Server +Verify InfluxDB is running and accessible: + +```bash +# Check if InfluxDB is running +systemctl status influxdb + +# Test connection +curl -i http://localhost:8086/ping + +# Check if database exists +echo "SHOW DATABASES" | influx +``` + +### 3. Check Network Connectivity +Test network connectivity between your gateway and InfluxDB: + +```bash +# Test basic connectivity +ping your_influxdb_host + +# Test port connectivity +telnet your_influxdb_host 8086 +``` + +## Root Causes and Solutions + +### 1. Network Connectivity Issues + +**Symptoms:** +- Connection timeouts +- Intermittent data loss +- Reconnection attempts in logs + +**Solutions:** +```ini +[influxdb_output] +# Increase timeouts for slow networks +connection_timeout = 30 +reconnect_attempts = 10 +reconnect_delay = 10.0 +``` + +### 2. InfluxDB Server Restarts + +**Symptoms:** +- Connection refused errors +- Sudden data gaps +- Reconnection success after delays + +**Solutions:** +- Monitor InfluxDB server stability +- Check InfluxDB logs for crashes +- Consider using InfluxDB clustering for high availability + +### 3. Memory/Resource Issues + +**Symptoms:** +- Slow response times +- Connection hangs +- Batch write failures + +**Solutions:** +```ini +[influxdb_output] +# Reduce batch size to lower memory usage +batch_size = 50 +batch_timeout = 5.0 +``` + +### 4. Authentication Issues + +**Symptoms:** +- Authentication errors in logs +- Connection succeeds but writes fail + +**Solutions:** +- Verify username/password in configuration +- Check InfluxDB user permissions +- Test authentication manually: + +```bash +curl -i -u username:password http://localhost:8086/query?q=SHOW%20DATABASES +``` + +### 5. Database/Measurement Issues + +**Symptoms:** +- Data appears in InfluxDB but not in expected measurement +- Type conflicts in logs + +**Solutions:** +- Verify database and measurement names +- Check for field type conflicts +- Use `force_float = true` to avoid type issues + +## Configuration Best Practices + +### Recommended Configuration +```ini +[influxdb_output] +transport = influxdb_out +host = localhost +port = 8086 +database = solar +measurement = device_data +include_timestamp = true +include_device_info = true + +# Connection monitoring +reconnect_attempts = 5 +reconnect_delay = 5.0 +connection_timeout = 10 + +# Batching (adjust based on your data rate) +batch_size = 100 +batch_timeout = 10.0 + +# Data handling +force_float = true +log_level = INFO +``` + +### For Unstable Networks +```ini +[influxdb_output] +# More aggressive reconnection +reconnect_attempts = 10 +reconnect_delay = 10.0 +connection_timeout = 30 + +# Smaller batches for faster recovery +batch_size = 50 +batch_timeout = 5.0 +``` + +### For High-Volume Data +```ini +[influxdb_output] +# Larger batches for efficiency +batch_size = 500 +batch_timeout = 30.0 + +# Faster reconnection +reconnect_attempts = 3 +reconnect_delay = 2.0 +``` + +## Monitoring and Alerts + +### 1. Monitor Connection Status +Add this to your monitoring system: +```bash +# Check if gateway is writing data +curl -s "http://localhost:8086/query?db=solar&q=SELECT%20count(*)%20FROM%20device_data%20WHERE%20time%20%3E%20now()%20-%201h" +``` + +### 2. Set Up Alerts +Monitor these conditions: +- No data points in the last hour +- Reconnection attempts > 5 in 10 minutes +- Connection failures > 3 in 5 minutes + +### 3. Log Monitoring +Watch for these log patterns: +```bash +# Monitor for connection issues +grep -i "connection\|reconnect\|failed" /var/log/protocol_gateway.log + +# Monitor for data flow +grep -i "wrote.*points\|batch.*flush" /var/log/protocol_gateway.log +``` + +## Testing Your Setup + +### 1. Test Connection Monitoring +Run the connection test script: +```bash +python test_influxdb_connection.py +``` + +### 2. Test Data Flow +Create a simple test configuration: +```ini +[test_source] +transport = modbus_rtu +port = /dev/ttyUSB0 +baudrate = 9600 +protocol_version = test_protocol +read_interval = 5 +bridge = influxdb_output + +[influxdb_output] +transport = influxdb_out +host = localhost +port = 8086 +database = test +measurement = test_data +log_level = DEBUG +``` + +### 3. Verify Data in InfluxDB +```sql +-- Check if data is being written +SELECT * FROM test_data ORDER BY time DESC LIMIT 10 + +-- Check data rate +SELECT count(*) FROM test_data WHERE time > now() - 1h +``` + +## Advanced Troubleshooting + +### 1. Enable Verbose Logging +```ini +[general] +log_level = DEBUG + +[influxdb_output] +log_level = DEBUG +``` + +### 2. Check Multiprocessing Issues +If using multiple transports, verify bridge configuration: +```ini +# Ensure bridge names match exactly +[source_transport] +bridge = influxdb_output + +[influxdb_output] +transport = influxdb_out +# No bridge needed for output transports +``` + +### 3. Monitor System Resources +```bash +# Check memory usage +free -h + +# Check disk space +df -h + +# Check network connections +netstat -an | grep 8086 +``` + +### 4. InfluxDB Performance Tuning +```ini +# InfluxDB configuration (influxdb.conf) +[data] +wal-fsync-delay = "1s" +cache-max-memory-size = "1g" +series-id-set-cache-size = 100 +``` + +## Common Error Messages + +### "Failed to connect to InfluxDB" +- Check if InfluxDB is running +- Verify host and port +- Check firewall settings + +### "Failed to write batch to InfluxDB" +- Check InfluxDB server resources +- Verify database permissions +- Check for field type conflicts + +### "Not connected to InfluxDB, skipping data write" +- Connection was lost, reconnection in progress +- Check network connectivity +- Monitor reconnection attempts + +### "Connection check failed" +- Network issue or InfluxDB restart +- Check InfluxDB server status +- Verify network connectivity + +## Getting Help + +If you're still experiencing issues: + +1. **Collect Information:** + - Gateway logs with DEBUG level + - InfluxDB server logs + - Network connectivity test results + - Configuration file (remove sensitive data) + +2. **Test Steps:** + - Run the connection test script + - Verify InfluxDB is accessible manually + - Test with a simple configuration + +3. **Provide Details:** + - Operating system and version + - Python version + - InfluxDB version + - Network setup (local/remote InfluxDB) + - Data volume and frequency + +## Prevention + +### 1. Regular Monitoring +- Set up automated monitoring for data flow +- Monitor InfluxDB server health +- Check network connectivity regularly + +### 2. Configuration Validation +- Test configurations before deployment +- Use connection monitoring settings +- Validate InfluxDB permissions + +### 3. Backup Strategies +- Consider multiple InfluxDB instances +- Implement data backup procedures +- Use InfluxDB clustering for high availability \ No newline at end of file diff --git a/ppg.py b/ppg.py new file mode 100644 index 0000000..0df4a41 --- /dev/null +++ b/ppg.py @@ -0,0 +1,9 @@ +#a little wrapper to create a shorthand alias +import sys + +from protocol_gateway import main + +if __name__ == "__main__": + # Pass sys.argv (or the relevant slice) to main() + # assuming your main accepts them as parameters + main(sys.argv[1:]) # pass all args except script name \ No newline at end of file diff --git a/protocol_gateway.py b/protocol_gateway.py index 2f8ea9b..0e1220d 100644 --- a/protocol_gateway.py +++ b/protocol_gateway.py @@ -27,6 +27,7 @@ from classes.protocol_settings import protocol_settings, registry_map_entry from classes.transports.transport_base import transport_base +from defs.common import strtobool __logo = """ @@ -49,31 +50,57 @@ class CustomConfigParser(ConfigParser): def get(self, section, option, *args, **kwargs): - if isinstance(option, list): - fallback = None + fallback = None - if "fallback" in kwargs: #override kwargs fallback, for manually handling here - fallback = kwargs["fallback"] - kwargs["fallback"] = None + if "fallback" in kwargs: #override kwargs fallback, for manually handling here + fallback = kwargs["fallback"] + kwargs["fallback"] = None + if isinstance(option, list): for name in option: - value = super().get(section, name, *args, **kwargs) + try: + value = super().get(section, name, *args, **kwargs) + except NoOptionError: + value = None + if value: break + else: + try: + value = super().get(section, option, *args, **kwargs) + except NoOptionError: + value = None - if not value: - value = fallback + if not value: #apply fallback + value = fallback - if value is None: + if value is None: + if isinstance(option, list): raise NoOptionError(option[0], section) - else: - value = super().get(section, option, *args, **kwargs) + else: + raise NoOptionError(option, section) if isinstance(value, int): return value + if isinstance(value, float): + return value + return value.strip() if value is not None else value + def getint(self, section, option, *args, **kwargs): #bypass fallback bug + value = self.get(section, option, *args, **kwargs) + return int(value) if value is not None else None + + def getfloat(self, section, option, *args, **kwargs): #bypass fallback bug + value = self.get(section, option, *args, **kwargs) + return float(value) if value is not None else None + + def getboolean(self, section, option, *args, **kwargs): #bypass fallback bug + value = self.get(section, option, *args, **kwargs) + return strtobool(value) + + class Protocol_Gateway: """ Main class, implementing the Growatt / Inverters to MQTT functionality @@ -121,11 +148,12 @@ def __init__(self, config_file : str): logging.basicConfig(level=log_level) for section in self.__settings.sections(): - if section.startswith("transport"): - transport_cfg = self.__settings[section] - transport_type = transport_cfg.get("transport", fallback="") - protocol_version = transport_cfg.get("protocol_version", fallback="") + transport_cfg = self.__settings[section] + transport_type = transport_cfg.get("transport", fallback="") + protocol_version = transport_cfg.get("protocol_version", fallback="") + # Process sections that either start with "transport" OR have a transport field + if section.startswith("transport") or transport_type: if not transport_type and not protocol_version: raise ValueError("Missing Transport / Protocol Version") @@ -208,24 +236,18 @@ def run(self): traceback.print_exc() self.__log.error(err) - time.sleep(0.7) #change this in future. probably reduce to allow faster reads. + time.sleep(0.07) #change this in future. probably reduce to allow faster reads. -def main(): +def main(args=None): """ main method """ - print(__logo) - ppg = Protocol_Gateway(args.config) - ppg.run() - - -if __name__ == "__main__": # Create ArgumentParser object parser = argparse.ArgumentParser(description="Python Protocol Gateway") @@ -241,4 +263,11 @@ def main(): # If '--config' is provided, use it; otherwise, fall back to the positional or default. args.config = args.config if args.config else args.positional_config + print(__logo) + + ppg = Protocol_Gateway(args.config) + ppg.run() + + +if __name__ == "__main__": main() diff --git a/protocols/debug.holding_registry_map.csv b/protocols/debug.holding_registry_map.csv new file mode 100644 index 0000000..f07581a --- /dev/null +++ b/protocols/debug.holding_registry_map.csv @@ -0,0 +1,2 @@ +variable name;data type;register;documented name;length;writeable;unit;values;note; +;USHORT;15;debug;2byte;R;10mA;;Positive: charging Negative: discharging; diff --git a/protocols/debug.json b/protocols/debug.json new file mode 100644 index 0000000..ae0c62d --- /dev/null +++ b/protocols/debug.json @@ -0,0 +1,4 @@ +{ + "reader" : "pace", + "batch_size": 1 +} \ No newline at end of file diff --git a/protocols/eg4/eg4_v58.input_registry_map.csv b/protocols/eg4/eg4_v58.input_registry_map.csv index a6b71ed..928feda 100644 --- a/protocols/eg4/eg4_v58.input_registry_map.csv +++ b/protocols/eg4/eg4_v58.input_registry_map.csv @@ -129,6 +129,7 @@ Grid Hz,,15,Fac,0.01Hz,0-65535,Utility grid frequency,,,,,,,,,,,, ,8bit,118.b8,SN_7__serial number,,[0-9a-zA-Z],,,,,,,,,,,,, ,8bit,119,SN_8__serial number,,[0-9a-zA-Z],,,,,,,,,,,,, ,8bit,119.b8,SN_9__serial number,,[0-9a-zA-Z],,,,,,,,,,,,, +,ASCII_LE,115~119,Serial Number,,,Serial Number as one string instead of split,,,,,,,,,,,, ,,120,VBusP,0.1V,,,Half BUS voltage,Half BUS voltage,Half BUS voltage,Half BUS voltage,Half BUS voltage,Half BUS voltage,Half BUS voltage,Half BUS voltage,Half BUS voltage,Half BUS voltage,Half BUS voltage,Half BUS voltage ,,121,GenVolt,0.1V,,Generator voltage Voltage of generator for three phase: R phase,,,,,,,,,,,, ,,122,GenFreq,0.01Hz,,Generator frequency,,,,,,,,,,,, diff --git a/protocols/growatt/growatt_bms_canbus_v1.04.registry_map.csv b/protocols/growatt/growatt_bms_canbus_v1.04.registry_map.csv index fbad44e..7100b03 100644 --- a/protocols/growatt/growatt_bms_canbus_v1.04.registry_map.csv +++ b/protocols/growatt/growatt_bms_canbus_v1.04.registry_map.csv @@ -11,7 +11,7 @@ variable name,data type,register,documented name,description,writable,values,uni ,16BIT_FLAGS,x312,Protection Flags,,,"{""b15"": ""OTD (Over Temperature Discharge) protection"", ""b14"": ""OTC (Over Temperature Charge) protection"", ""b13"": ""UTD (Under Temperature Discharge) protection"", ""b12"": ""UTC (Under Temperature Charge) protection"", ""b11"": ""System error"", ""b10"": ""Delta V Fail"", ""b7"": ""DisCharge over current"", ""b6"": ""Charge over current"", ""b5"": ""SCD (Short Circuit Discharge) protection"", ""b4"": ""Cell over voltage"", ""b3"": ""Cell under voltage"", ""b2"": ""Module over voltage"", ""b1"": ""Module under voltage"", ""b0"": ""Soft start fail""}",, ,16BIT_FLAGS,X312.2,Alarm Flags,,,"{""b7"": ""DisCharge over current"", ""b6"": ""Charge over current"", ""b5"": ""Cell over voltage"", ""b4"": ""Cell under voltage"", ""b3"": ""Module over voltage"", ""b2"": ""Module under voltage"", ""b1"": """", ""b0"": """", ""b15"": ""OTD (Over Temperature Discharge) protection"", ""b14"": ""OTC (Over Temperature Charge) protection"", ""b13"": ""UTD (Under Temperature Discharge) protection"", ""b12"": ""UTC (Under Temperature Charge) protection"", ""b11"": ""Delta V Fail"", ""b10"": ""Pack before turn off"", ""b9"": ""Internal communication fail"", ""b8"": """"}",, ,,,,,,,, -,SHORT,x313,Average module voltage of system ,,,,0.01V, +Battery Voltage,SHORT,x313,Average module voltage of system ,,,,0.01V, ,SHORT,X313.2,Total current of system,,,,0.1A, ,SHORT,X313.4,Maximum cell temperature,,,,0.1A, ,BYTE,X313.6,Average State of Charge of System,,,,%, diff --git a/protocols/srne/srne_2021_v1.96.holding_registry_map.csv b/protocols/srne/srne_2021_v1.96.holding_registry_map.csv index 461428a..bc9552f 100644 --- a/protocols/srne/srne_2021_v1.96.holding_registry_map.csv +++ b/protocols/srne/srne_2021_v1.96.holding_registry_map.csv @@ -45,15 +45,44 @@ variable name,data type,register,documented name,description,writable,values,uni ,,0x0233,Load Phase_C active power,,R,,W, ,,0x0235,Load Phase_C apparent power,,R,,VA, ,,0x0237,Load Phase_C ratio,,R,0~100,%, -,BYTE,0xF02C,Stats GenerateEnergyToGridTday,,R,,0.1kWh, -,BYTE,0xF02D,Stats BatChgTday,,R,,1AH, -,BYTE,0xF02E,Stats BatDischgTday,,R,,1AH, -,BYTE,0xF02F,Stats GenerateEnergyTday,,R,,0.1kWh, -,BYTE,0xF030,Stats UsedEnergyTday,,R,,0.1kWh, -,BYTE,0xF031,Stats WorkDaysTotal,,R,,1d, -,BYTE,0xF03C,Stats GridChgEnergyTday,,R,,1AH, -,BYTE,0xF03D,Stats LoadConsumLineTday,,R,,0.1kWh, -,BYTE,0xF03E,Stats InvWorkTimeTday,,R,,1min, -,BYTE,0xF03F,Stats GridWorkTimeTday,,R,,1min, -,BYTE,0xF04A,Stats InvWorkTimeTotal,,R,,1h, -,BYTE,0xF04B,Stats GridWorkTimeTotal,,R,,1h, \ No newline at end of file +,,0xF000,Stats PVEnergyYesterday,,R,,0.1kWh, +,,0xF001,Stats PVEnergy2Dayago,,R,,0.1kWh, +,,0xF002,Stats PVEnergy3Dayago,,R,,0.1kWh, +,,0xF003,Stats PVEnergy4Dayago,,R,,0.1kWh, +,,0xF004,Stats PVEnergy5Dayago,,R,,0.1kWh, +,,0xF005,Stats PVEnergy6Dayago,,R,,0.1kWh, +,,0xF006,Stats PVEnergy7Dayago,,R,,0.1kWh, +,,0xF007,Stats BatChgEnergyYesterday,,R,,1AH, +,,0xF008,Stats BatChgEnergy2Dayago,,R,,1AH, +,,0xF009,Stats BatChgEnergy3Dayago,,R,,1AH, +,,0xF00A,Stats BatChgEnergy4Dayago,,R,,1AH, +,,0xF00B,Stats BatChgEnergy5Dayago,,R,,1AH, +,,0xF00C,Stats BatChgEnergy6Dayago,,R,,1AH, +,,0xF00D,Stats BatChgEnergy7Dayago,,R,,1AH, +,,0xF00E,Stats BatDischgEnergyYesterday,,R,,1AH, +,,0xF00F,Stats BatDischgEnergy2Dayago,,R,,1AH, +,,0xF010,Stats BatDischgEnergy3Dayago,,R,,1AH, +,,0xF011,Stats BatDischgEnergy4Dayago,,R,,1AH, +,,0xF012,Stats BatDischgEnergy5Dayago,,R,,1AH, +,,0xF013,Stats BatDischgEnergy6Dayago,,R,,1AH, +,,0xF014,Stats BatDischgEnergy7Dayago,,R,,1AH, +,,0xF015,Stats GridChgEnergyYesterday,,R,,1AH, +,,0xF016,Stats GridChgEnergy2Dayago,,R,,1AH, +,,0xF017,Stats GridChgEnergy3Dayago,,R,,1AH, +,,0xF018,Stats GridChgEnergy4Dayago,,R,,1AH, +,,0xF019,Stats GridChgEnergy5Dayago,,R,,1AH, +,,0xF01A,Stats GridChgEnergy6Dayago,,R,,1AH, +,,0xF01B,Stats GridChgEnergy7Dayago,,R,,1AH, +,,0xF02C,Stats GenerateEnergyToGridTday,,R,,0.1kWh, +,,0xF02D,Stats BatChgTday,,R,,1AH, +,,0xF02E,Stats BatDischgTday,,R,,1AH, +,,0xF02F,Stats GenerateEnergyTday,,R,,0.1kWh, +,,0xF030,Stats UsedEnergyTday,,R,,0.1kWh, +,,0xF031,Stats WorkDaysTotal,,R,,1d, +,,0xF038,Stats GeneratEnergyTotal,,R,,0.1kWh, +,,0xF03C,Stats GridChgEnergyTday,,R,,1AH, +,,0xF03D,Stats GridLoadConsumTday,,R,,0.1kWh, +,,0xF03E,Stats InvWorkTimeTday,,R,,1min, +,,0xF03F,Stats GridWorkTimeTday,,R,,1min, +,,0xF04A,Stats InvWorkTimeTotal,,R,,1h, +,,0xF04B,Stats GridWorkTimeTotal,,R,,1h, diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..75b9fb8 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,33 @@ +#pip install build twine +#python -m build +#python -m twine upload dist/* + +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "python-protocol-gateway" +version = "1.1.10" +description = "Python Protocol Gateway reads data via Modbus RTU or other protocols and translates the data for MQTT. In the long run, Python Protocol Gateway will become a general purpose protocol gateway to translate between more than just modbus and mqtt." +readme = "README.md" +license = "Apache-2.0" +authors = [{ name = "HotNoob", email = "hotnoob@hotnoob.com" }] +requires-python = ">=3.9" +dynamic = ["dependencies", "optional-dependencies"] + +[project.scripts] +protocol-gateway = "protocol_gateway:main" +ppg = "protocol_gateway:main" + +[tool.setuptools] +py-modules = ["protocol_gateway"] +license-files = ["LICENSE"] + +[tool.setuptools.dynamic] +dependencies = {file = ["requirements.txt"]} +optional-dependencies = {dev = { file = ["requirements-dev.txt"] }} + +[project.urls] +Homepage = "https://github.com/HotNoob/PythonProtocolGateway" +Repository = "https://github.com/HotNoob/PythonProtocolGateway" diff --git a/pytests/test_influxdb_out.py b/pytests/test_influxdb_out.py new file mode 100644 index 0000000..2b43c81 --- /dev/null +++ b/pytests/test_influxdb_out.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +""" +Test for InfluxDB output transport +""" + +import time +import unittest +from protocol_gateway import CustomConfigParser as ConfigParser +from unittest.mock import MagicMock, Mock, patch + +from classes.transports.influxdb_out import influxdb_out + + +class TestInfluxDBOut(unittest.TestCase): + """Test cases for InfluxDB output transport""" + + def setUp(self): + """Set up test fixtures""" + self.config = ConfigParser() + self.config.add_section('influxdb_output') + self.config.set('influxdb_output', 'type', 'influxdb_out') + self.config.set('influxdb_output', 'host', 'localhost') + self.config.set('influxdb_output', 'port', '8086') + self.config.set('influxdb_output', 'database', 'test_db') + + #@patch('classes.transports.influxdb_out.InfluxDBClient') + #@patch('classes.transports.influxdb_out.InfluxDBClient') + @patch('classes.transports.influxdb_out.InfluxDBClient') + def test_connect_success(self, mock_influxdb_client): + """Test successful connection to InfluxDB""" + # Mock the InfluxDB client + mock_client = Mock() + mock_influxdb_client.return_value = mock_client + mock_client.get_list_database.return_value = [{'name': 'test_db'}] + + transport = influxdb_out(self.config['influxdb_output']) + transport.connect() + + self.assertTrue(transport.connected) + mock_influxdb_client.assert_called_once_with( + host='localhost', + port=8086, + username=None, + password=None, + database='test_db', + timeout=10 + ) + + @patch('classes.transports.influxdb_out.InfluxDBClient') + def test_connect_database_creation(self, mock_influxdb_client): + """Test database creation when it doesn't exist""" + # Mock the InfluxDB client + mock_client = Mock() + mock_influxdb_client.return_value = mock_client + mock_client.get_list_database.return_value = [{'name': 'other_db'}] + + transport = influxdb_out(self.config['influxdb_output']) + transport.connect() + + self.assertTrue(transport.connected) + mock_client.create_database.assert_called_once_with('test_db') + + @patch('classes.transports.influxdb_out.InfluxDBClient') + def test_write_data_batching(self, mock_influxdb_client): + """Test data writing and batching""" + # Mock the InfluxDB client + mock_client = Mock() + mock_influxdb_client.return_value = mock_client + mock_client.get_list_database.return_value = [{'name': 'test_db'}] + + transport = influxdb_out(self.config['influxdb_output']) + transport.connect() + + + # Mock source transport + source_transport = Mock() + source_transport.transport_name = 'test_source' + source_transport.device_identifier = 'test123' + source_transport.device_name = 'Test Device' + source_transport.device_manufacturer = 'Test Manufacturer' + source_transport.device_model = 'Test Model' + source_transport.device_serial_number = '123456' + + mock_protocol_settings = Mock() + mock_protocol_settings.get_registry_map.return_value = [] # or list of entries if you want + source_transport.protocolSettings = mock_protocol_settings + + # Test data + test_data = {'battery_voltage': '48.5', 'battery_current': '10.2'} + + transport.last_batch_time = time.time() #stop "flush" from happening and failing test + transport.batch_timeout = 21 + transport.write_data(test_data, source_transport) + + # Check that data was added to batch + self.assertEqual(len(transport.batch_points), 1) + point = transport.batch_points[0] + + self.assertEqual(point['measurement'], 'device_data') + self.assertIn('device_identifier', point['tags']) + self.assertIn('battery_voltage', point['fields']) + self.assertIn('battery_current', point['fields']) + + # Check data type conversion + self.assertEqual(point['fields']['battery_voltage'], 48.5) + self.assertEqual(point['fields']['battery_current'], 10.2) + + def test_configuration_options(self): + """Test configuration option parsing""" + # Add more configuration options + self.config.set('influxdb_output', 'username', 'admin') + self.config.set('influxdb_output', 'password', 'secret') + self.config.set('influxdb_output', 'measurement', 'custom_measurement') + self.config.set('influxdb_output', 'batch_size', '50') + self.config.set('influxdb_output', 'batch_timeout', '5.0') + + transport = influxdb_out(self.config['influxdb_output']) + + self.assertEqual(transport.username, 'admin') + self.assertEqual(transport.password, 'secret') + self.assertEqual(transport.measurement, 'custom_measurement') + self.assertEqual(transport.batch_size, 50) + self.assertEqual(transport.batch_timeout, 5.0) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/requirements-dev.txt b/requirements-dev.txt index 144e14a..4c4b1f2 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,2 +1,3 @@ ruff -modbus_tk \ No newline at end of file +modbus_tk +pytest \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 2158c18..af0cebf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ pymodbus==3.7.0 paho-mqtt pyserial python-can +influxdb diff --git a/tools/canbus_server_sim.py b/tools/canbus_server_sim.py new file mode 100644 index 0000000..fb05393 --- /dev/null +++ b/tools/canbus_server_sim.py @@ -0,0 +1,90 @@ +import subprocess +import can +import time +import atexit +import signal +import sys +import os +from pathlib import Path + + + +VCAN_IFACE = 'vcan0' +vcan_messages = [] + + +def load_candump_file(filepath): + os.chdir(Path(__file__).resolve().parent) + + messages = [] + + with open(filepath, 'r') as f: + for line in f: + line = line.strip() + if not line or '#' not in line: + continue + + try: + can_id_str, data_str = line.split('#') + can_id = int(can_id_str, 16) + data = bytes.fromhex(data_str) + + msg = can.Message( + arbitration_id=can_id, + data=data, + is_extended_id=False + ) + messages.append(msg) + except Exception as e: + print(f"Failed to parse line '{line}': {e}") + + return messages + + +def emulate_device(): + bus = can.interface.Bus(channel='vcan0', bustype='socketcan') + + while True: + for msg in vcan_messages: + try: + bus.send(msg) + print(f"Sent message: {msg}") + except can.CanError: + print("Message NOT sent") + time.sleep(1) # Send message every 1 second + +def setup_vcan(interface=VCAN_IFACE): + try: + # Load vcan kernel module + subprocess.run(['sudo', 'modprobe', 'vcan'], check=True) + + # Add virtual CAN interface + subprocess.run(['sudo', 'ip', 'link', 'add', 'dev', interface, 'type', 'vcan'], check=True) + + # Bring the interface up + subprocess.run(['sudo', 'ip', 'link', 'set', 'up', interface], check=True) + + print(f"Virtual CAN interface {interface} is ready.") + except subprocess.CalledProcessError as e: + print(f"Failed to set up {interface}: {e}") + + +def cleanup_vcan(interface=VCAN_IFACE): + try: + subprocess.run(['sudo', 'ip', 'link', 'delete', interface], check=True) + print(f"Removed {interface}") + except subprocess.CalledProcessError as e: + print(f"Error removing {interface}: {e}") + +# Register cleanup to run at program exit +atexit.register(cleanup_vcan) + +# Optional: Handle Ctrl+C gracefully +signal.signal(signal.SIGINT, lambda sig, frame: sys.exit(0)) + + +if __name__ == "__main__": + setup_vcan() + vcan_messages = load_candump_file("candump.txt") + emulate_device() + diff --git a/tools/candump.txt b/tools/candump.txt new file mode 100644 index 0000000..58811e6 --- /dev/null +++ b/tools/candump.txt @@ -0,0 +1,3 @@ +313#14C8FFF100E56264 +313#14C8FFF100E56264 +313#14C8FFF100E56264 \ No newline at end of file diff --git a/tools/modbus_server_sim.py b/tools/modbus_server_sim.py index c96bfb9..04f1503 100644 --- a/tools/modbus_server_sim.py +++ b/tools/modbus_server_sim.py @@ -1,7 +1,10 @@ ''' simulate modbus tcp server for testing ppg ''' +import json import sys -from modbus_tk import modbus_tcp, hooks, utils -from modbus_tk.defines import HOLDING_REGISTERS + +from modbus_tk import hooks, modbus_tcp, utils +from modbus_tk.defines import HOLDING_REGISTERS, READ_INPUT_REGISTERS + def on_write_request(request): print(f"Write request: {request}") @@ -9,8 +12,30 @@ def on_write_request(request): server = modbus_tcp.TcpServer(address="0.0.0.0", port=5020) slave = server.add_slave(1) -slave.add_block('0', HOLDING_REGISTERS, 0, 100) # 100 registers -slave.set_values('0', 40, [1] * (55 - 40 + 1)) #regiters 40-55 set to 1. for emulating hdhk_16ch_ac_module + +#load registries +input_save_path = "input_registry.json" +holding_save_path = "holding_registry.json" + +#load previous scan if enabled and exists +with open(input_save_path, "r") as file: + input_registry = json.load(file) + +with open(holding_save_path, "r") as file: + holding_registry = json.load(file) + +# Convert keys to integers +input_registry = {int(key): value for key, value in input_registry.items()} +holding_registry = {int(key): value for key, value in holding_registry.items()} + +slave.add_block('INPUT', READ_INPUT_REGISTERS, 0, max(input_registry.keys()) +1 ) +slave.add_block('HOLDING', HOLDING_REGISTERS, 0, max(holding_registry.keys()) +1) + +for address, value in input_registry.items(): + slave.set_values('INPUT', address, [value]) + +for address, value in holding_registry.items(): + slave.set_values('HOLDING', address, [value]) server.start() print("Modbus server is running on port 5020...")