Permalink
Browse files

Now supports Python 2.5

  • Loading branch information...
1 parent f2c2627 commit 568b03119ded3692956733faa9ae61d7fa4ed7b3 @ask committed Apr 20, 2012
Showing with 120 additions and 35 deletions.
  1. +7 −1 CHANGES.txt
  2. +66 −0 Modules/_billiard/connection.h
  3. +21 −0 billiard/compat.py
  4. +4 −3 billiard/connection.py
  5. +2 −2 billiard/heap.py
  6. +12 −22 billiard/process.py
  7. +8 −7 billiard/synchronize.py
View
@@ -1,9 +1,15 @@
+2.7.3.1 - 2012-04-20
+--------------------
+
+- Python 2.5 support added.
+
2.7.3.0 - 2012-04-20
--------------------
- Updated from Python 2.7.3
-- Python 2.5 support removed (may be added again later).
+- Python 2.4 support removed, now only supports 2.5, 2.6 and 2.7.
+ (may consider py3k support at some point).
- Pool improvments from Celery.
@@ -185,6 +185,7 @@ Billiard_connection_recvbytes(BilliardConnectionObject *self, PyObject *args)
return result;
}
+#ifdef HAS_NEW_PY_BUFFER
static PyObject *
Billiard_connection_recvbytes_into(BilliardConnectionObject *self, PyObject *args)
{
@@ -251,6 +252,71 @@ Billiard_connection_recvbytes_into(BilliardConnectionObject *self, PyObject *arg
result = NULL;
goto _cleanup;
}
+# else /* old buffer protocol */
+
+static PyObject *
+Billiard_connection_recvbytes_into(BilliardConnectionObject *self, PyObject *args)
+{
+ char *freeme = NULL, *buffer = NULL;
+ Py_ssize_t length = 0, res, offset = 0;
+ PyObject *result = NULL;
+
+ CHECK_READABLE(self);
+
+ if (!PyArg_ParseTuple(args, "w#|", F_PY_SSIZE_T,
+ &buffer, &length, &offset))
+ return NULL;
+
+ if (offset < 0) {
+ PyErr_SetString(PyExc_ValueError, "negative offset");
+ goto _error;
+ }
+
+ if (offset > 0) {
+ PyErr_SetString(PyExc_ValueError, "offset out of bounds");
+ goto _error;
+ }
+
+ res = Billiard_conn_recv_string(self, buffer+offset, length-offset,
+ &freeme, PY_SSIZE_T_MAX);
+ if (res < 0) {
+ if (res == MP_BAD_MESSAGE_LENGTH) {
+ if ((self->flags & WRITABLE) == 0) {
+ Py_BEGIN_ALLOW_THREADS
+ CLOSE(self->handle);
+ Py_END_ALLOW_THREADS;
+ self->handle = INVALID_HANDLE_VALUE;
+ } else {
+ self->flags = WRITABLE;
+ }
+ }
+ Billiard_SetError(PyExc_IOError, res);
+ } else {
+ if (freeme == NULL) {
+ result = PyInt_FromSsize_t(res);
+ } else {
+ result = PyObject_CallFunction(BufferTooShort,
+ F_RBUFFER "#",
+ freeme, res);
+ PyMem_Free(freeme);
+ if (result) {
+ PyErr_SetObject(BufferTooShort, result);
+ Py_DECREF(result);
+ }
+ goto _error;
+ }
+ }
+
+_cleanup:
+ return result;
+
+_error:
+ result = NULL;
+ goto _cleanup;
+
+}
+# endif /* buffer */
+
/*
* Functions for transferring objects
View
@@ -0,0 +1,21 @@
+import errno
+import os
+import sys
+
+try:
+ bytes
+except NameError:
+ bytes = str
+
+
+try:
+ closerange = os.closerange
+except AttributeError:
+
+ def closerange(fd_low, fd_high): # noqa
+ for fd in reversed(xrange(fd_low, fd_high)):
+ try:
+ os.close(fd)
+ except OSError, exc:
+ if exc.errno != errno.EBADF:
+ raise
View
@@ -21,6 +21,7 @@
from . import current_process, AuthenticationError
from .util import get_temp_dir, Finalize, sub_debug, debug
from .forking import duplicate, close
+from .compat import bytes
#
@@ -361,9 +362,9 @@ def PipeClient(address):
MESSAGE_LENGTH = 20
-CHALLENGE = b'#CHALLENGE#'
-WELCOME = b'#WELCOME#'
-FAILURE = b'#FAILURE#'
+CHALLENGE = bytes('#CHALLENGE#')
+WELCOME = bytes('#WELCOME#')
+FAILURE = bytes('#FAILURE#')
def deliver_challenge(connection, authkey):
import hmac
View
@@ -62,9 +62,9 @@ def __init__(self, size, fileno=-1):
if fileno == -1 and not _forking_is_enabled:
name = os.path.join(
get_temp_dir(),
- 'pym-%d-%d' % (os.getpid(), next(self._counter)))
+ 'pym-%d-%d' % (os.getpid(), self._counter.next()))
self.fileno = os.open(
- name, os.O_RDWR | os.O_CREAT | os.O_EXCL, 0o600)
+ name, os.O_RDWR | os.O_CREAT | os.O_EXCL, 0600)
os.unlink(name)
os.ftruncate(self.fileno, size)
self.buffer = mmap.mmap(self.fileno, self.size)
View
@@ -19,6 +19,8 @@
import itertools
import binascii
+from .compat import bytes
+
#
#
#
@@ -136,40 +138,28 @@ def is_alive(self):
self._popen.poll()
return self._popen.returncode is None
- @property
- def name(self):
+ def _get_name(self):
return self._name
- @name.setter
- def name(self, name):
+ def _set_name(self, value):
assert isinstance(name, basestring), 'name must be a string'
- self._name = name
+ self._name = value
+ name = property(_get_name, _set_name)
- @property
- def daemon(self):
- '''
- Return whether process is a daemon
- '''
+ def _get_daemon(self):
return self._daemonic
- @daemon.setter
- def daemon(self, daemonic):
- '''
- Set whether process is a daemon
- '''
+ def _set_daemon(self, daemonic):
assert self._popen is None, 'process has already started'
self._daemonic = daemonic
+ daemon = property(_get_daemon, _set_daemon)
- @property
- def authkey(self):
+ def _get_authkey(self):
return self._authkey
- @authkey.setter
- def authkey(self, authkey):
- '''
- Set authorization key of process
- '''
+ def _set_authkey(self, authkey):
self._authkey = AuthenticationString(authkey)
+ authkey = property(_get_authkey, _set_authkey)
@property
def exitcode(self):
View
@@ -25,6 +25,7 @@
from .process import current_process
from .util import Finalize, register_after_fork, debug
from .forking import assert_spawning, Popen
+from .compat import bytes, closerange
# Try to import the mp.synchronize module cleanly, if it fails
# raise ImportError for platforms lacking a working sem_open implementation.
@@ -100,7 +101,7 @@ def __setstate__(self, state):
@staticmethod
def _make_name():
return '/%s-%s-%s' % (current_process()._semprefix,
- os.getpid(), next(SemLock._counter))
+ os.getpid(), SemLock._counter.next())
#
# Semaphore
@@ -352,14 +353,14 @@ def wait(self, timeout=None):
#
def _cleanup_semaphore_if_leaked(name):
- name = name.encode('ascii') + b'\0'
+ name = name.encode('ascii') + bytes('\0')
if len(name) > 512:
# posix guarantees that writes to a pipe of less than PIPE_BUF
# bytes are atomic, and that PIPE_BUF >= 512
raise ValueError('name too long')
fd = _get_unlinkfd()
- bytes = os.write(fd, name)
- assert bytes == len(name)
+ bits = os.write(fd, name)
+ assert bits == len(name)
def _get_unlinkfd():
cp = current_process()
@@ -388,8 +389,8 @@ def _collect_names_then_unlink(r):
MAXFD = os.sysconf("SC_OPEN_MAX")
except:
MAXFD = 256
- os.closerange(0, r)
- os.closerange(r+1, MAXFD)
+ closerange(0, r)
+ closerange(r+1, MAXFD)
# collect data written to pipe
data = []
@@ -406,7 +407,7 @@ def _collect_names_then_unlink(r):
data.append(s)
# attempt to unlink each collected name
- for name in b''.join(data).split(b'\0'):
+ for name in bytes('').join(data).split(bytes('\0')):
try:
sem_unlink(name.decode('ascii'))
except:

0 comments on commit 568b031

Please sign in to comment.