Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

We’re showing branches in this repository, but you can also compare across forks.

...
  • 5 commits
  • 2 files changed
  • 0 commit comments
  • 1 contributor
Showing with 91 additions and 42 deletions.
  1. +42 −21 gevent_zeromq/core.py
  2. +49 −21 gevent_zeromq/core.pyx
63 gevent_zeromq/core.py
View
@@ -58,6 +58,9 @@ class _Socket(_original_Socket):
respectively. This also ensures that at most one waiting greenlet is awoken
by send and recv.
+ getsockopt also consumes socket state change events, thus we also wake
+ waiting senders and receivers after an invocation of getsockopt.
+
Some doubleunderscore prefixes are used to minimize pollution of
:class:`zmq.core.socket.Socket`'s namespace.
"""
@@ -92,36 +95,45 @@ def close(self):
def __setup_events(self):
self.__readable = Event()
self.__writable = Event()
+
+ self.__readable_set = self.__readable.set
+ self.__readable_clear = self.__readable.clear
+ self.__readable_wait = self.__readable.wait
+
+ self.__writable_set = self.__writable.set
+ self.__writable_clear = self.__writable.clear
+ self.__writable_wait = self.__writable.wait
+
callback = allow_unbound_disappear(
_Socket.__state_changed, self, _Socket)
try:
- self._state_event = get_hub().loop.io(self.getsockopt(FD), 1) # read state watcher
+ self._state_event = get_hub().loop.io(self.__getsockopt(FD), 1) # read state watcher
self._state_event.start(callback)
except AttributeError:
# for gevent<1.0 compatibility
from gevent.core import read_event
- self._state_event = read_event(self.getsockopt(FD), callback, persist=True)
+ self._state_event = read_event(self.__getsockopt(FD), callback, persist=True)
def __state_changed(self, event=None, _evtype=None):
if self.closed:
# if the socket has entered a close state resume any waiting greenlets
- self.__writable.set()
- self.__readable.set()
+ self.__writable_set()
+ self.__readable_set()
return
- events = self.getsockopt(zmq.EVENTS)
- if events & zmq.POLLOUT:
- self.__writable.set()
- if events & zmq.POLLIN:
- self.__readable.set()
+ events = self.__getsockopt(EVENTS)
+ if events & POLLOUT:
+ self.__writable_set()
+ if events & POLLIN:
+ self.__readable_set()
def _wait_write(self):
- self.__writable.clear()
- self.__writable.wait()
+ self.__writable_clear()
+ self.__writable_wait()
def _wait_read(self):
- self.__readable.clear()
- self.__readable.wait()
+ self.__readable_clear()
+ self.__readable_wait()
def send(self, data, flags=0, copy=True, track=False):
@@ -129,7 +141,7 @@ def send(self, data, flags=0, copy=True, track=False):
if flags & NOBLOCK:
# check if the send lock is taken in a non-blocking manner
if not self.__send_lock.acquire(blocking=False):
- raise ZMQError(zmq.EAGAIN)
+ raise ZMQError(EAGAIN)
self.__send_lock.release()
return super(_Socket, self).send(data, flags, copy, track)
@@ -144,8 +156,8 @@ def send(self, data, flags=0, copy=True, track=False):
if e.errno != EAGAIN:
raise
finally:
- # wake a waiting reader as the readable state may have changed and send consumes this event
- self.__readable.set()
+ # wake a waiting reader as the readable state may have changed and send consumes socket state change events
+ self.__readable_set()
# we got EAGAIN, wait for socket to change state
self._wait_write()
@@ -163,9 +175,9 @@ def recv(self, flags=0, copy=True, track=False):
if flags & NOBLOCK:
# check if the recv lock is taken in a non-blocking manner
if not self.__recv_lock.acquire(blocking=False):
- raise ZMQError(zmq.EAGAIN)
+ raise ZMQError(EAGAIN)
self.__recv_lock.release()
- return _original_Socket.recv(self, flags, copy, track)
+ return super(_Socket, self).recv(flags, copy, track)
# Lock to wait for recv/recv_multipart to complete. This will also ensure that at most
# one greenlet at a time is waiting for a socket readable state change in case we get EAGAIN.
@@ -178,8 +190,8 @@ def recv(self, flags=0, copy=True, track=False):
if e.errno != EAGAIN:
raise
finally:
- # wake a waiting writer as the writable state may have changed and recv consumes this event
- self.__writable.set()
+ # wake a waiting writer as the writable state may have changed and recv consumes socket state change events
+ self.__writable_set()
# we got EAGAIN, wait for socket to change state
self._wait_read()
@@ -189,6 +201,15 @@ def recv_multipart(self, flags=0, copy=True, track=False):
# so we use a lock to ensure that there's only ony greenlet
# calling recv_multipart at any time.
with self.__recv_lock:
- return _original_Socket.recv_multipart(self, flags, copy, track)
+ return super(_Socket, self).recv_multipart(flags, copy, track)
+ def __getsockopt(self, option):
+ return super(_Socket, self).getsockopt(option)
+ def getsockopt(self, option):
+ try:
+ return self.__getsockopt(option)
+ finally:
+ # wake a waiting reader and a writer as the writable/readable state may have changed and getsockopt consumes socket state change events
+ self.__writable_set()
+ self.__readable_set()
70 gevent_zeromq/core.pyx
View
@@ -60,6 +60,9 @@ cdef class _Socket(_original_Socket):
respectively. This also ensures that at most one waiting greenlet is awoken
by send and recv.
+ getsockopt also consumes socket state change events, thus we also wake
+ waiting senders and receivers after an invocation of getsockopt.
+
Some doubleunderscore prefixes are used to minimize pollution of
:class:`zmq.core.socket.Socket`'s namespace.
"""
@@ -70,6 +73,13 @@ cdef class _Socket(_original_Socket):
cdef object __weakref__
cdef public object _state_event
+ cdef object __readable_set
+ cdef object __writable_set
+ cdef object __writable_clear
+ cdef object __readable_clear
+ cdef object __writable_wait
+ cdef object __readable_wait
+
def __init__(self, _Context context, int socket_type):
super(_Socket, self).__init__(context, socket_type)
self.__setup_events()
@@ -89,44 +99,53 @@ cdef class _Socket(_original_Socket):
cdef __setup_events(self) with gil:
self.__readable = Event()
self.__writable = Event()
+
+ self.__readable_set = self.__readable.set
+ self.__readable_clear = self.__readable.clear
+ self.__readable_wait = self.__readable.wait
+
+ self.__writable_set = self.__writable.set
+ self.__writable_clear = self.__writable.clear
+ self.__writable_wait = self.__writable.wait
+
callback = allow_unbound_disappear(
_Socket.__state_changed, self, _Socket)
try:
- self._state_event = get_hub().loop.io(self.getsockopt(FD), 1) # read state watcher
+ self._state_event = get_hub().loop.io(self.__getsockopt(FD), 1) # read state watcher
self._state_event.start(callback)
except AttributeError, e:
# for gevent<1.0 compatibility
from gevent.core import read_event
- self._state_event = read_event(self.getsockopt(FD), callback, persist=True)
+ self._state_event = read_event(self.__getsockopt(FD), callback, persist=True)
def __state_changed(self, event=None, _evtype=None):
if self.closed:
# if the socket has entered a close state resume any waiting greenlets
- self.__writable.set()
- self.__readable.set()
+ self.__writable_set()
+ self.__readable_set()
return
- cdef int events = self.getsockopt(EVENTS)
+ cdef int events = self.__getsockopt(EVENTS)
if events & POLLOUT:
- self.__writable.set()
+ self.__writable_set()
if events & POLLIN:
- self.__readable.set()
+ self.__readable_set()
- cdef _wait_write(self) with gil:
- self.__writable.clear()
- self.__writable.wait()
+ cdef inline _wait_write(self) with gil:
+ self.__writable_clear()
+ self.__writable_wait()
- cdef _wait_read(self) with gil:
- self.__readable.clear()
- self.__readable.wait()
+ cdef inline _wait_read(self) with gil:
+ self.__readable_clear()
+ self.__readable_wait()
- def send(self, object data, int flags=0, copy=True, track=False):
+ cpdef send(self, object data, int flags=0, copy=True, track=False):
# if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised
if flags & NOBLOCK:
# check if the send lock is taken in a non-blocking manner
if not self.__send_lock.acquire(blocking=False):
- raise ZMQError(zmq.EAGAIN)
+ raise ZMQError(EAGAIN)
self.__send_lock.release()
return _original_Socket.send(self, data, flags, copy, track)
@@ -141,8 +160,8 @@ cdef class _Socket(_original_Socket):
if e.errno != EAGAIN:
raise
finally:
- # wake a waiting reader as the readable state may have changed and send consumes this event
- self.__readable.set()
+ # wake a waiting reader as the readable state may have changed and send consumes socket state change events
+ self.__readable_set()
# we got EAGAIN, wait for socket to change state
self._wait_write()
@@ -154,13 +173,13 @@ cdef class _Socket(_original_Socket):
with self.__send_lock:
return _original_Socket.send_multipart(self, msg_parts, flags, copy, track)
- def recv(self, int flags=0, copy=True, track=False):
+ cpdef recv(self, int flags=0, copy=True, track=False):
# if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised
if flags & NOBLOCK:
# check if the recv lock is taken in a non-blocking manner
if not self.__recv_lock.acquire(blocking=False):
- raise ZMQError(zmq.EAGAIN)
+ raise ZMQError(EAGAIN)
self.__recv_lock.release()
return _original_Socket.recv(self, flags, copy, track)
@@ -175,8 +194,8 @@ cdef class _Socket(_original_Socket):
if e.errno != EAGAIN:
raise
finally:
- # wake a waiting writer as the writable state may have changed and recv consumes this event
- self.__writable.set()
+ # wake a waiting writer as the writable state may have changed and recv consumes socket state change events
+ self.__writable_set()
# we got EAGAIN, wait for socket to change state
self._wait_read()
@@ -188,4 +207,13 @@ cdef class _Socket(_original_Socket):
with self.__recv_lock:
return _original_Socket.recv_multipart(self, flags, copy, track)
+ cdef inline __getsockopt(self, int option):
+ return _original_Socket.getsockopt(self, option)
+ def getsockopt(self, int option):
+ try:
+ return self.__getsockopt(option)
+ finally:
+ # wake a waiting reader and a writer as the writable/readable state may have changed and getsockopt consumes socket state change events
+ self.__writable_set()
+ self.__readable_set()

No commit comments for this range

Something went wrong with that request. Please try again.