Skip to content

Commit

Permalink
test all selectors via pytest parameterization
Browse files Browse the repository at this point in the history
  • Loading branch information
andymccurdy committed Feb 12, 2019
1 parent 78c181b commit 363c05d
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 68 deletions.
49 changes: 29 additions & 20 deletions redis/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,30 +111,37 @@ class PollSelector(BaseSelector):
A poll-based selector that should work on (almost?) all versions
of Unix
"""
_EVENT_MASK = (select.POLLIN | select.POLLPRI | select.POLLOUT |
select.POLLERR | select.POLLHUP)
_READ_MASK = select.POLLIN | select.POLLPRI
_WRITE_MASK = select.POLLOUT
READ_MASK = select.POLLIN | select.POLLPRI
ERROR_MASK = select.POLLERR | select.POLLHUP
WRITE_MASK = select.POLLOUT

_READ_POLLER_MASK = READ_MASK | ERROR_MASK
_READY_POLLER_MASK = READ_MASK | ERROR_MASK | WRITE_MASK

def __init__(self, sock):
super(PollSelector, self).__init__(sock)
self.poller = select.poll()
self.poller.register(sock, self._EVENT_MASK)
self.read_poller = select.poll()
self.read_poller.register(sock, self._READ_POLLER_MASK)
self.ready_poller = select.poll()
self.ready_poller.register(sock, self._READY_POLLER_MASK)

def close(self):
"""
Close the selector.
"""
try:
self.poller.unregister(self.sock)
except (KeyError, ValueError):
# KeyError is raised if somehow the socket was not registered
# ValueError is raised if the socket's file descriptor is
# negative.
# In either case, we can't do anything better than to remove
# the reference to the poller.
pass
self.poller = None
for poller in (self.read_poller, self.ready_poller):
try:
self.read_poller.unregister(self.sock)
except (KeyError, ValueError):
# KeyError is raised if somehow the socket was not
# registered
# ValueError is raised if the socket's file descriptor is
# negative.
# In either case, we can't do anything better than to
# remove the reference to the poller.
pass
self.read_poller = None
self.ready_poller = None
self.sock = None

def check_can_read(self, timeout=0):
Expand All @@ -145,16 +152,18 @@ def check_can_read(self, timeout=0):
This doesn't guarentee that the socket is still connected, just
that there is data to read.
"""
events = self.poller.poll(0)
return bool(events and events[0][1] & self._READ_MASK)
timeout = int(timeout * 1000)
events = self.read_poller.poll(timeout)
return bool(events and events[0][1] & self.READ_MASK)

def check_is_ready_for_command(self, timeout=0):
"""
Return True if the socket is ready to send a command,
otherwise False
"""
events = self.poller.poll(0)
return bool(events and events[0][1] == self._WRITE_MASK)
timeout = timeout * 1000
events = self.ready_poller.poll(timeout)
return bool(events and events[0][1] == self.WRITE_MASK)


def has_selector(selector):
Expand Down
48 changes: 0 additions & 48 deletions tests/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import re

from threading import Thread
from redis.client import parse_client_list
from redis.connection import ssl_available, to_bool
from .conftest import skip_if_server_version_lt

Expand Down Expand Up @@ -76,39 +75,6 @@ def test_repr_contains_db_info_unix(self):
expected = 'ConnectionPool<UnixDomainSocketConnection<path=/abc,db=1>>'
assert repr(pool) == expected

def test_pool_provides_healthy_connections(self):
pool = self.get_pool(connection_class=redis.Connection,
max_connections=2)
conn1 = pool.get_connection('_')
conn2 = pool.get_connection('_')

# set a unique name on the connection we'll be testing
conn1._same_connection_value = 'killed-client'
conn1.send_command('client', 'setname', 'redis-py-1')
assert conn1.read_response() == b'OK'
pool.release(conn1)

# find the well named client in the client list
conn2.send_command('client', 'list')
client_list = parse_client_list(conn2.read_response())
for client in client_list:
if client['name'] == 'redis-py-1':
break
else:
assert False, 'Client redis-py-1 not found in client list'

# kill the well named client
conn2.send_command('client', 'kill', client['addr'])
assert conn2.read_response() == b'OK'

# our connection should have been disconnected, but a quality
# connection pool would know this and only provide a healthy
# connection.
conn = pool.get_connection('_')
assert conn == conn1
conn.send_command('ping')
assert conn.read_response() == b'PONG'


class TestBlockingConnectionPool(object):
def get_pool(self, connection_kwargs=None, max_connections=10, timeout=20):
Expand Down Expand Up @@ -541,17 +507,3 @@ def test_connect_from_url_unix(self):
'UnixDomainSocketConnection',
'path=/path/to/socket,db=0',
)

def test_can_read(self, r):
connection = r.connection_pool.get_connection('ping')
assert not connection.can_read()
connection.send_command('ping')
# wait for the server to respond
wait_until = time.time() + 2
while time.time() < wait_until:
if connection.can_read():
break
time.sleep(0.01)
assert connection.can_read()
assert connection.read_response() == b'PONG'
assert not connection.can_read()
122 changes: 122 additions & 0 deletions tests/test_selector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import pytest
import time
from redis import selector

_SELECTORS = (
'SelectSelector',
'PollSelector',
)


@pytest.mark.parametrize('selector_name', _SELECTORS)
class TestSelector(object):

@pytest.fixture()
def selector_patch(self, selector_name, request):
"A fixture to patch the DefaultSelector with each selector"
if not hasattr(selector, selector_name):
pytest.skip('selector %s unavailable' % selector_name)
default_selector = selector._DEFAULT_SELECTOR

def revert_selector():
selector._DEFAULT_SELECTOR = default_selector
request.addfinalizer(revert_selector)

selector._DEFAULT_SELECTOR = getattr(selector, selector_name)

def kill_connection(self, connection, r):
"Helper that tells the redis server to kill `connection`"
# set a name for the connection so that we can identify and kill it
connection.send_command('client', 'setname', 'redis-py-1')
assert connection.read_response() == b'OK'

# find the client based on its name and kill it
for client in r.client_list():
if client['name'] == 'redis-py-1':
assert r.client_kill(client['addr'])
break
else:
assert False, 'Client redis-py-1 not found in client list'

def test_can_read(self, selector_patch, r):
c = r.connection_pool.get_connection('_')

# a fresh connection should not be readable
assert not c.can_read()

c.send_command('PING')
# a connection should be readable when a response is available
# note that we supply a timeout here to make sure the server has
# a chance to respond
assert c.can_read(1.0)

assert c.read_response() == b'PONG'

# once the response is read, the connection is no longer readable
assert not c.can_read()

def test_is_ready_for_command(self, selector_patch, r):
c = r.connection_pool.get_connection('_')

# a fresh connection should be ready for a new command
assert c.is_ready_for_command()

c.send_command('PING')
# once the server replies with a response, the selector should report
# that the connection is no longer ready since there is data that
# can be read. note that we need to wait for the server to respond
wait_until = time.time() + 2
while time.time() < wait_until:
if not c.is_ready_for_command():
break
time.sleep(0.01)

assert not c.is_ready_for_command()

assert c.read_response() == b'PONG'

# once the response is read, the connection should be ready again
assert c.is_ready_for_command()

def test_killed_connection_no_longer_ready(self, selector_patch, r):
"A connection that becomes disconnected is no longer ready"
c = r.connection_pool.get_connection('_')
# the connection should start as ready
assert c.is_ready_for_command()

self.kill_connection(c, r)

# the selector should immediately report that the socket is no
# longer ready
assert not c.is_ready_for_command()

def test_pool_restores_killed_connection(self, selector_patch, r2):
"""
The ConnectionPool only returns healthy connecdtions, even if the
connection was killed while idle in the pool.
"""
# r2 provides two separate clients/connection pools
r = r2[0]
c = r.connection_pool.get_connection('_')
c._test_client = True
# the connection should start as ready
assert c.is_ready_for_command()

# release the connection back to the pool
r.connection_pool.release(c)

# kill the connection that is now idle in the pool
# use the second redis client/pool instance run the kill command
# such that it doesn't manipulate the primary connection pool
self.kill_connection(c, r2[1])

assert not c.is_ready_for_command()

# retrieving the connection from the pool should provide us with
# the same connection we were previously using and it should now
# be ready for a command
c2 = r.connection_pool.get_connection('_')
assert c2 == c
assert c2._test_client is True

assert c.is_ready_for_command()

0 comments on commit 363c05d

Please sign in to comment.