Navigation Menu

Skip to content

Commit

Permalink
bugfix for sharded connection pools, and support for unix socket
Browse files Browse the repository at this point in the history
  • Loading branch information
fiorix committed Mar 16, 2012
1 parent 2bd3d05 commit 28f9689
Showing 1 changed file with 141 additions and 18 deletions.
159 changes: 141 additions & 18 deletions txredisapi.py
Expand Up @@ -39,23 +39,35 @@
from twisted.protocols import policies
from twisted.python import log


class RedisError(Exception):
pass


class ConnectionError(RedisError):
pass


class ResponseError(RedisError):
pass


class InvalidResponse(RedisError):
pass


class InvalidData(RedisError):
pass


class WatchError(RedisError):
pass


def list_or_args(command, keys, args):
oldapi = bool(args)
try:
i = iter(keys)
iter(keys)
if isinstance(keys, (str, unicode)):
raise TypeError
except TypeError:
Expand All @@ -69,6 +81,7 @@ def list_or_args(command, keys, args):
keys.extend(args)
return keys


class RedisProtocol(basic.LineReceiver, policies.TimeoutMixin):
"""
Redis client protocol.
Expand Down Expand Up @@ -134,7 +147,7 @@ def lineReceived(self, line):
return

self.resetTimeout()
token = line[0] # first byte indicates reply type
token = line[0] # first byte indicates reply type
data = line[1:]
if token == self.ERROR:
self.errorReceived(data)
Expand All @@ -153,14 +166,15 @@ def lineReceived(self, line):
self.bulkDataReceived(None)
return
else:
self.bulk_length += 2 # \r\n
self.bulk_length += 2 # \r\n
self.setRawMode()
elif token == self.MULTI_BULK:
try:
self.multi_bulk_length = long(data)
except (TypeError, ValueError):
self.replyReceived(InvalidResponse(
"Cannot convert multi-response header '%s' to integer" % data))
"Cannot convert multi-response header '%s' to integer" % \
data))
self.multi_bulk_length = 0
return
if self.multi_bulk_length == -1:
Expand Down Expand Up @@ -696,7 +710,7 @@ def sismember(self, key, value):

def sinter(self, keys, *args):
"""
Return the intersection between the Sets stored at key1, key2, ..., keyN
Return the intersection between the Sets stored at key1, ..., keyN
"""
keys = list_or_args("sinter", keys, args)
return self.execute_command("SINTER", *keys)
Expand Down Expand Up @@ -1129,6 +1143,7 @@ def punsubscribe(self, patterns):
patterns = [patterns]
return self.execute_command("PUNSUBSCRIBE", *patterns)


class SubscriberFactory(protocol.ReconnectingClientFactory):
maxDelay = 120
continueTrying = True
Expand Down Expand Up @@ -1176,6 +1191,17 @@ def __repr__(self):
(cli.host, cli.port, self._factory.size)


class UnixConnectionHandler(ConnectionHandler):
def __repr__(self):
try:
cli = self._factory.pool[0].transport.getPeer()
except:
return "<Redis Connection: Not connected>"
else:
return "<Redis Unix Connection: %s - %d connection(s)>" % \
(cli.name, self._factory.size)


ShardedMethods = [
"decr",
"delete",
Expand Down Expand Up @@ -1227,13 +1253,14 @@ def __repr__(self):

_findhash = re.compile('.+\{(.*)\}.*', re.I)


class HashRing(object):
"""Consistent hash for redis API"""
def __init__(self, nodes=[], replicas=160):
self.nodes=[]
self.replicas=replicas
self.ring={}
self.sorted_keys=[]
self.nodes = []
self.replicas = replicas
self.ring = {}
self.sorted_keys = []

for n in nodes:
self.add_node(n)
Expand Down Expand Up @@ -1264,18 +1291,21 @@ def get_node_pos(self, key):
return [None, None]
crc = zlib.crc32(key)
idx = bisect.bisect(self.sorted_keys, crc)
idx = min(idx, (self.replicas * len(self.nodes))-1) # prevents out of range index
# prevents out of range index
idx = min(idx, (self.replicas * len(self.nodes)) - 1)
return [self.ring[self.sorted_keys[idx]], idx]

def iter_nodes(self, key):
if len(self.ring) == 0: yield None, None
if len(self.ring) == 0:
yield None, None
node, pos = self.get_node_pos(key)
for k in self.sorted_keys[pos:]:
yield k, self.ring[k]

def __call__(self, key):
return self.get_node(key)


class ShardedConnectionHandler(object):
def __init__(self, connections):
if isinstance(connections, defer.DeferredList):
Expand Down Expand Up @@ -1334,7 +1364,7 @@ def mget(self, keys, *args):

deferreds = []
for node, keys in group.items():
nd=node.mget(keys)
nd = node.mget(keys)
deferreds.append(nd)

result = []
Expand All @@ -1358,11 +1388,26 @@ def __repr__(self):
return "<Redis Sharded Connection: %s>" % ", ".join(nodes)


class ShardedUnixConnectionHandler(ShardedConnectionHandler):
def __repr__(self):
nodes = []
for conn in self._ring.nodes:
try:
cli = conn._factory.pool[0].transport.getPeer()
except:
pass
else:
nodes.append("%s/%d" % \
(cli.name, conn._factory.size))
return "<Redis Sharded Connection: %s>" % ", ".join(nodes)


class RedisFactory(protocol.ReconnectingClientFactory):
maxDelay = 10
protocol = RedisProtocol

def __init__(self, dbid, poolsize, isLazy=False):
def __init__(self, dbid, poolsize, isLazy=False,
handler=ConnectionHandler):
if not isinstance(poolsize, int):
raise ValueError("Redis poolsize must be an integer, not %s" % \
repr(poolsize))
Expand All @@ -1379,7 +1424,7 @@ def __init__(self, dbid, poolsize, isLazy=False):
self.size = 0
self.pool = []
self.deferred = defer.Deferred()
self.handler = ConnectionHandler(self)
self.handler = handler(self)

def addConnection(self, conn):
self.pool.append(conn)
Expand Down Expand Up @@ -1419,7 +1464,7 @@ def getConnection(self):


def makeConnection(host, port, dbid, poolsize, reconnect, isLazy):
factory = RedisFactory(dbid, poolsize, isLazy)
factory = RedisFactory(dbid, poolsize, isLazy, ConnectionHandler)
factory.continueTrying = reconnect
for x in xrange(poolsize):
reactor.connectTCP(host, port, factory)
Expand All @@ -1429,6 +1474,7 @@ def makeConnection(host, port, dbid, poolsize, reconnect, isLazy):
else:
return factory.deferred


def makeShardedConnection(hosts, dbid, poolsize, reconnect, isLazy):
err = "Please use a list or tuple of host:port for sharded connections"
if not isinstance(hosts, (list, tuple)):
Expand All @@ -1442,8 +1488,8 @@ def makeShardedConnection(hosts, dbid, poolsize, reconnect, isLazy):
except:
raise ValueError(err)

c = makeConnection(host, port, dbid, poolsize, reconnect, isLazy)
connections.append(c)
c = makeConnection(host, port, dbid, poolsize, reconnect, isLazy)
connections.append(c)

if isLazy:
return ShardedConnectionHandler(connections)
Expand All @@ -1452,38 +1498,115 @@ def makeShardedConnection(hosts, dbid, poolsize, reconnect, isLazy):
ShardedConnectionHandler(deferred)
return deferred


def Connection(host="localhost", port=6379, dbid=0, reconnect=True):
return makeConnection(host, port, dbid, 1, reconnect, False)


def lazyConnection(host="localhost", port=6379, dbid=0, reconnect=True):
return makeConnection(host, port, dbid, 1, reconnect, True)


def ConnectionPool(host="localhost", port=6379, dbid=0,
poolsize=10, reconnect=True):
return makeConnection(host, port, dbid, poolsize, reconnect, False)


def lazyConnectionPool(host="localhost", port=6379, dbid=0,
poolsize=10, reconnect=True):
return makeConnection(host, port, dbid, poolsize, reconnect, True)


def ShardedConnection(hosts, dbid=0, reconnect=True):
return makeShardedConnection(hosts, dbid, 1, reconnect, False)


def lazyShardedConnection(hosts, dbid=0, reconnect=True):
return makeShardedConnection(hosts, dbid, 1, reconnect, True)


def ShardedConnectionPool(hosts, dbid=0, poolsize=10, reconnect=True):
return makeShardedConnection(hosts, dbid, poolsize, reconnect, False)


def lazyShardedConnectionPool(hosts, dbid=0, poolsize=10, reconnect=True):
return makeShardedConnection(hosts, dbid, poolsize, reconnect, True)


def makeUnixConnection(path, dbid, poolsize, reconnect, isLazy):
factory = RedisFactory(dbid, poolsize, isLazy, UnixConnectionHandler)
factory.continueTrying = reconnect
for x in xrange(poolsize):
reactor.connectUNIX(path, factory)

if isLazy:
return factory.handler
else:
return factory.deferred


def makeShardedUnixConnection(paths, dbid, poolsize, reconnect, isLazy):
err = "Please use a list or tuple of paths for sharded unix connections"
if not isinstance(paths, (list, tuple)):
raise ValueError(err)

connections = []
for path in paths:
c = makeUnixConnection(path, dbid, poolsize, reconnect, isLazy)
connections.append(c)

if isLazy:
return ShardedUnixConnectionHandler(connections)
else:
deferred = defer.DeferredList(connections)
ShardedUnixConnectionHandler(deferred)
return deferred


def UnixConnection(path="/tmp/redis.sock", dbid=0, reconnect=True):
return makeUnixConnection(path, dbid, 1, reconnect, False)


def lazyUnixConnection(path="/tmp/redis.sock", dbid=0, reconnect=True):
return makeUnixConnection(path, dbid, 1, reconnect, True)


def UnixConnectionPool(path="/tmp/redis.sock", dbid=0, poolsize=10,
reconnect=True):
return makeUnixConnection(path, dbid, poolsize, reconnect, False)


def lazyUnixConnectionPool(path="/tmp/redis.sock", dbid=0, poolsize=10,
reconnect=True):
return makeUnixConnection(path, dbid, poolsize, reconnect, True)


def ShardedUnixConnection(paths, dbid=0, reconnect=True):
return makeShardedUnixConnection(paths, dbid, 1, reconnect, False)


def lazyShardedUnixConnection(paths, dbid=0, reconnect=True):
return makeShardedUnixConnection(paths, dbid, 1, reconnect, True)


def ShardedUnixConnectionPool(paths, dbid=0, poolsize=10, reconnect=True):
return makeShardedUnixConnection(paths, dbid, poolsize, reconnect, False)


def lazyShardedUnixConnectionPool(paths, dbid=0, poolsize=10, reconnect=True):
return makeShardedUnixConnection(paths, dbid, poolsize, reconnect, True)


__all__ = [
Connection, lazyConnection,
ConnectionPool, lazyConnectionPool,
ShardedConnection, lazyShardedConnection,
ShardedConnectionPool, lazyShardedConnectionPool,
UnixConnection, lazyUnixConnection,
UnixConnectionPool, lazyUnixConnectionPool,
ShardedUnixConnection, lazyShardedUnixConnection,
ShardedUnixConnectionPool, lazyShardedUnixConnectionPool,
]

__version__ = version = "0.3"
__version__ = version = "0.4"
__author__ = "Alexandre Fiori"

0 comments on commit 28f9689

Please sign in to comment.