Skip to content

Commit

Permalink
Merge remote-tracking branch 'andymccurdy/watch' into watch_fixes
Browse files Browse the repository at this point in the history
Conflicts:
	redis/client.py
	tests/server_commands.py
  • Loading branch information
tilgovi committed Jul 12, 2011
2 parents 82ca44f + 9641968 commit 4bc9b77
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 119 deletions.
11 changes: 11 additions & 0 deletions CHANGES
@@ -1,6 +1,17 @@
* 2.4.6 (in development)
* Variadic arguments for SADD, SREM, ZREN, HDEL, LPUSH, and RPUSH. Thanks
Raphaël Vinot.
* Fixed an error in the Hiredis parser that occasionally caused the
socket connection to become corrupted and unusable. This became noticeable
once connection pools started to be used.
* ZRANGE, ZREVRANGE, ZRANGEBYSCORE, and ZREVRANGEBYSCORE now take an
additional optional argument, score_cast_func, which is a callable used
to cast the score value in the return type. The default is float.
* Removed the PUBLISH method from the PubSub class. Connections that are
[P]SUBSCRIBEd cannot issue PUBLISH commands, so it doesn't make to have
it here.
* Pipelines now contain WATCH and UNWATCH. Calling WATCH or UNWATCH from
the base client class will result in a deprecation warning.
* 2.4.5
* The PythonParser now works better when reading zero length strings.
* 2.4.4
Expand Down
235 changes: 128 additions & 107 deletions redis/client.py
Expand Up @@ -81,8 +81,9 @@ def zset_score_pairs(response, **options):
"""
if not response or not options['withscores']:
return response
score_cast_func = options.get('score_cast_func', float)
it = iter(response)
return zip(it, imap(float, it))
return zip(it, imap(score_cast_func, it))

def int_or_none(response):
if response is None:
Expand Down Expand Up @@ -519,6 +520,18 @@ def type(self, name):
"Returns the type of key ``name``"
return self.execute_command('TYPE', name)

def watch(self, *names):
"""
Watches the values at keys ``names``, or None if the key doesn't exist
"""
warnings.warn(DeprecationWarning('Call WATCH from a Pipeline object'))

def unwatch(self):
"""
Unwatches the value at key ``name``, or None of the key doesn't exist
"""
warnings.warn(DeprecationWarning('Call UNWATCH from a Pipeline object'))

#### LIST COMMANDS ####
def blpop(self, keys, timeout=0):
"""
Expand Down Expand Up @@ -829,27 +842,31 @@ def zinterstore(self, dest, keys, aggregate=None):
"""
return self._zaggregate('ZINTERSTORE', dest, keys, aggregate)

def zrange(self, name, start, end, desc=False, withscores=False):
def zrange(self, name, start, end, desc=False, withscores=False,
score_cast_func=float):
"""
Return a range of values from sorted set ``name`` between
``start`` and ``end`` sorted in ascending order.
``start`` and ``end`` can be negative, indicating the end of the range.
``desc`` indicates to sort in descending order.
``desc`` a boolean indicating whether to sort the results descendingly
``withscores`` indicates to return the scores along with the values.
The return type is a list of (value, score) pairs
``score_cast_func`` a callable used to cast the score return value
"""
if desc:
return self.zrevrange(name, start, end, withscores)
pieces = ['ZRANGE', name, start, end]
if withscores:
pieces.append('withscores')
return self.execute_command(*pieces, **{'withscores': withscores})
options = {'withscores': withscores, 'score_cast_func': score_cast_func}
return self.execute_command(*pieces, **options)

def zrangebyscore(self, name, min, max,
start=None, num=None, withscores=False):
start=None, num=None, withscores=False, score_cast_func=float):
"""
Return a range of values from the sorted set ``name`` with scores
between ``min`` and ``max``.
Expand All @@ -859,6 +876,8 @@ def zrangebyscore(self, name, min, max,
``withscores`` indicates to return the scores along with the values.
The return type is a list of (value, score) pairs
`score_cast_func`` a callable used to cast the score return value
"""
if (start is not None and num is None) or \
(num is not None and start is None):
Expand All @@ -868,7 +887,8 @@ def zrangebyscore(self, name, min, max,
pieces.extend(['LIMIT', start, num])
if withscores:
pieces.append('withscores')
return self.execute_command(*pieces, **{'withscores': withscores})
options = {'withscores': withscores, 'score_cast_func': score_cast_func}
return self.execute_command(*pieces, **options)

def zrank(self, name, value):
"""
Expand Down Expand Up @@ -897,7 +917,8 @@ def zremrangebyscore(self, name, min, max):
"""
return self.execute_command('ZREMRANGEBYSCORE', name, min, max)

def zrevrange(self, name, start, num, withscores=False):
def zrevrange(self, name, start, num, withscores=False,
score_cast_func=float):
"""
Return a range of values from sorted set ``name`` between
``start`` and ``num`` sorted in descending order.
Expand All @@ -906,14 +927,17 @@ def zrevrange(self, name, start, num, withscores=False):
``withscores`` indicates to return the scores along with the values
The return type is a list of (value, score) pairs
``score_cast_func`` a callable used to cast the score return value
"""
pieces = ['ZREVRANGE', name, start, num]
if withscores:
pieces.append('withscores')
return self.execute_command(*pieces, **{'withscores': withscores})
options = {'withscores': withscores, 'score_cast_func': score_cast_func}
return self.execute_command(*pieces, **options)

def zrevrangebyscore(self, name, max, min,
start=None, num=None, withscores=False):
start=None, num=None, withscores=False, score_cast_func=float):
"""
Return a range of values from the sorted set ``name`` with scores
between ``min`` and ``max`` in descending order.
Expand All @@ -923,6 +947,8 @@ def zrevrangebyscore(self, name, max, min,
``withscores`` indicates to return the scores along with the values.
The return type is a list of (value, score) pairs
``score_cast_func`` a callable used to cast the score return value
"""
if (start is not None and num is None) or \
(num is not None and start is None):
Expand All @@ -932,7 +958,8 @@ def zrevrangebyscore(self, name, max, min,
pieces.extend(['LIMIT', start, num])
if withscores:
pieces.append('withscores')
return self.execute_command(*pieces, **{'withscores': withscores})
options = {'withscores': withscores, 'score_cast_func': score_cast_func}
return self.execute_command(*pieces, **options)

def zrevrank(self, name, value):
"""
Expand Down Expand Up @@ -1136,13 +1163,6 @@ def unsubscribe(self, channels=[]):
pass
return self.execute_command('UNSUBSCRIBE', *channels)

def publish(self, channel, message):
"""
Publish ``message`` on ``channel``.
Returns the number of subscribers the message was delivered to.
"""
return self.execute_command('PUBLISH', channel, message)

def listen(self):
"Listen for messages on channels this client has been subscribed to"
while self.subscription_count:
Expand Down Expand Up @@ -1185,17 +1205,76 @@ class Pipeline(Redis):
def __init__(self, connection_pool, response_callbacks, transaction,
shard_hint):
self.connection_pool = connection_pool
self.connection = None
self.response_callbacks = response_callbacks
self.transaction = transaction
self.shard_hint = shard_hint

self._real_exec = self.default_execute_command
self._pipe_exec = self.pipeline_execute_command
self._watching = False
self.reset()

def _get_watch(self):
return self._watching

def _set_watch(self, value):
self._watching = value
self.execute_command = value and self._real_exec or self._pipe_exec

watching = property(_get_watch, _set_watch)

def reset(self):
self.command_stack = []
# make sure to reset the connection state in the event that we were
# watching something
if self.watching and self.connection:
try:
# call this manually since our unwatch or
# default_execute_command methods can call reset()
self.connection.send_command('UNWATCH')
self.connection.read_response()
except ConnectionError:
# disconnect will also remove any previous WATCHes
self.connection.disconnect()
# clean up the other instance attributes
self.watching = False
if self.transaction:
self.execute_command('MULTI')
# we can safely return the connection to the pool here since we're
# sure we're no longer WATCHing anything
if self.connection:
self.connection_pool.release(self.connection)
self.connection = None

def execute_command(self, *args, **options):
def multi(self):
"""
Start a transactional block of the pipeline after WATCH commands
are issued. End the transactional block with `execute`.
"""
self.execute_command = self._pipe_exec

def default_execute_command(self, *args, **options):
"""
Execute a command, but don't auto-retry on a ConnectionError. Used
when issuing WATCH or subsequent commands retrieving their values
but before MULTI is called.
"""
command_name = args[0]
conn = self.connection
# if this is the first call, we need a connection
if not conn:
conn = self.connection_pool.get_connection(command_name,
self.shard_hint)
self.connection = conn
try:
conn.send_command(*args)
return self.parse_response(conn, command_name, **options)
except ConnectionError:
self.reset()
raise

def pipeline_execute_command(self, *args, **options):
"""
Stage a command to be executed when execute() is next called
Expand Down Expand Up @@ -1229,7 +1308,7 @@ def _execute_transaction(self, connection, commands):

if len(response) != len(commands):
raise ResponseError("Wrong number of response items from "
"pipeline execution")
"pipeline execution")
# We have to run response callbacks manually
data = []
for r, cmd in izip(response, commands):
Expand All @@ -1242,7 +1321,7 @@ def _execute_transaction(self, connection, commands):
return data

def _execute_pipeline(self, connection, commands):
# build up all commands into a single request to increase network perf
# build up all commands into a single request to increase network perf
all_cmds = ''.join(starmap(connection.pack_command,
[args for args, options in commands]))
connection.send_packed_command(all_cmds)
Expand All @@ -1257,108 +1336,50 @@ def execute(self):
else:
execute = self._execute_pipeline
stack = self.command_stack
self.reset()
conn = self.connection_pool.get_connection('MULTI', self.shard_hint)
conn = self.connection or \
self.connection_pool.get_connection('MULTI', self.shard_hint)
try:
return execute(conn, stack)
except ConnectionError:
conn.disconnect()
# if we watching a variable, the watch is no longer valid since
# this conncetion has died.
if self.watching:
raise WatchError("Watched variable changed.")
return execute(conn, stack)
finally:
self.connection_pool.release(conn)


class RedisConnection(Redis):
"""
A ``Redis`` which is bound to one single connection, allowing transactional
commands to be run in a thread-safe manner.
Note that, unlike ``Redis``, ``RedisConnection`` may raise a
``ConnectionError`` which should be handled by the caller.
See also: ``Redis.connection()``.
"""

connection = None

def get_connection(self, command_name, options):
if self.connection is None:
# XXX: how is the 'command_name' used?
self.connection = self.connection_pool.get_connection(command_name,
**options)
return self.connection

def execute_command(self, *args, **options):
"""
Execute a command and return a parsed response.
Note: unlike Redis.execute_command, this may raise a
``ConnectionError``, which should be handled by the calling code.
"""

command_name = args[0]
connection = self.get_connection(command_name, options)
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()

def close(self):
"If a connection exists, return it to the connection pool."
if self.connection is not None:
# XXX: some logic could be added here to only call ``discard`` if
# ``multi`` or ``watch`` were issued.
self.discard()
self.connection_pool.release(self.connection)
self.connection = None

def pipeline(self, transaction=True, shard_hint=None):
# XXX: I don't think pipelines make any sense on a connection which is
# "bound" like this. Am I wrong in this?
raise Exception("not done yet")
self.reset()

def watch(self, *names):
"""
Watches the values at keys ``names``, or None if the key doesn't exist
Watches the values at keys ``names``
"""
if not self.transaction:
raise RedisError("Can only WATCH when using transactions")
# if more than 'MULTI' is in the command_stack, we can't WATCH anymore
if self.watching and len(self.command_stack) > 1:
raise RedisError("Can only WATCH before issuing pipeline commands")
self.watching = True
return self.execute_command('WATCH', *names)

def unwatch(self):
"""
Unwatches the all watched keys.
"""
return self.execute_command('UNWATCH')

def multi(self):
"""
Marks the start of a transaction block.
All further commands will return None until ``execute`` is called.
"""
self.execute_command('MULTI')

def execute(self):
Unwatches all previously specified keys
"""
Executes all commands which have been executed since the last ``multi``.
Returns a list of each command's result.
"""
self.execute_command('EXEC')
# XXX: Need to collect the results and return them
# XXX: update the docs to note that the command is 'execute' not 'exec'.
raise Exception("not done yet")

def discard(self):
"""
Discards all commands which have been executed since the last ``multi``.
"""
self.execute_command('DISCARD')


if not self.transaction:
raise RedisError("Can only UNWATCH when using transactions")
# if more than 'MULTI' is in the command_stack, we can't UNWATCH anymore
if self.watching:
if len(self.command_stack) > 1:
raise RedisError("Can only UNWATCH before issuing "
"pipeline commands")
response = self.execute_command('UNWATCH')
else:
response = True
# it's safe to reset() here because we are no longer bound to a
# single connection and we're sure the command stack is empty.
self.reset()
return response

class LockError(RedisError):
"Errors thrown from the Lock"
Expand Down

0 comments on commit 4bc9b77

Please sign in to comment.