Skip to content

Commit

Permalink
migrated to msgpack as serializer for network traffic to allow connec…
Browse files Browse the repository at this point in the history
…tions from non-python applications
  • Loading branch information
jooste committed Nov 1, 2017
1 parent e1e91e8 commit d8be175
Show file tree
Hide file tree
Showing 15 changed files with 341 additions and 240 deletions.
2 changes: 1 addition & 1 deletion bluesky/__init__.py
Expand Up @@ -11,7 +11,7 @@
INIT, OP, HOLD, END = list(range(4))

NUMEVENTS = 16
SetNodeIdType, SetActiveNodeType, AddNodeType, SimStateEventType, BatchEventType, \
SetNodeIdType, SetActiveNodeType, NodesChanged, SimStateEventType, BatchEventType, \
PanZoomEventType, ACDataEventType, SimInfoEventType, StackTextEventType, \
StackInitEventType, ShowDialogEventType, DisplayFlagEventType, \
RouteDataEventType, DisplayShapeEventType, \
Expand Down
1 change: 1 addition & 0 deletions bluesky/io/__init__.py
@@ -1,5 +1,6 @@
from bluesky import settings
from bluesky.io.node import Node
from bluesky.io.iodata import IOData
if not settings.is_sim:
from bluesky.io.iomanager import IOManager
from bluesky.io.client import Client
59 changes: 34 additions & 25 deletions bluesky/io/client.py
@@ -1,27 +1,31 @@
import zmq
import msgpack
from bluesky.tools import Signal
from bluesky.io.npcodec import encode_ndarray, decode_ndarray


class Client(object):
def __init__(self):
ctx = zmq.Context.instance()
self.event_io = ctx.socket(zmq.DEALER)
self.stream_in = ctx.socket(zmq.SUB)
self.poller = zmq.Poller()
self.host_id = b''
self.client_id = 0
self.sender_id = b''
self.event_io = ctx.socket(zmq.DEALER)
self.stream_in = ctx.socket(zmq.SUB)
self.poller = zmq.Poller()
self.host_id = b''
self.client_id = 0
self.sender_id = b''
self.known_nodes = dict()

# Signals
self.nodes_changed = Signal()
self.event_received = Signal()
self.stream_received = Signal()

def connect(self):
self.event_io.connect('tcp://localhost:9000')
self.event_io.send(b'REGISTER')
msg = self.event_io.recv()
self.client_id = 256 * msg[-2] + msg[-1]
self.host_id = msg[:5]
self.send_event(b'REGISTER')
data = self.event_io.recv_multipart()[-1]
self.client_id = 256 * data[-2] + data[-1]
self.host_id = data[:5]
print('Client {} connected to host {}'.format(self.client_id, self.host_id))
self.stream_in.connect('tcp://localhost:9001')
self.stream_in.setsockopt(zmq.SUBSCRIBE, b'')
Expand All @@ -34,29 +38,34 @@ def receive(self):
try:
socks = dict(self.poller.poll(0))
if socks.get(self.event_io) == zmq.POLLIN:
self.sender_id = self.event_io.recv()
data = self.event_io.recv_pyobj()
print('received event data')
res = self.event_io.recv_multipart()
self.sender_id = res[0]
name = res[1]
data = msgpack.unpackb(res[2], object_hook=decode_ndarray, encoding='utf-8')
if name == b'NODESCHANGED':
self.known_nodes.update(data)
self.nodes_changed.emit(data)

print('received {} event data'.format(name))
print(data)
self.event_received.emit(data, self.sender_id)
self.event_received.emit(name, data, self.sender_id)

if socks.get(self.stream_in) == zmq.POLLIN:
nameandid = self.stream_in.recv()
stream_name = nameandid[:-8]
sender_id = nameandid[-8:]
data = self.stream_in.recv_pyobj()
self.stream_received.emit(data, stream_name, sender_id)
res = self.stream_in.recv_multipart()

name = res[0][:-8]
sender_id = res[0][-8:]
data = msgpack.unpackb(res[1], object_hook=decode_ndarray, encoding='utf-8')
self.stream_received.emit(name, data, sender_id)
except zmq.ZMQError:
return False

def addnodes(self, count=1):
self.event_io.send(bytearray((count,)), zmq.SNDMORE)
self.event_io.send(b'ADDNODES')
self.send_event(b'ADDNODES', count)

def send_event(self, data, target=None):
def send_event(self, name, data=None, target=None):
# On the sim side, target is obtained from the currently-parsed stack command
self.event_io.send(target or b'*', zmq.SNDMORE)
self.event_io.send_pyobj(data)
self.event_io.send_multipart([target or b'*', name, msgpack.packb(data, default=encode_ndarray, use_bin_type=True)])

def send_stream(self, data, name):
def send_stream(self, name, data):
pass
49 changes: 34 additions & 15 deletions bluesky/io/iomanager.py
Expand Up @@ -3,34 +3,37 @@
from threading import Thread
from subprocess import Popen
import zmq
import msgpack


class IOManager(Thread):
def __init__(self):
super(IOManager, self).__init__()
self.localnodes = list()
self.spawned_processes = list()
self.running = True
self.nodes = dict()

def addnodes(self, count=1):
for _ in range(count):
p = Popen([sys.executable, 'BlueSky_qtgl.py', '--node'])
self.localnodes.append(p)
self.spawned_processes.append(p)

def run(self):
# Get ZMQ context
ctx = zmq.Context.instance()

# Convenience class for event connection handling
class EventConn:
''' Convenience class for event connection handling. '''
# Generate one host ID for this host
host_id = b'\x00' + os.urandom(5)
host_id = b'\x00' + os.urandom(4)

def __init__(self, endpoint):
def __init__(self, endpoint, connection_key):
self.sock = ctx.socket(zmq.ROUTER)
self.sock.bind(endpoint)
self.namefromid = dict()
self.idfromname = dict()
self.conn_count = 0
self.conn_key = connection_key

def __eq__(self, sock):
# Compare own socket to socket returned from poller.poll
Expand All @@ -40,23 +43,27 @@ def register(self, connid):
# The connection ID consists of the host id plus the index of the
# new connection encoded in two bytes.
self.conn_count += 1
name = self.host_id + bytearray((self.conn_count // 256, self.conn_count % 256))
name = self.host_id + self.conn_key + \
bytearray((self.conn_count // 256, self.conn_count % 256))
self.namefromid[connid] = name
self.idfromname[name] = connid
return name


# Create connection points for clients
fe_event = EventConn('tcp://*:9000')
fe_event = EventConn('tcp://*:9000', b'c')
fe_stream = ctx.socket(zmq.XPUB)
fe_stream.bind('tcp://*:9001')


# Create connection points for sim workers
be_event = EventConn('tcp://*:10000')
be_event = EventConn('tcp://*:10000', b'w')
be_stream = ctx.socket(zmq.XSUB)
be_stream.bind('tcp://*:10001')

# We start with zero nodes
self.nodes[EventConn.host_id] = 0

# Create poller for both event connection points and the stream reader
poller = zmq.Poller()
poller.register(fe_event.sock, zmq.POLLIN)
Expand Down Expand Up @@ -88,19 +95,31 @@ def register(self, connid):
be_stream.send_multipart(msg)
else:
# Select the correct source and destination
src, dest = (fe_event, be_event) if sock == fe_event else (be_event, fe_event)
srcisclient = (sock == fe_event)
src, dest = (fe_event, be_event) if srcisclient else (be_event, fe_event)

# Message format: [sender, target, name, data]
sender, target, eventname, data = msg

if msg[-1] == b'REGISTER':
if eventname == b'REGISTER':
# This is a registration message for a new connection
# Send reply with connection name
sock.send_multipart([msg[0], src.register(msg[0])])
msg[-1] = src.register(msg[0])
src.sock.send_multipart(msg)
if srcisclient:
src.sock.send_multipart([msg[0], src.host_id, b'NODESCHANGED', msgpack.packb(self.nodes, use_bin_type=True)])
else:
self.nodes[src.host_id] += 1
data = msgpack.packb({src.host_id : self.nodes[src.host_id]}, use_bin_type=True)
for connid in dest.namefromid:
dest.sock.send_multipart([connid, src.host_id, b'NODESCHANGED', data])

elif msg[-1] == b'ADDNODES':
elif msg[2] == b'ADDNODES':
# This is a request to start new nodes.
count = msg[-2]
count = msgpack.unpackb(msg[3])
self.addnodes(count)

elif msg[-1] == b'QUIT':
elif msg[2] == b'QUIT':
# TODO: send quit to all
self.running = False

Expand All @@ -119,5 +138,5 @@ def register(self, connid):
dest.sock.send_multipart(msg)

# Wait for all nodes to finish
for n in self.localnodes:
for n in self.spawned_processes:
n.wait()
37 changes: 17 additions & 20 deletions bluesky/io/node.py
@@ -1,9 +1,10 @@
""" Node encapsulates the sim process, and manages process I/O. """
from threading import Thread
import zmq
import msgpack
from bluesky import stack
from bluesky.tools import Timer

from bluesky.io.npcodec import encode_ndarray, decode_ndarray

class IOThread(Thread):
''' Separate thread for node I/O. '''
Expand All @@ -24,9 +25,6 @@ def run(self):
poller.register(be_event, zmq.POLLIN)
poller.register(be_stream, zmq.POLLIN)

fe_event.send(b'REGISTER')
be_event.send(fe_event.recv())

while True:
try:
poll_socks = dict(poller.poll(None))
Expand Down Expand Up @@ -65,12 +63,13 @@ def init(self):

# Start the I/O thread, and receive from it this node's ID
self.iothread.start()
self.nodeid = self.event_io.recv()
self.send_event(b'REGISTER')
self.nodeid = self.event_io.recv_multipart()[-1]
print('Node started, id={}'.format(self.nodeid))

def event(self, data, sender_id):
def event(self, eventname, eventdata, sender_id):
''' Event data handler. Reimplemented in Simulation. '''
print('Received data from {}'.format(sender_id))
print('Received {} data from {}'.format(eventname, sender_id))

def step(self):
''' Perform one iteration step. Reimplemented in Simulation. '''
Expand All @@ -85,7 +84,7 @@ def start(self):
self.run()

# Send quit event to the worker thread and wait for it to close.
self.event_io.send('QUIT')
self.event_io.send(b'QUIT')
self.iothread.join()

def quit(self):
Expand All @@ -97,10 +96,12 @@ def run(self):
while self.running:
# Get new events from the I/O thread
while self.poll():
sender_id = self.event_io.recv()
data = self.event_io.recv_pyobj()
res = self.event_io.recv_multipart()
sender_id = res[0]
name = res[1]
data = msgpack.unpackb(res[2], object_hook=decode_ndarray, encoding='utf-8')
print('Node received event')
self.event(data, sender_id)
self.event(name, data, sender_id)
# Perform a simulation step
self.step()

Expand All @@ -116,15 +117,11 @@ def poll(self):
return False

def addnodes(self, count=1):
self.event_io.send(bytearray((count,)), zmq.SNDMORE)
self.event_io.send(b'ADDNODES')
self.send_event(b'ADDNODES', count)

def send_event(self, data, target=None):
def send_event(self, name, data=None, target=None):
# On the sim side, target is obtained from the currently-parsed stack command
self.event_io.send(stack.sender() or b'*', zmq.SNDMORE)
self.event_io.send_pyobj(data)
self.event_io.send_multipart([stack.sender() or b'*', name, msgpack.packb(data, default=encode_ndarray, use_bin_type=True)])

def send_stream(self, data, name):
# self.stream_out.send(bytearray(name + self.nodeid, 'ascii'), zmq.SNDMORE)
self.stream_out.send(bytearray(name, 'ascii') + self.nodeid, zmq.SNDMORE)
self.stream_out.send_pyobj(data)
def send_stream(self, name, data):
self.stream_out.send_multipart([name + self.nodeid, msgpack.packb(data, default=encode_ndarray, use_bin_type=True)])
16 changes: 16 additions & 0 deletions bluesky/io/npcodec.py
@@ -0,0 +1,16 @@
import numpy as np

def encode_ndarray(o):
'''Msgpack encoder for numpy arrays.'''
if isinstance(o, np.ndarray):
return {b'numpy': True,
b'type': o.dtype.str,
b'shape': o.shape,
b'data': o.tobytes()}
return o

def decode_ndarray(o):
'''Msgpack decoder for numpy arrays.'''
if o.get(b'numpy'):
return np.fromstring(o[b'data'], dtype=np.dtype(o[b'type'])).reshape(o[b'shape'])
return o

0 comments on commit d8be175

Please sign in to comment.