Skip to content
This repository has been archived by the owner on Mar 22, 2021. It is now read-only.

Commit

Permalink
serial_io: fundamental unit tests and debugging
Browse files Browse the repository at this point in the history
Added tests for Object creation, a basic cmd() and 2 cases for the read_until
methods. The last method needed an additonal unit test, because start_strip and
end_strip should be tested separately.

See GitHub Issue #60.

Signed-off-by: Erik Bernoth <erik.bernoth@gmail.com>
  • Loading branch information
erikbgithub committed Jun 18, 2013
1 parent 766559b commit e0ff18e
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 29 deletions.
69 changes: 40 additions & 29 deletions monk_tf/serial_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import serial

class SerialConsole(serial.Serial):
class SerialIO(serial.Serial):
""" console like interface on top of pyserial
The job of this class is to allow a console like input-output behaviour
Expand All @@ -36,13 +36,14 @@ def __init__(self, *args, **kwargs):
:param prompt: set a prompt for the communication
"""
self._logger = logging.getLogger(__name__)
self.prompt = Serial._DEFAULT_PROMPT
if args:
self.prompt = prompt
super(Serial,self).__init__(*args, **kwargs)
self.prompt = SerialIO._DEFAULT_PROMPT
if "prompt" in args:
self.prompt = args["prompt"]
args.pop("prompt")
super(SerialIO,self).__init__(*args, **kwargs)


def cmd(self, msg, prompt=None, sleep_length=.1, timeout=5, linesep="\n"):
def cmd(self, msg, prompt=None, sleep_time=.1, timeout=5, linesep="\n"):
""" send a command and retrieve it's response.
Text that might be in the buffer before the command is ignored, as well
Expand All @@ -53,7 +54,7 @@ def cmd(self, msg, prompt=None, sleep_length=.1, timeout=5, linesep="\n"):
The basic structure of a command is like that::
<something left in the buffer><our command><EOL>
<command output><EOL> # optional (EOLs inside the command output are treated as any character)
<command output> # optional (EOLs inside the command output are treated as any character)
<prompt><space>
We try to just grab the ``<command output>`` and return it.
Expand All @@ -68,7 +69,7 @@ def cmd(self, msg, prompt=None, sleep_length=.1, timeout=5, linesep="\n"):
default is used and if that is also not set the default
python prompt is used: ``>>> ``.
:param sleep_length: defines how long the process should sleep until
:param sleep_time: defines how long the process should sleep until
the next iteration of the loop is started.
:param timeout: defines in seconds how long this method should take at
Expand All @@ -79,20 +80,25 @@ def cmd(self, msg, prompt=None, sleep_length=.1, timeout=5, linesep="\n"):
:param return: the command output.
"""
# prepare
prompt = prompt if prompt else self.prompt
cmd = msg.strip() + linesep
self.write(cmd)
self.__last_cmd = cmd
self.__last_confidence, self.__last_output = self.read_until(
cmd,prompt, sleep_length, timeout)
self._last_cmd = cmd
self._logger.debug(str((self, cmd, prompt, sleep_time, timeout)))
self._last_confidence, self._last_output = self.read_until(
prompt,
cmd,
sleep_time,
timeout
)
return self.last_output


def read_until(self, end_strip, start_strip=None, sleep_length=.1, timeout=5):
def read_until(self, end_strip, start_strip=None, sleep_time=.1, timeout=5):
""" read until end strip found
This function reads everything available in the buffer, then waits
``sleep_length`` seconds and then starts again, either until it finds
``sleep_time`` seconds and then starts again, either until it finds
``end_strip`` in the buffered text or the ``timeout`` runs out.
Everything will be deleted, beginning from the ``end_strip``, because
that is expected to be knowledge the user already has.
Expand All @@ -101,13 +107,19 @@ def read_until(self, end_strip, start_strip=None, sleep_length=.1, timeout=5):
``start_strip``, which will delete everything in the buffer until the
start_strip is found.
All of that is a generalization of how the method is used in cmd(). The
idea is that a command line execution over serial repeats the written
command, then writes the command output and finally prints a new
prompt. Therefore it makes sense to look for the command and the prompt
as the delimiters of the desired part of the output.
:param end_strip: The text which terminates the search. A string as
excepted by :py:func:`str.find`.
:param start_strip: The text which starts the search. A string as
expected by :py:func:`str.find`.
:param sleep_length: defines how long the process should sleep until
:param sleep_time: defines how long the process should sleep until
the next iteration of the loop is started.
:param timeout: defines in seconds how long this method should take at
Expand All @@ -117,24 +129,23 @@ def read_until(self, end_strip, start_strip=None, sleep_length=.1, timeout=5):
timeout didn't deplete. The second param contains the output
as far as it was received.
"""
state = ReadState.LEFT_OVER if start_strip else ReadState.FOUND_START
state = SerialIO.ReadState.LEFT_OVER if start_strip else SerialIO.ReadState.FOUND_START
out = ""
start_time = time.time()
while True:
if time.time() - start_time >= timeout:
self.__last_confidence = False
self.__last_cmd = msg
self.__last_output = out
return out
time.sleep(sleep_length)
return False, out
# this should be also executed before first loop
# because this function is also used by write
time.sleep(sleep_time)
out += self.read(self.inWaiting())
if state == ReadState.LEFT_OVER:
if state == SerialIO.ReadState.LEFT_OVER:
pos = out.find(start_strip)
if pos >= 0:
# forget everything up to including the start_strip
out = out[pos+len(start_strip):]
state = ReadState.FOUND_START
elif state == ReadState.FOUND_START:
state = SerialIO.ReadState.FOUND_START
elif state == SerialIO.ReadState.FOUND_START:
pos = out.find(end_strip)
if pos >= 0:
# strip the end_strip and everything afterwards
Expand All @@ -146,35 +157,35 @@ def read_until(self, end_strip, start_strip=None, sleep_length=.1, timeout=5):
@property
def prompt(self):
try:
return self.__prompt
return self._prompt
except AttributeError:
return None


@prompt.setter
def prompt(self,new_prompt):
self.__prompt = new_prompt
self._prompt = new_prompt


@property
def last_confidence(self):
try:
return self.__last_confidence
return self._last_confidence
except AttributeError:
return None


@property
def last_cmd(self):
try:
return self.__last_cmd
return self._last_cmd
except AttributeError:
return None


@property
def last_output(self):
try:
return self.__last_output
return self._last_output
except AttributeError:
return None
136 changes: 136 additions & 0 deletions test/test_serial_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# -*- coding: utf-8 -*-
#
# MONK Automated Testing Framework
#
# Copyright (C) 2012-2013 DResearch Fahrzeugelektronik GmbH, project-monk@dresearch-fe.de
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version
# 3 of the License, or (at you option) any later version.
#

import logging

from nose import tools as nt

from monk_tf import serial_io as sio


def test_simple():
"""serial_io: check wether creating a SerialIO object works
"""
#nothing to prepare
#
#execute
sut = sio.SerialIO()
#assert
nt.ok_(sut, "should contain a monk_tf.serial_io.SerialIO object, but instead contains this: '{}'".format(sut))


def test_cmd_set_attribs():
"""serial_io: check wether cmd automatically updates the attributes
"""
# prepare
send_cmd = "qwer"
expected_calls = ['write', 'read_until']
expected_cmd = send_cmd + "\n"
expected_confidence = True
expected_output = 'abcd'
sut = MockSerialIOCmd((True, expected_output))
# execute
sut.cmd(send_cmd)
# evaluate
nt.eq_(expected_calls, sut.calls)
nt.eq_(expected_cmd, sut.last_cmd)
nt.eq_(expected_confidence, sut.last_confidence)
nt.eq_(expected_output, sut.last_output)
# clean up
# not needed


def test_read_until_strips_end():
"""serial_io: check wether reading really strips end_strip
"""
# prepare
expected_calls = ["read"]
expected_out = "trewq\n"
expected_confidence = True
in_strip = "abcd"
in_sleep = 0.0
in_mock_output = expected_out + in_strip
sut = MockSerialIORead(in_mock_output)
# execute
out_confidence, output = sut.read_until(in_strip, sleep_time=in_sleep)
# evalute
nt.eq_(expected_calls, sut.calls)
nt.eq_(expected_confidence, out_confidence)
nt.eq_(expected_out, output)
# clean up
# not needed


def test_read_until_strips_start():
"""serial_io: check wether reading really strips start_strip
"""
# prepare
expected_calls = ["read", "read"]
expected_out = "trewq\n"
expected_confidence = True
in_start_strip = "abcd\n"
in_stop_strip = "dcba"
in_sleep = 0.0
in_mock_output = in_start_strip + expected_out + in_stop_strip
sut = MockSerialIORead(in_mock_output)
# execute
out_confidence, output = sut.read_until(
in_stop_strip,
start_strip=in_start_strip,
sleep_time=in_sleep
)
# evalute
nt.eq_(expected_calls, sut.calls)
nt.eq_(expected_confidence, out_confidence)
nt.eq_(expected_out, output)
# clean up
# not needed


class MockSerialIOCmd(sio. SerialIO):
""" mocks specifically for testing the cmd() method
"""

def __init__(self, readout=None):
self.calls = []
self.readout = readout
super(MockSerialIOCmd, self).__init__()


def write(self,cmd=None):
self.calls.append("write")


def read_until(self, cmd=None, prompt=None, sleep_time=None, timeout=None,
start_strip=None):
self.calls.append("read_until")
return self.readout


class MockSerialIORead(sio.SerialIO):

def __init__(self, readout=None):
self.calls = []
self.readout = readout
super(MockSerialIORead, self).__init__()


def write(self,cmd=None):
self.calls.append("write")


def read(self, number=None):
self.calls.append("read")
return self.readout

def inWaiting(self):
return 0

0 comments on commit e0ff18e

Please sign in to comment.