Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Fixing code to assume msg_type and msg_id are top-level.

* I have gone through and looked for instances of ['msg_type'] and
  ['msg_id'] and tried to make sure that I added ['header'] so
  pull the values out of the header.
* But there are many cases where I can't tell if the dict is the
  full message or the header already. This is especially true
  of the msg_id in the parallel db parts of the code.
* Tests pass, but this is scary.
  • Loading branch information...
commit 36064ac1834200c257f58e9bd7ce196fa872958f 1 parent ad0a4e9
@ellisonbg authored
View
2  IPython/frontend/qt/base_frontend_mixin.py
@@ -96,7 +96,7 @@ def _dispatch(self, msg):
""" Calls the frontend handler associated with the message type of the
given message.
"""
- msg_type = msg['msg_type']
+ msg_type = msg['header']['msg_type']
handler = getattr(self, '_handle_' + msg_type, None)
if handler:
handler(msg)
View
6 IPython/frontend/qt/kernelmanager.py
@@ -66,7 +66,7 @@ def call_handlers(self, msg):
self.message_received.emit(msg)
# Emit signals for specialized message types.
- msg_type = msg['msg_type']
+ msg_type = msg['header']['msg_type']
signal = getattr(self, msg_type, None)
if signal:
signal.emit(msg)
@@ -122,7 +122,7 @@ def call_handlers(self, msg):
# Emit the generic signal.
self.message_received.emit(msg)
# Emit signals for specialized message types.
- msg_type = msg['msg_type']
+ msg_type = msg['header']['msg_type']
signal = getattr(self, msg_type + '_received', None)
if signal:
signal.emit(msg)
@@ -155,7 +155,7 @@ def call_handlers(self, msg):
self.message_received.emit(msg)
# Emit signals for specialized message types.
- msg_type = msg['msg_type']
+ msg_type = msg['header']['msg_type']
if msg_type == 'input_request':
self.input_requested.emit(msg)
View
8 IPython/parallel/client/client.py
@@ -670,7 +670,7 @@ def _flush_notifications(self):
while msg is not None:
if self.debug:
pprint(msg)
- msg_type = msg['msg_type']
+ msg_type = msg['header']['msg_type']
handler = self._notification_handlers.get(msg_type, None)
if handler is None:
raise Exception("Unhandled message type: %s"%msg.msg_type)
@@ -684,7 +684,7 @@ def _flush_results(self, sock):
while msg is not None:
if self.debug:
pprint(msg)
- msg_type = msg['msg_type']
+ msg_type = msg['header']['msg_type']
handler = self._queue_handlers.get(msg_type, None)
if handler is None:
raise Exception("Unhandled message type: %s"%msg.msg_type)
@@ -729,7 +729,7 @@ def _flush_iopub(self, sock):
msg_id = parent['msg_id']
content = msg['content']
header = msg['header']
- msg_type = msg['msg_type']
+ msg_type = msg['header']['msg_type']
# init metadata:
md = self.metadata[msg_id]
@@ -994,7 +994,7 @@ def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None,
msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
subheader=subheader, track=track)
- msg_id = msg['msg_id']
+ msg_id = msg['header']['msg_id']
self.outstanding.add(msg_id)
if ident:
# possibly routed to a specific engine
View
4 IPython/parallel/client/view.py
@@ -523,7 +523,7 @@ def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, tra
ident=ident)
if track:
trackers.append(msg['tracker'])
- msg_ids.append(msg['msg_id'])
+ msg_ids.append(msg['header']['msg_id'])
tracker = None if track is False else zmq.MessageTracker(*trackers)
ar = AsyncResult(self.client, msg_ids, fname=f.__name__, targets=targets, tracker=tracker)
if block:
@@ -980,7 +980,7 @@ def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
subheader=subheader)
tracker = None if track is False else msg['tracker']
- ar = AsyncResult(self.client, msg['msg_id'], fname=f.__name__, targets=None, tracker=tracker)
+ ar = AsyncResult(self.client, msg['header']['msg_id'], fname=f.__name__, targets=None, tracker=tracker)
if block:
try:
View
4 IPython/parallel/controller/hub.py
@@ -494,7 +494,7 @@ def dispatch_query(self, msg):
return
# print client_id, header, parent, content
#switch on message type:
- msg_type = msg['msg_type']
+ msg_type = msg['header']['msg_type']
self.log.info("client::client %r requested %r"%(client_id, msg_type))
handler = self.query_handlers.get(msg_type, None)
try:
@@ -791,7 +791,7 @@ def save_iopub_message(self, topics, msg):
self.log.error("iopub::invalid IOPub message: %r"%msg)
return
msg_id = parent['msg_id']
- msg_type = msg['msg_type']
+ msg_type = msg['header']['msg_type']
content = msg['content']
# ensure msg_id is in db
View
2  IPython/parallel/controller/scheduler.py
@@ -216,7 +216,7 @@ def dispatch_notification(self, msg):
self.log.warn("task::Unauthorized message from: %r"%idents)
return
- msg_type = msg['msg_type']
+ msg_type = msg['header']['msg_type']
handler = self._notification_handlers.get(msg_type, None)
if handler is None:
View
6 IPython/parallel/engine/kernelstarter.py
@@ -44,7 +44,7 @@ def dispatch_request(self, raw_msg):
except:
print ("bad msg: %s"%msg)
- msgtype = msg['msg_type']
+ msgtype = msg['header']['msg_type']
handler = self.handlers.get(msgtype, None)
if handler is None:
self.downstream.send_multipart(raw_msg, copy=False)
@@ -58,7 +58,7 @@ def dispatch_reply(self, raw_msg):
except:
print ("bad msg: %s"%msg)
- msgtype = msg['msg_type']
+ msgtype = msg['header']['msg_type']
handler = self.handlers.get(msgtype, None)
if handler is None:
self.upstream.send_multipart(raw_msg, copy=False)
@@ -227,4 +227,4 @@ def make_starter(up_addr, down_addr, *args, **kwargs):
starter = KernelStarter(session, upstream, downstream, *args, **kwargs)
starter.start()
loop.start()
-
+
View
12 IPython/parallel/engine/streamkernel.py
@@ -150,7 +150,7 @@ def abort_queue(self, stream):
self.log.info("Aborting:")
self.log.info(str(msg))
- msg_type = msg['msg_type']
+ msg_type = msg['header']['msg_type']
reply_type = msg_type.split('_')[0] + '_reply'
# reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
# self.reply_socket.send(ident,zmq.SNDMORE)
@@ -205,9 +205,9 @@ def dispatch_control(self, msg):
header = msg['header']
msg_id = header['msg_id']
- handler = self.control_handlers.get(msg['msg_type'], None)
+ handler = self.control_handlers.get(msg['header']['msg_type'], None)
if handler is None:
- self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
+ self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['header']['msg_type'])
else:
handler(self.control_stream, idents, msg)
@@ -386,14 +386,14 @@ def dispatch_queue(self, stream, msg):
if self.check_aborted(msg_id):
self.aborted.remove(msg_id)
# is it safe to assume a msg_id will not be resubmitted?
- reply_type = msg['msg_type'].split('_')[0] + '_reply'
+ reply_type = msg['header']['msg_type'].split('_')[0] + '_reply'
status = {'status' : 'aborted'}
reply_msg = self.session.send(stream, reply_type, subheader=status,
content=status, parent=msg, ident=idents)
return
- handler = self.shell_handlers.get(msg['msg_type'], None)
+ handler = self.shell_handlers.get(msg['header']['msg_type'], None)
if handler is None:
- self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
+ self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['header']['msg_type'])
else:
handler(stream, idents, msg)
View
4 IPython/parallel/tests/test_db.py
@@ -56,8 +56,8 @@ def load_records(self, n=1):
msg = self.session.msg('apply_request', content=dict(a=5))
msg['buffers'] = []
rec = init_record(msg)
- msg_ids.append(msg['msg_id'])
- self.db.add_record(msg['msg_id'], rec)
+ msg_ids.append(msg['header']['msg_id'])
+ self.db.add_record(msg['header']['msg_id'], rec)
return msg_ids
def test_add_record(self):
View
6 IPython/zmq/ipkernel.py
@@ -133,11 +133,11 @@ def do_one_iteration(self):
# Print some info about this message and leave a '--->' marker, so it's
# easier to trace visually the message chain when debugging. Each
# handler prints its message at the end.
- self.log.debug('\n*** MESSAGE TYPE:'+str(msg['msg_type'])+'***')
+ self.log.debug('\n*** MESSAGE TYPE:'+str(msg['header']['msg_type'])+'***')
self.log.debug(' Content: '+str(msg['content'])+'\n --->\n ')
# Find and call actual handler for message
- handler = self.handlers.get(msg['msg_type'], None)
+ handler = self.handlers.get(msg['header']['msg_type'], None)
if handler is None:
self.log.error("UNKNOWN MESSAGE TYPE:" +str(msg))
else:
@@ -375,7 +375,7 @@ def _abort_queue(self):
"Unexpected missing message part."
self.log.debug("Aborting:\n"+str(Message(msg)))
- msg_type = msg['msg_type']
+ msg_type = msg['header']['msg_type']
reply_type = msg_type.split('_')[0] + '_reply'
reply_msg = self.session.send(self.shell_socket, reply_type,
{'status' : 'aborted'}, msg, ident=ident)
View
2  IPython/zmq/pykernel.py
@@ -190,7 +190,7 @@ def _abort_queue(self):
else:
assert ident is not None, "Missing message part."
self.log.debug("Aborting: %s"%Message(msg))
- msg_type = msg['msg_type']
+ msg_type = msg['header']['msg_type']
reply_type = msg_type.split('_')[0] + '_reply'
reply_msg = self.session.send(self.shell_socket, reply_type, {'status':'aborted'}, msg, ident=ident)
self.log.debug(Message(reply_msg))
View
3  IPython/zmq/session.py
@@ -359,9 +359,7 @@ def msg(self, msg_type, content=None, parent=None, subheader=None):
"""
msg = {}
msg['header'] = self.msg_header(msg_type)
- msg['msg_id'] = msg['header']['msg_id']
msg['parent_header'] = {} if parent is None else extract_header(parent)
- msg['msg_type'] = msg_type
msg['content'] = {} if content is None else content
sub = {} if subheader is None else subheader
msg['header'].update(sub)
@@ -651,7 +649,6 @@ def unpack_message(self, msg_list, content=True, copy=True):
if not len(msg_list) >= minlen:
raise TypeError("malformed message, must have at least %i elements"%minlen)
message['header'] = self.unpack(msg_list[1])
- message['msg_type'] = message['header']['msg_type']
message['parent_header'] = self.unpack(msg_list[2])
if content:
message['content'] = self.unpack(msg_list[3])
View
4 IPython/zmq/tests/test_session.py
@@ -37,10 +37,8 @@ def test_msg(self):
self.assertTrue(isinstance(msg['content'],dict))
self.assertTrue(isinstance(msg['header'],dict))
self.assertTrue(isinstance(msg['parent_header'],dict))
- self.assertEquals(msg['msg_type'], 'execute')
+ self.assertEquals(msg['header']['msg_type'], 'execute')
-
-
def test_args(self):
"""initialization arguments for Session"""
s = self.session
Please sign in to comment.
Something went wrong with that request. Please try again.