-
Notifications
You must be signed in to change notification settings - Fork 0
/
serialmanager.py
219 lines (189 loc) · 7.25 KB
/
serialmanager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
import time
import serial
import re
import serial.tools.list_ports
from PyQt5.QtCore import QObject, pyqtSignal, pyqtSlot
class SerialManager(QObject):
"""Class that handles the serial connection."""
data_ready = pyqtSignal(str)
no_port_sel = pyqtSignal()
sleep_finished = pyqtSignal()
line_written = pyqtSignal()
serial_test_succeeded = pyqtSignal(str)
serial_test_failed = pyqtSignal(str)
port_unavailable_signal = pyqtSignal()
version_signal = pyqtSignal(str)
no_version = pyqtSignal()
serial_error_signal = pyqtSignal()
file_not_found_signal = pyqtSignal(str)
generic_error_signal = pyqtSignal(str)
def __init__(self):
super().__init__()
self.ser = serial.Serial(None, 115200, timeout=15,
parity=serial.PARITY_NONE, rtscts=False,
xonxoff=False, dsrdtr=False)
self.end = b"\r\n>"
def scan_ports():
"""Scan and return list of connected comm ports."""
return serial.tools.list_ports.comports()
def rs485_write_command(self, command: str):
"""Write a command by sending individual chars and wait for echo back,
because its a RS485 interface: half-duplex with no control flow.
"""
for c in command:
self.ser.write(c.encode())
self.ser.flush()
time.sleep(0.05)
self.ser.read(self.ser.in_waiting)
self.ser.write(b"\r\n")
self.ser.flush()
time.sleep(0.1)
@pyqtSlot(str)
def send_command(self, command):
"""Checks connection to the serial port and sends a command."""
if self.ser.is_open:
try:
self.flush_buffers()
self.rs485_write_command(command)
try:
response = self.ser.read_until(self.end).decode()
self.data_ready.emit(response)
except UnicodeDecodeError:
self.serial_error_signal.emit()
return
except serial.serialutil.SerialException:
self.no_port_sel.emit()
else:
self.no_port_sel.emit()
@pyqtSlot()
def version_check(self):
command = "version"
p = r"[0-9]+\.[0-9]+[a-z]"
if self.ser.is_open:
try:
self.flush_buffers()
self.rs485_write_command(command)
try:
response = self.ser.read_until(self.end).decode()
except UnicodeDecodeError:
self.serial_error_signal.emit()
return
# Ensure version matches format, otherwise emit error signal.
if re.search(p, response):
self.version_signal.emit(re.search(p, response).group())
return
else:
self.no_version.emit()
return
except serial.serialutil.SerialException:
self.port_unavailable_signal.emit()
except Exception as e:
self.generic_error_signal.emit()
else:
self.no_port_sel.emit()
@pyqtSlot()
def one_wire_test(self):
"""Sends command for one wire test and evaluates the result."""
if self.ser.is_open:
try:
self.flush_buffers()
self.rs485_write_command("1-wire-test")
time.sleep(0.5)
self.ser.write(" ".encode())
time.sleep(0.3)
self.ser.write(".".encode())
data = self.ser.read_until(self.end).decode()
self.data_ready.emit(data)
except serial.serialutil.SerialException:
self.no_port_sel.emit()
else:
self.no_port_sel.emit()
@pyqtSlot()
def reprogram_one_wire(self):
"""Sends command to reprogram one wire master."""
if self.ser.is_open:
try:
self.rs485_write_command("reprogram-1-wire-master")
# Wait for serial buffer to fill
time.sleep(5)
num_bytes = self.ser.in_waiting
data = self.ser.read(num_bytes).decode()
self.data_ready.emit(data)
except serial.serialutil.SerialException:
self.no_port_sel.emit()
else:
self.no_port_sel.emit()
@pyqtSlot(str)
def write_hex_file(self, file_path):
"""Writes hex file line-by-line with appropriate delay between
each name."""
if self.ser.is_open:
try:
with open(file_path, "rb") as f:
for line in f:
self.ser.write(line)
self.line_written.emit()
# minimum of 50 ms delay required after each line
time.sleep(0.060)
except serial.serialutil.SerialException:
self.no_port_sel.emit()
except FileNotFoundError:
self.file_not_found_signal.emit("1-wire-master")
time.sleep(3)
data = self.ser.read_until(self.end).decode()
self.data_ready.emit(data)
else:
self.no_port_sel.emit()
@pyqtSlot(str)
def set_serial(self, serial_num):
"""Sets the serial port."""
if self.ser.is_open:
try:
self.flush_buffers
s = serial_num + "\r\n"
self.ser.write(s.encode())
time.sleep(0.3)
data = self.ser.read_until(self.end).decode()
# Try to get serial number twice
if serial_num not in data:
self.flush_buffers
self.ser.write(s.encode())
time.sleep(0.3)
data = self.ser.read_until(self.end).decode()
if serial_num not in data:
self.serial_test_failed.emit(data)
return
self.serial_test_succeeded.emit(serial_num)
except serial.serialutil.SerialException:
self.no_port_sel.emit()
@pyqtSlot(int)
def sleep(self, interval):
"""Wait for a specified time period."""
time.sleep(interval)
self.sleep_finished.emit()
def is_connected(self, port):
"""Checks for serial connection."""
try:
self.ser.write(b"\r\n")
time.sleep(0.1)
self.ser.read(self.ser.in_waiting)
except serial.serialutil.SerialException:
return False
return self.ser.port == port and self.ser.is_open
def open_port(self, port: str) -> bool:
"""Opens serial port and checks that board is available."""
try:
self.ser.close()
self.ser.port = port
self.ser.open()
except serial.serialutil.SerialException:
self.port_unavailable_signal.emit()
def flush_buffers(self):
"""Flushes the serial buffer by writing to the buffer and then reading
all the available bytes."""
self.ser.write("\r\n".encode())
time.sleep(0.5)
self.ser.read(self.ser.in_waiting)
def close_port(self):
"""Closes serial port."""
self.ser.close()