forked from jupyter/jupyter_client
/
client.py
72 lines (53 loc) · 2.65 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
"""Implements an async kernel client"""
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
import zmq.asyncio
from traitlets import Instance, Type
from ..channels import AsyncZMQSocketChannel, HBChannel
from ..client import KernelClient, reqrep
def wrapped(meth, channel):
"""Wrap a method on a channel and handle replies."""
def _(self, *args, **kwargs):
reply = kwargs.pop("reply", False)
timeout = kwargs.pop("timeout", None)
msg_id = meth(self, *args, **kwargs)
if not reply:
return msg_id
return self._recv_reply(msg_id, timeout=timeout, channel=channel)
return _
class AsyncKernelClient(KernelClient):
"""A KernelClient with async APIs
``get_[channel]_msg()`` methods wait for and return messages on channels,
raising :exc:`queue.Empty` if no message arrives within ``timeout`` seconds.
"""
context = Instance(zmq.asyncio.Context)
def _context_default(self) -> zmq.asyncio.Context:
self._created_context = True
return zmq.asyncio.Context()
# --------------------------------------------------------------------------
# Channel proxy methods
# --------------------------------------------------------------------------
get_shell_msg = KernelClient._async_get_shell_msg
get_iopub_msg = KernelClient._async_get_iopub_msg
get_stdin_msg = KernelClient._async_get_stdin_msg
get_control_msg = KernelClient._async_get_control_msg
wait_for_ready = KernelClient._async_wait_for_ready
# The classes to use for the various channels
shell_channel_class = Type(AsyncZMQSocketChannel) # type:ignore[arg-type]
iopub_channel_class = Type(AsyncZMQSocketChannel) # type:ignore[arg-type]
stdin_channel_class = Type(AsyncZMQSocketChannel) # type:ignore[arg-type]
hb_channel_class = Type(HBChannel) # type:ignore[arg-type]
control_channel_class = Type(AsyncZMQSocketChannel) # type:ignore[arg-type]
_recv_reply = KernelClient._async_recv_reply
# replies come on the shell channel
execute = reqrep(wrapped, KernelClient.execute)
history = reqrep(wrapped, KernelClient.history)
complete = reqrep(wrapped, KernelClient.complete)
is_complete = reqrep(wrapped, KernelClient.is_complete)
inspect = reqrep(wrapped, KernelClient.inspect)
kernel_info = reqrep(wrapped, KernelClient.kernel_info)
comm_info = reqrep(wrapped, KernelClient.comm_info)
is_alive = KernelClient._async_is_alive
execute_interactive = KernelClient._async_execute_interactive
# replies come on the control channel
shutdown = reqrep(wrapped, KernelClient.shutdown, channel="control")