Skip to content

Commit

Permalink
#9: continuous stream of images found and passed through 0MQ pipe
Browse files Browse the repository at this point in the history
this stream is asynchronous to plan data collection, tested with
bp.count()
  • Loading branch information
prjemian committed Dec 7, 2017
1 parent 9a95fa8 commit a8c5dbe
Showing 1 changed file with 19 additions and 7 deletions.
26 changes: 19 additions & 7 deletions APS_BlueSky_tools/zmq_pair.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def client_example(filename, host=None):
print("\nEnding 0MQ client")


def mona_zmq_sender(sender, key, document, detector):
def mona_zmq_sender(sender, key, document, detector, signal_name):
'''
send documents from BlueSky events for the MONA project via a ZMQ pair
Expand Down Expand Up @@ -158,10 +158,9 @@ def receiver(self, key, document):
import json
sender.send_string(key)
sender.send_string(json.dumps(document))
if key == "event" and detector is not None:
dname = detector.name + "_array_counter"
# print(dname, "?", document["data"])
if document["data"].get(dname) is not None:
if key == "event" and detector is not None and signal_name is not None:
# print(signal_name, "?", document["data"])
if document["data"].get(signal_name) is not None:
# Is it faster to pick this up by EPICS CA?
# Using 0MQ, no additional library is needed
# print("... sending image ...")
Expand Down Expand Up @@ -201,14 +200,23 @@ def mona_zmq_receiver(filename):

def process_message():
"""
get messages from 0MQ and convert them back to original form
Messages are in the form of (key, buffer) pairs
where key is one of (start descriptor event stop bulk_events image)
and buffer is json for all except image, which is binary data.
"""
msg = listener.receive()
if str(msg) == str(listener.eot_signal_text):
return ()
key = msg.decode()
if key in ("start", "descriptor", "event", "stop", "bulk_events"):
document = listener.receive().decode()
return key, json.loads(document)
try:
return key, json.loads(document)
except Exception as msg:
print(msg)
return ()
elif key == "image":
s = listener.receive().decode().rstrip(')').lstrip('(').split(',')
shape = tuple(map(int, s))
Expand All @@ -235,7 +243,11 @@ def process_message():
nexus.close()

while True:
results = process_message()
try:
results = process_message()
except Exception as msg:
print(msg)
continue
if len(results) == 0:
break
key, document = results
Expand Down

0 comments on commit a8c5dbe

Please sign in to comment.