/
iopubwatcher.py
executable file
·81 lines (67 loc) · 2.58 KB
/
iopubwatcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#! /usr/bin/env python
"""
A script for watching all traffic on the IOPub channel (stdout/stderr/pyerr) of
engines.
This connects to the default cluster, or you can pass the path to your
ipcontroller-client.json
Try running this script, and then running a few jobs that print (and call
sys.stdout.flush), and you will see the print statements as they arrive,
notably not waiting for the results to finish.
You can use the zeromq SUBSCRIBE mechanism to only receive information from
specific engines, and easily filter by message type.
Authors
-------
* MinRK
"""
import sys
import json
import zmq
from IPython.kernel.zmq.session import Session
from IPython.utils.py3compat import str_to_bytes
from IPython.utils.path import get_security_file
def main(connection_file):
"""watch iopub channel, and print messages"""
ctx = zmq.Context.instance()
with open(connection_file) as f:
cfg = json.loads(f.read())
reg_url = cfg['interface']
iopub_port = cfg['iopub']
iopub_url = "%s:%s" % (reg_url, iopub_port)
session = Session(key=str_to_bytes(cfg['key']))
sub = ctx.socket(zmq.SUB)
# This will subscribe to all messages:
sub.setsockopt(zmq.SUBSCRIBE, b'')
# replace with b'' with b'engine.1.stdout' to subscribe only to engine 1's
# stdout 0MQ subscriptions are simple 'foo*' matches, so 'engine.1.'
# subscribes to everything from engine 1, but there is no way to subscribe
# to just stdout from everyone. multiple calls to subscribe will add
# subscriptions, e.g. to subscribe to engine 1's stderr and engine 2's
# stdout: sub.setsockopt(zmq.SUBSCRIBE, b'engine.1.stderr')
# sub.setsockopt(zmq.SUBSCRIBE, b'engine.2.stdout')
sub.connect(iopub_url)
while True:
try:
idents, msg = session.recv(sub, mode=0)
except KeyboardInterrupt:
return
# ident always length 1 here
topic = idents[0]
if msg['msg_type'] == 'stream':
# stdout/stderr
# stream names are in msg['content']['name'], if you want to handle
# them differently
print("%s: %s" % (topic, msg['content']['data']))
elif msg['msg_type'] == 'pyerr':
# Python traceback
c = msg['content']
print(topic + ':')
for line in c['traceback']:
# indent lines
print(' ' + line)
if __name__ == '__main__':
if len(sys.argv) > 1:
cf = sys.argv[1]
else:
# This gets the security file for the default profile:
cf = get_security_file('ipcontroller-client.json')
main(cf)