Skip to content

Commit

Permalink
Fixed a race condition in multi(), Added an example for transactions.
Browse files Browse the repository at this point in the history
  • Loading branch information
jeethu committed Mar 15, 2012
1 parent fcaaa45 commit 4f80c24
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 14 deletions.
63 changes: 63 additions & 0 deletions examples/transaction.py
@@ -0,0 +1,63 @@
#!/usr/bin/env python
# coding: utf-8

import txredisapi as redis

from twisted.internet import defer
from twisted.internet import reactor


@defer.inlineCallbacks
def transactions():
"""
The multi() method on txredisapi connections returns a transaction.
All redis commands called on transactions are then queued by the server.
Calling the transaction's .commit() method executes all the queued commands
and returns the result for each queued command in a list.
Calling the transaction's .discard() method flushes the queue and returns
the connection to the default non-transactional state.
multi() also takes an optional argument keys, which can be either a
string or an iterable (list,tuple etc) of strings. If present, the keys
are WATCHED on the server, and if any of the keys are modified by
a different connection before the transaction is committed,
commit() raises a WatchError exception.
Transactions with WATCH make multi-command atomic all or nothing operations
possible. If a transaction fails, you can be sure that none of the commands
in it ran and you can retry it again.
Read the redis documentation on Transactions for more.
http://redis.io/topics/transactions
Tip: Try to keep transactions as short as possible.
Connections in a transaction cannot be reused until the transaction
is either committed or discarded. For instance, if you have a
ConnectionPool with 10 connections and all of them are in transactions,
if you try to run a command on the connection pool,
it'll raise a RedisError exception.
"""
conn = yield redis.Connection()
# A Transaction with nothing to watch on
txn = yield conn.multi()
txn.incr('test:a')
txn.lpush('test:l', 'test')
r = yield txn.commit() # Commit txn,call txn.discard() to discard it
print 'Transaction1: %s' % r

# Another transaction with a few values to watch on
txn1 = yield conn.multi(['test:l', 'test:h'])
txn1.lpush('test:l', 'test')
txn1.hset('test:h', 'test', 'true')
# Transactions with watched keys will fail if any of the keys are modified
# externally after calling .multi() and before calling transaction.commit()
r = yield txn1.commit() # This will raise if WatchError if txn1 fails.
print 'Transaction2: %s' % r
yield conn.disconnect()


def main():
transactions().addCallback(lambda ign: reactor.stop())

if __name__ == '__main__':
reactor.callWhenRunning(main)
reactor.run()
3 changes: 3 additions & 0 deletions tests/test_watch.py
Expand Up @@ -58,6 +58,7 @@ def testRedisWatchFail(self):
db1 = yield self._getRedisConnection()
yield self.db.set(self._KEYS[0], 'foo')
t = yield self.db.multi(self._KEYS[0])
self.assertIsInstance(t, redis.RedisProtocol)
yield t.set(self._KEYS[1], 'bar')
# This should trigger a failure
yield db1.set(self._KEYS[0], 'bar1')
Expand All @@ -67,12 +68,14 @@ def testRedisWatchFail(self):
def testRedisWatchSucceed(self):
yield self.db.set(self._KEYS[0], 'foo')
t = yield self.db.multi(self._KEYS[0])
self.assertIsInstance(t, redis.RedisProtocol)
yield t.set(self._KEYS[0], 'bar')
yield t.commit().addBoth(self._check_watcherror, shouldError=False)

@defer.inlineCallbacks
def testRedisMultiNoArgs(self):
yield self.db.set(self._KEYS[0], 'foo')
t = yield self.db.multi()
self.assertIsInstance(t, redis.RedisProtocol)
yield t.set(self._KEYS[1], 'bar')
yield t.commit().addBoth(self._check_watcherror, shouldError=False)
29 changes: 15 additions & 14 deletions txredisapi.py
Expand Up @@ -1000,26 +1000,27 @@ def sort(self, key, start=None, end=None, by=None, get=None,
# That object must be used for further interactions within
# the transaction. At the end, either exec() or discard()
# must be executed.
def multi(self, watch_keys=None):
if watch_keys:
if isinstance(watch_keys, (str, unicode)):
watch_keys = [watch_keys]
def multi(self, keys=None):
if keys:
if isinstance(keys, (str, unicode)):
keys = [keys]
d = defer.Deferred()
self.execute_command("WATCH", *watch_keys).addCallback(
self.execute_command("WATCH", *keys).addCallback(
self._watch_added, d)
return d
return self.execute_command("MULTI").addCallback(self._multi_started)
else:
d = self.execute_command("MULTI").addCallback(self._multi_started)
self.inTransaction = True
return d

def _watch_added(self, response, d):
if response == 'OK':
self.execute_command("MULTI").addCallback(
self._multi_started).chainDeferred(d)
else:
d.errback(WatchError("Invalid WATCH response: %s" % (response)))
if response != 'OK':
d.errback(RedisError('Invalid WATCH response: %s' % response))
self.execute_command("MULTI").addCallback(
self._multi_started).chainDeferred(d)

def _multi_started(self, response):
if response == 'OK':
self.inTransaction = True
if response != 'OK':
raise RedisError('Invalid MULTI response: %s' % response)
return self

def _commit_check(self, response):
Expand Down

0 comments on commit 4f80c24

Please sign in to comment.