Skip to content

Commit

Permalink
Refactor RPC module
Browse files Browse the repository at this point in the history
Put all the orthogonal things in small submodules.
  • Loading branch information
coldfix committed Apr 24, 2015
1 parent 36a2932 commit 8b992cb
Show file tree
Hide file tree
Showing 9 changed files with 572 additions and 371 deletions.
381 changes: 10 additions & 371 deletions cpymad/_rpc.py

Large diffs are not rendered by default.

Empty file added cpymad/rpc_util/__init__.py
Empty file.
105 changes: 105 additions & 0 deletions cpymad/rpc_util/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""
RPC client utilities.
"""

from __future__ import absolute_import

import sys

from . import ipc


__all__ = [
'RemoteProcessClosed',
'RemoteProcessCrashed',
'Client',
]


class RemoteProcessClosed(RuntimeError):
"""The MAD-X remote process has already been closed."""
pass


class RemoteProcessCrashed(RuntimeError):
"""The MAD-X remote process has crashed."""
pass


class Client(object):

"""
Base class for a very lightweight synchronous RPC client.
Uses a connection that shares the interface with :class:`Connection` to
do synchronous RPC. Synchronous IO means that currently callbacks /
events are impossible.
"""

def __init__(self, conn):
"""Initialize the client with a :class:`Connection` like object."""
self._conn = conn

def __del__(self):
"""Close the client and the associated connection with it."""
try:
self.close()
except (RemoteProcessCrashed, RemoteProcessClosed):
# catch ugly follow-up warnings after a MAD-X process has crashed
pass

@classmethod
def spawn_subprocess(cls, **Popen_args):
"""
Create client for a backend service in a subprocess.
You can use the keyword arguments to pass further arguments to
Popen, which is useful for example, if you want to redirect STDIO
streams.
"""
args = [sys.executable, '-m', cls.__module__]
conn, proc = ipc.spawn_subprocess(args, **Popen_args)
return cls(conn), proc

def close(self):
"""Close the connection gracefully, stop the remote service."""
try:
self._conn.send(('close', ()))
except ValueError: # already closed
pass
self._conn.close()

@property
def closed(self):
"""Check if connection is closed."""
return self._conn.closed

def _request(self, kind, *args):
"""Communicate with the remote service synchronously."""
try:
self._conn.send((kind, args))
except ValueError:
if self.closed:
raise RemoteProcessClosed()
raise
except IOError:
raise RemoteProcessCrashed()
try:
response = self._conn.recv()
except EOFError:
raise RemoteProcessCrashed()
return self._dispatch(response)

def _dispatch(self, response):
"""Dispatch an answer from the remote service."""
kind, args = response
handler = getattr(self, '_dispatch_%s' % (kind,))
return handler(*args)

def _dispatch_exception(self, exc_info):
"""Dispatch an exception."""
raise exc_info

def _dispatch_data(self, data):
"""Dispatch returned data."""
return data
69 changes: 69 additions & 0 deletions cpymad/rpc_util/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
IPC connection.
"""

from __future__ import absolute_import

import os

try:
# python2's cPickle is an accelerated (C extension) version of pickle:
import cPickle as pickle
except ImportError:
# python3's pickle automatically uses the accelerated version and falls
# back to the python version, see:
# http://docs.python.org/3.3/whatsnew/3.0.html?highlight=cpickle
import pickle


__all__ = [
'Connection',
]


class Connection(object):

"""
Pipe-like IPC connection using file objects.
For most purposes this should behave like the connection objects
returned by :func:`multiprocessing.Pipe`.
This class combines two orthogonal functionalities. In general this is
bad practice, meaning the class should be refactored into two classes,
but for our specific purpose this will do it.
- build a bidirectional stream from two unidirectional streams
- build a serialized connection from pure data streams (pickle)
"""

def __init__(self, recv, send):
"""Create duplex connection from two unidirectional streams."""
self._recv = recv
self._send = send

def recv(self):
"""Receive a pickled message from the remote end."""
return pickle.load(self._recv)

def send(self, data):
"""Send a pickled message to the remote end."""
# '-1' instructs pickle to use the latest protocol version. This
# improves performance by a factor ~50-100 in my tests:
return pickle.dump(data, self._send, -1)

def close(self):
"""Close the connection."""
self._recv.close()
self._send.close()

@property
def closed(self):
"""Check if the connection is fully closed."""
return self._recv.closed and self._send.closed

@classmethod
def from_fd(cls, recv_fd, send_fd):
"""Create a connection from two file descriptors."""
return cls(os.fdopen(recv_fd, 'rb', 0),
os.fdopen(send_fd, 'wb', 0))
85 changes: 85 additions & 0 deletions cpymad/rpc_util/ipc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""
IPC utilities.
"""

from __future__ import absolute_import

import os
import subprocess
import sys

from .connection import Connection

if sys.platform == 'win32':
from .windows import Handle
else:
from .posix import Handle


__all__ = [
'close_all_but',
'create_ipc_connectin',
'spawn_subprocess',
'prepare_subprocess_ipc',
]


def close_all_but(keep):
"""Close all but the given file descriptors."""
# first, let the garbage collector run, it may find some unreachable
# file objects (on posix forked processes) and close them:
import gc
gc.collect()
# close all ranges in between the file descriptors to be kept:
keep = sorted(set([-1] + keep + [subprocess.MAXFD]))
for s, e in zip(keep[:-1], keep[1:]):
if s+1 < e:
os.closerange(s+1, e)


def create_ipc_connection():
"""
Create a connection that can be used for IPC with a subprocess.
Return (local_connection, remote_recv_handle, remote_send_handle).
"""
local_recv, _remote_send = Handle.pipe()
_remote_recv, local_send = Handle.pipe()
remote_recv = _remote_recv.dup_inheritable()
remote_send = _remote_send.dup_inheritable()
conn = Connection.from_fd(local_recv.detach_fd(),
local_send.detach_fd())
return conn, remote_recv, remote_send


def spawn_subprocess(argv, **Popen_args):
"""
Spawn a subprocess and pass to it two IPC handles.
You can use the keyword arguments to pass further arguments to
Popen, which is useful for example, if you want to redirect STDIO
streams.
Return (ipc_connection, process).
"""
conn, remote_recv, remote_send = create_ipc_connection()
args = argv + [str(int(remote_recv)), str(int(remote_send))]
proc = subprocess.Popen(args, close_fds=False, **Popen_args)
return conn, proc


def prepare_subprocess_ipc(args):
"""
Prepare this process for IPC with its parent. Close all the open handles
except for the STDIN/STDOUT/STDERR and the IPC handles. Return a
:class:`Connection` to the parent process.
"""
handles = [int(arg) for arg in args]
recv_fd = Handle(handles[0]).detach_fd()
send_fd = Handle(handles[1]).detach_fd()
conn = Connection.from_fd(recv_fd, send_fd)
close_all_but([sys.stdin.fileno(),
sys.stdout.fileno(),
sys.stderr.fileno(),
recv_fd, send_fd])
return conn
82 changes: 82 additions & 0 deletions cpymad/rpc_util/posix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""
Linux specific low level stuff.
"""

from __future__ import absolute_import

import os


__all__ = [
'Handle',
]


try:
# python3: handles must be made inheritable explicitly:
_set_inheritable = os.set_inheritable
except AttributeError:
# python2: handles are inheritable by default, so nothing to do
def _set_inheritable(fd, inheritable):
pass


class Handle(object):

"""
Wrap a native file descriptor. Close on deletion.
The API is a compromise aimed at compatibility with win32.
"""

def __init__(self, handle):
"""Store a file descriptor (int)."""
self.handle = handle

@classmethod
def from_fd(cls, fd):
"""Create a :class:`Handle` instance from a file descriptor (int)."""
return cls(fd)

@classmethod
def pipe(cls):
"""
Create a unidirectional pipe.
Return a pair (recv, send) of :class:`Handle`s.
"""
recv, send = os.pipe()
return cls(recv), cls(send)

def __int__(self):
"""Get the underlying file descriptor."""
return self.handle

def __del__(self):
"""Close the file descriptor."""
self.close()

def __enter__(self):
"""Enter `with` context."""
return self

def __exit__(self, *exc_info):
"""Close the file descriptor."""
self.close()

def close(self):
"""Close the file descriptor."""
if self.handle is not None:
os.close(self.handle)
self.handle = None

def detach_fd(self):
"""Un-own and return the file descriptor."""
fd, self.handle = self.handle, None
return fd

def dup_inheritable(self):
"""Make the file descriptor inheritable."""
dup = os.dup(self.handle)
_set_inheritable(dup, True)
return self.__class__(dup)

0 comments on commit 8b992cb

Please sign in to comment.