Skip to content
This repository has been archived by the owner on Mar 22, 2021. It is now read-only.

Commit

Permalink
serial_io.py: simplify cmd() method
Browse files Browse the repository at this point in the history
According to the research in Issue #60 (#60),
the simplest implementation for best case scenarios is only 2 lines. This
commit implements those 2 lines instead of the complex behaviour before.

Issue #69 (#69) shall improve or replace that
method with a more practical implementation, if that is not already finished
with Issue #61 (#61).

See Github Issue #60 for details.

Signed-off-by: Erik Bernoth <erik.bernoth@gmail.com>
Acked-by: Steffen Sledz <sledz@dresearch-fe.de>
Acked-by: Eik Binschek <binschek@dresearch-fe.de>
  • Loading branch information
erikbgithub committed Jul 9, 2013
1 parent a1c8f5c commit cf7441b
Showing 1 changed file with 22 additions and 148 deletions.
170 changes: 22 additions & 148 deletions monk_tf/serial_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,167 +25,41 @@ class SerialIO(serial.Serial):
abstraction layers are necessary.
"""

_DEFAULT_PROMPT=">>> "

class ReadState:
LEFT_OVER=1
FOUND_START=2

def __init__(self, *args, **kwargs):
"""
:param prompt: set a prompt for the communication
"""
self._logger = logging.getLogger(__name__)
self.prompt = SerialIO._DEFAULT_PROMPT
if "prompt" in args:
self.prompt = args["prompt"]
args.pop("prompt")
if "linesep" in args:
self.linesep = args["linesep"]
args.pop("linesep")
super(SerialIO,self).__init__(*args, **kwargs)


def cmd(self, msg, prompt=None, sleep_time=.1, timeout=5, linesep="\n"):
""" send a command and retrieve it's response.
Text that might be in the buffer before the command is ignored, as well
as text beginning from the prompt after the command. The command text
itself and the prompt afterwards are the borders of the command output,
that is returned.
The basic structure of a command is like that::
<something left in the buffer><our command><EOL>
<command output> # optional (EOLs inside the command output are treated as any character)
<prompt><space>
We try to just grab the ``<command output>`` and return it.
.. note:: The results from the last output can be optained with the
attributes ``last_cmd``, ``last_confidence`` and ``last_output``.
:param msg: the shell command you want to execute over the serial line.
:param prompt: the prompt that signals that a command is treated and
the next command can be sent. If None, the object's
default is used and if that is also not set the default
python prompt is used: ``>>> ``.
:param sleep_time: defines how long the process should sleep until
the next iteration of the loop is started.
:param timeout: defines in seconds how long this method should take at
most.
:param linesep: the line separator used for the communication. It
defaults to ``\n``
:param return: the command output.
"""
prompt = prompt if prompt else self.prompt
cmd = msg.strip() + linesep
self.write(cmd)
self._last_cmd = cmd
self._logger.debug(str((self, cmd, prompt, sleep_time, timeout)))
self._last_confidence, self._last_output = self.read_until(
prompt,
cmd,
sleep_time,
timeout
)
return self.last_confidence, self.last_output


def read_until(self, end_strip, start_strip=None, sleep_time=.1, timeout=5):
""" read until end strip found
This function reads everything available in the buffer, then waits
``sleep_time`` seconds and then starts again, either until it finds
``end_strip`` in the buffered text or the ``timeout`` runs out.
Everything will be deleted, beginning from the ``end_strip``, because
that is expected to be knowledge the user already has.
The same way as ``end_strip`` works on the end, you can also define a
``start_strip``, which will delete everything in the buffer until the
start_strip is found.
All of that is a generalization of how the method is used in cmd(). The
idea is that a command line execution over serial repeats the written
command, then writes the command output and finally prints a new
prompt. Therefore it makes sense to look for the command and the prompt
as the delimiters of the desired part of the output.
:param end_strip: The text which terminates the search. A string as
excepted by :py:func:`str.find`.
:param start_strip: The text which starts the search. A string as
expected by :py:func:`str.find`.
:param sleep_time: defines how long the process should sleep until
the next iteration of the loop is started.
def cmd(self, msg):
""" send a command and receive it's output
:param timeout: defines in seconds how long this method should take at
most.
:return: (boolean, string) - The first param is True as long as the
timeout didn't deplete. The second param contains the output
as far as it was received.
:param msg: the command that shall be sent, with or without line
separator in the end
:return: the output of the command as created by target device
"""
state = SerialIO.ReadState.LEFT_OVER if start_strip else SerialIO.ReadState.FOUND_START
out = ""
start_time = time.time()
while True:
if time.time() - start_time >= timeout:
return False, out
# this should be also executed before first loop
# because this function is also used by write
time.sleep(sleep_time)
out += self.read(self.inWaiting())
if state == SerialIO.ReadState.LEFT_OVER:
pos = out.find(start_strip)
if pos >= 0:
# forget everything up to including the start_strip
out = out[pos+len(start_strip):]
state = SerialIO.ReadState.FOUND_START
elif state == SerialIO.ReadState.FOUND_START:
pos = out.find(end_strip)
if pos >= 0:
# strip the end_strip and everything afterwards
out = out[:pos]
return True, out



@property
def prompt(self):
try:
return self._prompt
except AttributeError:
return None


@prompt.setter
def prompt(self,new_prompt):
self._prompt = new_prompt

# a command will only be executed, if it ends in a linebreak
self.write(msg.strip() + self.linesep)
# remove first line (the cmd itself), last line (the next prompt)
# and unnecesary \r characters from the output
return self.linesep.join(self.readall().replace("\r","").split("\n")[1:-1])

@property
def last_confidence(self):
try:
return self._last_confidence
except AttributeError:
return None


@property
def last_cmd(self):
def linesep(self):
""" In most cases ``os.linesep``.
"""
try:
return self._last_cmd
return self._linesep
except AttributeError:
return None

self.linesep = os.linesep
return self._linesep

@property
def last_output(self):
try:
return self._last_output
except AttributeError:
return None
@linesep.setter
def linesep(self, lsep):
self._linesep = lsep

0 comments on commit cf7441b

Please sign in to comment.