In [None]:
# import neccessary libraries
import minimalmodbus
import serial.tools.list_ports
import time
from datetime import datetime
import json

# Find connected COM Ports Windows (RS485 to USB Converter)
ports = list(serial.tools.list_ports.comports())
if len(ports)==0:
    print('Please Plug A USB to RS485 Serial Converter in to Computer')
else:
    for p in ports:
        print (p)
        
PORT = str(input("Which of the COM Ports above is your RS458 Bus Line?"))
SLAVE_ADDRESS = int(input("What is the Slave Address or RTU# of your sensor?"))

twinprox = minimalmodbus.Instrument(PORT, SLAVE_ADDRESS)

# update current slave settings for TwinProx defaults and some useful variables
twinprox.serial.port                        # this is the serial port name
twinprox.address                            # this is the slave address (set this to the last 2 digits of the serial number of the TwinProx that you want to communicate with)
twinprox.serial.baudrate = 115200           # Baudrate fixed 115200
twinprox.serial.bytesize = 8                # Bytesize fixed 8
twinprox.serial.parity   = "N"              # Parity fixed None 
twinprox.serial.stopbits = 1                # Stopbits fixed 1
twinprox.serial.timeout  = 0.30             # Seconds
twinprox.close_port_after_each_call = True  # Helps communication for Windows Devices (can be set to false on many Linux devices)
twinprox.mode = minimalmodbus.MODE_RTU      # modbus mode fixed RTU Mode
twinprox.clear_buffers_before_each_transaction = True


print(twinprox)                             # check updated slave communication settings

def stability_data():
    distance_a = twinprox.read_register(176, functioncode=3)
    distance_b = twinprox.read_register(177, functioncode=3)
    displacement_a = twinprox.read_register(178, functioncode=3)
    displacement_b = twinprox.read_register(179, functioncode=3)
    temperature = twinprox.read_float(201, functioncode=3, number_of_registers=2, byteorder=3)
    return({"temperature":temperature, "distance_a":distance_a/100, "displacement_a":displacement_a/100, "distance_b":distance_b/100, "displacement_b":displacement_b/100})

def get_time():
    return(str(time.time())[slice(10)])

reads=0
logs = {}
data=[]
while(reads<=24):
    logs[get_time()] = stability_data()
    print(f"logs: {logs}")
    time.sleep(900)
    reads+=1      
encoded_json = json.JSONEncoder().encode(logs)
file_name = "stability_test"+".json"
f = open(file_name, "w")
f.write(encoded_json)
f.close()

In [None]:
import minimalmodbus
import serial.tools.list_ports
import time
import json
from datetime import datetime
import logging
from typing import Dict, Any

class TwinProxSensorLogger:
    """Logger for TwinProx sensor using Modbus RTU communication."""
    
    def __init__(self, port: str, slave_address: int):
        """Initialize TwinProx sensor logger."""
        logging.basicConfig(
            level=logging.INFO, 
            format='%(asctime)s - %(levelname)s - %(message)s',
            filename='sensor_log.log'
        )
        self.logger = logging.getLogger(__name__)
        
        try:
            self.instrument = minimalmodbus.Instrument(port, slave_address)
            self._configure_instrument()
            self.logger.info(f"Initialized TwinProx sensor on {port} with address {slave_address}")
        except Exception as e:
            self.logger.error(f"Sensor initialization failed: {e}")
            raise
    
    def _configure_instrument(self):
        """Configure Modbus instrument communication parameters."""
        self.instrument.serial.baudrate = 115200
        self.instrument.serial.bytesize = 8
        self.instrument.serial.parity = "N"
        self.instrument.serial.stopbits = 1
        self.instrument.serial.timeout = 0.30
        self.instrument.close_port_after_each_call = True
        self.instrument.mode = minimalmodbus.MODE_RTU
        self.instrument.clear_buffers_before_each_transaction = True
    
    def read_stability_data(self) -> Dict[str, Any]:
        """Read and return sensor stability data."""
        try:
            return {
                "temperature": self._read_temperature(),
                "distance_a": self._read_register(176) / 100,
                "distance_b": self._read_register(177) / 100,
                "displacement_a": self._read_register(178) / 100,
                "displacement_b": self._read_register(179) / 100
            }
        except Exception as e:
            self.logger.error(f"Data read failed: {e}")
            raise
    
    def _read_temperature(self) -> float:
        """Read temperature from sensor."""
        return self.instrument.read_float(
            registeraddress=201, 
            functioncode=3, 
            number_of_registers=2, 
            byteorder=3
        )
    
    def _read_register(self, register: int) -> float:
        """Read a single register from sensor."""
        return self.instrument.read_register(register, functioncode=3)
    
    def log_data(self, duration_hours: int = 24, interval_seconds: int = 10) -> None:
        """Log sensor data over specified duration."""
        logs = {}
        iterations = int((duration_hours * 3600) / interval_seconds)
        
        try:
            for _ in range(iterations):
                current_time = datetime.now().isoformat()
                logs[current_time] = self.read_stability_data()
                self.logger.info(f"Recorded data at {current_time}")
                
                if len(logs) % 5 == 0:
                    self._save_intermediate_logs(logs)
                
                time.sleep(interval_seconds)
            
            self._save_logs_to_file(logs)
        
        except Exception as e:
            self.logger.error(f"Data logging failed: {e}")
    
    def _save_intermediate_logs(self, logs: Dict[str, Any]) -> None:
        """Save intermediate logs to prevent data loss."""
        try:
            with open("intermediate_stability_log.json", "w") as f:
                json.dump(logs, f, indent=2)
        except IOError as e:
            self.logger.error(f"Intermediate log save failed: {e}")
    
    def _save_logs_to_file(self, logs: Dict[str, Any]) -> None:
        """Save final logs to a timestamped JSON file."""
        try:
            filename = f"stability_test_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
            with open(filename, "w") as f:
                json.dump(logs, f, indent=2)
            
            self.logger.info(f"Logs saved to {filename}")
        except IOError as e:
            self.logger.error(f"Final log save failed: {e}")

def find_available_ports():
    """Find and print available COM ports."""
    ports = list(serial.tools.list_ports.comports())
    if not ports:
        print('No COM ports found. Connect a USB to RS485 Serial Converter.')
        return []
    
    for p in ports:
        print(p)
    
    return ports

def main():
    """Run the TwinProx sensor logging process."""
    available_ports = find_available_ports()
    
    if not available_ports:
        return
    
    port = input("Enter COM Port for RS485 Bus Line: ")
    slave_address = int(input("Enter Slave Address (RTU#): "))
    
    try:
        sensor_logger = TwinProxSensorLogger(port, slave_address)
        sensor_logger.log_data()
        print("data_logged")
    except Exception as e:
        print(f"Logging error: {e}")

if __name__ == "__main__":
    main()

In [None]:
import minimalmodbus
import serial.tools.list_ports
import time
import json
from datetime import datetime
import logging
from typing import Dict, Any

class TwinProxSensorLogger:
    def __init__(self, port: str, slave_address: int, verbose: bool = True):
        """Initialize TwinProx sensor logger with optional verbose mode."""
        self.verbose = verbose
        logging.basicConfig(
            level=logging.INFO, 
            format='%(asctime)s - %(levelname)s - %(message)s',
            filename='sensor_log.log'
        )
        self.logger = logging.getLogger(__name__)
        
        try:
            self.instrument = minimalmodbus.Instrument(port, slave_address)
            self._configure_instrument()
            if self.verbose:
                print(f"Initialized TwinProx sensor on {port} with address {slave_address}")
            self.logger.info(f"Initialized TwinProx sensor on {port} with address {slave_address}")
        except Exception as e:
            error_msg = f"Sensor initialization failed: {e}"
            if self.verbose:
                print(error_msg)
            self.logger.error(error_msg)
            raise
    
    def _configure_instrument(self):
        """Configure Modbus instrument communication parameters."""
        self.instrument.serial.baudrate = 115200
        self.instrument.serial.bytesize = 8
        self.instrument.serial.parity = "N"
        self.instrument.serial.stopbits = 1
        self.instrument.serial.timeout = 0.30
        self.instrument.close_port_after_each_call = True
        self.instrument.mode = minimalmodbus.MODE_RTU
        self.instrument.clear_buffers_before_each_transaction = True
    
    def read_stability_data(self) -> Dict[str, Any]:
        """Read and return sensor stability data."""
        try:
            data = {
                "temperature": self._read_temperature(),
                "distance_a": self._read_register(176) / 100,
                "distance_b": self._read_register(177) / 100,
                "displacement_a": self._read_register(178) / 100,
                "displacement_b": self._read_register(179) / 100
            }
            if self.verbose:
                print(f"Sensor data: {data}")
            return data
        except Exception as e:
            error_msg = f"Data read failed: {e}"
            if self.verbose:
                print(error_msg)
            self.logger.error(error_msg)
            raise
    
    def _read_temperature(self) -> float:
        """Read temperature from sensor."""
        return self.instrument.read_float(
            registeraddress=201, 
            functioncode=3, 
            number_of_registers=2, 
            byteorder=3
        )
    
    def _read_register(self, register: int) -> float:
        """Read a single register from sensor."""
        return self.instrument.read_register(register, functioncode=3)
    
    def log_data(self, duration_hours: int = 1, interval_seconds: int = 10) -> Dict[str, Any]:
        """Log sensor data over specified duration."""
        logs = {}
        iterations = int((duration_hours * 3600) / interval_seconds)
        
        try:
            for _ in range(iterations):
                current_time = datetime.now().isoformat()
                logs[current_time] = self.read_stability_data()
                self.logger.info(f"Recorded data at {current_time}")
                
                if len(logs) % 5 == 0:
                    self._save_intermediate_logs(logs)
                
                time.sleep(interval_seconds)
            
            final_log_file = self._save_logs_to_file(logs)
            return logs
        
        except Exception as e:
            error_msg = f"Data logging failed: {e}"
            if self.verbose:
                print(error_msg)
            self.logger.error(error_msg)
            raise
    
    def _save_intermediate_logs(self, logs: Dict[str, Any]) -> None:
        """Save intermediate logs to prevent data loss."""
        try:
            with open("intermediate_stability_log.json", "w") as f:
                json.dump(logs, f, indent=2)
            if self.verbose:
                print("Saved intermediate logs")
        except IOError as e:
            error_msg = f"Intermediate log save failed: {e}"
            if self.verbose:
                print(error_msg)
            self.logger.error(error_msg)
    
    def _save_logs_to_file(self, logs: Dict[str, Any]) -> str:
        """Save final logs to a timestamped JSON file."""
        try:
            filename = f"stability_test_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
            with open(filename, "w") as f:
                json.dump(logs, f, indent=2)
            
            if self.verbose:
                print(f"Logs saved to {filename}")
            self.logger.info(f"Logs saved to {filename}")
            return filename
        except IOError as e:
            error_msg = f"Final log save failed: {e}"
            if self.verbose:
                print(error_msg)
            self.logger.error(error_msg)
            raise

def find_available_ports(verbose: bool = True):
    """Find and print available COM ports."""
    ports = list(serial.tools.list_ports.comports())
    if not ports:
        msg = 'No COM ports found. Connect a USB to RS485 Serial Converter.'
        if verbose:
            print(msg)
        return []
    
    for p in ports:
        print(p)
    
    return ports

def main():
    """Run the TwinProx sensor logging process."""
    try:
        available_ports = find_available_ports()
        
        if not available_ports:
            return
        
        port = input("Enter COM Port for RS485 Bus Line: ")
        slave_address = int(input("Enter Slave Address (RTU#): "))
        
        sensor_logger = TwinProxSensorLogger(port, slave_address)
        logged_data = sensor_logger.log_data(duration_hours=0.1)  # Log for 6 minutes instead of 24 hours
        print("Data logging completed successfully.")
        print("Last data point:", list(logged_data.values())[-1] if logged_data else "No data")
    
    except Exception as e:
        print(f"Logging error: {e}")

if __name__ == "__main__":
    main()

COM2 - Communications Port (COM2)
COM4 - USB Serial Port (COM4)


Enter COM Port for RS485 Bus Line:  com4
Enter Slave Address (RTU#):  10


Initialized TwinProx sensor on com4 with address 10
Sensor data: {'temperature': 43.3770751953125, 'distance_a': 34.58, 'distance_b': 41.04, 'displacement_a': 0.16, 'displacement_b': 0.16}
Sensor data: {'temperature': 43.46569061279297, 'distance_a': 34.57, 'distance_b': 41.04, 'displacement_a': 0.16, 'displacement_b': 0.15}
Sensor data: {'temperature': 43.46569061279297, 'distance_a': 34.58, 'distance_b': 41.05, 'displacement_a': 0.18, 'displacement_b': 0.15}
Sensor data: {'temperature': 43.46569061279297, 'distance_a': 34.56, 'distance_b': 41.03, 'displacement_a': 0.16, 'displacement_b': 0.15}
Sensor data: {'temperature': 43.46569061279297, 'distance_a': 34.55, 'distance_b': 41.01, 'displacement_a': 0.17, 'displacement_b': 0.15}
Saved intermediate logs
Sensor data: {'temperature': 43.46569061279297, 'distance_a': 34.57, 'distance_b': 41.03, 'displacement_a': 0.17, 'displacement_b': 0.15}
Sensor data: {'temperature': 43.46569061279297, 'distance_a': 34.54, 'distance_b': 41.01, 'displa