Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 10 additions & 37 deletions benchmesh-serial-service/src/benchmesh_service/drivers/owon_oel.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,18 @@
import serial
import time
from ..transport import SerialTransport

class OwonOEL:
def __init__(self, port, baudrate=115200):
self.port = port
self.baudrate = baudrate
self.ser = None
self.connect()
def __init__(self, port, baudrate=115200, serial_mode='8N1', seol='\r', reol='\r'):
self.t = SerialTransport(port, baudrate, serial_mode=serial_mode, seol=seol, reol=reol).open()

def connect(self):
self.ser = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1.0
)
time.sleep(0.2)
def identify(self):
self.t.write_line('*IDN?')
return self.t.read_until_reol(1024)

def write(self, command):
if self.ser and self.ser.is_open:
self.ser.write(command)
def write(self, data: bytes):
self.t.write(data)

def read(self, size=1024):
if self.ser and self.ser.is_open:
return self.ser.read(size)
return None

def check_status(self):
if self.ser and self.ser.is_open:
try:
self.ser.write(b'*IDN?\r')
time.sleep(0.2)
reply = self.ser.read(1024)
return reply
except Exception as e:
print(f"Error checking status: {e}")
self.connect() # Attempt to reconnect on error
return None
return self.t.read(size)

def close(self):
if self.ser:
self.ser.close()
self.t.close()
51 changes: 12 additions & 39 deletions benchmesh-serial-service/src/benchmesh_service/drivers/owon_spm.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,18 @@
import serial
import time
import logging
from ..transport import SerialTransport

class OWONSPM:
def __init__(self, port, baudrate=115200):
self.port = port
self.baudrate = baudrate
self.ser = None
self.connected = False
self.logger = logging.getLogger(__name__)
def __init__(self, port, baudrate=115200, serial_mode='8N1', seol='\r', reol='\r'):
self.t = SerialTransport(port, baudrate, serial_mode=serial_mode, seol=seol, reol=reol).open()

def connect(self):
try:
self.ser = serial.Serial(self.port, self.baudrate, timeout=1)
self.connected = True
self.logger.info(f"Connected to OWON SPM on {self.port}")
except serial.SerialException as e:
self.logger.error(f"Failed to connect to OWON SPM on {self.port}: {e}")
def identify(self):
self.t.write_line('*IDN?')
return self.t.read_until_reol(1024)

def disconnect(self):
if self.ser and self.ser.is_open:
self.ser.close()
self.connected = False
self.logger.info(f"Disconnected from OWON SPM on {self.port}")
def write(self, text: str):
self.t.write_line(text)

def send_command(self, command):
if self.connected:
self.ser.write(command.encode() + b'\r')
time.sleep(0.1) # wait for the command to be processed
response = self.ser.read(1024).decode()
return response
else:
self.logger.warning("Attempted to send command while not connected.")
return None
def read(self, size=1024):
return self.t.read(size)

def check_status(self):
if self.connected:
self.logger.info("Checking status of OWON SPM.")
# Implement status check logic here
else:
self.logger.warning("Cannot check status, not connected.")

def __del__(self):
self.disconnect()
def close(self):
self.t.close()
66 changes: 13 additions & 53 deletions benchmesh-serial-service/src/benchmesh_service/drivers/owon_xdm.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,21 @@
import serial
import time
import logging
from ..transport import SerialTransport

class OWONXDM:
def __init__(self, port, baudrate):
self.port = port
self.baudrate = baudrate
self.ser = None
self.connected = False
def __init__(self, port, baudrate=115200, serial_mode='8N1', seol='\r', reol='\r'):
self.t = SerialTransport(port, baudrate, serial_mode=serial_mode, seol=seol, reol=reol).open()

def connect(self):
try:
self.ser = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1.0
)
self.connected = True
logging.info(f"Connected to OWON XDM on {self.port}")
except serial.SerialException as e:
logging.error(f"Failed to connect to OWON XDM on {self.port}: {e}")
self.connected = False
def identify(self):
self.t.write_line('*IDN?')
return self.t.read_until_reol(1024)

def disconnect(self):
if self.ser and self.ser.is_open:
self.ser.close()
self.connected = False
logging.info(f"Disconnected from OWON XDM on {self.port}")
def write(self, data: bytes):
self.t.write(data)

def send_command(self, command):
if self.connected:
self.ser.write(command)
time.sleep(0.1) # wait for the command to be processed
response = self.ser.read(1024) # read response
return response
else:
logging.warning("Attempted to send command while not connected.")
return None
def read(self, size=1024):
return self.t.read(size)

def check_status(self):
if self.connected:
try:
self.ser.write(b'*IDN?\r')
time.sleep(0.1)
reply = self.ser.read(1024)
logging.info(f"Status check reply: {reply}")
return reply
except Exception as e:
logging.error(f"Error checking status: {e}")
self.disconnect()
return None
else:
logging.warning("Attempted to check status while not connected.")
return None
def close(self):
self.t.close()

def is_connected(self):
return self.connected
return self.t.is_open
60 changes: 14 additions & 46 deletions benchmesh-serial-service/src/benchmesh_service/drivers/tenma_psu.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,20 @@
import serial
import time
import threading
from src.benchmesh_service.logger import logger
from ..transport import SerialTransport
from ..logger import logger

class TenmaPSU:
def __init__(self, port, baudrate=115200):
self.port = port
self.baudrate = baudrate
self.ser = None
self.is_connected = False
def __init__(self, port, baudrate=115200, serial_mode='8N1', seol='', reol=''):
# TENMA manifest declares empty EOLs
self.t = SerialTransport(port, baudrate, serial_mode=serial_mode, seol=seol, reol=reol).open()

def connect(self):
try:
self.ser = serial.Serial(self.port, self.baudrate, timeout=1)
self.is_connected = True
logger.info(f"Connected to TENMA PSU on {self.port}")
except serial.SerialException as e:
logger.error(f"Failed to connect to TENMA PSU: {e}")
def identify(self):
self.t.write_line('*IDN?')
return self.t.read_until_reol(1024)

def disconnect(self):
if self.ser and self.ser.is_open:
self.ser.close()
self.is_connected = False
logger.info("Disconnected from TENMA PSU")
def write(self, text: str):
self.t.write_line(text)

def send_command(self, command):
if self.is_connected:
self.ser.write(command.encode() + b'\r\n')
time.sleep(0.1) # wait for the command to be processed
response = self.ser.readline().decode().strip()
return response
else:
logger.warning("Attempted to send command while not connected")
return None
def read(self, size=1024):
return self.t.read(size)

def check_status(self):
if self.is_connected:
logger.info("Checking status of TENMA PSU")
# Example command to check status
response = self.send_command("*IDN?")
logger.info(f"Status response: {response}")
else:
logger.warning("Cannot check status, not connected")

def start_status_check(self):
def status_check_loop():
while self.is_connected:
self.check_status()
time.sleep(0.5) # check status every 500ms

threading.Thread(target=status_check_loop, daemon=True).start()
def close(self):
self.t.close()
15 changes: 11 additions & 4 deletions benchmesh-serial-service/src/benchmesh_service/logger.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
import logging


def setup_logger():
logger = logging.getLogger("benchmesh_service")
# Avoid adding duplicate handlers if called multiple times
if getattr(logger, "_is_configured", False):
return logger

logger.setLevel(logging.DEBUG)

# Create file handler
fh = logging.FileHandler("benchmesh_service.log")
fh.setLevel(logging.DEBUG)

# Create console handler
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)

# Create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)

# Add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)

# Prevent propagation to root logger (avoids duplicate logs via root handlers)
logger.propagate = False
# Mark as configured
logger._is_configured = True

return logger


logger = setup_logger()
Loading