This repository has been archived by the owner on Mar 22, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'f-review-serial-cmd' into dev
Closes GitHub Issue #60.
- Loading branch information
Showing
2 changed files
with
201 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# MONK automated test framework | ||
# | ||
# Copyright (C) 2013 DResearch Fahrzeugelektronik GmbH | ||
# Written and maintained by MONK Developers <project-monk@dresearch-fe.de> | ||
# | ||
# This program is free software; you can redistribute it and/or | ||
# modify it under the terms of the GNU General Public License | ||
# as published by the Free Software Foundation; either version | ||
# 3 of the License, or (at your option) any later version. | ||
# | ||
|
||
import os | ||
import time | ||
import logging | ||
|
||
import serial | ||
|
||
class SerialIO(serial.Serial): | ||
""" console like interface on top of pyserial | ||
The job of this class is to allow a console like input-output behaviour | ||
between the user of this class and a serial interface. To achieve this some | ||
abstraction layers are necessary. | ||
""" | ||
|
||
|
||
def __init__(self, *args, **kwargs): | ||
""" | ||
:param prompt: set a prompt for the communication | ||
""" | ||
self._logger = logging.getLogger(__name__) | ||
if "linesep" in args: | ||
self.linesep = args["linesep"] | ||
args.pop("linesep") | ||
super(SerialIO,self).__init__(*args, **kwargs) | ||
|
||
|
||
def cmd(self, msg): | ||
""" send a command and receive it's output | ||
:param msg: the command that shall be sent, with or without line | ||
separator in the end | ||
:return: the output of the command as created by target device | ||
""" | ||
# a command will only be executed, if it ends in a linebreak | ||
self.write(msg.strip() + self.linesep) | ||
# remove first line (the cmd itself), last line (the next prompt) | ||
# and unnecesary \r characters from the output | ||
return self.linesep.join(self.readall().replace("\r","").split("\n")[1:-1]) | ||
|
||
@property | ||
def linesep(self): | ||
""" In most cases ``os.linesep``. | ||
""" | ||
try: | ||
return self._linesep | ||
except AttributeError: | ||
self.linesep = os.linesep | ||
return self._linesep | ||
|
||
@linesep.setter | ||
def linesep(self, lsep): | ||
self._linesep = lsep |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# MONK Automated Testing Framework | ||
# | ||
# Copyright (C) 2012-2013 DResearch Fahrzeugelektronik GmbH, project-monk@dresearch-fe.de | ||
# | ||
# This program is free software; you can redistribute it and/or | ||
# modify it under the terms of the GNU General Public License | ||
# as published by the Free Software Foundation; either version | ||
# 3 of the License, or (at you option) any later version. | ||
# | ||
|
||
import logging | ||
|
||
from nose import tools as nt | ||
|
||
from monk_tf import serial_io as sio | ||
|
||
|
||
def test_simple(): | ||
"""serial_io: check wether creating a SerialIO object works | ||
""" | ||
#nothing to prepare | ||
# | ||
#execute | ||
sut = sio.SerialIO() | ||
#assert | ||
nt.ok_(sut, "should contain a monk_tf.serial_io.SerialIO object, but instead contains this: '{}'".format(sut)) | ||
|
||
|
||
def test_cmd_set_attribs(): | ||
"""serial_io: check wether cmd automatically updates the attributes | ||
""" | ||
# prepare | ||
send_cmd = "qwer" | ||
expected_calls = ['write', 'read_until'] | ||
expected_cmd = send_cmd + "\n" | ||
expected_confidence = True | ||
expected_output = 'abcd' | ||
sut = MockSerialIOCmd((True, expected_output)) | ||
# execute | ||
sut.cmd(send_cmd) | ||
# evaluate | ||
nt.eq_(expected_calls, sut.calls) | ||
nt.eq_(expected_cmd, sut.last_cmd) | ||
nt.eq_(expected_confidence, sut.last_confidence) | ||
nt.eq_(expected_output, sut.last_output) | ||
# clean up | ||
# not needed | ||
|
||
|
||
def test_read_until_strips_end(): | ||
"""serial_io: check wether reading really strips end_strip | ||
""" | ||
# prepare | ||
expected_calls = ["read"] | ||
expected_out = "trewq\n" | ||
expected_confidence = True | ||
in_strip = "abcd" | ||
in_sleep = 0.0 | ||
in_mock_output = expected_out + in_strip | ||
sut = MockSerialIORead(in_mock_output) | ||
# execute | ||
out_confidence, output = sut.read_until(in_strip, sleep_time=in_sleep) | ||
# evalute | ||
nt.eq_(expected_calls, sut.calls) | ||
nt.eq_(expected_confidence, out_confidence) | ||
nt.eq_(expected_out, output) | ||
# clean up | ||
# not needed | ||
|
||
|
||
def test_read_until_strips_start(): | ||
"""serial_io: check wether reading really strips start_strip | ||
""" | ||
# prepare | ||
expected_calls = ["read", "read"] | ||
expected_out = "trewq\n" | ||
expected_confidence = True | ||
in_start_strip = "abcd\n" | ||
in_stop_strip = "dcba" | ||
in_sleep = 0.0 | ||
in_mock_output = in_start_strip + expected_out + in_stop_strip | ||
sut = MockSerialIORead(in_mock_output) | ||
# execute | ||
out_confidence, output = sut.read_until( | ||
in_stop_strip, | ||
start_strip=in_start_strip, | ||
sleep_time=in_sleep | ||
) | ||
# evalute | ||
nt.eq_(expected_calls, sut.calls) | ||
nt.eq_(expected_confidence, out_confidence) | ||
nt.eq_(expected_out, output) | ||
# clean up | ||
# not needed | ||
|
||
|
||
class MockSerialIOCmd(sio. SerialIO): | ||
""" mocks specifically for testing the cmd() method | ||
""" | ||
|
||
def __init__(self, readout=None): | ||
self.calls = [] | ||
self.readout = readout | ||
super(MockSerialIOCmd, self).__init__() | ||
|
||
|
||
def write(self,cmd=None): | ||
self.calls.append("write") | ||
|
||
|
||
def read_until(self, cmd=None, prompt=None, sleep_time=None, timeout=None, | ||
start_strip=None): | ||
self.calls.append("read_until") | ||
return self.readout | ||
|
||
|
||
class MockSerialIORead(sio.SerialIO): | ||
|
||
def __init__(self, readout=None): | ||
self.calls = [] | ||
self.readout = readout | ||
super(MockSerialIORead, self).__init__() | ||
|
||
|
||
def write(self,cmd=None): | ||
self.calls.append("write") | ||
|
||
|
||
def read(self, number=None): | ||
self.calls.append("read") | ||
return self.readout | ||
|
||
def inWaiting(self): | ||
return 0 |