Skip to content

Commit

Permalink
Added data port implementation to pandabox client
Browse files Browse the repository at this point in the history
  • Loading branch information
coretl committed Aug 8, 2017
1 parent cb95356 commit b3d79b5
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 25 deletions.
153 changes: 129 additions & 24 deletions malcolm/modules/pandablocks/controllers/pandablocksclient.py 100644 → 100755
@@ -1,20 +1,73 @@
from collections import namedtuple, OrderedDict
import logging
import struct

# Create a module level logger
log = logging.getLogger(__name__)


# States
WAIT_HEADER_START = 0
WAIT_HEADER_END = 1
WAIT_DATA_START = 2
RECV_DATA = 3
DATA_END = 4


BlockData = namedtuple("BlockData", "number,description,fields")
FieldData = namedtuple("FieldData",
"field_type,field_subtype,description,labels")


class SocketReader(object):
"""Non threadsafe socket reader"""
def __init__(self, socket, hostname, port):
self._socket = socket
self._socket.settimeout(1.0)
self._socket.connect((hostname, port))
self._buf = ""

def shutdown(self):
import socket
self._socket.shutdown(socket.SHUT_RDWR)

def close(self):
self._socket.close()

def send(self, message):
self._socket.send(message)

def _recv_more_data(self):
rx = None
while not rx:
import socket
try:
rx = self._socket.recv(4096)
except socket.error:
rx = None
else:
log.debug("Received %d bytes: %r", len(rx), rx)
self._buf += rx

def recv_line(self):
while "\n" not in self._buf:
self._recv_more_data()
line, self._buf = self._buf.split("\n", 1)
return line

def recv_bytes(self, size):
while len(self._buf) < size:
self._recv_more_data()
bytes, self._buf = self._buf[:size], self._buf[size:]
return bytes


class PandABlocksClient(object):
# Sentinel that tells the send_loop and recv_loop to stop
STOP = object()

def __init__(self, hostname="localhost", port=8888, queue_cls=None):
def __init__(self, hostname="localhost", port=8888, queue_cls=None,
dataport=8889):
if queue_cls is None:
try:
# Python 2
Expand All @@ -25,24 +78,27 @@ def __init__(self, hostname="localhost", port=8888, queue_cls=None):
self.queue_cls = queue_cls
self.hostname = hostname
self.port = port
self.dataport = dataport
# Completed lines for a response in progress
self._completed_response_lines = []
# True if the current response is multiline
self._is_multiline = None
# True when we have been started
self.started = False
# Filled in on start
self._socket = None
self._reader = None
self._send_spawned = None
self._send_queue = None
self._recv_spawned = None
self._datareader = None
self._data_spawned = None
self._response_queues = None
self._thread_pool = None

def start(self, spawn=None, socket_cls=None):
if spawn is None:
from multiprocessing.pool import ThreadPool
self._thread_pool = ThreadPool(2)
self._thread_pool = ThreadPool(3)
spawn = self._thread_pool.apply_async
if socket_cls is None:
from socket import socket as socket_cls
Expand All @@ -51,21 +107,29 @@ def start(self, spawn=None, socket_cls=None):
self._send_queue = self.queue_cls()
# Holds response_queue to send next
self._response_queues = self.queue_cls()
self._socket = socket_cls()
self._socket.connect((self.hostname, self.port))
self._reader = SocketReader(socket_cls(), self.hostname, self.port)
self._send_spawned = spawn(self._send_loop)
self._recv_spawned = spawn(self._recv_loop)
# If asked to do dataport
if self.dataport:
self._datareader = SocketReader(
socket_cls(), self.hostname, self.dataport)
self._data_spawned = spawn(self._data_loop)
self.started = True

def stop(self):
assert self.started, "Send and recv threads not started"
self._send_queue.put((self.STOP, None))
self._send_spawned.wait()
import socket
self._socket.shutdown(socket.SHUT_RDWR)
self._reader.shutdown()
self._recv_spawned.wait()
self._socket.close()
self._socket = None
self._reader.close()
self._reader = None
if self.dataport:
self._datareader.shutdown()
self._data_spawned.wait()
self._datareader.close()
self._datareader = None
self.started = False
if self._thread_pool is not None:
self._thread_pool.close()
Expand Down Expand Up @@ -106,22 +170,10 @@ def _send_loop(self):
break
try:
self._response_queues.put(response_queue)
self._socket.send(message)
self._reader.send(message)
except Exception: # pylint:disable=broad-except
log.exception("Exception sending message %s", message)

def _get_lines(self):
buf = ""
while True:
lines = buf.split("\n")
for line in lines[:-1]:
yield line
buf = lines[-1]
# Get something new from the socket
rx = self._socket.recv(4096)
if rx:
buf += rx

def _respond(self, resp):
"""Respond to the person waiting"""
response_queue = self._response_queues.get(timeout=0.1)
Expand All @@ -133,10 +185,9 @@ def _recv_loop(self):
"""Service socket recv, returning responses to the correct queue"""
self._completed_response_lines = []
self._is_multiline = None
lines_iterator = self._get_lines()
while True:
try:
line = next(lines_iterator)
line = self._reader.recv_line()
if self._is_multiline is None:
self._is_multiline = line.startswith("!") or line == "."
if line.startswith("ERR"):
Expand All @@ -155,6 +206,53 @@ def _recv_loop(self):
log.exception("Exception receiving message")
raise

def _data_loop(self):
"""Service data socket, throwing away frames for now"""
state = WAIT_HEADER_START
header = ""
self._datareader.send("XML FRAMED SCALED\n")
while True:
try:
log.debug("State: %d", state)
if state == WAIT_HEADER_START:
line = self._datareader.recv_line()
if line == "<header>":
header += line + "\n"
state = WAIT_HEADER_END
elif state == WAIT_HEADER_END:
line = self._datareader.recv_line()
header += line + "\n"
if line == "</header>":
log.debug("Got header:\n%s", header)
# Eat the extra newline
line = self._datareader.recv_line()
assert line == "", "Expected \\n, got %r" % line
state = WAIT_DATA_START
elif state == WAIT_DATA_START:
bytes = self._datareader.recv_bytes(4)
if bytes == "BIN ":
state = RECV_DATA
elif bytes == "END ":
state = DATA_END
else:
raise ValueError("Bad data %r" % bytes)
elif state == RECV_DATA:
bytes = self._datareader.recv_bytes(4)
# Read message length as uint32 LE
length = struct.unpack("<I", bytes)[0]
packet = self._datareader.recv_bytes(length - 8)
log.debug("Got packet length %s", length)
state = WAIT_DATA_START
elif state == DATA_END:
# Read out why we finished
line = self._datareader.recv_line()
log.debug("Data completed: %r", line)
header = ""
state = WAIT_HEADER_START
except Exception:
log.exception("Exception receiving message")
raise

def _get_block_numbers(self):
block_numbers = OrderedDict()
for line in self.send_recv("*BLOCKS?\n"):
Expand Down Expand Up @@ -328,3 +426,10 @@ def set_table(self, block, field, int_values):
lines += ["\n"]
resp = self.send_recv("".join(lines))
assert resp == "OK", "Expected OK, got %r" % resp

if __name__ == "__main__":
import logging
logging.basicConfig(filename='panda.log', level=logging.DEBUG)
client = PandABlocksClient("172.23.252.201")
client.start()
raw_input()
Expand Up @@ -46,7 +46,8 @@ def __init__(self, process, parts, params):
# changes left over from last time
self.changes = OrderedDict()
# The PandABlock client that does the comms
self.client = PandABlocksClient(params.hostname, params.port, Queue)
self.client = PandABlocksClient(
params.hostname, params.port, Queue, dataport=None)
# Filled in on reset
self._stop_queue = None
self._poll_spawned = None
Expand Down

0 comments on commit b3d79b5

Please sign in to comment.