diff --git a/malcolm/modules/pandablocks/controllers/pandablocksclient.py b/malcolm/modules/pandablocks/controllers/pandablocksclient.py old mode 100644 new mode 100755 index 19907ac77..0cf650e47 --- a/malcolm/modules/pandablocks/controllers/pandablocksclient.py +++ b/malcolm/modules/pandablocks/controllers/pandablocksclient.py @@ -1,20 +1,73 @@ from collections import namedtuple, OrderedDict import logging +import struct # Create a module level logger log = logging.getLogger(__name__) +# States +WAIT_HEADER_START = 0 +WAIT_HEADER_END = 1 +WAIT_DATA_START = 2 +RECV_DATA = 3 +DATA_END = 4 + + BlockData = namedtuple("BlockData", "number,description,fields") FieldData = namedtuple("FieldData", "field_type,field_subtype,description,labels") +class SocketReader(object): + """Non threadsafe socket reader""" + def __init__(self, socket, hostname, port): + self._socket = socket + self._socket.settimeout(1.0) + self._socket.connect((hostname, port)) + self._buf = "" + + def shutdown(self): + import socket + self._socket.shutdown(socket.SHUT_RDWR) + + def close(self): + self._socket.close() + + def send(self, message): + self._socket.send(message) + + def _recv_more_data(self): + rx = None + while not rx: + import socket + try: + rx = self._socket.recv(4096) + except socket.error: + rx = None + else: + log.debug("Received %d bytes: %r", len(rx), rx) + self._buf += rx + + def recv_line(self): + while "\n" not in self._buf: + self._recv_more_data() + line, self._buf = self._buf.split("\n", 1) + return line + + def recv_bytes(self, size): + while len(self._buf) < size: + self._recv_more_data() + bytes, self._buf = self._buf[:size], self._buf[size:] + return bytes + + class PandABlocksClient(object): # Sentinel that tells the send_loop and recv_loop to stop STOP = object() - def __init__(self, hostname="localhost", port=8888, queue_cls=None): + def __init__(self, hostname="localhost", port=8888, queue_cls=None, + dataport=8889): if queue_cls is None: try: # Python 2 @@ -25,6 +78,7 @@ def __init__(self, hostname="localhost", port=8888, queue_cls=None): self.queue_cls = queue_cls self.hostname = hostname self.port = port + self.dataport = dataport # Completed lines for a response in progress self._completed_response_lines = [] # True if the current response is multiline @@ -32,17 +86,19 @@ def __init__(self, hostname="localhost", port=8888, queue_cls=None): # True when we have been started self.started = False # Filled in on start - self._socket = None + self._reader = None self._send_spawned = None self._send_queue = None self._recv_spawned = None + self._datareader = None + self._data_spawned = None self._response_queues = None self._thread_pool = None def start(self, spawn=None, socket_cls=None): if spawn is None: from multiprocessing.pool import ThreadPool - self._thread_pool = ThreadPool(2) + self._thread_pool = ThreadPool(3) spawn = self._thread_pool.apply_async if socket_cls is None: from socket import socket as socket_cls @@ -51,21 +107,29 @@ def start(self, spawn=None, socket_cls=None): self._send_queue = self.queue_cls() # Holds response_queue to send next self._response_queues = self.queue_cls() - self._socket = socket_cls() - self._socket.connect((self.hostname, self.port)) + self._reader = SocketReader(socket_cls(), self.hostname, self.port) self._send_spawned = spawn(self._send_loop) self._recv_spawned = spawn(self._recv_loop) + # If asked to do dataport + if self.dataport: + self._datareader = SocketReader( + socket_cls(), self.hostname, self.dataport) + self._data_spawned = spawn(self._data_loop) self.started = True def stop(self): assert self.started, "Send and recv threads not started" self._send_queue.put((self.STOP, None)) self._send_spawned.wait() - import socket - self._socket.shutdown(socket.SHUT_RDWR) + self._reader.shutdown() self._recv_spawned.wait() - self._socket.close() - self._socket = None + self._reader.close() + self._reader = None + if self.dataport: + self._datareader.shutdown() + self._data_spawned.wait() + self._datareader.close() + self._datareader = None self.started = False if self._thread_pool is not None: self._thread_pool.close() @@ -106,22 +170,10 @@ def _send_loop(self): break try: self._response_queues.put(response_queue) - self._socket.send(message) + self._reader.send(message) except Exception: # pylint:disable=broad-except log.exception("Exception sending message %s", message) - def _get_lines(self): - buf = "" - while True: - lines = buf.split("\n") - for line in lines[:-1]: - yield line - buf = lines[-1] - # Get something new from the socket - rx = self._socket.recv(4096) - if rx: - buf += rx - def _respond(self, resp): """Respond to the person waiting""" response_queue = self._response_queues.get(timeout=0.1) @@ -133,10 +185,9 @@ def _recv_loop(self): """Service socket recv, returning responses to the correct queue""" self._completed_response_lines = [] self._is_multiline = None - lines_iterator = self._get_lines() while True: try: - line = next(lines_iterator) + line = self._reader.recv_line() if self._is_multiline is None: self._is_multiline = line.startswith("!") or line == "." if line.startswith("ERR"): @@ -155,6 +206,53 @@ def _recv_loop(self): log.exception("Exception receiving message") raise + def _data_loop(self): + """Service data socket, throwing away frames for now""" + state = WAIT_HEADER_START + header = "" + self._datareader.send("XML FRAMED SCALED\n") + while True: + try: + log.debug("State: %d", state) + if state == WAIT_HEADER_START: + line = self._datareader.recv_line() + if line == "
": + header += line + "\n" + state = WAIT_HEADER_END + elif state == WAIT_HEADER_END: + line = self._datareader.recv_line() + header += line + "\n" + if line == "
": + log.debug("Got header:\n%s", header) + # Eat the extra newline + line = self._datareader.recv_line() + assert line == "", "Expected \\n, got %r" % line + state = WAIT_DATA_START + elif state == WAIT_DATA_START: + bytes = self._datareader.recv_bytes(4) + if bytes == "BIN ": + state = RECV_DATA + elif bytes == "END ": + state = DATA_END + else: + raise ValueError("Bad data %r" % bytes) + elif state == RECV_DATA: + bytes = self._datareader.recv_bytes(4) + # Read message length as uint32 LE + length = struct.unpack("