Skip to content

Commit

Permalink
+: API change, get_transparent_reply replaced by get_message_payload
Browse files Browse the repository at this point in the history
get_transparent_reply is still used internally
get_message_payload is optimised for the SARAD protocols. It reads the first 3
byte to get the length of the remaining message. This improves the performance
of data transmission.
  • Loading branch information
mistrey committed Jul 20, 2021
1 parent 2c521ec commit a1c2700
Showing 1 changed file with 121 additions and 114 deletions.
235 changes: 121 additions & 114 deletions sarad/sari.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,9 @@ class SaradInst(Generic[SI]):
components: List of sensor or actor components
Public methods:
get_reply()
get_transparent_reply"""
get_message_payload()"""

version = "0.1"
version = "1.0"

class Lock(Enum):
"""Setting of the device. Lock the hardware button."""
Expand Down Expand Up @@ -549,58 +549,26 @@ def __check_answer(answer: bytes) -> CheckedAnswerDict:
"number_of_bytes_in_payload": number_of_bytes_in_payload,
}

# *** __get_message_payload():
# *** get_message_payload():

def __get_message_payload(
self, message: bytes, expected_length_of_reply: int, timeout: int
) -> CheckedAnswerDict:
def get_message_payload(self, message: bytes, timeout: int) -> CheckedAnswerDict:
"""Returns a dictionary of:
is_valid: True if answer is valid, False otherwise
is_control_message: True if control message
payload: Payload of answer
number_of_bytes_in_payload"""

def cmd_cylce():
serial_port = self.__port
baudrate = self.__family["baudrate"]
parity = self.__family["parity"]
write_sleeptime = self.__family["write_sleeptime"]
wait_for_reply = self.__family["wait_for_reply"]
ser = Serial(
serial_port,
baudrate,
bytesize=8,
xonxoff=0,
timeout=timeout,
parity=parity,
rtscts=0,
stopbits=STOPBITS_ONE,
)
for element in message:
byte = (element).to_bytes(1, "big")
ser.write(byte)
time.sleep(write_sleeptime)
time.sleep(wait_for_reply)
answer = ser.read(expected_length_of_reply)
time.sleep(0.1)
while ser.in_waiting:
logger.debug("%s bytes waiting.", ser.in_waiting)
ser.read(ser.in_waiting)
time.sleep(0.5)
ser.close()
return answer

answer = cmd_cylce()
answer = self._get_transparent_reply(message, timeout=timeout, keep=False)
if answer == b"":
# Workaround for firmware bug in SARAD instruments.
logger.debug("Play it again, Sam!")
answer = cmd_cylce()
answer = self._get_transparent_reply(message, timeout=timeout, keep=False)
checked_answer = self.__check_answer(answer)
return {
"is_valid": checked_answer["is_valid"],
"is_control": checked_answer["is_control"],
"payload": checked_answer["payload"],
"number_of_bytes_in_payload": checked_answer["number_of_bytes_in_payload"],
"raw": answer,
}

# *** __str__(self):
Expand Down Expand Up @@ -634,7 +602,7 @@ def _get_description(self) -> bool:
id_cmd = self.family["get_id_cmd"]
length_of_reply = self.family["length_of_reply"]
ok_byte = self.family["ok_byte"]
reply = self.get_reply(id_cmd, length_of_reply)
reply = self.get_reply(id_cmd, length_of_reply, timeout=0.5)
if reply and (reply[0] == ok_byte):
logger.debug("Get description successful.")
try:
Expand Down Expand Up @@ -765,100 +733,139 @@ def _get_parameter(self, parameter_name: str) -> Any:
# ** Public methods:
# *** get_reply():

def get_reply(self, cmd_data: List[bytes], reply_length=50, timeout=1) -> Any:
def get_reply(self, cmd_data: List[bytes], _reply_length=50, timeout=0.1) -> Any:
"""Returns a bytestring of the payload of the instruments reply
to the provided list of 1-byte command and data bytes."""
length = reply_length + 6
msg = self.__make_command_msg(cmd_data)
checked_payload = self.__get_message_payload(msg, length, timeout)
checked_payload = self.get_message_payload(msg, timeout)
if checked_payload["is_valid"]:
return checked_payload["payload"]
logger.debug(checked_payload["payload"])
return False

# *** get_transparent_reply():
# *** _get_transparent_reply():
@staticmethod
def __get_control_bytes(serial):
"""Read 3 Bytes from serial interface"""
perf_time_0 = time.perf_counter()
answer = serial.read(3)
perf_time_1 = time.perf_counter()
logging.debug(
"Receiving %s from serial took me %f s",
answer,
perf_time_1 - perf_time_0,
)
try:
assert answer != b""
except AssertionError:
logging.warning("The instrument did not reply.")
return answer
try:
assert answer.startswith(b"B") is True
except AssertionError:
logging.warning("This seems to be no SARAD instrument.")
answer = b""
return answer
control_byte = answer[1]
neg_control_byte = answer[2]
try:
assert (control_byte ^ 0xFF) == neg_control_byte
except AssertionError:
logging.error("Message corrupted.")
answer = b""
return answer
is_control = bool(control_byte & 0x80)
logging.debug("is_control: %s, control_byte: %s", is_control, control_byte)
# try:
# assert is_control is False
# except AssertionError:
# logging.error("Data message expected, but this is a control message.")
# answer = b""
# return answer
return answer

def get_transparent_reply(self, raw_cmd, reply_length=50, timeout=1, keep=True):
"""Returns the raw bytestring of the instruments reply"""
@staticmethod
def __get_payload_length(first_bytes):
"""Read 3 Bytes from serial interface
to get the length of payload from the control byte."""
control_byte = first_bytes[1]
return (control_byte & 0x7F) + 1

def __get_transparent_reply(raw_cmd, reply_length, timeout, keep):
serial_port = self.__port
baudrate = self.__family["baudrate"]
parity = self.__family["parity"]
write_sleeptime = self.__family["write_sleeptime"]
wait_for_reply = self.__family["wait_for_reply"]
if not keep:
def _get_transparent_reply(self, raw_cmd, timeout=0.1, keep=True):
"""Returns the raw bytestring of the instruments reply"""
if not keep:
ser = Serial(
self.__port,
self.__family["baudrate"],
bytesize=8,
xonxoff=0,
timeout=timeout,
parity=self.__family["parity"],
rtscts=0,
stopbits=STOPBITS_ONE,
)
if not ser.is_open:
ser.open()
logging.debug("Open serial, don't keep.")
else:
try:
ser = self.__ser
logging.debug("Reuse stored serial interface")
if not ser.is_open:
logging.debug("Port is closed. Reopen.")
ser.open()
except AttributeError:
ser = Serial(
serial_port,
baudrate,
self.__port,
self.__family["baudrate"],
bytesize=8,
xonxoff=0,
timeout=timeout,
parity=parity,
parity=self.__family["parity"],
rtscts=0,
stopbits=STOPBITS_ONE,
exclusive=True,
)
if not ser.is_open:
ser.open()
logging.debug("Open serial, don't keep.")
else:
try:
ser = self.__ser
logging.debug("Reuse stored serial interface")
if not ser.is_open:
logging.debug("Port is closed. Reopen.")
ser.open()
except AttributeError:
ser = Serial(
serial_port,
baudrate,
bytesize=8,
xonxoff=0,
timeout=timeout,
parity=parity,
rtscts=0,
stopbits=STOPBITS_ONE,
exclusive=True,
)
if not ser.is_open:
ser.open()
logging.debug("Open serial")
perf_time_0 = time.perf_counter()
logging.debug("Writing %s to serial port", raw_cmd)
for element in raw_cmd:
byte = (element).to_bytes(1, "big")
ser.write(byte)
time.sleep(write_sleeptime)
perf_time_1 = time.perf_counter()
logging.debug(
"Writing the command to serial took me %f s", perf_time_1 - perf_time_0
)
time.sleep(wait_for_reply)
answer = ser.read(reply_length)
perf_time_2 = time.perf_counter()
logging.debug(
"Receiving %s from serial took me %f s",
answer,
perf_time_2 - perf_time_1,
)
# time.sleep(0.1)
while ser.in_waiting:
logging.debug("%d bytes waiting", ser.in_waiting)
ser.read(ser.in_waiting)
time.sleep(0.1)
if not keep:
ser.close()
logging.debug("Serial interface closed.")
else:
logging.debug("Store serial interface")
self.__ser = ser
return answer

answer = __get_transparent_reply(raw_cmd, reply_length, timeout, keep)
if answer == b"":
logging.debug("Open serial")
perf_time_0 = time.perf_counter()
for element in raw_cmd:
byte = (element).to_bytes(1, "big")
ser.write(byte)
time.sleep(self.__family["write_sleeptime"])
perf_time_1 = time.perf_counter()
logging.debug(
"Writing command %s to serial took me %f s",
raw_cmd,
perf_time_1 - perf_time_0,
)
# time.sleep(self.__family["wait_for_reply"])
first_bytes = self.__get_control_bytes(ser)
try:
assert first_bytes != b""
except AssertionError:
return b""
payload_length = self.__get_payload_length(first_bytes)
number_of_remaining_bytes = payload_length + 3
remaining_bytes = ser.read(number_of_remaining_bytes)
while ser.in_waiting:
logging.debug("%d bytes waiting", ser.in_waiting)
ser.read(ser.in_waiting)
time.sleep(0.1)
logging.debug("Play it again Sam!")
answer = __get_transparent_reply(raw_cmd, reply_length, timeout, keep)
perf_time_2 = time.perf_counter()
answer = first_bytes + remaining_bytes
logging.debug(
"Receiving %s from serial took me %f s",
answer,
perf_time_2 - perf_time_1,
)
if not keep:
ser.close()
logging.debug("Serial interface closed.")
else:
logging.debug("Store serial interface")
self.__ser = ser
return answer

# *** start_cycle():
Expand Down

0 comments on commit a1c2700

Please sign in to comment.