Skip to content
This repository has been archived by the owner on Apr 18, 2018. It is now read-only.

Commit

Permalink
update connection multiplexing code to use epoll when available, fall…
Browse files Browse the repository at this point in the history
… back to select when not
  • Loading branch information
kylemcc committed Feb 6, 2013
1 parent 19f6fe5 commit 595f189
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 47 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ gearman/._*
MANIFEST
dist
build
*.swo
87 changes: 41 additions & 46 deletions gearman/connection_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import select as select_lib

import gearman.io
import gearman.util
from gearman.connection import GearmanConnection
from gearman.constants import _DEBUG_MODE_
Expand Down Expand Up @@ -109,50 +109,27 @@ def establish_connection(self, current_connection):
current_handler.initial_state(**self.handler_initial_state)
return current_connection

def poll_connections_once(self, submitted_connections, timeout=None):
"""Does a single robust select, catching socket errors"""
select_connections = set(current_connection for current_connection in submitted_connections if current_connection.connected)

rd_connections = set()
wr_connections = set()
ex_connections = set()

if timeout is not None and timeout < 0.0:
return rd_connections, wr_connections, ex_connections

successful_select = False
while not successful_select and select_connections:
select_connections -= ex_connections
check_rd_connections = [current_connection for current_connection in select_connections if current_connection.readable()]
check_wr_connections = [current_connection for current_connection in select_connections if current_connection.writable()]

try:
rd_list, wr_list, ex_list = gearman.util.select(check_rd_connections, check_wr_connections, select_connections, timeout=timeout)
rd_connections |= set(rd_list)
wr_connections |= set(wr_list)
ex_connections |= set(ex_list)

successful_select = True
except (select_lib.error, ConnectionError):
# On any exception, we're going to assume we ran into a socket exception
# We'll need to fish for bad connections as suggested at
#
# http://www.amk.ca/python/howto/sockets/
for conn_to_test in select_connections:
try:
_, _, _ = gearman.util.select([conn_to_test], [], [], timeout=0)
except (select_lib.error, ConnectionError):
rd_connections.discard(conn_to_test)
wr_connections.discard(conn_to_test)
ex_connections.add(conn_to_test)

gearman_logger.error('select error: %r' % conn_to_test)

if _DEBUG_MODE_:
gearman_logger.debug('select :: Poll - %d :: Read - %d :: Write - %d :: Error - %d', \
len(select_connections), len(rd_connections), len(wr_connections), len(ex_connections))

return rd_connections, wr_connections, ex_connections
def poll_connections_once(self, poller, connection_map, timeout=None):
# a timeout of -1 when used with epoll will block until there
# is activity. Select does not support negative timeouts, so this
# is translated to a timeout=None when falling back to select
timeout = timeout or -1

readable = set()
writable = set()
errors = set()
for fileno, events in poller.poll(timeout=timeout):
conn = connection_map.get(fileno)
if not conn:
continue
if events & gearman.io.READ:
readable.add(conn)
if events & gearman.io.WRITE:
writable.add(conn)
if events & gearman.io.ERROR:
errors.add(conn)

return readable, writable, errors

def handle_connection_activity(self, rd_connections, wr_connections, ex_connections):
"""Process all connection activity... executes all handle_* callbacks"""
Expand All @@ -178,22 +155,38 @@ def handle_connection_activity(self, rd_connections, wr_connections, ex_connecti
failed_connections = ex_connections | dead_connections
return rd_connections, wr_connections, failed_connections

def _register_connections_with_poller(self, connections, poller):
for conn in connections:
events = 0
if conn.readable():
events |= gearman.io.READ
if conn.writable():
events |= gearman.io.WRITE
poller.register(conn, events)

def poll_connections_until_stopped(self, submitted_connections, callback_fxn, timeout=None):
"""Continue to poll our connections until we receive a stopping condition"""
stopwatch = gearman.util.Stopwatch(timeout)
submitted_connections = set(submitted_connections)
connection_map = {}

any_activity = False
callback_ok = callback_fxn(any_activity)
connection_ok = compat.any(current_connection.connected for current_connection in submitted_connections)
poller = gearman.io.get_connection_poller()
if connection_ok:
self._register_connections_with_poller(submitted_connections,
poller)
connection_map = dict([(c.fileno(), c) for c in
submitted_connections if c.connected])

while connection_ok and callback_ok:
time_remaining = stopwatch.get_time_remaining()
if time_remaining == 0.0:
break

# Do a single robust select and handle all connection activity
read_connections, write_connections, dead_connections = self.poll_connections_once(submitted_connections, timeout=time_remaining)
read_connections, write_connections, dead_connections = self.poll_connections_once(poller, connection_map, timeout=time_remaining)

# Handle reads and writes and close all of the dead connections
read_connections, write_connections, dead_connections = self.handle_connection_activity(read_connections, write_connections, dead_connections)
Expand All @@ -206,6 +199,8 @@ def poll_connections_until_stopped(self, submitted_connections, callback_fxn, ti
callback_ok = callback_fxn(any_activity)
connection_ok = compat.any(current_connection.connected for current_connection in submitted_connections)

poller.close()

# We should raise here if we have no alive connections (don't go into a select polling loop with no connections)
if not connection_ok:
raise ServerUnavailable('Found no valid connections in list: %r' % self.connection_list)
Expand Down
155 changes: 155 additions & 0 deletions gearman/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import select

import gearman.errors
import gearman.util

# epoll event types
_EPOLLIN = 0x01
_EPOLLOUT = 0x04
_EPOLLERR = 0x08
_EPOLLHUP = 0x10

READ = _EPOLLIN
WRITE = _EPOLLOUT
ERROR = _EPOLLERR | _EPOLLHUP

def get_connection_poller():
"""
Returns a select.epoll-like object. Depending on the platform, this will
either be:
- On modern Linux system, with python >= 2.6: select.epoll
- On all other systems: gearman.io._Select: an object that mimics
select.epoll, but uses select.select
"""
if hasattr(select, "epoll"):
return select.epoll()
else:
return _Select()

def _find_bad_connections(connections):
"""
Find any bad connections in a list of connections.
For use with select.select.
When select throws an exception, it's likely that one of the sockets
passed in has died. In order to find the bad connections, they must be
checked individually. This will do so and return a list of any bad
connections found.
"""
bad = []
for conn in connections:
try:
_, _, _ = gearman.util.select([conn], [], [], timeout=0)
except (select.error, gearman.errors.ConnectionError):
bad.append(conn)
return bad

class _Select(object):
"""
A `select.epoll`-like object that uses select.select.
Used as a fallback when epoll is not available. Inspired by tornado's
fallback mechanism
"""

def __init__(self):
self.read = set()
self.write = set()
self.error = set()

def close(self):
"""
Close the _Select object. For parity with select.epoll. Does nothing
here.
"""
pass

def register(self, fd, evmask):
"""
Register a file descriptor for polling.
fd: a file descriptor (socket) to be registers
evmask: a bit set describing the desired events to report
Events are similar to those accepted by select.epoll:
- gearman.io.READ: report when fd is readable (i.e.: a socket.recv
operation likely won't block, and will yield some data)
- gearman.io.WRITE: report when fd is writable (i.e.: a socket.send
operation likely won't block, and will be able to write some
data)
- gearman.io.ERROR: report when fd is in an error state
"""
if fd in self.read or fd in self.write or fd in self.error:
raise ValueError("Connection already registered: %d" % fd.fileno())
if evmask & READ:
self.read.add(fd)
if evmask & WRITE:
self.write.add(fd)
if evmask & ERROR:
self.error.add(fd)

def modify(self, fd, evmask):
"""
Update the IO events that should be reported for a given file
descriptor. See _Select.register for details on these events
"""
self.unregister(fd)
self.register(fd, evmask)

def unregister(self, fd):
"""
Stop tracking events for a given file descriptor
"""
self.read.discard(fd)
self.write.discard(fd)
self.error.discard(fd)

def poll(self, timeout):
"""
Wait for events for any of the of register file descriptors. The
maximum time to wait is specified by the timeout value.
A timeout < 0 will block indefinitely. A timeout of 0 will not block at
all. And, a timeout > 0 will block for at most that many seconds. The
timeout parameter may be a floating point number.
"""
readable = set()
writable = set()
errors = set()

if timeout is not None and timeout < 0.0:
# for parity with epoll, negative timeout = block until there
# is activity
timeout = None

connections = (self.read|self.write|self.error)

success = False
while not success and connections:
connections -= errors
try:
r, w, e = gearman.util.select(self.read,
self.write, self.error, timeout)
readable = set(r)
writable = set(w)
errors |= set(e) #this set could already be populated
success = True
except (select.error, gearman.errors.ConnectionError):
bad_conns = _find_bad_connections(connections)
map(self.read.discard, bad_conns)
map(self.write.discard, bad_conns)
map(self.error.discard, bad_conns)
errors |= set(bad_conns)


events = {}
for conn in readable:
events[conn.fileno()] = events.get(conn.fileno(), 0) | READ
for conn in writable:
events[conn.fileno()] = events.get(conn.fileno(), 0) | WRITE
for conn in errors:
events[conn.fileno()] = events.get(conn.fileno(), 0) | ERROR

return events.items()

9 changes: 8 additions & 1 deletion tests/_core_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,22 @@ def send_data_to_socket(self):
if self._fail_on_write:
self.throw_exception(message='mock write failure')

def fileno(self):
# 73 is the best number, so why not?
return 73

def __repr__(self):
return ('<GearmanConnection %s:%d connected=%s> (%s)' %
(self.gearman_host, self.gearman_port, self.connected, id(self)))

class MockGearmanConnectionManager(GearmanConnectionManager):
"""Handy mock client base to test Worker/Client/Abstract ClientBases"""
def poll_connections_once(self, connections, timeout=None):
def poll_connections_once(self, poller, connection_map, timeout=None):
return set(), set(), set()

def _register_connections_with_poller(self, connections, poller):
pass

class _GearmanAbstractTest(unittest.TestCase):
connection_class = MockGearmanConnection
connection_manager_class = MockGearmanConnectionManager
Expand Down

0 comments on commit 595f189

Please sign in to comment.