Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

update connection multiplexing code to use epoll when available, fall…

… back to select when not
  • Loading branch information...
commit 595f1898de785af3cb8aade78541bfaddc52ce14 1 parent 19f6fe5
@kylemcc kylemcc authored
View
1  .gitignore
@@ -12,3 +12,4 @@ gearman/._*
MANIFEST
dist
build
+*.swo
View
87 gearman/connection_manager.py
@@ -1,6 +1,6 @@
import logging
-import select as select_lib
+import gearman.io
import gearman.util
from gearman.connection import GearmanConnection
from gearman.constants import _DEBUG_MODE_
@@ -109,50 +109,27 @@ def establish_connection(self, current_connection):
current_handler.initial_state(**self.handler_initial_state)
return current_connection
- def poll_connections_once(self, submitted_connections, timeout=None):
- """Does a single robust select, catching socket errors"""
- select_connections = set(current_connection for current_connection in submitted_connections if current_connection.connected)
-
- rd_connections = set()
- wr_connections = set()
- ex_connections = set()
-
- if timeout is not None and timeout < 0.0:
- return rd_connections, wr_connections, ex_connections
-
- successful_select = False
- while not successful_select and select_connections:
- select_connections -= ex_connections
- check_rd_connections = [current_connection for current_connection in select_connections if current_connection.readable()]
- check_wr_connections = [current_connection for current_connection in select_connections if current_connection.writable()]
-
- try:
- rd_list, wr_list, ex_list = gearman.util.select(check_rd_connections, check_wr_connections, select_connections, timeout=timeout)
- rd_connections |= set(rd_list)
- wr_connections |= set(wr_list)
- ex_connections |= set(ex_list)
-
- successful_select = True
- except (select_lib.error, ConnectionError):
- # On any exception, we're going to assume we ran into a socket exception
- # We'll need to fish for bad connections as suggested at
- #
- # http://www.amk.ca/python/howto/sockets/
- for conn_to_test in select_connections:
- try:
- _, _, _ = gearman.util.select([conn_to_test], [], [], timeout=0)
- except (select_lib.error, ConnectionError):
- rd_connections.discard(conn_to_test)
- wr_connections.discard(conn_to_test)
- ex_connections.add(conn_to_test)
-
- gearman_logger.error('select error: %r' % conn_to_test)
-
- if _DEBUG_MODE_:
- gearman_logger.debug('select :: Poll - %d :: Read - %d :: Write - %d :: Error - %d', \
- len(select_connections), len(rd_connections), len(wr_connections), len(ex_connections))
-
- return rd_connections, wr_connections, ex_connections
+ def poll_connections_once(self, poller, connection_map, timeout=None):
+ # a timeout of -1 when used with epoll will block until there
+ # is activity. Select does not support negative timeouts, so this
+ # is translated to a timeout=None when falling back to select
+ timeout = timeout or -1
+
+ readable = set()
+ writable = set()
+ errors = set()
+ for fileno, events in poller.poll(timeout=timeout):
+ conn = connection_map.get(fileno)
+ if not conn:
+ continue
+ if events & gearman.io.READ:
+ readable.add(conn)
+ if events & gearman.io.WRITE:
+ writable.add(conn)
+ if events & gearman.io.ERROR:
+ errors.add(conn)
+
+ return readable, writable, errors
def handle_connection_activity(self, rd_connections, wr_connections, ex_connections):
"""Process all connection activity... executes all handle_* callbacks"""
@@ -178,14 +155,30 @@ def handle_connection_activity(self, rd_connections, wr_connections, ex_connecti
failed_connections = ex_connections | dead_connections
return rd_connections, wr_connections, failed_connections
+ def _register_connections_with_poller(self, connections, poller):
+ for conn in connections:
+ events = 0
+ if conn.readable():
+ events |= gearman.io.READ
+ if conn.writable():
+ events |= gearman.io.WRITE
+ poller.register(conn, events)
+
def poll_connections_until_stopped(self, submitted_connections, callback_fxn, timeout=None):
"""Continue to poll our connections until we receive a stopping condition"""
stopwatch = gearman.util.Stopwatch(timeout)
submitted_connections = set(submitted_connections)
+ connection_map = {}
any_activity = False
callback_ok = callback_fxn(any_activity)
connection_ok = compat.any(current_connection.connected for current_connection in submitted_connections)
+ poller = gearman.io.get_connection_poller()
+ if connection_ok:
+ self._register_connections_with_poller(submitted_connections,
+ poller)
+ connection_map = dict([(c.fileno(), c) for c in
+ submitted_connections if c.connected])
while connection_ok and callback_ok:
time_remaining = stopwatch.get_time_remaining()
@@ -193,7 +186,7 @@ def poll_connections_until_stopped(self, submitted_connections, callback_fxn, ti
break
# Do a single robust select and handle all connection activity
- read_connections, write_connections, dead_connections = self.poll_connections_once(submitted_connections, timeout=time_remaining)
+ read_connections, write_connections, dead_connections = self.poll_connections_once(poller, connection_map, timeout=time_remaining)
# Handle reads and writes and close all of the dead connections
read_connections, write_connections, dead_connections = self.handle_connection_activity(read_connections, write_connections, dead_connections)
@@ -206,6 +199,8 @@ def poll_connections_until_stopped(self, submitted_connections, callback_fxn, ti
callback_ok = callback_fxn(any_activity)
connection_ok = compat.any(current_connection.connected for current_connection in submitted_connections)
+ poller.close()
+
# We should raise here if we have no alive connections (don't go into a select polling loop with no connections)
if not connection_ok:
raise ServerUnavailable('Found no valid connections in list: %r' % self.connection_list)
View
155 gearman/io.py
@@ -0,0 +1,155 @@
+import select
+
+import gearman.errors
+import gearman.util
+
+# epoll event types
+_EPOLLIN = 0x01
+_EPOLLOUT = 0x04
+_EPOLLERR = 0x08
+_EPOLLHUP = 0x10
+
+READ = _EPOLLIN
+WRITE = _EPOLLOUT
+ERROR = _EPOLLERR | _EPOLLHUP
+
+def get_connection_poller():
+ """
+ Returns a select.epoll-like object. Depending on the platform, this will
+ either be:
+ - On modern Linux system, with python >= 2.6: select.epoll
+ - On all other systems: gearman.io._Select: an object that mimics
+ select.epoll, but uses select.select
+ """
+ if hasattr(select, "epoll"):
+ return select.epoll()
+ else:
+ return _Select()
+
+def _find_bad_connections(connections):
+ """
+ Find any bad connections in a list of connections.
+
+ For use with select.select.
+
+ When select throws an exception, it's likely that one of the sockets
+ passed in has died. In order to find the bad connections, they must be
+ checked individually. This will do so and return a list of any bad
+ connections found.
+ """
+ bad = []
+ for conn in connections:
+ try:
+ _, _, _ = gearman.util.select([conn], [], [], timeout=0)
+ except (select.error, gearman.errors.ConnectionError):
+ bad.append(conn)
+ return bad
+
+class _Select(object):
+ """
+ A `select.epoll`-like object that uses select.select.
+
+ Used as a fallback when epoll is not available. Inspired by tornado's
+ fallback mechanism
+ """
+
+ def __init__(self):
+ self.read = set()
+ self.write = set()
+ self.error = set()
+
+ def close(self):
+ """
+ Close the _Select object. For parity with select.epoll. Does nothing
+ here.
+ """
+ pass
+
+ def register(self, fd, evmask):
+ """
+ Register a file descriptor for polling.
+
+ fd: a file descriptor (socket) to be registers
+ evmask: a bit set describing the desired events to report
+
+ Events are similar to those accepted by select.epoll:
+ - gearman.io.READ: report when fd is readable (i.e.: a socket.recv
+ operation likely won't block, and will yield some data)
+ - gearman.io.WRITE: report when fd is writable (i.e.: a socket.send
+ operation likely won't block, and will be able to write some
+ data)
+ - gearman.io.ERROR: report when fd is in an error state
+ """
+ if fd in self.read or fd in self.write or fd in self.error:
+ raise ValueError("Connection already registered: %d" % fd.fileno())
+ if evmask & READ:
+ self.read.add(fd)
+ if evmask & WRITE:
+ self.write.add(fd)
+ if evmask & ERROR:
+ self.error.add(fd)
+
+ def modify(self, fd, evmask):
+ """
+ Update the IO events that should be reported for a given file
+ descriptor. See _Select.register for details on these events
+ """
+ self.unregister(fd)
+ self.register(fd, evmask)
+
+ def unregister(self, fd):
+ """
+ Stop tracking events for a given file descriptor
+ """
+ self.read.discard(fd)
+ self.write.discard(fd)
+ self.error.discard(fd)
+
+ def poll(self, timeout):
+ """
+ Wait for events for any of the of register file descriptors. The
+ maximum time to wait is specified by the timeout value.
+
+ A timeout < 0 will block indefinitely. A timeout of 0 will not block at
+ all. And, a timeout > 0 will block for at most that many seconds. The
+ timeout parameter may be a floating point number.
+ """
+ readable = set()
+ writable = set()
+ errors = set()
+
+ if timeout is not None and timeout < 0.0:
+ # for parity with epoll, negative timeout = block until there
+ # is activity
+ timeout = None
+
+ connections = (self.read|self.write|self.error)
+
+ success = False
+ while not success and connections:
+ connections -= errors
+ try:
+ r, w, e = gearman.util.select(self.read,
+ self.write, self.error, timeout)
+ readable = set(r)
+ writable = set(w)
+ errors |= set(e) #this set could already be populated
+ success = True
+ except (select.error, gearman.errors.ConnectionError):
+ bad_conns = _find_bad_connections(connections)
+ map(self.read.discard, bad_conns)
+ map(self.write.discard, bad_conns)
+ map(self.error.discard, bad_conns)
+ errors |= set(bad_conns)
+
+
+ events = {}
+ for conn in readable:
+ events[conn.fileno()] = events.get(conn.fileno(), 0) | READ
+ for conn in writable:
+ events[conn.fileno()] = events.get(conn.fileno(), 0) | WRITE
+ for conn in errors:
+ events[conn.fileno()] = events.get(conn.fileno(), 0) | ERROR
+
+ return events.items()
+
View
9 tests/_core_testing.py
@@ -35,15 +35,22 @@ def send_data_to_socket(self):
if self._fail_on_write:
self.throw_exception(message='mock write failure')
+ def fileno(self):
+ # 73 is the best number, so why not?
+ return 73
+
def __repr__(self):
return ('<GearmanConnection %s:%d connected=%s> (%s)' %
(self.gearman_host, self.gearman_port, self.connected, id(self)))
class MockGearmanConnectionManager(GearmanConnectionManager):
"""Handy mock client base to test Worker/Client/Abstract ClientBases"""
- def poll_connections_once(self, connections, timeout=None):
+ def poll_connections_once(self, poller, connection_map, timeout=None):
return set(), set(), set()
+ def _register_connections_with_poller(self, connections, poller):
+ pass
+
class _GearmanAbstractTest(unittest.TestCase):
connection_class = MockGearmanConnection
connection_manager_class = MockGearmanConnectionManager
Please sign in to comment.
Something went wrong with that request. Please try again.