diff --git a/monk_tf/serial_io.py b/monk_tf/serial_io.py new file mode 100644 index 0000000..cc5c3d1 --- /dev/null +++ b/monk_tf/serial_io.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +# +# MONK automated test framework +# +# Copyright (C) 2013 DResearch Fahrzeugelektronik GmbH +# Written and maintained by MONK Developers +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version +# 3 of the License, or (at your option) any later version. +# + +import os +import time +import logging + +import serial + +class SerialIO(serial.Serial): + """ console like interface on top of pyserial + + The job of this class is to allow a console like input-output behaviour + between the user of this class and a serial interface. To achieve this some + abstraction layers are necessary. + """ + + + def __init__(self, *args, **kwargs): + """ + :param prompt: set a prompt for the communication + """ + self._logger = logging.getLogger(__name__) + if "linesep" in args: + self.linesep = args["linesep"] + args.pop("linesep") + super(SerialIO,self).__init__(*args, **kwargs) + + + def cmd(self, msg): + """ send a command and receive it's output + + :param msg: the command that shall be sent, with or without line + separator in the end + :return: the output of the command as created by target device + """ + # a command will only be executed, if it ends in a linebreak + self.write(msg.strip() + self.linesep) + # remove first line (the cmd itself), last line (the next prompt) + # and unnecesary \r characters from the output + return self.linesep.join(self.readall().replace("\r","").split("\n")[1:-1]) + + @property + def linesep(self): + """ In most cases ``os.linesep``. + """ + try: + return self._linesep + except AttributeError: + self.linesep = os.linesep + return self._linesep + + @linesep.setter + def linesep(self, lsep): + self._linesep = lsep diff --git a/test/test_serial_io.py b/test/test_serial_io.py new file mode 100644 index 0000000..7f6535b --- /dev/null +++ b/test/test_serial_io.py @@ -0,0 +1,136 @@ +# -*- coding: utf-8 -*- +# +# MONK Automated Testing Framework +# +# Copyright (C) 2012-2013 DResearch Fahrzeugelektronik GmbH, project-monk@dresearch-fe.de +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version +# 3 of the License, or (at you option) any later version. +# + +import logging + +from nose import tools as nt + +from monk_tf import serial_io as sio + + +def test_simple(): + """serial_io: check wether creating a SerialIO object works + """ + #nothing to prepare + # + #execute + sut = sio.SerialIO() + #assert + nt.ok_(sut, "should contain a monk_tf.serial_io.SerialIO object, but instead contains this: '{}'".format(sut)) + + +def test_cmd_set_attribs(): + """serial_io: check wether cmd automatically updates the attributes + """ + # prepare + send_cmd = "qwer" + expected_calls = ['write', 'read_until'] + expected_cmd = send_cmd + "\n" + expected_confidence = True + expected_output = 'abcd' + sut = MockSerialIOCmd((True, expected_output)) + # execute + sut.cmd(send_cmd) + # evaluate + nt.eq_(expected_calls, sut.calls) + nt.eq_(expected_cmd, sut.last_cmd) + nt.eq_(expected_confidence, sut.last_confidence) + nt.eq_(expected_output, sut.last_output) + # clean up + # not needed + + +def test_read_until_strips_end(): + """serial_io: check wether reading really strips end_strip + """ + # prepare + expected_calls = ["read"] + expected_out = "trewq\n" + expected_confidence = True + in_strip = "abcd" + in_sleep = 0.0 + in_mock_output = expected_out + in_strip + sut = MockSerialIORead(in_mock_output) + # execute + out_confidence, output = sut.read_until(in_strip, sleep_time=in_sleep) + # evalute + nt.eq_(expected_calls, sut.calls) + nt.eq_(expected_confidence, out_confidence) + nt.eq_(expected_out, output) + # clean up + # not needed + + +def test_read_until_strips_start(): + """serial_io: check wether reading really strips start_strip + """ + # prepare + expected_calls = ["read", "read"] + expected_out = "trewq\n" + expected_confidence = True + in_start_strip = "abcd\n" + in_stop_strip = "dcba" + in_sleep = 0.0 + in_mock_output = in_start_strip + expected_out + in_stop_strip + sut = MockSerialIORead(in_mock_output) + # execute + out_confidence, output = sut.read_until( + in_stop_strip, + start_strip=in_start_strip, + sleep_time=in_sleep + ) + # evalute + nt.eq_(expected_calls, sut.calls) + nt.eq_(expected_confidence, out_confidence) + nt.eq_(expected_out, output) + # clean up + # not needed + + +class MockSerialIOCmd(sio. SerialIO): + """ mocks specifically for testing the cmd() method + """ + + def __init__(self, readout=None): + self.calls = [] + self.readout = readout + super(MockSerialIOCmd, self).__init__() + + + def write(self,cmd=None): + self.calls.append("write") + + + def read_until(self, cmd=None, prompt=None, sleep_time=None, timeout=None, + start_strip=None): + self.calls.append("read_until") + return self.readout + + +class MockSerialIORead(sio.SerialIO): + + def __init__(self, readout=None): + self.calls = [] + self.readout = readout + super(MockSerialIORead, self).__init__() + + + def write(self,cmd=None): + self.calls.append("write") + + + def read(self, number=None): + self.calls.append("read") + return self.readout + + def inWaiting(self): + return 0