diff --git a/monk_tf/serial_io.py b/monk_tf/serial_io.py index 06f45ea..5cb2659 100644 --- a/monk_tf/serial_io.py +++ b/monk_tf/serial_io.py @@ -17,7 +17,7 @@ import serial -class SerialConsole(serial.Serial): +class SerialIO(serial.Serial): """ console like interface on top of pyserial The job of this class is to allow a console like input-output behaviour @@ -36,13 +36,14 @@ def __init__(self, *args, **kwargs): :param prompt: set a prompt for the communication """ self._logger = logging.getLogger(__name__) - self.prompt = Serial._DEFAULT_PROMPT - if args: - self.prompt = prompt - super(Serial,self).__init__(*args, **kwargs) + self.prompt = SerialIO._DEFAULT_PROMPT + if "prompt" in args: + self.prompt = args["prompt"] + args.pop("prompt") + super(SerialIO,self).__init__(*args, **kwargs) - def cmd(self, msg, prompt=None, sleep_length=.1, timeout=5, linesep="\n"): + def cmd(self, msg, prompt=None, sleep_time=.1, timeout=5, linesep="\n"): """ send a command and retrieve it's response. Text that might be in the buffer before the command is ignored, as well @@ -53,7 +54,7 @@ def cmd(self, msg, prompt=None, sleep_length=.1, timeout=5, linesep="\n"): The basic structure of a command is like that:: - # optional (EOLs inside the command output are treated as any character) + # optional (EOLs inside the command output are treated as any character) We try to just grab the ```` and return it. @@ -68,7 +69,7 @@ def cmd(self, msg, prompt=None, sleep_length=.1, timeout=5, linesep="\n"): default is used and if that is also not set the default python prompt is used: ``>>> ``. - :param sleep_length: defines how long the process should sleep until + :param sleep_time: defines how long the process should sleep until the next iteration of the loop is started. :param timeout: defines in seconds how long this method should take at @@ -79,20 +80,25 @@ def cmd(self, msg, prompt=None, sleep_length=.1, timeout=5, linesep="\n"): :param return: the command output. """ - # prepare + prompt = prompt if prompt else self.prompt cmd = msg.strip() + linesep self.write(cmd) - self.__last_cmd = cmd - self.__last_confidence, self.__last_output = self.read_until( - cmd,prompt, sleep_length, timeout) + self._last_cmd = cmd + self._logger.debug(str((self, cmd, prompt, sleep_time, timeout))) + self._last_confidence, self._last_output = self.read_until( + prompt, + cmd, + sleep_time, + timeout + ) return self.last_output - def read_until(self, end_strip, start_strip=None, sleep_length=.1, timeout=5): + def read_until(self, end_strip, start_strip=None, sleep_time=.1, timeout=5): """ read until end strip found This function reads everything available in the buffer, then waits - ``sleep_length`` seconds and then starts again, either until it finds + ``sleep_time`` seconds and then starts again, either until it finds ``end_strip`` in the buffered text or the ``timeout`` runs out. Everything will be deleted, beginning from the ``end_strip``, because that is expected to be knowledge the user already has. @@ -101,13 +107,19 @@ def read_until(self, end_strip, start_strip=None, sleep_length=.1, timeout=5): ``start_strip``, which will delete everything in the buffer until the start_strip is found. + All of that is a generalization of how the method is used in cmd(). The + idea is that a command line execution over serial repeats the written + command, then writes the command output and finally prints a new + prompt. Therefore it makes sense to look for the command and the prompt + as the delimiters of the desired part of the output. + :param end_strip: The text which terminates the search. A string as excepted by :py:func:`str.find`. :param start_strip: The text which starts the search. A string as expected by :py:func:`str.find`. - :param sleep_length: defines how long the process should sleep until + :param sleep_time: defines how long the process should sleep until the next iteration of the loop is started. :param timeout: defines in seconds how long this method should take at @@ -117,24 +129,23 @@ def read_until(self, end_strip, start_strip=None, sleep_length=.1, timeout=5): timeout didn't deplete. The second param contains the output as far as it was received. """ - state = ReadState.LEFT_OVER if start_strip else ReadState.FOUND_START + state = SerialIO.ReadState.LEFT_OVER if start_strip else SerialIO.ReadState.FOUND_START out = "" start_time = time.time() while True: if time.time() - start_time >= timeout: - self.__last_confidence = False - self.__last_cmd = msg - self.__last_output = out - return out - time.sleep(sleep_length) + return False, out + # this should be also executed before first loop + # because this function is also used by write + time.sleep(sleep_time) out += self.read(self.inWaiting()) - if state == ReadState.LEFT_OVER: + if state == SerialIO.ReadState.LEFT_OVER: pos = out.find(start_strip) if pos >= 0: # forget everything up to including the start_strip out = out[pos+len(start_strip):] - state = ReadState.FOUND_START - elif state == ReadState.FOUND_START: + state = SerialIO.ReadState.FOUND_START + elif state == SerialIO.ReadState.FOUND_START: pos = out.find(end_strip) if pos >= 0: # strip the end_strip and everything afterwards @@ -146,20 +157,20 @@ def read_until(self, end_strip, start_strip=None, sleep_length=.1, timeout=5): @property def prompt(self): try: - return self.__prompt + return self._prompt except AttributeError: return None @prompt.setter def prompt(self,new_prompt): - self.__prompt = new_prompt + self._prompt = new_prompt @property def last_confidence(self): try: - return self.__last_confidence + return self._last_confidence except AttributeError: return None @@ -167,7 +178,7 @@ def last_confidence(self): @property def last_cmd(self): try: - return self.__last_cmd + return self._last_cmd except AttributeError: return None @@ -175,6 +186,6 @@ def last_cmd(self): @property def last_output(self): try: - return self.__last_output + return self._last_output except AttributeError: return None diff --git a/test/test_serial_io.py b/test/test_serial_io.py new file mode 100644 index 0000000..7f6535b --- /dev/null +++ b/test/test_serial_io.py @@ -0,0 +1,136 @@ +# -*- coding: utf-8 -*- +# +# MONK Automated Testing Framework +# +# Copyright (C) 2012-2013 DResearch Fahrzeugelektronik GmbH, project-monk@dresearch-fe.de +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version +# 3 of the License, or (at you option) any later version. +# + +import logging + +from nose import tools as nt + +from monk_tf import serial_io as sio + + +def test_simple(): + """serial_io: check wether creating a SerialIO object works + """ + #nothing to prepare + # + #execute + sut = sio.SerialIO() + #assert + nt.ok_(sut, "should contain a monk_tf.serial_io.SerialIO object, but instead contains this: '{}'".format(sut)) + + +def test_cmd_set_attribs(): + """serial_io: check wether cmd automatically updates the attributes + """ + # prepare + send_cmd = "qwer" + expected_calls = ['write', 'read_until'] + expected_cmd = send_cmd + "\n" + expected_confidence = True + expected_output = 'abcd' + sut = MockSerialIOCmd((True, expected_output)) + # execute + sut.cmd(send_cmd) + # evaluate + nt.eq_(expected_calls, sut.calls) + nt.eq_(expected_cmd, sut.last_cmd) + nt.eq_(expected_confidence, sut.last_confidence) + nt.eq_(expected_output, sut.last_output) + # clean up + # not needed + + +def test_read_until_strips_end(): + """serial_io: check wether reading really strips end_strip + """ + # prepare + expected_calls = ["read"] + expected_out = "trewq\n" + expected_confidence = True + in_strip = "abcd" + in_sleep = 0.0 + in_mock_output = expected_out + in_strip + sut = MockSerialIORead(in_mock_output) + # execute + out_confidence, output = sut.read_until(in_strip, sleep_time=in_sleep) + # evalute + nt.eq_(expected_calls, sut.calls) + nt.eq_(expected_confidence, out_confidence) + nt.eq_(expected_out, output) + # clean up + # not needed + + +def test_read_until_strips_start(): + """serial_io: check wether reading really strips start_strip + """ + # prepare + expected_calls = ["read", "read"] + expected_out = "trewq\n" + expected_confidence = True + in_start_strip = "abcd\n" + in_stop_strip = "dcba" + in_sleep = 0.0 + in_mock_output = in_start_strip + expected_out + in_stop_strip + sut = MockSerialIORead(in_mock_output) + # execute + out_confidence, output = sut.read_until( + in_stop_strip, + start_strip=in_start_strip, + sleep_time=in_sleep + ) + # evalute + nt.eq_(expected_calls, sut.calls) + nt.eq_(expected_confidence, out_confidence) + nt.eq_(expected_out, output) + # clean up + # not needed + + +class MockSerialIOCmd(sio. SerialIO): + """ mocks specifically for testing the cmd() method + """ + + def __init__(self, readout=None): + self.calls = [] + self.readout = readout + super(MockSerialIOCmd, self).__init__() + + + def write(self,cmd=None): + self.calls.append("write") + + + def read_until(self, cmd=None, prompt=None, sleep_time=None, timeout=None, + start_strip=None): + self.calls.append("read_until") + return self.readout + + +class MockSerialIORead(sio.SerialIO): + + def __init__(self, readout=None): + self.calls = [] + self.readout = readout + super(MockSerialIORead, self).__init__() + + + def write(self,cmd=None): + self.calls.append("write") + + + def read(self, number=None): + self.calls.append("read") + return self.readout + + def inWaiting(self): + return 0