Permalink
Browse files

Merge pull request #33 from dgvncsz0f/master

Adding proper watch support and implements its dual, the unwatch command
  • Loading branch information...
2 parents 843a3ae + 52cdc1a commit d150918b79bedfbd47bdbd405603427292f9bd61 @gleicon gleicon committed Dec 21, 2012
Showing with 56 additions and 18 deletions.
  1. +31 −2 tests/test_transactions.py
  2. +25 −16 txredisapi.py
@@ -30,12 +30,41 @@ def testRedisConnection(self):
rapi = yield txredisapi.Connection(redis_host, redis_port)
# test set() operation
- transaction = yield rapi.multi()
- print "oi"
+ transaction = yield rapi.multi("txredisapi:test_transaction")
+ self.assertTrue(transaction.inTransaction)
for key, value in (("txredisapi:test_transaction", "foo"), ("txredisapi:test_transaction", "bar")):
yield transaction.set(key, value)
yield transaction.commit()
+ self.assertFalse(transaction.inTransaction)
result = yield rapi.get("txredisapi:test_transaction")
self.assertEqual(result, "bar")
yield rapi.disconnect()
+
+ @defer.inlineCallbacks
+ def testRedisWithOnlyWatchUnwatch(self):
+ rapi = yield txredisapi.Connection(redis_host, redis_port)
+
+ k = "txredisapi:testRedisWithOnlyWatchAndUnwatch"
+ tx = yield rapi.watch(k)
+ self.assertTrue(tx.inTransaction)
+ yield tx.set(k, "bar")
+ v = yield tx.get(k)
+ self.assertEqual("bar", v)
+ yield tx.unwatch()
+ self.assertFalse(tx.inTransaction)
+
+ yield rapi.disconnect()
+
+ @defer.inlineCallbacks
+ def testRedisWithWatchAndMulti(self):
+ rapi = yield txredisapi.Connection(redis_host, redis_port)
+
+ tx = yield rapi.watch("txredisapi:testRedisWithWatchAndMulti")
+ yield tx.multi()
+ yield tx.unwatch()
+ self.assertTrue(tx.inTransaction)
+ yield tx.commit()
+ self.assertFalse(tx.inTransaction)
+
+ yield rapi.disconnect()
View
@@ -197,6 +197,7 @@ def __init__(self, charset="utf-8", errors="strict"):
self.transactions = 0
self.inTransaction = False
+ self.unwatch_cc = lambda: ()
@defer.inlineCallbacks
def connectionMade(self):
@@ -1195,32 +1196,40 @@ def sort(self, key, start=None, end=None, by=None, get=None,
return self.execute_command("SORT", *pieces)
+ def _clear_txstate(self):
+ self.inTransaction = self.transactions != 0
+
+ def watch(self, keys):
+ self.inTransaction = True
+ self.unwatch_cc = self._clear_txstate
+ if isinstance(keys, (str, unicode)):
+ keys = [keys]
+ d = self.execute_command("WATCH", *keys).addCallback(self._tx_started)
+ return d
+
+ def unwatch(self):
+ self.unwatch_cc()
+ return self.execute_command("UNWATCH")
+
# Transactions
# multi() will return a deferred with a "connection" object
# That object must be used for further interactions within
# the transaction. At the end, either exec() or discard()
# must be executed.
def multi(self, keys=None):
- self.inTransaction = True
- if keys:
- if isinstance(keys, (str, unicode)):
- keys = [keys]
- d = defer.Deferred()
- self.execute_command("WATCH", *keys).addCallback(
- self._watch_added, d)
+ self.unwatch_cc = lambda: ()
+ if keys is not None:
+ d = self.watch(keys)
+ d.addCallback(lambda _: self.execute_command("MULTI"))
else:
- d = self.execute_command("MULTI").addCallback(self._multi_started)
+ self.inTransaction = True
+ d = self.execute_command("MULTI")
+ d.addCallback(self._tx_started)
return d
- def _watch_added(self, response, d):
- if response != 'OK':
- d.errback(RedisError('Invalid WATCH response: %s' % response))
- self.execute_command("MULTI").addCallback(
- self._multi_started).chainDeferred(d)
-
- def _multi_started(self, response):
+ def _tx_started(self, response):
if response != 'OK':
- raise RedisError('Invalid MULTI response: %s' % response)
+ raise RedisError('Invalid response: %s' % response)
return self
def _commit_check(self, response):

0 comments on commit d150918

Please sign in to comment.