In [1]:
import time
from pymodbus.client.sync import ModbusSerialClient as ModbusClient
from pymodbus.exceptions import ModbusException
import matplotlib.pyplot as plt
import time
from pymodbus.client.sync import ModbusSerialClient as ModbusClient
from pymodbus.exceptions import ModbusException, ModbusIOException
import sys

In [None]:
# Configuration - Adjust these as per your setup
SERIAL_PORT = 'COM3'  # Updated based on successful test
BAUDRATE = 115200  # From Config.h
TIMEOUT = 1  # Seconds
PARITY = 'E'  # Even parity as per SERIAL_8E1 in firmware
STOPBITS = 1
BYTESIZE = 8
SLAVE_ID = 1  # From Config.h

# Updated lists based on Config.h ModbusReg
# Holding registers (writeable)
HOLDING_REGISTERS = (
    list(range(0, 15)) +  # DUTY_BASE 0-14
    list(range(100, 115)) +  # FREQ_BASE 100-114
    [500, 501] +  # MOTOR_TEMP_CRIT, MOTOR_CURR_CRIT
    [900] +  # START_REG_ADDR
    list(range(911, 915)) +  # DEV_STATUS_BASE 911-914
    [921, 922] +  # AIR_TEMP_LOW, AIR_TEMP_HIGH
    [931, 932]  # WATER_TEMP_LOW, WATER_TEMP_HIGH
)

# Input registers (read-only)
INPUT_REGISTERS = (
    list(range(200, 215)) +  # CURR_BASE 200-214
    list(range(300, 315)) +  # TEMP_BASE 300-314
    list(range(400, 415)) +  # STATUS_BASE 400-414
    [901] +  # TIME_LOW
    [920] +  # AIR_TEMP_REG
    [930]  # WATER_TEMP_REG
)

def test_modbus_reads(client, verbose=True):
    holding_values = {}
    input_values = {}

    if verbose:
        print("Testing Holding Register Reads:")
    for addr in HOLDING_REGISTERS:
        try:
            response = client.read_holding_registers(addr, 1, unit=SLAVE_ID)
            if response.isError():
                if verbose:
                    print(f"Error reading holding register {addr}: {response}")
            else:
                holding_values[addr] = response.registers[0]
                if verbose:
                    print(f"Holding register {addr}: {response.registers[0]}")
        except ModbusIOException as e:
            if verbose:
                print(f"IO Error on holding {addr}: {e} - Possible communication issue, check wiring/parity/baudrate.")
        except ModbusException as e:
            if verbose:
                print(f"Modbus Exception on holding {addr}: {e} - e.g., Illegal Address if out of range.")
        except Exception as e:
            if verbose:
                print(f"Unexpected error on holding {addr}: {e}")
        time.sleep(0.05)  # Small delay to avoid flooding

    if verbose:
        print("\nTesting Input Register Reads:")
    for addr in INPUT_REGISTERS:
        try:
            response = client.read_input_registers(addr, 1, unit=SLAVE_ID)
            if response.isError():
                if verbose:
                    print(f"Error reading input register {addr}: {response}")
            else:
                input_values[addr] = response.registers[0]
                if verbose:
                    print(f"Input register {addr}: {response.registers[0]}")
        except ModbusIOException as e:
            if verbose:
                print(f"IO Error on input {addr}: {e} - Possible interference or timeout.")
        except ModbusException as e:
            if verbose:
                print(f"Modbus Exception on input {addr}: {e}")
        except Exception as e:
            if verbose:
                print(f"Unexpected error on input {addr}: {e}")
        time.sleep(0.05)

    return holding_values, input_values

def looped_read(client, duration=60, interval=1):
    print(f"\nStarting looped read every {interval} second for {duration} seconds...")
    prev_holding = {}
    prev_input = {}
    for t in range(duration // interval):
        print(f"\n--- Iteration {t+1} at time {t*interval} seconds ---")
        holding_values, input_values = test_modbus_reads(client, verbose=False)
        
        # Check for changes in holding registers
        changes = False
        print("Holding Register Changes:")
        for addr, val in holding_values.items():
            if addr in prev_holding and prev_holding[addr] != val:
                print(f"  {addr}: {prev_holding[addr]} -> {val}")
                changes = True
        if not changes:
            print("  No changes")
        
        # Check for changes in input registers
        changes = False
        print("Input Register Changes:")
        for addr, val in input_values.items():
            if addr in prev_input and prev_input[addr] != val:
                print(f"  {addr}: {prev_input[addr]} -> {val}")
                changes = True
        if not changes:
            print("  No changes")
        
        prev_holding = holding_values
        prev_input = input_values
        time.sleep(interval)

def test_modbus_writes(client):
    print("\nTesting Writes to Holding Registers (e.g., to trigger changes):")
    test_addr = 0  # DUTY_BASE + 0
    test_value = 500  # 50% duty (0-1000 scale)
    try:
        response = client.write_register(test_addr, test_value, unit=SLAVE_ID)
        if response.isError():
            print(f"Error writing to {test_addr}: {response}")
        else:
            print(f"Successfully wrote {test_value} to {test_addr}")
            # Read back to verify
            read_back = client.read_holding_registers(test_addr, 1, unit=SLAVE_ID)
            if not read_back.isError():
                print(f"Read back: {read_back.registers[0]}")
    except Exception as e:
        print(f"Error during write test: {e}")

if __name__ == "__main__":
    client = ModbusClient(method='rtu', port=SERIAL_PORT, baudrate=BAUDRATE,
                          timeout=TIMEOUT, parity=PARITY, stopbits=STOPBITS, bytesize=BYTESIZE)
    
    if not client.connect():
        print("Failed to connect to serial port. Check port name, permissions, or if device is connected.")
        sys.exit(1)
    
    print(f"Connected to {SERIAL_PORT} at {BAUDRATE} baud with parity {PARITY}.")

    # Initial full read with verbose output
    test_modbus_reads(client)

    # Looped read to monitor changes over 1 minute
    looped_read(client)
    
    # Optional write test
    test_modbus_writes(client)
    
    client.close()
    print("Test complete. Review changes in looped reads for dynamic behavior (e.g., if input registers update over time). If static, check firmware update loops.")

Connected to COM3 at 115200 baud with parity E.
Testing Holding Register Reads:
Holding register 0: 0
Holding register 1: 0
Holding register 2: 0
Holding register 3: 0
Holding register 4: 0
Holding register 5: 0
Holding register 6: 0
Holding register 7: 0
Holding register 8: 0
Holding register 9: 0
Holding register 10: 0
Holding register 11: 0
Holding register 12: 0
Holding register 13: 0
Holding register 14: 0
Holding register 100: 1000
Holding register 101: 1000
Holding register 102: 1000
Holding register 103: 1000
Holding register 104: 1000
Holding register 105: 1000
Holding register 106: 1000
Holding register 107: 1000
Holding register 108: 1000
Holding register 109: 1000
Holding register 110: 1000
Holding register 111: 1000
Holding register 112: 1000
Holding register 113: 1000
Holding register 114: 1000
Holding register 500: 6000
Holding register 501: 900
Holding register 900: 0
Holding register 911: 0
Holding register 912: 0
Holding register 913: 0
Holding register 914: 0
Holding

KeyboardInterrupt: 